blob: bd54e5890a7093546454846b1603515803b72fe3 [file] [log] [blame]
#!/usr/bin/env vpython
# coding=utf-8
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import base64
import datetime
import json
import logging
import os
import random
import sys
import unittest
import mock
from parameterized import parameterized
import test_env_handlers
from test_support import test_case
from google.appengine.ext import ndb
from protorpc.remote import protojson
import webapp2
import webtest
from components import auth
from components import ereporter2
from components import utils
import handlers_bot
import handlers_endpoints
import swarming_rpcs
from proto.config import realms_pb2
from server import acl
from server import bot_code
from server import bot_management
from server import config
from server import large
from server import service_accounts
from server import task_pack
from server import task_queues
from server import task_request
from server import task_result
from server import task_scheduler
DATETIME_NO_MICRO = '%Y-%m-%dT%H:%M:%S'
def fmtdate(d, fmt='%Y-%m-%dT%H:%M:%S'):
"""Formats a datetime.datetime instance to the format generated by the API."""
return unicode(d.strftime(fmt))
def message_to_dict(rpc_message):
return json.loads(protojson.encode_message(rpc_message))
def _bot_event(event_type, bot_id, **kwargs):
args = {
'authenticated_as': 'bot:whitelisted-ip',
'dimensions': {
u'id': [bot_id],
u'pool': [u'default']
},
'external_ip': '8.8.4.4',
'state': {
'ram': 65
},
'version': '123456789',
'quarantined': False,
'maintenance_msg': None,
'task_id': None,
'task_name': None,
'register_dimensions': event_type.startswith('request_')
}
args.update(kwargs)
return bot_management.bot_event(event_type, bot_id, **args)
class BaseTest(test_env_handlers.AppTestBase, test_case.EndpointsTestCase):
# These test fail with 'Unknown bot ID, not in config'
# Need to run in test_seq.py
no_run = 1
def setUp(self):
test_case.EndpointsTestCase.setUp(self)
super(BaseTest, self).setUp()
# handlers_bot is necessary to create fake tasks.
self.app = webtest.TestApp(
webapp2.WSGIApplication(handlers_bot.get_routes(), debug=True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
self.mock(ereporter2, 'log_request', lambda *args, **kwargs: self.fail(
'%s, %s' % (args, kwargs)))
# Client API test cases run by default as user.
self.set_as_user()
self.mock(utils, 'enqueue_task', self._enqueue_task)
self.mock(utils, 'enqueue_task_async', self._enqueue_task_async)
self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(self.now)
@ndb.non_transactional
def _enqueue_task(self, url, queue_name, **kwargs):
del kwargs
if queue_name in ('cancel-children-tasks', 'pubsub'):
return True
self.fail(url)
@ndb.non_transactional
def _enqueue_task_async(self, url, queue_name, payload):
if queue_name == 'rebuild-task-cache':
return task_queues.rebuild_task_cache_async(payload)
self.fail(url)
class ServerApiTest(BaseTest):
api_service_cls = handlers_endpoints.SwarmingServerService
def test_details(self):
"""Asserts that server_details returns the correct version."""
self.mock_default_pool_acl([])
self.mock(config.config, 'config_service_hostname', lambda: 'a.server')
with mock.patch.dict("os.environ", {"CHOPS_GIT_VERSION": '5626-39642e9'}):
response = self.call_api('details')
expected = {
u'bot_version':
unicode(bot_code.get_bot_version('https://testbed.example.com')[0]),
u'display_server_url_template':
u'',
u'project_id':
u'sample-app',
u'chops_git_version':
u'5626-39642e9',
u'luci_config':
u'a.server',
u'default_isolate_server':
u'https://pool.config.isolate.example.com',
u'default_isolate_namespace':
u'default-gzip',
u'server_version':
unicode(utils.get_app_version()),
u'cas_viewer_server':
u'https://test-cas-viewer-server.com',
}
self.assertEqual(expected, response.json)
def test_public_permissions(self):
"""Asserts that permissions respond correctly to an unauthed user."""
self.set_as_anonymous()
response = self.call_api('permissions')
expected = {
u'cancel_task': False,
u'cancel_tasks': False,
u'delete_bot': False,
u'get_bootstrap_token': False,
u'get_configs': False,
u'put_configs': False,
u'terminate_bot': False,
}
self.assertEqual(expected, response.json)
def test_user_permissions(self):
"""Asserts that permissions respond correctly to a basic user."""
self.set_as_user()
response = self.call_api('permissions')
expected = {
u'cancel_task': False,
u'cancel_tasks': False,
u'delete_bot': False,
u'get_bootstrap_token': False,
u'get_configs': False,
u'put_configs': False,
u'terminate_bot': False,
}
self.assertEqual(expected, response.json)
def test_user_permissions_with_bot_id(self):
"""Asserts that permissions respond correctly to a user has pool
permissions.
"""
# create a bot, and a task.
self.set_as_bot()
self.bot_poll()
self.set_as_admin()
self.mock_default_pool_acl([])
_, task_id = self.client_create_task_raw()
# The user has realm permissions.
self.set_as_user()
self.mock_auth_db([
auth.Permission('swarming.pools.cancelTask'),
auth.Permission('swarming.pools.listBots'),
auth.Permission('swarming.pools.listTasks'),
auth.Permission('swarming.pools.terminateBot'),
])
params = {
'bot_id': 'bot1',
'task_id': task_id,
'tags': ['pool:default'],
}
response = self.call_api('permissions', body=params, status=200)
expected = {
u'cancel_task': True,
u'cancel_tasks': True,
u'delete_bot': False,
u'get_bootstrap_token': False,
u'get_configs': False,
u'list_bots': [u'default', u'template'],
u'list_tasks': [u'default', u'template'],
u'put_configs': False,
u'terminate_bot': True,
}
self.assertEqual(expected, response.json)
def test_privileged_user_permissions(self):
"""Asserts that permissions respond correctly to a privileged user."""
self.set_as_privileged_user()
response = self.call_api('permissions')
expected = {
u'cancel_task': True,
u'cancel_tasks': False,
u'delete_bot': False,
u'get_bootstrap_token': False,
u'get_configs': False,
u'put_configs': False,
u'terminate_bot': True,
}
self.assertEqual(expected, response.json)
def test_admin_permissions(self):
"""Asserts that permissions respond correctly to an admin."""
self.set_as_admin()
response = self.call_api('permissions')
expected = {
u'cancel_task': True,
u'cancel_tasks': True,
u'delete_bot': True,
u'get_bootstrap_token': True,
u'get_configs': True,
u'put_configs': True,
u'terminate_bot': True,
}
self.assertEqual(expected, response.json)
def _test_file(self, name, header):
# Tests either get_bootstrap or get_bot_config.
self.set_as_admin()
path = os.path.join(self.APP_DIR, 'swarming_bot', 'config', name + '.py')
with open(path, 'rb') as f:
content = f.read().decode('utf-8')
expected = {
u'content': header + content,
}
self.assertEqual(expected, self.call_api('get_' + name).json)
# Define a script on the luci-config server.
def get_self_config_mock(path, revision=None, store_last_good=False):
self.assertEqual('scripts/%s.py' % name, path)
if revision:
self.assertEqual(False, store_last_good)
return revision, 'old code'
self.assertEqual(None, revision)
self.assertEqual(True, store_last_good)
return 'abc', 'foo bar'
def config_service_hostname_mock():
return 'localhost:1'
self.mock(bot_code.config, 'get_self_config', get_self_config_mock)
self.mock(bot_code.config, 'config_service_hostname',
config_service_hostname_mock)
expected = {
u'content': header + u'foo bar',
u'version': u'abc',
u'who': u'localhost:1',
}
self.assertEqual(expected, self.call_api('get_' + name).json)
def test_bootstrap(self):
self._test_file(
'bootstrap', '#!/usr/bin/env python\n'
'# coding: utf-8\n'
'host_url = \'\'\n'
'bootstrap_token = \'\'\n')
def test_bot_config(self):
self._test_file('bot_config', '')
class TasksApiTest(BaseTest):
api_service_cls = handlers_endpoints.SwarmingTasksService
def setUp(self):
super(TasksApiTest, self).setUp()
utils.clear_cache(config.settings)
self.mock_default_pool_acl(['service-account@example.com'])
def test_new_ok_raw(self):
"""Asserts that new generates appropriate metadata."""
oauth_grant_calls = self.mock_task_service_accounts()
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
expiration_secs=30,
properties=self.create_props(
command=['rm', '-rf', '/'],
execution_timeout_secs=30,
grace_period_secs=15),
pubsub_topic='projects/abc/topics/def',
pubsub_auth_token='secret that must not be shown',
pubsub_userdata='userdata',
service_account='service-account@example.com',
)
expected_props = self.gen_props(
command=[u'rm', u'-rf', u'/'],
execution_timeout_secs=u'30',
grace_period_secs=u'15')
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
expiration_secs=u'30',
priority=u'20',
properties=expected_props,
pubsub_topic=u'projects/abc/topics/def',
pubsub_userdata=u'userdata',
tags=[
u'a:tag', u'os:Amiga', u'pool:default', u'priority:20',
u'realm:none',
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost'
],
service_account=u'service-account@example.com',
task_slices=[
{
u'expiration_secs': u'30',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
}
# The task was not created for real, so there's no task id.
task_id = expected[u'request'].pop(u'task_id')
# Do an evaluate_only call first
request.evaluate_only = True
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
request.evaluate_only = False
expected[u'task_id'] = u'5cee488008810'
expected[u'request'][u'task_id'] = task_id
expected[u'task_result'] = {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
}
response = self.call_api('new', body=message_to_dict(request))
# Time advanced since the evaluate_only call.
expected['request']['created_ts'] = fmtdate(self.now)
self.assertEqual(expected, response.json)
# Asked for a correct grant(s), since both evaluate and non-evaluate modes
# do auth check.
self.assertEqual([
(u'service-account@example.com', datetime.timedelta(0, 30 + 30 + 15))
] * 2, oauth_grant_calls)
def test_new_ok_template(self):
"""Asserts that new generates appropriate metadata for a templated task."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(random, 'randint', lambda *_: 10000) # always pick prod
props = self.create_props(
command=['rm', '-rf', '/'],
execution_timeout_secs=30,
grace_period_secs=15,
dimensions=[
{
u'key': u'os',
u'value': u'Amiga'
},
{
u'key': u'pool',
u'value': u'template'
},
])
# We want to observe the pool default
props[u'cipd_input'][u'client_package'] = None
props[u'cipd_input'][u'server'] = None
request = self.create_new_request(properties=props, tags=[u'a:tag'])
expected_props = self.gen_props(
command=[u'rm', u'-rf', u'/'],
execution_timeout_secs=u'30',
grace_period_secs=u'15',
env=[{
u'key': u'VAR',
u'value': u'prod'
}],
dimensions=[
{
u'key': u'os',
u'value': u'Amiga'
},
{
u'key': u'pool',
u'value': u'template'
},
],
inputs_ref={
u'isolatedserver': u'https://pool.config.isolate.example.com',
u'namespace': u'default-gzip',
},
)
expected_props[u'cipd_input'][u'client_package'] = {
u'package_name': u'cipd-client-pkg',
u'version': u'from_pool_config',
}
expected_props[u'cipd_input'][u'server'] = (
u'https://pool.config.cipd.example.com')
# Need to add the additional package that the template applies.
expected_props[u'cipd_input'][u'packages'].insert(0, {
u'package_name': u'some-pkg',
u'path': u'.',
u'version': u'prod-version'
})
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
priority=u'20',
properties=expected_props,
tags=[
u'a:tag',
u'os:Amiga',
u'pool:template',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:prod',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:template',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:prod',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_ok_template_no_additional_packages(self):
"""Asserts that new generates appropriate metadata for a templated task."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(random, 'randint', lambda *_: 10000) # always pick prod
props = self.create_props(
command=['echo', 'hello', 'world'],
execution_timeout_secs=30,
grace_period_secs=15,
dimensions=[
{
u'key': u'os',
u'value': u'Amiga'
},
{
u'key': u'pool',
u'value': u'template'
},
])
# We want to observe the pool defaults when no CIPD props were set at all
# in the task request.
props[u'cipd_input'] = None
request = self.create_new_request(properties=props, tags=[u'a:tag'])
expected_cipd_props = {
u'client_package': {
u'version': u'from_pool_config',
u'package_name': u'cipd-client-pkg'
},
u'server':
u'https://pool.config.cipd.example.com',
u'packages': [{
u'path': u'.',
u'version': u'prod-version',
u'package_name': u'some-pkg'
}],
}
response = self.call_api('new', body=message_to_dict(request))
# Much of the response was verified in the previous template test, so just
# verify the CIPD related props here.
self.assertEqual(expected_cipd_props,
response.json[u'request'][u'properties'][u'cipd_input'])
def test_new_bad_service_account(self):
oauth_grant_calls = self.mock_task_service_accounts()
request = self.create_new_request(
properties=self.create_props(command=['rm', '-rf', '/']),
service_account='bad email')
response = self.call_api('new', body=message_to_dict(request), status=400)
self.assertEqual(
{
u'error': {
u'message': u'\'service_account\' must be an email, "bot" or '
'"none" string, got u\'bad email\''
},
}, response.json)
self.assertFalse(oauth_grant_calls)
def test_new_forbidden_service_account(self):
self.mock_task_service_accounts(
exc=auth.AuthorizationError('forbidden account'))
request = self.create_new_request(
properties=self.create_props(command=['rm', '-rf', '/']),
service_account='service-account@example.com')
response = self.call_api('new', body=message_to_dict(request), status=403)
self.assertEqual({
u'error': {
u'message': u'forbidden account'
},
}, response.json)
def test_new_ok_deduped(self):
"""Asserts that new returns task result for deduped."""
# Run a task to completion.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw(
tags=['project:yay', 'commit:post'], properties=dict(idempotent=True))
self.set_as_bot()
self.bot_run_task()
self.mock(random, 'getrandbits', lambda _: 0x66)
now_30 = self.mock_now(self.now, 30)
# Expectations.
t_result = self.gen_result_summary(
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
tags=[
u'commit:post',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'project:yay',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
try_number=u'1')
t_request = self.gen_request(
created_ts=fmtdate(self.now),
properties=self.gen_props(
command=[u'python', u'run_test.py'], idempotent=True),
priority=u'20',
tags=[
u'commit:post',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'project:yay',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
task_slices=[
{
u'expiration_secs':
u'86400',
u'properties':
self.gen_props(
command=[u'python', u'run_test.py'], idempotent=True),
u'wait_for_capacity':
False,
},
])
# Make sure it completed.
self.set_as_privileged_user()
expected = {u'items': [t_result], u'now': fmtdate(now_30)}
request = handlers_endpoints.TasksRequest.combined_message_class()
response = self.call_api('list', body=message_to_dict(request))
self.assertEqual(expected, response.json)
expected = {u'items': [t_request], u'now': fmtdate(now_30)}
request = handlers_endpoints.TasksRequest.combined_message_class()
response = self.call_api('requests', body=message_to_dict(request))
self.assertEqual(expected, response.json)
# Expectations.
deduped_request = self.gen_request(
created_ts=fmtdate(now_30),
expiration_secs=u'30',
name=u'job2',
priority=u'200',
properties=self.gen_props(
command=[u'python', u'run_test.py'], idempotent=True),
tags=[
u'commit:pre',
u'os:Amiga',
u'pool:default',
u'priority:200',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
task_id=u'5cf59b8006610',
task_slices=[
{
u'expiration_secs':
u'30',
u'properties':
self.gen_props(
command=[u'python', u'run_test.py'], idempotent=True),
u'wait_for_capacity':
False,
},
])
deduped_result = self.gen_result_summary(
completed_ts=fmtdate(self.now),
cost_saved_usd=0.1,
created_ts=fmtdate(now_30),
duration=0.1,
deduped_from=u'5cee488008811',
exit_code=u'0',
modified_ts=fmtdate(now_30),
name=u'job2',
task_id=u'5cf59b8006610',
started_ts=fmtdate(self.now),
tags=[
u'commit:pre',
u'os:Amiga',
u'pool:default',
u'priority:200',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
])
expected = {
u'request': deduped_request,
u'task_id': u'5cf59b8006610',
u'task_result': deduped_result,
}
self.set_as_user()
new_req = self.create_new_request(
expiration_secs=30,
name='job2',
priority=200,
tags=['commit:pre'],
properties=self.create_props(
command=['python', 'run_test.py'], idempotent=True))
response = self.call_api('new', body=message_to_dict(new_req))
self.assertEqual(expected, response.json)
self.set_as_privileged_user()
expected = {
u'items': [deduped_result],
u'now': fmtdate(now_30),
}
request = handlers_endpoints.TasksRequest.combined_message_class(
state=swarming_rpcs.TaskStateQuery.DEDUPED)
response = self.call_api('list', body=message_to_dict(request))
self.assertEqual(expected, response.json)
# Assert the entity presence.
self.assertEqual(2, task_request.TaskRequest.query().count())
self.assertEqual(2, task_result.TaskResultSummary.query().count())
self.assertEqual(1, task_result.TaskRunResult.query().count())
# Deduped task have no performance data associated.
request = handlers_endpoints.TasksRequest.combined_message_class(
state=swarming_rpcs.TaskStateQuery.DEDUPED,
include_performance_stats=True)
response = self.call_api('list', body=message_to_dict(request))
self.assertEqual(expected, response.json)
# Use the occasion to test 'count' and 'requests'.
start = utils.datetime_to_timestamp(self.now) / 1000000. - 1
end = utils.datetime_to_timestamp(now_30) / 1000000. + 1
request = handlers_endpoints.TasksRequest.combined_message_class(
start=start, end=end, state=swarming_rpcs.TaskStateQuery.DEDUPED)
response = self.call_api('count', body=message_to_dict(request))
self.assertEqual({u'now': fmtdate(now_30), u'count': u'1'}, response.json)
expected = {u'items': [deduped_request, t_request], u'now': fmtdate(now_30)}
request = handlers_endpoints.TasksRequest.combined_message_class(
start=start, end=end)
response = self.call_api('requests', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_ok_isolated(self):
"""Asserts that new generates appropriate metadata."""
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
properties=self.create_props(
inputs_ref=swarming_rpcs.FilesRef(
isolated='1' * 40,
isolatedserver='http://localhost:1',
namespace='default-gzip')))
expected_props = self.gen_props(
inputs_ref={
u'isolated': u'1' * 40,
u'isolatedserver': u'http://localhost:1',
u'namespace': u'default-gzip',
})
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
properties=expected_props,
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_ok_isolated_with_defaults(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
properties=self.create_props(
inputs_ref=swarming_rpcs.FilesRef(isolated='1' * 40)))
expected_props = self.gen_props(
inputs_ref={
u'isolated': u'1' * 40,
u'isolatedserver': u'https://pool.config.isolate.example.com',
u'namespace': u'default-gzip',
})
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
properties=expected_props,
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_ok_cas_input_root(self):
"""Asserts that new generates appropriate metadata."""
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
properties=self.create_props(
command=['python', 'test.py'],
cas_input_root=swarming_rpcs.CASReference(
cas_instance='projects/test/instances/default',
digest=swarming_rpcs.Digest(hash='12345', size_bytes=1))))
expected_props = self.gen_props(
command=[u'python', u'test.py'],
cas_input_root={
u'cas_instance': u'projects/test/instances/default',
u'digest': {
u'hash': u'12345',
u'size_bytes': u'1',
},
})
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
properties=expected_props,
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_conflict_inputs(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
properties=self.create_props(
command=['python', 'test.py'],
cas_input_root=swarming_rpcs.CASReference(
cas_instance='projects/test/instances/default',
digest=swarming_rpcs.Digest(hash='12345', size_bytes=1)),
inputs_ref=swarming_rpcs.FilesRef(
isolated='1' * 40,
isolatedserver='http://localhost:1',
namespace='default-gzip')))
response = self.call_api('new', body=message_to_dict(request), status=400)
self.assertEqual("can't set both inputs_ref and cas_input_root",
response.json['error']['message'])
def test_new_cipd_package_with_defaults(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
expected_props = self.gen_props(
cipd_input={
u'client_package': {
u'package_name': u'cipd-client-pkg',
u'version': u'from_pool_config',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'.',
u'version': u'latest',
},],
u'server': u'https://pool.config.cipd.example.com',
},
command=[u'rm', u'-rf', u'/'],
env=[{
u'key': u'PATH',
u'value': u'/'
}])
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
properties=expected_props,
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
request = self.create_new_request(
properties=self.create_props(
cipd_input=swarming_rpcs.CipdInput(packages=[
swarming_rpcs.CipdPackage(
package_name='rm', path='.', version='latest'),
]),
command=['rm', '-rf', '/'],
env=[
swarming_rpcs.StringPair(key='PATH', value='/'),
]))
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
def test_new_task_slices_one(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_slices = [
{
u'expiration_secs': 180,
u'properties': self.create_props(command=['python', 'run_test.py']),
u'wait_for_capacity': False,
},
]
response, _ = self.client_create_task(
expiration_secs=None, task_slices=task_slices)
expected_props = self.gen_props(command=[u'python', u'run_test.py'])
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
expiration_secs=u'180',
properties=expected_props,
task_slices=[
{
u'expiration_secs': u'180',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
self.assertEqual(expected, response)
def test_new_task_slices_two(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_slices = [
{
u'expiration_secs': 180,
u'properties': self.create_props(command=['python', 'run_test.py']),
u'wait_for_capacity': False,
},
{
u'expiration_secs':
180,
u'properties':
self.create_props(
command=['python', 'run_test.py'],
dimensions=[{
u'key': u'pool',
u'value': u'default'
}]),
u'wait_for_capacity':
False,
},
]
response, _ = self.client_create_task(
expiration_secs=None, task_slices=task_slices)
expected_props_1 = self.gen_props(command=[u'python', u'run_test.py'])
expected_props_2 = self.gen_props(
command=[u'python', u'run_test.py'],
dimensions=[{
u'key': u'pool',
u'value': u'default'
}])
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
expiration_secs=u'360',
properties=expected_props_1,
task_slices=[
{
u'expiration_secs': u'180',
u'properties': expected_props_1,
u'wait_for_capacity': False,
},
{
u'expiration_secs': u'180',
u'properties': expected_props_2,
u'wait_for_capacity': False,
},
]),
u'task_id':
u'5cee488008810',
u'task_result': {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
},
}
self.assertEqual(expected, response)
def test_new_task_slices_two_denied(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_slices = [
{
u'expiration_secs': 180,
u'properties': self.create_props(command=['python', 'run_test.py']),
u'wait_for_capacity': False,
},
{
# That's incorrect:
u'expiration_secs': 0,
u'properties': self.create_props(command=['python', 'run_test.py']),
u'wait_for_capacity': False,
},
]
request = swarming_rpcs.NewTaskRequest(
expiration_secs=None,
name='hi',
priority=10,
tags=[],
task_slices=task_slices,
user='joe@localhost')
resp = self.call_api('new', body=message_to_dict(request), status=400)
expected = {
u'error': {
u'message': u'expiration_secs (0) must be between 1s and 7 days',
},
}
self.assertEqual(expected, resp.json)
def test_new_idempotent(self):
"""Asserts that new generates appropriate metadata."""
self.mock_task_service_accounts()
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
expiration_secs=30,
properties=self.create_props(
command=['rm', '-rf', '/'],
execution_timeout_secs=30,
grace_period_secs=15),
pubsub_topic='projects/abc/topics/def',
pubsub_auth_token='secret that must not be shown',
pubsub_userdata='userdata',
service_account='service-account@example.com',
request_uuid=u'cf60878f-8f2a-4f1e-b1f5-8b5ec88813a9')
expected_props = self.gen_props(
command=[u'rm', u'-rf', u'/'],
execution_timeout_secs=u'30',
grace_period_secs=u'15')
expected = {
u'request':
self.gen_request(
created_ts=fmtdate(self.now),
expiration_secs=u'30',
priority=u'20',
properties=expected_props,
pubsub_topic=u'projects/abc/topics/def',
pubsub_userdata=u'userdata',
tags=[
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
service_account=u'service-account@example.com',
task_slices=[
{
u'expiration_secs': u'30',
u'properties': expected_props,
u'wait_for_capacity': False,
},
]),
}
# The task was not created for real, so there's no task id.
task_id = expected[u'request'].pop(u'task_id')
# Do an evaluate_only call first
request.evaluate_only = True
response = self.call_api('new', body=message_to_dict(request))
self.assertEqual(expected, response.json)
request.evaluate_only = False
expected[u'task_id'] = u'5cee488008810'
expected[u'request'][u'task_id'] = task_id
expected[u'task_result'] = {
u'abandoned_ts': u'2010-01-02T03:04:05',
u'completed_ts': u'2010-01-02T03:04:05',
u'created_ts': u'2010-01-02T03:04:05',
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': u'2010-01-02T03:04:05',
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'NO_RESOURCE',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
}
with mock.patch(
'server.task_scheduler.schedule_request',
wraps=task_scheduler.schedule_request) as mock_schedule_request:
response = self.call_api('new', body=message_to_dict(request))
# Time advanced since the evaluate_only call.
expected['request']['created_ts'] = fmtdate(self.now)
self.assertEqual(expected, response.json)
mock_schedule_request.assert_called_once()
with mock.patch(
'server.task_scheduler.schedule_request',
wraps=task_scheduler.schedule_request) as mock_schedule_request:
# Send the same request twice.
response = self.call_api('new', body=message_to_dict(request))
# Time advanced since the evaluate_only call.
expected['request']['created_ts'] = fmtdate(self.now)
self.assertEqual(expected, response.json)
# schedule_request should not be called this time.
self.assertFalse(mock_schedule_request.called)
def test_new_denied_pool(self):
# Ensures that quality check is done early enough that a 400 and not an 500
# is returned.
request = self.create_new_request(
properties={u'dimensions': [{
u'key': u'id',
u'value': u'bot123'
}]})
response = self.call_api('new', body=message_to_dict(request), status=400)
expected = {
u'error': {
u'message': u"get_pool_config called with None",
},
}
self.assertEqual(expected, response.json)
def test_new_denied_command(self):
request = self.create_new_request(
properties={
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
})
response = self.call_api('new', body=message_to_dict(request), status=400)
expected = {
u'error': {
u'message': u'\'command\' must be specified',
},
}
self.assertEqual(expected, response.json)
def test_new_denied_execution_timeout_secs(self):
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
})
response = self.call_api('new', body=message_to_dict(request), status=400)
expected = {
u'error': {
u'message': u"'execution_timeout_secs' must be specified"
},
}
self.assertEqual(expected, response.json)
def test_new_ok(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
})
response = self.call_api('new', body=message_to_dict(request), status=200)
self.assertEqual(u'5cee488008810', response.json[u'task_id'])
def fail_on_using_legacy_acls(self):
def err(*_args, **_kwargs):
raise AssertionError('Must not be called')
self.mock(task_scheduler, 'check_schedule_request_acl_caller', err)
self.mock(task_scheduler, 'check_schedule_request_acl_service_account', err)
self.mock(service_accounts, 'get_oauth_token_grant', err)
def test_new_ok_in_realms_mode(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
# Legacy pool ACLs must no be used in this mode, only realm ACLs are used.
self.fail_on_using_legacy_acls()
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
},
service_account='service-account@example.com',
realm='test:task_realm')
response = self.call_api('new', body=message_to_dict(request), status=200)
self.assertEqual(u'5cee488008810', response.json[u'task_id'])
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(response.json[u'task_id'])
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
# Correctly initialized the service account state.
self.assertEqual('service-account@example.com', req.service_account)
self.assertFalse(req.service_account_token)
def test_new_ok_with_default_task_realm_not_enforced(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.mock_default_pool_acl(
service_accounts=['service-account@example.com'],
default_task_realm='test:task_realm',
enforced_realm_permissions=None)
# Realm ACLs should be ignored. Note that they are still called to record
# dry run check results.
self.mock(auth, 'has_permission', lambda *_args, **_kwargs: False)
# Uses legacy service account calls.
self.mock(service_accounts, 'get_oauth_token_grant', lambda **_: 'tok')
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
},
service_account='service-account@example.com')
response = self.call_api('new', body=message_to_dict(request), status=200)
self.assertEqual(u'5cee488008810', response.json[u'task_id'])
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(response.json[u'task_id'])
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
# Uses legacy service account token.
self.assertEqual('service-account@example.com', req.service_account)
self.assertEqual('tok', req.service_account_token)
def test_new_ok_with_default_task_realm_enforced(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
self.mock_default_pool_acl(
service_accounts=['service-account@example.com'],
default_task_realm='test:task_realm',
enforced_realm_permissions={
realms_pb2.REALM_PERMISSION_POOLS_CREATE_TASK,
realms_pb2.REALM_PERMISSION_TASKS_CREATE_IN_REALM,
realms_pb2.REALM_PERMISSION_TASKS_ACT_AS,
})
# Legacy pool ACLs must no be used in this mode, only realm ACLs are used.
self.fail_on_using_legacy_acls()
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
},
service_account='service-account@example.com')
response = self.call_api('new', body=message_to_dict(request), status=200)
self.assertEqual(u'5cee488008810', response.json[u'task_id'])
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(response.json[u'task_id'])
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
# Correctly initialized the service account state.
self.assertEqual('service-account@example.com', req.service_account)
self.assertFalse(req.service_account_token)
def test_new_invalid_realm(self):
request = self.create_new_request(
properties={
u'command': [u'echo', u'hi'],
u'dimensions': [{
u'key': u'pool',
u'value': u'default'
}],
u'execution_timeout_secs': 30,
},
service_account='service-account@example.com',
realm='test/invalid') # should be <project>:<realm>
response = self.call_api('new', body=message_to_dict(request), status=400)
self.assertEqual(
{
u'error': {
u'message': u'Bad realm u\'test/invalid\', want '
'"<project>:<name>"',
},
}, response.json)
@ndb.tasklet
def _updateToken(self, *_args, **_kwargs):
raise ndb.Return('update-token')
def test_new_ok_with_resultdb_and_realm(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
with mock.patch(
'server.resultdb.create_invocation_async',
side_effect=self._updateToken):
_, task_id = self.client_create_task(
expiration_secs=None,
task_slices=[
{
'expiration_secs':
180,
'properties':
self.create_props(command=['python', 'run_test.py']),
'wait_for_capacity':
True,
},
],
resultdb={'enable': True},
realm='test:task_realm')
# Get the produced TaskRequest, and verify the realm and resultdb config.
rKey, sKey = task_pack.get_request_and_result_keys(task_id)
request, summary = rKey.get(), sKey.get()
self.assertEqual('test:task_realm', request.realm)
self.assertEqual('invocations/task-test-swarming.appspot.com-5cee488008811',
summary.resultdb_info.invocation)
def test_new_ok_with_resultdb_and_default_task_realm(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
self.mock_default_pool_acl(
service_accounts=['service-account@example.com'],
default_task_realm='test:task_realm',
enforced_realm_permissions={
realms_pb2.REALM_PERMISSION_POOLS_CREATE_TASK,
realms_pb2.REALM_PERMISSION_TASKS_CREATE_IN_REALM,
realms_pb2.REALM_PERMISSION_TASKS_ACT_AS,
})
with mock.patch(
'server.resultdb.create_invocation_async',
side_effect=self._updateToken):
_, task_id = self.client_create_task(
expiration_secs=None,
task_slices=[
{
'expiration_secs':
180,
'properties':
self.create_props(command=['python', 'run_test.py']),
'wait_for_capacity':
True,
},
],
# Set resultdb without realm.
resultdb={'enable': True})
# Get the produced TaskRequest, and verify the realm and resultdb config.
rKey, sKey = task_pack.get_request_and_result_keys(task_id)
request, summary = rKey.get(), sKey.get()
self.assertEqual('test:task_realm', request.realm)
self.assertEqual('invocations/task-test-swarming.appspot.com-5cee488008811',
summary.resultdb_info.invocation)
def _prepare_mass_cancel(self):
# Create 3 tasks: one pending, one running, one complete.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.do_handshake(do_first_poll=True)
# Completed.
self.set_as_user()
_, _ = self.client_create_task_raw(
name='first', tags=['project:yay', 'commit:abcd', 'os:Win'])
self.set_as_bot()
self.bot_run_task()
# Running.
self.set_as_user()
self.mock_now(self.now, 60)
_, running_id = self.client_create_task_raw(
name='second',
user='jack@localhost',
tags=['project:yay', 'commit:efgh', 'os:Win'])
self.set_as_bot()
self.bot_poll()
# Pending.
self.set_as_user()
now_120 = self.mock_now(self.now, 120)
_, pending_id = self.client_create_task_raw(
name='third',
user='jack@localhost',
tags=['project:yay', 'commit:ijkhl', 'os:Linux'])
return running_id, pending_id, now_120
def test_mass_cancel_pending(self):
_, pending_id, now_120 = self._prepare_mass_cancel()
def enqueue_task(url, name, payload):
self.assertEqual('/internal/taskqueue/important/tasks/cancel', url)
self.assertEqual('cancel-tasks', name)
e = {'tasks': [pending_id], 'kill_running': False}
self.assertEqual(e, json.loads(payload))
return True
self.mock(utils, 'enqueue_task', enqueue_task)
self.set_as_admin()
response = self.call_api('cancel', body={u'tags': [u'project:yay']})
expected = {
u'matched': u'1',
u'now': fmtdate(now_120),
}
self.assertEqual(expected, response.json)
def test_mass_cancel_running(self):
running_id, pending_id, now_120 = self._prepare_mass_cancel()
def enqueue_task(url, name, payload):
self.assertEqual('/internal/taskqueue/important/tasks/cancel', url)
self.assertEqual('cancel-tasks', name)
e = {'tasks': [pending_id, running_id], 'kill_running': True}
self.assertEqual(e, json.loads(payload))
return True
self.mock(utils, 'enqueue_task', enqueue_task)
self.set_as_admin()
response = self.call_api(
'cancel', body={
u'tags': [u'project:yay'],
'kill_running': True
})
expected = {
u'matched': u'2',
u'now': fmtdate(now_120),
}
self.assertEqual(expected, response.json)
def test_mass_cancel_realm_permission(self):
# non-privileged user without realm permission.
self.set_as_user()
self.mock_auth_db([])
# the user needs to specify a pool filter.
response = self.call_api('cancel', body={'tags': ['foo:bar']}, status=403)
self.assertErrorResponseMessage(u'No pool is specified', response)
# the user can't access the tasks without permission.
request = {'tags': ['pool:default']}
response = self.call_api('cancel', body=request, status=403)
self.assertErrorResponseMessage(
u'user "user@example.com" does not have permission '
'"swarming.pools.cancelTask"', response)
# give permission to the user.
self.mock_auth_db([auth.Permission('swarming.pools.cancelTask')])
# the user still needs to specify a pool filter.
response = self.call_api('cancel', body={'tags': ['foo:bar']}, status=403)
self.assertErrorResponseMessage(u'No pool is specified', response)
# ok if the user has a permission of the specified pool.
request = {'tags': ['pool:default']}
self.call_api('cancel', body=request, status=200)
def test_list_ok(self):
"""Asserts that list requests all TaskResultSummaries."""
first, second, now_120, start, end = self._gen_two_tasks()
first_no_perf = first.copy()
first_no_perf.pop('performance_stats')
# Basic request.
request = handlers_endpoints.TasksRequest.combined_message_class(
end=end, start=start, include_performance_stats=True)
expected = {u'now': fmtdate(now_120), u'items': [second, first]}
actual = self.call_api('list', body=message_to_dict(request)).json
# Generate the actual expected values by decompressing the data.
for k in ('isolated_download', 'isolated_upload'):
for j in ('items_cold', 'items_hot'):
actual['items'][1]['performance_stats'][k][j] = large.unpack(
base64.b64decode(actual['items'][1]['performance_stats'][k][j]))
self.assertEqual(expected, actual)
# Sort by CREATED_TS.
request = handlers_endpoints.TasksRequest.combined_message_class(
sort=swarming_rpcs.TaskSort.CREATED_TS)
actual = self.call_api('list', body=message_to_dict(request)).json
self.assertEqual(
{
u'now': fmtdate(now_120),
u'items': [second, first_no_perf]
}, actual)
# Sort by MODIFIED_TS.
request = handlers_endpoints.TasksRequest.combined_message_class(
sort=swarming_rpcs.TaskSort.MODIFIED_TS)
actual = self.call_api('list', body=message_to_dict(request)).json
self.assertEqual(
{
u'now': fmtdate(now_120),
u'items': [first_no_perf, second]
}, actual)
# With two tags.
request = handlers_endpoints.TasksRequest.combined_message_class(
end=end, start=start, tags=['project:yay', 'commit:pre'])
self.assertEqual({
u'now': fmtdate(now_120),
u'items': [second]
},
self.call_api('list', body=message_to_dict(request)).json)
# A spurious tag.
request = handlers_endpoints.TasksRequest.combined_message_class(
end=end, start=start, tags=['foo:bar'])
self.assertEqual({u'now': fmtdate(now_120)},
self.call_api('list', body=message_to_dict(request)).json)
# Both state and tag.
request = handlers_endpoints.TasksRequest.combined_message_class(
end=end,
start=start,
tags=['commit:pre'],
state=swarming_rpcs.TaskStateQuery.COMPLETED_SUCCESS)
self.assertEqual({
u'now': fmtdate(now_120),
u'items': [second]
},
self.call_api('list', body=message_to_dict(request)).json)
# Both sort and tag.
request = handlers_endpoints.TasksRequest.combined_message_class(
end=end,
start=start,
tags=['commit:pre'],
sort=swarming_rpcs.TaskSort.MODIFIED_TS,
state=swarming_rpcs.TaskStateQuery.COMPLETED_SUCCESS)
self.call_api('list', body=message_to_dict(request), status=400)
@parameterized.expand(['list', 'count'])
def test_list_realm_permission(self, api):
# non-privileged user without realm permission.
self.set_as_user()
self.mock_auth_db([])
start = (
utils.datetime_to_timestamp(self.now - datetime.timedelta(days=1)) /
1000000.)
# the user needs to specify a pool filter.
response = self.call_api(api, body={'start': start}, status=403)
self.assertErrorResponseMessage(u'No pool is specified', response)
# the user can't access the tasks without permission.
request = {'start': start, 'tags': ['pool:default']}
response = self.call_api(api, body=request, status=403)
self.assertErrorResponseMessage(
u'user "user@example.com" does not have permission '
'"swarming.pools.listTasks"', response)
# give permission to the user.
self.mock_auth_db([auth.Permission('swarming.pools.listTasks')])
# the user still needs to specify a pool filter.
response = self.call_api(api, body={'start': start}, status=403)
self.assertErrorResponseMessage(u'No pool is specified', response)
# ok if the user has a permission of the specified pool.
request = {'start': start, 'tags': ['pool:default']}
self.call_api(api, body=request, status=200)
def test_get_state_ok(self):
"""Asserts that get_states requests return correct state."""
# Create two tasks, one COMPLETED, one PENDING
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
# first request
_, first_id = self.client_create_task_raw(
name='first',
tags=['project:yay', 'commit:post'],
properties=dict(idempotent=True))
self.set_as_bot()
self.bot_run_task()
# second request
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x66)
self.mock_now(self.now, 60)
_, second_id = self.client_create_task_raw(
name='second',
user='jack@localhost',
tags=['project:yay', 'commit:pre'])
self.set_as_privileged_user()
# Basic request.
request = handlers_endpoints.TaskStatesRequest.combined_message_class(
task_id=[first_id, second_id])
expected = {u'states': ['COMPLETED', 'PENDING']}
actual = self.call_api('get_states', body=message_to_dict(request)).json
self.assertEqual(expected, actual)
def test_count_indexes(self):
# Asserts that no combination crashes.
_, _, now_120, start, end = self._gen_two_tasks()
for state in swarming_rpcs.TaskStateQuery:
for tags in ([], ['a:1'], ['a:1', 'b:2']):
request = handlers_endpoints.TasksCountRequest.combined_message_class(
start=start, end=end, state=state, tags=tags)
result = self.call_api('count', body=message_to_dict(request)).json
# Don't check for correctness here, just assert that it doesn't throw
# due to missing index.
result.pop(u'count')
expected = {u'now': fmtdate(now_120)}
self.assertEqual(expected, result)
def test_list_indexes(self):
# Asserts that no combination crashes unexpectedly.
TaskStateQuery = swarming_rpcs.TaskStateQuery
TaskSort = swarming_rpcs.TaskSort
# List of all unsupported combinations. These can be added either with a new
# index or by massaging the way entities are stored.
unspported = [
# (<Using start, end or tags>, TaskStateQuery, TaskSort)
(None, TaskStateQuery.BOT_DIED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.BOT_DIED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.BOT_DIED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.BOT_DIED, TaskSort.STARTED_TS),
(None, TaskStateQuery.CANCELED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.CANCELED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.CANCELED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.CANCELED, TaskSort.STARTED_TS),
(None, TaskStateQuery.COMPLETED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.COMPLETED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.COMPLETED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.COMPLETED, TaskSort.STARTED_TS),
(None, TaskStateQuery.COMPLETED_FAILURE, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.COMPLETED_FAILURE, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.COMPLETED_FAILURE, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.COMPLETED_FAILURE, TaskSort.STARTED_TS),
(None, TaskStateQuery.COMPLETED_SUCCESS, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.COMPLETED_SUCCESS, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.COMPLETED_SUCCESS, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.COMPLETED_SUCCESS, TaskSort.STARTED_TS),
(None, TaskStateQuery.DEDUPED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.DEDUPED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.DEDUPED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.DEDUPED, TaskSort.STARTED_TS),
(None, TaskStateQuery.EXPIRED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.EXPIRED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.EXPIRED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.EXPIRED, TaskSort.STARTED_TS),
(None, TaskStateQuery.KILLED, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.KILLED, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.KILLED, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.KILLED, TaskSort.STARTED_TS),
(None, TaskStateQuery.NO_RESOURCE, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.NO_RESOURCE, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.NO_RESOURCE, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.NO_RESOURCE, TaskSort.STARTED_TS),
(None, TaskStateQuery.PENDING, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.PENDING, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.PENDING, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.PENDING, TaskSort.STARTED_TS),
(None, TaskStateQuery.PENDING_RUNNING, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.PENDING_RUNNING, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.PENDING_RUNNING, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.PENDING_RUNNING, TaskSort.STARTED_TS),
(None, TaskStateQuery.RUNNING, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.RUNNING, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.RUNNING, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.RUNNING, TaskSort.STARTED_TS),
(None, TaskStateQuery.TIMED_OUT, TaskSort.ABANDONED_TS),
(None, TaskStateQuery.TIMED_OUT, TaskSort.COMPLETED_TS),
(None, TaskStateQuery.TIMED_OUT, TaskSort.MODIFIED_TS),
(None, TaskStateQuery.TIMED_OUT, TaskSort.STARTED_TS),
(True, TaskStateQuery.ALL, TaskSort.ABANDONED_TS),
(True, TaskStateQuery.ALL, TaskSort.COMPLETED_TS),
(True, TaskStateQuery.ALL, TaskSort.MODIFIED_TS),
(True, TaskStateQuery.ALL, TaskSort.STARTED_TS),
]
_, _, now_120, start, end = self._gen_two_tasks()
for state in TaskStateQuery:
for tags in ([], ['a:1'], ['a:1', 'b:2']):
for start in (None, start):
for end in (None, end):
for sort in TaskSort:
request = handlers_endpoints.TasksRequest.combined_message_class(
start=start, end=end, state=state, tags=tags, sort=sort)
using_filter = bool(start or end or tags)
if ((using_filter, state, sort) in unspported or
(None, state, sort) in unspported):
try:
self.call_api(
'list', body=message_to_dict(request), status=400)
except: # pylint: disable=bare-except
self.fail('Is actually supported: (%s, %s, %s)' %
(using_filter, state, sort))
else:
try:
result = self.call_api(
'list', body=message_to_dict(request)).json
except: # pylint: disable=bare-except
self.fail('Is unsupported: (%s, %s, %s)' %
(using_filter, state, sort))
# Don't check for correctness here, just assert that it doesn't
# throw due to missing index or invalid query.
result.pop(u'items', None)
expected = {u'now': fmtdate(now_120)}
self.assertEqual(expected, result)
def test_tags_ok(self):
"""Asserts that TasksTags is returned with the right data."""
self.set_as_privileged_user()
task_result.TagAggregation(
key=task_result.TagAggregation.KEY,
tags=[
task_result.TagValues(tag='foo', values=['alpha', 'beta']),
task_result.TagValues(
tag='bar', values=['gamma', 'delta', 'epsilon']),
],
ts=self.now).put()
expected = {
u'tasks_tags': [
{
u'key': u'foo',
u'value': [u'alpha', u'beta'],
},
{
u'key': u'bar',
u'value': [u'gamma', u'delta', u'epsilon'],
},
],
u'ts': fmtdate(self.now, DATETIME_NO_MICRO),
}
self.assertEqual(expected, self.call_api('tags', body={}).json)
def _gen_two_tasks(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
# first request
_, first_id = self.client_create_task_raw(
name='first',
tags=['project:yay', 'commit:post'],
properties=dict(idempotent=True))
self.set_as_bot()
self.bot_run_task()
# second request
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x66)
now_60 = self.mock_now(self.now, 60)
self.client_create_task_raw(
name='second',
user='jack@localhost',
tags=['project:yay', 'commit:pre'],
properties=dict(idempotent=True))
# Hack the datastore so MODIFIED_TS returns in backward order compared to
# CREATED_TS.
now_120 = self.mock_now(self.now, 120)
entity = task_pack.unpack_result_summary_key(first_id).get()
entity.modified_ts = now_120
entity.put()
first = self.gen_result_summary(
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(now_120),
name=u'first',
performance_stats=self.gen_perf_stats(),
started_ts=fmtdate(self.now),
tags=[
u'commit:post',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'project:yay',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
try_number=u'1')
deduped = self.gen_result_summary(
completed_ts=fmtdate(self.now),
cost_saved_usd=0.1,
created_ts=fmtdate(now_60),
deduped_from=u'5cee488008811',
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(now_60),
name=u'second',
run_id=u'5cee488008811',
started_ts=fmtdate(self.now),
tags=[
u'commit:pre',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'project:yay',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:jack@localhost',
],
task_id=u'5cfcee8006610',
user=u'jack@localhost')
start = (
utils.datetime_to_timestamp(self.now - datetime.timedelta(days=1)) /
1000000.)
end = (
utils.datetime_to_timestamp(self.now + datetime.timedelta(days=1)) /
1000000.)
self.set_as_privileged_user()
return first, deduped, now_120, start, end
class TaskApiTest(BaseTest):
api_service_cls = handlers_endpoints.SwarmingTaskService
def setUp(self):
super(TaskApiTest, self).setUp()
self.tasks_api = test_case.Endpoints(
handlers_endpoints.SwarmingTasksService)
self.mock_default_pool_acl([])
def test_cancel_pending(self):
"""Asserts that task cancellation goes smoothly."""
# catch PubSub notification
# Create and cancel a task as a non-privileged user.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id = self.client_create_task_raw(
pubsub_topic='projects/abc/topics/def', pubsub_userdata='blah')
expected = {u'ok': True, u'was_running': False}
response = self.call_api(
'cancel', body={
'task_id': task_id,
'kill_running': False
})
self.assertEqual(expected, response.json)
# determine that the task's state updates correctly
expected = {
u'abandoned_ts': fmtdate(self.now),
u'completed_ts': fmtdate(self.now),
u'created_ts': fmtdate(self.now),
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': fmtdate(self.now),
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'CANCELED',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': task_id,
u'user': u'joe@localhost',
}
response = self.call_api('result', body={'task_id': task_id})
self.assertEqual(expected, response.json)
# notification has been sent.
expected = [
{
'payload': '{"auth_token":null,"task_id":"5cee488008810",'
'"topic":"projects/abc/topics/def","userdata":"blah"}',
'queue_name': 'pubsub',
'transactional': True,
'url': '/internal/taskqueue/pubsub/5cee488008810',
},
]
def test_cancel_forbidden(self):
"""Asserts that non-privileged non-owner can't cancel tasks."""
# Create a task as an admin.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_admin()
_, task_id = self.client_create_task_raw(
pubsub_topic='projects/abc/topics/def', pubsub_userdata='blah')
# Attempt to cancel as non-privileged user -> HTTP 403.
self.set_as_user()
self.mock_auth_db([])
self.call_api(
'cancel', body={
'task_id': task_id,
'kill_running': False
}, status=403)
def test_cancel_with_realm_permission(self):
# someone creates tasks with/without realm.
self.set_as_privileged_user()
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id_with_realm = self.client_create_task_raw(realm='test:task_realm')
_, task_id_without_realm = self.client_create_task_raw(realm=None)
def assertTaskIsNotAccessible(task_id):
response = self.call_api('cancel', body={'task_id': task_id}, status=403)
self.assertErrorResponseMessage(u'Task "%s" is not accessible' % task_id,
response)
# non-privileged user can't cancel the both tasks without permission.
self.set_as_user()
assertTaskIsNotAccessible(task_id_with_realm)
assertTaskIsNotAccessible(task_id_without_realm)
# the user can cancel to the both tasks with swarming.pools.cancelTask
# permission.
self.mock_auth_db([auth.Permission('swarming.pools.cancelTask')])
self.call_api('cancel', body={'task_id': task_id_with_realm}, status=200)
self.call_api('cancel', body={'task_id': task_id_without_realm}, status=200)
# the user can cancel with swarming.tasks.cancel permission.
self.mock_auth_db([auth.Permission('swarming.tasks.cancel')])
self.call_api('cancel', body={'task_id': task_id_with_realm}, status=200)
# but, not accessible to the task without realm.
assertTaskIsNotAccessible(task_id_without_realm)
def test_cancel_running(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id = self.client_create_task_raw(
properties=dict(command=['python', 'runtest.py']))
self.set_as_bot()
params = self.do_handshake()
data = self.post_json('/swarming/api/v1/bot/poll', params)
run_id = data['manifest']['task_id']
def _params(**kwargs):
out = {
'cost_usd': 0.1,
'duration': None,
'exit_code': None,
'id': 'bot1',
'output': None,
'output_chunk_start': 0,
'task_id': run_id,
}
out.update(**kwargs)
return out
self.set_as_bot()
params = _params(output=base64.b64encode('Oh '))
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': False, u'ok': True}, response)
self.set_as_user()
expected = self.gen_result_summary(
costs_usd=[0.1],
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'RUNNING',
tags=[
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
try_number=u'1')
self.assertEqual(expected, self.client_get_results(task_id))
# Denied if kill_running == False.
response = self.call_api(
'cancel', body={
'task_id': task_id,
'kill_running': False
})
self.assertEqual({u'ok': False, u'was_running': True}, response.json)
# Works if kill_running == True.
response = self.call_api(
'cancel', body={
'task_id': task_id,
'kill_running': True
})
self.assertEqual({u'ok': True, u'was_running': True}, response.json)
self.set_as_bot()
params = _params(output=base64.b64encode('hi'), output_chunk_start=3)
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': True, u'ok': True}, response)
# abandoned_ts is set but state isn't changed yet.
self.set_as_user()
expected = self.gen_result_summary(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'RUNNING',
tags=[
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
try_number=u'1')
self.assertEqual(expected, self.client_get_results(task_id))
# Bot terminates the task.
self.set_as_bot()
params = _params(
output=base64.b64encode(' again'),
output_chunk_start=6,
duration=0.1,
exit_code=0)
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': True, u'ok': True}, response)
self.set_as_user()
expected = self.gen_result_summary(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'KILLED',
tags=[
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
try_number=u'1')
self.assertEqual(expected, self.client_get_results(task_id))
def test_result_unknown(self):
"""Asserts that result raises 404 for unknown task IDs."""
self.call_api('result', body={'task_id': '12310'}, status=404)
def test_result_long(self):
"""Asserts that result raises 400 for wildly invalid task IDs."""
self.call_api('result', body={'task_id': '12310' * 10}, status=400)
def test_result_ok(self):
"""Asserts that result produces a result entity."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
# pending task
self.set_as_user()
_, task_id = self.client_create_task_raw()
response = self.call_api('result', body={'task_id': task_id})
expected = {
u'created_ts': fmtdate(self.now),
u'current_task_slice': u'0',
u'failure': False,
u'internal_failure': False,
u'modified_ts': fmtdate(self.now),
u'name': u'job1',
u'server_versions': [u'v1a'],
u'state': u'PENDING',
u'tags': [
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
u'task_id': u'5cee488008810',
u'user': u'joe@localhost',
}
self.assertEqual(expected, response.json)
# no bot started: running task
run_id = task_id[:-1] + '1'
self.call_api('result', body={'task_id': run_id}, status=404)
# run as bot
self.set_as_bot()
self.bot_poll()
self.set_as_user()
response = self.call_api('result', body={'task_id': run_id})
expected = self.gen_run_result(
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now))
self.assertEqual(expected, response.json)
def test_result_completed_task(self):
"""Tests that completed tasks are correctly reported."""
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
task_id = self.bot_run_task()
# First ask without perf metadata.
self.set_as_user()
response = self.call_api('result', body={'task_id': task_id})
expected = self.gen_run_result(
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(self.now),
run_id=task_id,
started_ts=fmtdate(self.now),
state=u'COMPLETED',
task_id=task_id)
self.assertEqual(expected, response.json)
expected[u'performance_stats'] = self.gen_perf_stats()
response = self.call_api(
'result', body={
'task_id': task_id,
'include_performance_stats': True
})
actual = response.json
for k in ('isolated_download', 'isolated_upload'):
for j in ('items_cold', 'items_hot'):
actual['performance_stats'][k][j] = large.unpack(
base64.b64decode(actual['performance_stats'][k][j]))
self.assertEqual(expected, actual)
def test_stdout_ok(self):
"""Asserts that stdout reports a task's output."""
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw()
# task_id determined by bot run
self.set_as_bot()
task_id = self.bot_run_task()
self.set_as_privileged_user()
run_id = task_id[:-1] + '1'
expected = {u'output': u'rÉsult string', u'state': u'COMPLETED'}
for i in (task_id, run_id):
response = self.call_api('stdout', body={'task_id': i})
self.assertEqual(expected, response.json)
# Partial fetch.
req = {'task_id': task_id, 'offset': 1, 'length': 2}
response = self.call_api('stdout', body=req)
# This is because it's counting in bytes, not in unicode characters:
expected = {u'output': u'É', u'state': u'COMPLETED'}
self.assertEqual(expected, response.json)
req = {'task_id': task_id, 'offset': 3, 'length': 5}
response = self.call_api('stdout', body=req)
expected = {u'output': u'sult ', u'state': u'COMPLETED'}
self.assertEqual(expected, response.json)
def test_stdout_empty(self):
"""Asserts that incipient tasks produce no output."""
_, task_id = self.client_create_task_raw()
response = self.call_api('stdout', body={'task_id': task_id})
self.assertEqual({u'state': u'NO_RESOURCE'}, response.json)
run_id = task_id[:-1] + '1'
self.call_api('stdout', body={'task_id': run_id}, status=404)
def test_result_run_not_found(self):
"""Asserts that getting results from incipient tasks raises 404."""
_, task_id = self.client_create_task_raw()
run_id = task_id[:-1] + '1'
self.call_api('stdout', body={'task_id': run_id}, status=404)
def test_task_deduped(self):
"""Asserts that task deduplication works as expected."""
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id_1 = self.client_create_task_raw(properties=dict(idempotent=True))
self.set_as_bot()
task_id_bot = self.bot_run_task()
self.assertEqual(task_id_1, task_id_bot[:-1] + '0')
self.assertEqual('1', task_id_bot[-1:])
# second task; this one's results should be returned immediately
self.set_as_user()
_, task_id_2 = self.client_create_task_raw(
name='second', user='jack@localhost', properties=dict(idempotent=True))
self.set_as_bot()
resp = self.bot_poll()
self.assertEqual('sleep', resp['cmd'])
self.set_as_user()
# results shouldn't change, even if the second task wasn't executed
response = self.call_api('stdout', body={'task_id': task_id_2})
expected = {'output': u'rÉsult string', u'state': u'COMPLETED'}
self.assertEqual(expected, response.json)
def test_request_unknown(self):
"""Asserts that 404 is raised for unknown tasks."""
self.call_api('request', body={'task_id': '12310'}, status=404)
@parameterized.expand(['', 'test:task_realm'])
def test_request_ok(self, realm):
"""Asserts that request produces a task request."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_task_service_accounts()
self.mock_default_pool_acl(['service-account@example.com'])
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id = self.client_create_task_raw(
properties={'secret_bytes': 'zekret'},
service_account='service-account@example.com',
realm=realm if realm else None)
expected_props = self.gen_props(
command=[u'python', u'run_test.py'],
secret_bytes=u'PFJFREFDVEVEPg==') # <REDACTED> in base64
expected = self.gen_request(
created_ts=fmtdate(self.now),
properties=expected_props,
service_account=u'service-account@example.com',
tags=[
u'a:tag',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:%s' % (realm if realm else 'none'),
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
],
task_slices=[
{
u'expiration_secs': u'86400',
u'properties': expected_props,
u'wait_for_capacity': False,
},
])
if realm:
expected['realm'] = realm
response = self.call_api('request', body={'task_id': task_id})
self.assertEqual(expected, response.json)
@parameterized.expand(['request', 'result', 'stdout'])
def test_get_with_realm_permission(self, api):
# someone creates tasks with/without realm.
self.set_as_privileged_user()
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id_with_realm = self.client_create_task_raw(realm='test:task_realm')
_, task_id_without_realm = self.client_create_task_raw(realm=None)
def assertTaskIsNotAccessible(task_id):
response = self.call_api(api, body={'task_id': task_id}, status=403)
self.assertErrorResponseMessage(u'Task "%s" is not accessible' % task_id,
response)
# non-privileged user can't access to the both tasks without permission.
self.set_as_user()
assertTaskIsNotAccessible(task_id_with_realm)
assertTaskIsNotAccessible(task_id_without_realm)
# the user can access to the both tasks with swarming.pools.listTasks
# permission.
self.mock_auth_db([auth.Permission('swarming.pools.listTasks')])
self.call_api(api, body={'task_id': task_id_with_realm}, status=200)
self.call_api(api, body={'task_id': task_id_without_realm}, status=200)
# the user can access with swarming.tasks.get permission.
self.mock_auth_db([auth.Permission('swarming.tasks.get')])
self.call_api(api, body={'task_id': task_id_with_realm}, status=200)
# but, not accessible to the task with no realm.
assertTaskIsNotAccessible(task_id_without_realm)
class QueuesApiTest(BaseTest):
api_service_cls = handlers_endpoints.SwarmingQueuesService
def test_list(self):
# Create 3 tasks with different dimensions, so it creates different task
# queues and one termination task. The termination task should not be