blob: 1dc7ee17694117284b19b979a20fea17e73b7d4c [file] [log] [blame]
#!/usr/bin/env vpython
# coding=utf-8
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import datetime
import logging
import os
import random
import sys
import unittest
import json
import cgi
import mock
import base64
from parameterized import parameterized
import test_env_handlers
from google.appengine.ext import ndb
import google.protobuf as proto
import webapp2
import webtest
from test_support import test_case
from components import auth
from components import ereporter2
from components.prpc import encoding
from components import utils
from proto.api_v2 import swarming_pb2
from proto.internals import rbe_pb2
from proto.config import realms_pb2
from server import acl
from server import bot_code
from server import bot_management
from server import bot_management
from server import config
from server import rbe
from server import service_accounts
from server import task_pack
from server import task_request
from server import task_result
from server import task_scheduler
from server import task_to_run
import handlers_bot
import handlers_prpc
import message_conversion_prpc
# 2010-01-02T03:04:05Z
def fmtdate(d, fmt='%Y-%m-%dT%H:%M:%SZ'):
"""Formats a datetime.datetime instance to the format generated by the API.
API datetime responses return at UTC+0 hence the Z at the end."""
return unicode(d.strftime(fmt))
def _decode(raw, dst):
assert raw[:5] == ')]}\'\n', raw[:5]
return encoding.get_decoder(encoding.Encoding.JSON)(raw[5:], dst)
def _decode_dict(dct, out):
return proto.json_format.ParseDict(dct, out)
def _encode(d):
raw = encoding.get_encoder(encoding.Encoding.JSON)(d)
assert raw[:5] == ')]}\'\n', raw[:5]
return raw[5:]
def _bot_event(event_type, bot_id, **kwargs):
args = {
'authenticated_as': 'bot:whitelisted-ip',
'dimensions': {
u'id': [bot_id],
u'pool': [u'default']
},
'external_ip': '8.8.4.4',
'state': {
'ram': 65
},
'version': '123456789',
'quarantined': False,
'maintenance_msg': None,
'task_id': None,
'task_name': None,
'register_dimensions': event_type.startswith('request_')
}
args.update(kwargs)
return bot_management.bot_event(event_type, bot_id, **args)
def apply_default_for_task_props(props):
props.cipd_input.client_package.package_name = 'infra/tools/cipd/${platform}'
props.cipd_input.client_package.version = 'git_revision:deadbeef'
props.cipd_input.packages.extend([
swarming_pb2.CipdPackage(package_name='rm',
path='bin',
version='git_revision:deadbeef')
])
props.cipd_input.server = 'https://pool.config.cipd.example.com'
props.command[:] = ['python', '-c', 'print(1)']
props.containment.containment_type = swarming_pb2.Containment.AUTO
props.dimensions.extend([
swarming_pb2.StringPair(key='os', value='Amiga'),
swarming_pb2.StringPair(key='pool', value='default')
])
props.execution_timeout_secs = 3600
props.grace_period_secs = 30
props.idempotent = False
props.io_timeout_secs = 1200
props.outputs[:] = ['foo', 'path/to/foobar']
def apply_defaults_for_request(request):
"""Applies some default expectations to a TaskRequestResponse
To be used for expectations.
"""
# This assumes:
# self.mock(random, 'getrandbits', lambda _: 0x88)
request.authenticated = 'user:user@example.com'
request.expiration_secs = 86400
request.task_id = '5cee488008810'
request.name = 'job1'
request.priority = 20
request.service_account = 'none'
request.tags[:] = [
u'a:tag',
u'authenticated:user:user@example.com',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
]
request.user = 'joe@localhost'
request.bot_ping_tolerance_secs = 600
class PrpcTest(test_env_handlers.AppTestBase):
no_run = 1
service = None
def setUp(self):
super(PrpcTest, self).setUp()
routes = handlers_prpc.get_routes(debug=True) + handlers_bot.get_routes()
self.app = webtest.TestApp(
webapp2.WSGIApplication(routes, debug=True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
},
)
self._headers = {
'Content-Type': encoding.Encoding.JSON[1],
'Accept': encoding.Encoding.JSON[1],
}
def apply_defaults_for_result_summary(self, result):
"""Applies default expectations to a TaskResultResponse initialized from a
TaskResultSummary ndb entity.
To be used for expectations.
"""
# This assumes:
# self.mock(random, 'getrandbits', lambda _: 0x88)
del result.bot_dimensions[:]
result.bot_dimensions.extend([
swarming_pb2.StringListPair(key='id', value=['bot1']),
swarming_pb2.StringListPair(key='os', value=['Amiga']),
swarming_pb2.StringListPair(key='pool', value=['default'])
])
result.bot_id = 'bot1'
result.bot_version = self.bot_version
result.current_task_slice = 0
result.failure = False
result.internal_failure = False
result.name = 'job1'
result.run_id = '5cee488008811'
result.server_versions[:] = [u'v1a']
result.state = swarming_pb2.TaskState.COMPLETED
result.tags[:] = [
'a:tag',
'os:Amiga',
'pool:default',
'priority:20',
'realm:none',
'service_account:none',
'swarming.pool.template:no_config',
'user:joe@localhost',
]
result.task_id = '5cee488008810'
result.user = 'joe@localhost'
def apply_defaults_for_run_result(self, result):
"""Returns a serialized swarming_rpcs.TaskResult initialized from a
TaskRunResult.
To be used for expectations.
"""
result.bot_dimensions.extend([
swarming_pb2.StringListPair(key='id', value=['bot1']),
swarming_pb2.StringListPair(key='os', value=['Amiga']),
swarming_pb2.StringListPair(key='pool', value=['default'])
])
result.bot_id = 'bot1'
result.bot_version = self.bot_version
result.costs_usd.extend([0.0])
result.current_task_slice = 0
result.failure = False
result.internal_failure = False
result.name = 'job1'
result.run_id = '5cee488008811'
result.server_versions.extend(['v1a'])
result.state = swarming_pb2.TaskState.RUNNING
result.task_id = '5cee488008811'
return result
def gen_perf_stats_prpc(self, stats):
"""Returns a serialized swarming_rpcs.PerformanceStats.
To be used for expectations.
"""
stats.bot_overhead = 0.1
stats.cache_trim.duration = 0.1
stats.isolated_download.duration = 1.0
stats.isolated_download.initial_size = 100000
stats.isolated_download.initial_number_items = 10
stats.isolated_download.items_cold = b'x\234\023\001\000\000\025\000\025'
stats.isolated_download.items_hot = b'x\234\223\343\002\000\000H\000)'
stats.isolated_download.num_items_cold = 1
stats.isolated_download.total_bytes_items_cold = 20
stats.isolated_download.num_items_hot = 2
stats.isolated_download.total_bytes_items_hot = 70
stats.isolated_upload.duration = 2.0
stats.isolated_upload.items_cold = b'x\234cdT\003\000\000.\000)'
stats.isolated_upload.items_hot = b'x\234cdd\324\007\000\000<\0003'
stats.isolated_upload.num_items_cold = 3
stats.isolated_upload.total_bytes_items_cold = 43
stats.isolated_upload.num_items_hot = 4
stats.isolated_upload.total_bytes_items_hot = 56
stats.cleanup.duration = 0.1
stats.package_installation.duration = 3.0
stats.named_caches_install.duration = 0.1
stats.named_caches_uninstall.duration = 0.1
def _new_task_request_prpc(self, use_default_slice=False):
ntr = swarming_pb2.NewTaskRequest(expiration_secs=24 * 60 * 60,
name='job1',
priority=20,
tags=[u'a:tag'],
user='joe@localhost',
bot_ping_tolerance_secs=600)
if use_default_slice:
ts = swarming_pb2.TaskSlice(expiration_secs=180, wait_for_capacity=True)
# Hack to get min line length.
props = ts.properties
props.cipd_input.client_package.package_name = ('infra/tools/'
'cipd/${platform}')
props.cipd_input.client_package.version = 'git_revision:deadbeef'
props.cipd_input.packages.extend([
swarming_pb2.CipdPackage(package_name='rm',
path='bin',
version='git_revision:deadbeef')
])
props.cipd_input.server = 'https://pool.config.cipd.example.com'
props.command[:] = ['python', '-c', 'print(1)']
props.containment.containment_type = swarming_pb2.Containment.AUTO
props.dimensions.extend([
swarming_pb2.StringPair(key='os', value='Amiga'),
swarming_pb2.StringPair(key='pool', value='default')
])
props.execution_timeout_secs = 3600
props.grace_period_secs = 30
props.idempotent = False
props.io_timeout_secs = 1200
props.outputs[:] = ['foo', 'path/to/foobar']
props.command[:] = ['python', 'run_test.py']
ntr.task_slices.extend([ts])
ntr.ClearField('expiration_secs')
return ntr
def _client_create_task_prpc(self, request):
"""Creates a task via the pRPC API."""
response = self.post_prpc('NewTask', request, service='swarming.v2.Tasks')
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
return actual, actual.task_id
def _client_cancel_task_prpc(self, task_id):
"""Cancels a pending task via pRPC API."""
response = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(
task_id=task_id, kill_running=False),
service='swarming.v2.Tasks')
actual = swarming_pb2.CancelResponse()
_decode(response.body, actual)
self.assertTrue(actual.canceled)
def post_prpc(self, rpc, request, expect_errors=False, service=None):
if not service:
assert self.service, "Child classes must define service"
service = self.service
return self.app.post('/prpc/%s/%s' % (service, rpc),
_encode(request),
self._headers,
expect_errors=expect_errors)
class BotServicePrpcTest(PrpcTest):
"""Tests the pRPC handlers."""
def setUp(self):
super(BotServicePrpcTest, self).setUp()
self.service = "swarming.v2.Bots"
self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(self.now)
self.mock_default_pool_acl([])
self.mock_tq_tasks()
def test_bot_get(self):
self.set_as_bot()
self.do_handshake()
self.set_as_privileged_user()
request = swarming_pb2.BotRequest(bot_id="bot1", )
resp = self.post_prpc("GetBot", request)
actual_info = swarming_pb2.BotInfo()
_decode(resp.body, actual_info)
expected = message_conversion_prpc.bot_info_response(
bot_management.get_info_key('bot1').get())
self.assertEqual(expected, actual_info)
def test_bot_in_maintenance(self):
self.set_as_privileged_user()
_bot_event('request_sleep', 'bot1', maintenance_msg='boiling water')
request = swarming_pb2.BotRequest(bot_id="bot1", )
resp = self.post_prpc("GetBot", request)
actual_info = swarming_pb2.BotInfo()
_decode(resp.body, actual_info)
expected = message_conversion_prpc.bot_info_response(
bot_management.get_info_key('bot1').get())
self.assertTrue(actual_info.maintenance_msg)
self.assertEqual(expected, actual_info)
def test_bot_get_not_found(self):
self.set_as_privileged_user()
request = swarming_pb2.BotRequest(bot_id="bot1", )
resp = self.post_prpc("GetBot", request, expect_errors=True)
self.assertEqual(resp.status, '404 Not Found')
self.assertEqual(resp.body, 'bot1 not found.')
def test_get_deleted_bot(self):
bot_id = "bot1"
_bot_event('bot_connected', bot_id=bot_id, state={'foo': 0})
self.set_as_admin()
request = swarming_pb2.BotRequest(bot_id=bot_id, )
resp = self.post_prpc("DeleteBot", request)
expected = swarming_pb2.DeleteResponse(deleted=True)
actual = swarming_pb2.DeleteResponse(deleted=True)
_decode(resp.body, actual)
self.assertEqual(expected, actual)
# Now check whether the bot still exists, it will return not found
request = swarming_pb2.BotRequest(bot_id=bot_id, )
resp = self.post_prpc("GetBot", request)
self.assertEqual(resp.status, '200 OK')
ghost_bot = swarming_pb2.BotInfo()
_decode(resp.body, ghost_bot)
self.assertTrue(ghost_bot.deleted)
def test_unauthorized_bot_not_found(self):
self.set_as_anonymous()
request = swarming_pb2.BotRequest(bot_id="bot1", )
resp = self.post_prpc("GetBot", request, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
self.assertEqual(resp.body, 'Access is denied.')
@parameterized.expand([
'GetBot',
'ListBotEvents',
])
def test_get_ok_realm(self, api):
# non-privileged user with realm permission.
self.mock_auth_db([auth.Permission('swarming.pools.listBots')])
self.set_as_user()
_bot_event('bot_connected', bot_id='id1')
resp = self.post_prpc(api,
swarming_pb2.BotRequest(bot_id='id1'),
expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
@parameterized.expand([
'GetBot',
'ListBotEvents',
])
def test_get_forbidden(self, api):
self.mock_auth_db([])
# non-privileged user with no realm permission.
self.set_as_user()
# alive bot
_bot_event('bot_connected', bot_id='id1')
resp = self.post_prpc(api,
swarming_pb2.BotRequest(bot_id='id1'),
expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
# deleted bot
with mock.patch('server.acl._is_admin', return_value=True):
self.post_prpc('DeleteBot', swarming_pb2.BotRequest(bot_id='id1'))
resp = self.post_prpc(api,
swarming_pb2.BotRequest(bot_id='id1'),
expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
def test_delete_ok(self):
"""Assert that delete finds and deletes a bot."""
self.set_as_admin()
self.mock(acl, '_is_admin', lambda *_args, **_kwargs: True)
state = {
'dict': {
'random': 'values'
},
'float': 0.,
'list': ['of', 'things'],
'str': u'uni',
}
_bot_event('request_sleep', bot_id='id1', state=state)
# delete the bot
resp = self.post_prpc('DeleteBot', swarming_pb2.BotRequest(bot_id='id1'))
actual = swarming_pb2.DeleteResponse()
_decode(resp.body, actual)
self.assertEqual(swarming_pb2.DeleteResponse(deleted=True), actual)
# is it gone?
resp = self.post_prpc('DeleteBot',
swarming_pb2.BotRequest(bot_id='id1'),
expect_errors=True)
self.assertEqual(resp.status, '404 Not Found')
def test_events(self):
# Run one task, push an event manually.
second = datetime.timedelta(seconds=1)
self.mock(random, 'getrandbits', lambda _: 0x88)
first_ticker = test_case.Ticker(self.now, datetime.timedelta(seconds=1))
t1 = self.mock_now(first_ticker())
self.set_as_bot()
params = self.do_handshake()
t2 = self.mock_now(first_ticker())
self.bot_poll(params=params)
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
t3 = self.mock_now(first_ticker())
res = self.bot_poll(params=params)
now_60 = first_ticker.last() + datetime.timedelta(seconds=60)
second_ticker = test_case.Ticker(now_60, datetime.timedelta(seconds=1))
t4 = self.mock_now(second_ticker())
resp = self.bot_complete_task(task_id=res['manifest']['task_id'])
self.assertEqual({u'must_stop': False, u'ok': True}, resp)
params['event'] = 'bot_rebooting'
params['message'] = 'for the best'
t5 = self.mock_now(second_ticker())
resp = self.post_json('/swarming/api/v1/bot/event', params)
self.assertEqual({}, resp)
start = first_ticker.first()
end = second_ticker.last()
self.set_as_privileged_user()
request = swarming_pb2.BotEventsRequest(bot_id="bot1", limit=200)
request.start.FromDatetime(start)
request.end.FromDatetime(end + second)
resp = self.post_prpc("ListBotEvents", request)
dimensions = [
{
u'key': u'id',
u'value': [u'bot1']
},
{
u'key': u'os',
u'value': [u'Amiga']
},
{
u'key': u'pool',
u'value': [u'default']
},
]
state_dict = {
'bot_group_cfg_version': 'default',
'running_time': 1234.,
'sleep_streak': 0,
'started_ts': 1410990411.111,
}
state = unicode(
json.dumps(state_dict, sort_keys=True, separators=(',', ':')))
state_dict.pop('bot_group_cfg_version')
state_no_cfg_ver = unicode(
json.dumps(state_dict, sort_keys=True, separators=(',', ':')))
items = [
{
u'authenticated_as': u'bot:whitelisted-ip',
u'dimensions': dimensions,
u'event_type': u'bot_rebooting',
u'external_ip': unicode(self.source_ip),
u'message': u'for the best',
u'quarantined': False,
u'state': state,
u'ts': fmtdate(t5),
u'version': unicode(self.bot_version),
},
{
u'authenticated_as': u'bot:whitelisted-ip',
u'dimensions': dimensions,
u'event_type': u'task_completed',
u'external_ip': unicode(self.source_ip),
u'quarantined': False,
u'state': state,
u'task_id': res['manifest']['task_id'],
u'ts': fmtdate(t4),
u'version': unicode(self.bot_version),
},
{
u'authenticated_as': u'bot:whitelisted-ip',
u'dimensions': dimensions,
u'event_type': u'request_task',
u'external_ip': unicode(self.source_ip),
u'quarantined': False,
u'state': state,
u'task_id': res['manifest']['task_id'],
u'ts': fmtdate(t3),
u'version': unicode(self.bot_version),
},
{
u'authenticated_as': u'bot:whitelisted-ip',
u'dimensions': dimensions,
u'event_type': u'request_sleep',
u'external_ip': unicode(self.source_ip),
u'quarantined': False,
u'state': state,
u'ts': fmtdate(t2),
u'version': unicode(self.bot_version),
},
{
u'authenticated_as': u'bot:whitelisted-ip',
u'dimensions': dimensions,
u'event_type': u'bot_connected',
u'external_ip': unicode(self.source_ip),
u'quarantined': False,
u'state': state_no_cfg_ver,
u'ts': fmtdate(t1),
u'version': u'123',
},
]
expected = swarming_pb2.BotEventsResponse()
_decode_dict(dict(items=items), expected)
actual = swarming_pb2.BotEventsResponse()
_decode(resp.body, actual)
# Now test with a subset.
start = second_ticker.first()
end = second_ticker.last()
request = swarming_pb2.BotEventsRequest(bot_id="bot1", limit=200)
request.start.FromDatetime(start)
request.end.FromDatetime(end + second)
_encode(request)
resp = self.post_prpc("ListBotEvents", request)
actual = swarming_pb2.BotEventsResponse()
_decode(resp.body, actual)
# actual.items[:] converts to a list so it can be compared with expected
# items
self.assertEqual(expected.items[:2], actual.items[:])
def test_terminate_admin(self):
self.set_as_bot()
self.bot_poll()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_admin()
request = swarming_pb2.TerminateRequest(bot_id="bot1")
resp = self.post_prpc('TerminateBot', request)
actual = swarming_pb2.TerminateResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TerminateResponse(task_id='5cee488008810')
self.assertEqual(expected, actual)
def test_terminate_privileged_user(self):
self.set_as_bot()
self.bot_poll()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_privileged_user()
bot_id = "bot1"
reason = "hello\n\n\tworld"
request = swarming_pb2.TerminateRequest(bot_id=bot_id, reason=reason)
resp = self.post_prpc('TerminateBot', request)
actual = swarming_pb2.TerminateResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TerminateResponse(task_id='5cee488008810')
self.assertEqual(expected, actual)
request_key, _ = task_pack.get_request_and_result_keys(actual.task_id)
request = request_key.get()
self.assertEqual("Terminate bot1: hello world", request.name)
def test_terminate_with_extremely_long_reason(self):
self.set_as_bot()
self.bot_poll()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_privileged_user()
bot_id = "bot1"
reason = "hello world" * 10000
request = swarming_pb2.TerminateRequest(bot_id=bot_id, reason=reason)
resp = self.post_prpc('TerminateBot', request, expect_errors=True)
self.assertEqual(resp.status, '400 Bad Request')
def test_terminate_user(self):
self.set_as_bot()
self.bot_poll()
self.mock(random, 'getrandbits', lambda _: 0x88)
# without realm permission.
self.set_as_user()
self.mock_auth_db([])
request = swarming_pb2.TerminateRequest(bot_id="bot1")
resp = self.post_prpc('TerminateBot', request, expect_errors=True)
expected = cgi.escape(
'user "user@example.com" does not have '
'permission "swarming.pools.terminateBot"',
quote=True)
self.assertEqual(expected, resp.body)
# give permission.
self.mock_auth_db([auth.Permission('swarming.pools.terminateBot')])
request = swarming_pb2.BotRequest(bot_id="bot1")
resp = self.post_prpc('TerminateBot', request)
actual = swarming_pb2.TerminateResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TerminateResponse(task_id='5cee488008810')
self.assertEqual(expected, actual)
def test_terminate_via_rbe(self):
self.set_as_bot()
self.bot_poll()
self.mock(random, 'getrandbits', lambda _: 0x88)
def mocked_rbe_config(*_args):
return rbe.RBEBotConfig(
instance='some/rbe/instance',
hybrid_mode=False)
self.mock(rbe, 'get_rbe_config_for_bot', mocked_rbe_config)
self.set_as_admin()
request = swarming_pb2.TerminateRequest(bot_id="bot1")
resp = self.post_prpc('TerminateBot', request)
actual = swarming_pb2.TerminateResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TerminateResponse(task_id='5cee488008810')
self.assertEqual(expected, actual)
key, _ = task_pack.get_request_and_result_keys(actual.task_id)
req = key.get()
self.assertEqual(req.rbe_instance, 'some/rbe/instance')
def test_tasks_ok(self):
"""Asserts that tasks produces bot information."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
res = self.bot_poll()
response = self.bot_complete_task(task_id=res['manifest']['task_id'])
self.assertEqual({u'must_stop': False, u'ok': True}, response)
now_1 = self.mock_now(self.now, 1)
self.mock(random, 'getrandbits', lambda _: 0x55)
self.set_as_user()
self.client_create_task_raw(name='philbert')
self.set_as_bot()
res = self.bot_poll()
response = self.bot_complete_task(exit_code=1,
task_id=res['manifest']['task_id'])
self.assertEqual({u'must_stop': False, u'ok': True}, response)
start = self.now + datetime.timedelta(seconds=0.5)
end = now_1 + datetime.timedelta(seconds=0.5)
self.set_as_privileged_user()
request = swarming_pb2.BotTasksRequest()
request.bot_id = 'bot1'
request.start.FromDatetime(start)
request.end.FromDatetime(end)
request.sort = swarming_pb2.QUERY_CREATED_TS
request.state = swarming_pb2.QUERY_ALL
request.include_performance_stats = True
request.limit = 100
response = self.post_prpc('ListBotTasks', request)
actual = swarming_pb2.TaskListResponse()
_decode(response.body, actual)
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_run_result(expected)
expected.bot_idle_since_ts.FromDatetime(now_1)
expected.completed_ts.FromDatetime(now_1)
expected.modified_ts.FromDatetime(now_1)
expected.started_ts.FromDatetime(now_1)
expected.created_ts.FromDatetime(now_1)
expected.duration = 0.1
expected.name = 'philbert'
expected.exit_code = 1
expected.failure = True
expected.run_id = '5cee870005511'
expected.state = swarming_pb2.TaskState.COMPLETED
expected.task_id = '5cee870005511'
expected.costs_usd[:] = [0.1]
self.gen_perf_stats_prpc(expected.performance_stats)
self.assertEqual(expected, actual.items[0])
def test_all_task_sorts(self):
self.set_as_privileged_user()
for sort in (swarming_pb2.QUERY_CREATED_TS, swarming_pb2.QUERY_STARTED_TS,
swarming_pb2.QUERY_COMPLETED_TS):
request = swarming_pb2.BotTasksRequest()
request.bot_id = 'bot1'
request.sort = sort
request.state = swarming_pb2.QUERY_ALL
request.include_performance_stats = True
request.limit = 100
response = self.post_prpc('ListBotTasks', request)
self.assertEqual(response.status, '200 OK')
def test_abandoned_has_error(self):
"""abandoned_ts does not have an index with bot_id. So we test that it
specifically will return a 400 bad request.
"""
self.set_as_privileged_user()
request = swarming_pb2.BotTasksRequest()
request.bot_id = 'bot1'
request.sort = swarming_pb2.QUERY_ABANDONED_TS
request.state = swarming_pb2.QUERY_ALL
request.include_performance_stats = True
request.limit = 100
response = self.post_prpc('ListBotTasks', request, expect_errors=True)
self.assertEqual(response.status, '400 Bad Request')
def test_all_task_state_filters(self):
self.set_as_privileged_user()
for state in swarming_pb2.StateQuery.DESCRIPTOR.values_by_number.keys():
# state cannot be pending since it is a disallowed state for
# a TaskRunResult. See task_result._validate_state_not_pending
if state not in (swarming_pb2.QUERY_PENDING,
swarming_pb2.QUERY_PENDING_RUNNING,
swarming_pb2.QUERY_DEDUPED):
request = swarming_pb2.BotTasksRequest()
request.bot_id = 'bot1'
request.state = state
request.include_performance_stats = True
request.limit = 100
response = self.post_prpc('ListBotTasks', request)
self.assertEqual(response.status, '200 OK')
def test_all_task_state_filters_disallowed_states(self):
self.set_as_privileged_user()
# State cannot be any of these states, since for `bot.list_tasks`
# a TaskRunResult. See task_result._DISALLOWED_STATES
for state in (swarming_pb2.QUERY_PENDING,
swarming_pb2.QUERY_PENDING_RUNNING,
swarming_pb2.QUERY_DEDUPED):
request = swarming_pb2.BotTasksRequest()
request.bot_id = 'bot1'
request.state = state
request.include_performance_stats = True
request.limit = 100
response = self.post_prpc('ListBotTasks', request, expect_errors=True)
self.assertEqual(response.status, '400 Bad Request')
def test_dimensions_not_found(self):
self.set_as_privileged_user()
request = swarming_pb2.BotsDimensionsRequest(pool='unknown')
response = self.post_prpc('GetBotDimensions', request, expect_errors=True)
self.assertEqual(response.status, '404 Not Found')
def test_dimensions_forbidden(self):
self.set_as_user()
self.mock_auth_db([])
# the user doesn't have permission to get dimensions.
request = swarming_pb2.BotsDimensionsRequest()
response = self.post_prpc('GetBotDimensions', request, expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
request = swarming_pb2.BotsDimensionsRequest(pool='default')
response = self.post_prpc('GetBotDimensions', request, expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
# pool permission isn't sufficient to get all dimensions.
self.mock_auth_db([
auth.Permission('swarming.pools.listBots'),
])
request = swarming_pb2.BotsDimensionsRequest()
response = self.post_prpc('GetBotDimensions', request, expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
def test_dimensions_pool(self):
self.set_as_user()
self.mock_auth_db([
auth.Permission('swarming.pools.listBots'),
])
bot_management.DimensionAggregation(
key=bot_management.get_aggregation_key('default'),
dimensions=[
bot_management.DimensionValues(dimension='foo',
values=['alpha', 'beta']),
],
ts=self.now).put()
expected = swarming_pb2.BotsDimensions(
bots_dimensions=[
swarming_pb2.StringListPair(key='foo', value=['alpha', 'beta']),
],
ts=message_conversion_prpc.date(self.now),
)
request = swarming_pb2.BotsDimensionsRequest(pool='default')
response = self.post_prpc('GetBotDimensions', request)
actual = swarming_pb2.BotsDimensions()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_dimensions_all(self):
"""Asserts that BotsDimensions is returned with the right data."""
self.set_as_privileged_user()
bot_management.DimensionAggregation(
key=bot_management.DimensionAggregation.KEY,
dimensions=[
bot_management.DimensionValues(dimension='foo',
values=['alpha', 'beta']),
bot_management.DimensionValues(dimension='bar',
values=['gamma', 'delta',
'epsilon']),
],
ts=self.now).put()
expected = swarming_pb2.BotsDimensions(
bots_dimensions=[
swarming_pb2.StringListPair(key='foo', value=['alpha', 'beta']),
swarming_pb2.StringListPair(key='bar',
value=['gamma', 'delta', 'epsilon']),
],
ts=message_conversion_prpc.date(self.now),
)
request = swarming_pb2.BotsDimensionsRequest()
response = self.post_prpc('GetBotDimensions', request)
actual = swarming_pb2.BotsDimensions()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_list_ok(self):
"""Asserts that BotInfo is returned for the appropriate set of bots."""
self.set_as_privileged_user()
then = datetime.datetime(2009, 1, 2, 3, 4, 5)
self.mock_now(then)
# Add four bot events, corresponding to one dead bot, one quarantined bot,
# one bot in maintenance, and one good bot
_bot_event('request_sleep', bot_id='id3')
self.mock_now(self.now)
_bot_event('request_sleep', bot_id='id1')
_bot_event('request_sleep', bot_id='id2', quarantined=True)
_bot_event('request_sleep', bot_id='id4', maintenance_msg='very busy')
def _default_bot(bot_id, the_date):
out = swarming_pb2.BotInfo(
authenticated_as='bot:whitelisted-ip',
bot_id=bot_id,
deleted=False,
dimensions=[
swarming_pb2.StringListPair(
key='id',
value=[bot_id],
),
swarming_pb2.StringListPair(
key='pool',
value=['default'],
)
],
external_ip='8.8.4.4',
first_seen_ts=message_conversion_prpc.date(the_date),
is_dead=False,
last_seen_ts=message_conversion_prpc.date(the_date),
quarantined=False,
state='{"ram":65}',
version='123456789')
return out
# setup expected bots
bot1 = _default_bot('id1', self.now)
bot2 = _default_bot('id2', self.now)
bot2.quarantined = True
bot3 = _default_bot('id3', then)
bot4 = _default_bot('id4', self.now)
bot4.maintenance_msg = 'very busy'
expected = swarming_pb2.BotInfoListResponse(
items=[bot1, bot2, bot3, bot4],
death_timeout=config.settings().bot_death_timeout_secs,
now=message_conversion_prpc.date(self.now))
def _verify(items,
quarantined=swarming_pb2.NULL,
in_maintenance=swarming_pb2.NULL,
is_dead=swarming_pb2.NULL,
is_busy=swarming_pb2.NULL,
dimensions=None):
if dimensions is None:
dimensions = []
request = swarming_pb2.BotsRequest(limit=100,
quarantined=quarantined,
in_maintenance=in_maintenance,
is_dead=is_dead,
is_busy=is_busy,
dimensions=dimensions)
response = self.post_prpc('ListBots', request)
actual = swarming_pb2.BotInfoListResponse()
_decode(response.body, actual)
expected.ClearField('items')
expected.items.extend(items)
self.assertEqual(expected.items, actual.items)
self.assertEqual(expected.death_timeout, actual.death_timeout)
_verify([bot1, bot2, bot3, bot4])
# This will mark bot3 as dead since it was created before self.now
self.assertEqual(1, bot_management.cron_update_bot_info())
bot3.is_dead = True
expected.items[2].is_dead = True
_verify([bot1, bot2, bot3, bot4])
# All bots should be returned if we don't care about quarantined
_verify([bot1, bot2, bot3, bot4], quarantined=swarming_pb2.NULL)
# All bots should be returned if we don't care about is_dead
_verify([bot1, bot2, bot3, bot4], is_dead=swarming_pb2.NULL)
# Only bot1 corresponds to these two dimensions
_verify(items=[bot1],
dimensions=[
swarming_pb2.StringPair(key='pool', value='default'),
swarming_pb2.StringPair(key='id', value='id1'),
])
# Only bot1 corresponds to being not dead and not quarantined and
# not in maintenance and this dimension
_verify(items=[bot1],
dimensions=[swarming_pb2.StringPair(key='pool', value='default')],
quarantined=swarming_pb2.FALSE,
in_maintenance=swarming_pb2.FALSE,
is_dead=swarming_pb2.FALSE)
# exclude bot2 only, which is quarantined
_verify(items=[bot1, bot3, bot4], quarantined=swarming_pb2.FALSE)
# exclude bot3 only, which is dead
_verify(items=[bot1, bot2, bot4], is_dead=swarming_pb2.FALSE)
# only bot2 is quarantined
_verify(items=[bot2], quarantined=swarming_pb2.TRUE)
# only bot4 is in maintenance
_verify(items=[bot4], in_maintenance=swarming_pb2.TRUE)
# quarantined:true can be paired with other dimensions and still work
_verify(items=[bot2],
quarantined=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='pool', value='default')])
# in_maintenance:true can be paired with other dimensions and still work
_verify(items=[bot4],
in_maintenance=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='pool', value='default')])
# only bot3 is dead
_verify(items=[bot3], is_dead=swarming_pb2.TRUE)
# is_dead:true can be paired with other dimensions and still work
_verify(items=[bot3],
is_dead=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='pool', value='default')])
# only 1 bot is "ready for work"
_verify(items=[bot1],
is_busy=swarming_pb2.FALSE,
is_dead=swarming_pb2.FALSE,
quarantined=swarming_pb2.FALSE)
# not:existing is a dimension that doesn't exist, nothing returned.
_verify(items=[],
dimensions=[swarming_pb2.StringPair(key='not', value='existing')])
# quarantined:true can be paired with other non-existing dimensions and
# still work
_verify(items=[],
quarantined=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='not', value='existing')])
# in_maintenance:true can be paired with other non-existing dimensions and
# still work
_verify(items=[],
in_maintenance=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='not', value='existing')])
# is_dead:true can be paired with other non-existing dimensions and
# still work
_verify(items=[],
is_dead=swarming_pb2.TRUE,
dimensions=[swarming_pb2.StringPair(key='not', value='existing')])
# No bot is both dead and quarantined
_verify(items=[], is_dead=swarming_pb2.TRUE, quarantined=swarming_pb2.TRUE)
# OR dimension finds bot1 and bot2
_verify(items=[bot1, bot2],
dimensions=[swarming_pb2.StringPair(key='id', value='id1|id2')])
def test_count_ok(self):
self.set_as_privileged_user()
then = datetime.datetime(2009, 1, 2, 3, 4, 5)
self.mock_now(then)
_bot_event('request_sleep', bot_id='id3', quarantined=True)
self.mock_now(self.now)
_bot_event('request_task', bot_id='id1', task_id='987')
_bot_event('request_sleep', bot_id='id2', quarantined=True)
_bot_event('request_sleep', bot_id='id4', maintenance_msg='very busy')
def _verify(dimensions,
count=0,
quarantined=0,
maintenance=0,
dead=0,
busy=0):
request = swarming_pb2.BotsCountRequest(dimensions=dimensions)
resp = self.post_prpc('CountBots', request)
actual = swarming_pb2.BotsCount()
_decode(resp.body, actual)
self.assertEqual(count, actual.count)
self.assertEqual(quarantined, actual.quarantined)
self.assertEqual(maintenance, actual.maintenance)
self.assertEqual(dead, actual.dead)
self.assertEqual(busy, actual.busy)
_verify(dimensions=[],
count=4,
quarantined=2,
maintenance=1,
dead=0,
busy=4)
self.assertEqual(1, bot_management.cron_update_bot_info())
_verify(dimensions=[],
count=4,
quarantined=2,
maintenance=1,
dead=1,
busy=4)
_verify(dimensions=[
swarming_pb2.StringPair(key='pool', value='default'),
swarming_pb2.StringPair(key='id', value='id1')
],
count=1,
busy=1)
_verify(dimensions=[swarming_pb2.StringPair(key='id', value='id1|id2')],
count=2,
quarantined=1,
maintenance=0,
dead=0,
busy=2)
_verify(dimensions=[
swarming_pb2.StringPair(key='pool', value='default'),
swarming_pb2.StringPair(key='id', value='id3')
],
count=1,
quarantined=1,
dead=1,
busy=1)
_verify(dimensions=[
swarming_pb2.StringPair(key='pool', value='default'),
swarming_pb2.StringPair(key='id', value='id4')
],
busy=1,
maintenance=1,
count=1)
_verify(dimensions=[swarming_pb2.StringPair(key='non', value='existing')])
def test_count_bad_request(self):
self.set_as_privileged_user()
request = swarming_pb2.BotsCountRequest(
dimensions=[swarming_pb2.StringPair(key='a', value='')])
resp = self.post_prpc('CountBots', request, expect_errors=True)
self.assertEqual(resp.status, '400 Bad Request')
@parameterized.expand([
('ListBots', swarming_pb2.BotsRequest(limit=10)),
('CountBots', swarming_pb2.BotsCountRequest()),
])
def test_ok_realm(self, rpc, request):
# non-privileged user, with realm permission.
self.set_as_user()
self.mock_auth_db([auth.Permission('swarming.pools.listBots')])
request.dimensions.extend(
[swarming_pb2.StringPair(key='pool', value='default')])
self.post_prpc(rpc, request)
@parameterized.expand([
('ListBots', swarming_pb2.BotsRequest(limit=10)),
('CountBots', swarming_pb2.BotsCountRequest()),
])
def test_forbidden_realm(self, rpc, request):
# non-privileged user, with no permissions.
self.mock_auth_db([])
self.set_as_user()
# the user needs to specify a pool dimension.
resp = self.post_prpc(rpc, request, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
# the user needs to have permissions of the requested pools.
request.dimensions.extend(
[swarming_pb2.StringPair(key='pool', value='default')])
response = self.post_prpc(rpc, request, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
expected = cgi.escape(
'user "user@example.com" does not have permission '
'"swarming.pools.listBots"',
quote=True)
self.assertEqual(expected, response.body)
# the user needs to specify known pools.
request.ClearField('dimensions')
request.dimensions.extend(
[swarming_pb2.StringPair(key='pool', value='unknown')])
response = self.post_prpc(rpc, request, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
expected = cgi.escape('No such pool or no permission to use it: unknown',
quote=True)
self.assertEqual(expected, response.body)
class TaskServicePrpcTest(PrpcTest):
def setUp(self):
super(TaskServicePrpcTest, self).setUp()
self.service = "swarming.v2.Tasks"
self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(self.now)
self.mock_default_pool_acl([])
self.mock_tq_tasks()
self.mock(service_accounts, 'has_token_server', lambda: True)
def test_result_unknown(self):
"""Asserts that result raises 404 for unknown task IDs."""
self.set_as_privileged_user()
resp = self.post_prpc('GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id='12310'),
expect_errors=True)
self.assertEqual(resp.status, '404 Not Found')
def test_result_long(self):
"""Asserts that result raises 400 for wildly invalid task IDs."""
self.set_as_privileged_user()
resp = self.post_prpc('GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id='12310' *
10),
expect_errors=True)
self.assertEqual(resp.status, '400 Bad Request')
def test_result_ok(self):
"""Asserts that result produces a result entity."""
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
# pending task
self.set_as_user()
_, task_id = self.client_create_task_raw()
response = self.post_prpc(
'GetResult', swarming_pb2.TaskIdWithPerfRequest(task_id=task_id))
expected = swarming_pb2.TaskResultResponse()
expected.created_ts.FromDatetime(self.now)
expected.modified_ts.FromDatetime(self.now)
expected.failure = False
expected.internal_failure = False
expected.name = 'job1'
expected.server_versions[:] = ['v1a']
expected.state = swarming_pb2.TaskState.PENDING
expected.tags[:] = [
u'a:tag',
u'authenticated:user:user@example.com',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
]
expected.task_id = '5cee488008810'
expected.user = 'joe@localhost'
actual = swarming_pb2.TaskResultResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
# no bot started: running task
run_id = task_id[:-1] + '1'
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=run_id),
expect_errors=True)
self.assertEqual(response.status, '404 Not Found')
# run as bot
self.set_as_bot()
self.bot_poll()
self.set_as_user()
response = self.post_prpc(
'GetResult', swarming_pb2.TaskIdWithPerfRequest(task_id=run_id))
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_run_result(expected)
expected.bot_idle_since_ts.FromDatetime(self.now)
expected.created_ts.FromDatetime(self.now)
expected.modified_ts.FromDatetime(self.now)
expected.started_ts.FromDatetime(self.now)
expected.current_task_slice = 0
actual = swarming_pb2.TaskResultResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_result_completed_task(self):
"""Tests that completed tasks are correctly reported."""
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
task_id = self.bot_run_task()
# First ask without perf metadata.
self.set_as_user()
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=task_id,
include_performance_stats=True))
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_run_result(expected)
self.gen_perf_stats_prpc(expected.performance_stats)
expected.bot_idle_since_ts.FromDatetime(self.now)
expected.completed_ts.FromDatetime(self.now)
expected.modified_ts.FromDatetime(self.now)
expected.created_ts.FromDatetime(self.now)
expected.started_ts.FromDatetime(self.now)
expected.state = swarming_pb2.COMPLETED
expected.costs_usd[:] = [0.1]
expected.run_id = task_id
expected.task_id = task_id
expected.duration = 0.1
actual = swarming_pb2.TaskResultResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_request_unknown(self):
"""Asserts that 404 is raised for unknown tasks."""
self.set_as_user()
response = self.post_prpc('GetRequest',
swarming_pb2.TaskIdRequest(task_id='12310'),
expect_errors=True)
self.assertEqual(response.status, '404 Not Found')
def test_request_ok(self):
"""Asserts that request produces a task request."""
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_default_pool_acl(['service-account@example.com'])
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id = self.client_create_task_raw(
properties={'secret_bytes': 'zekret'},
service_account='service-account@example.com',
realm='test:task_realm')
expected_props = swarming_pb2.TaskProperties()
apply_default_for_task_props(expected_props)
expected_props.secret_bytes = b'<REDACTED>'
expected_props.command[:] = ['python', 'run_test.py']
expected = swarming_pb2.TaskRequestResponse(
properties=expected_props,
task_slices=[
swarming_pb2.TaskSlice(expiration_secs=86400,
properties=expected_props,
wait_for_capacity=False)
])
apply_defaults_for_request(expected)
expected.created_ts.FromDatetime(self.now)
expected.service_account = 'service-account@example.com'
expected.bot_ping_tolerance_secs = 600
expected.expiration_secs = 86400
expected.realm = 'test:task_realm'
expected.tags[:] = [
u'a:tag',
u'authenticated:user:user@example.com',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:test:task_realm',
u'service_account:service-account@example.com',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
]
response = self.post_prpc('GetRequest',
swarming_pb2.TaskIdRequest(task_id=task_id))
actual = swarming_pb2.TaskRequestResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_cancel_pending(self):
"""Asserts that task cancellation goes smoothly."""
# catch PubSub notification
# Create and cancel a task as a non-privileged user.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id = self.client_create_task_raw(
pubsub_topic='projects/abc/topics/def', pubsub_userdata='blah')
expected = swarming_pb2.CancelResponse(canceled=True, was_running=False)
response = self.post_prpc(
'CancelTask',
swarming_pb2.TaskCancelRequest(task_id=task_id, kill_running=False))
actual = swarming_pb2.CancelResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
# determine that the task's state updates correctly
expected = swarming_pb2.TaskResultResponse(
abandoned_ts=message_conversion_prpc.date(self.now),
completed_ts=message_conversion_prpc.date(self.now),
created_ts=message_conversion_prpc.date(self.now),
current_task_slice=0,
failure=False,
internal_failure=False,
modified_ts=message_conversion_prpc.date(self.now),
name='job1',
server_versions=['v1a'],
state='CANCELED',
tags=[
'a:tag',
'authenticated:user:user@example.com',
'os:Amiga',
'pool:default',
'priority:20',
'realm:none',
'service_account:none',
'swarming.pool.template:none',
'swarming.pool.version:pools_cfg_rev',
'user:joe@localhost',
],
task_id=task_id,
user=u'joe@localhost',
)
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=task_id,
include_performance_stats=False))
actual = swarming_pb2.TaskResultResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_cancel_forbidden(self):
"""Asserts that non-privileged non-owner can't cancel tasks."""
# Create a task as an admin.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_admin()
_, task_id = self.client_create_task_raw(
pubsub_topic='projects/abc/topics/def', pubsub_userdata='blah')
# Attempt to cancel as non-privileged user -> HTTP 403.
self.set_as_user()
self.mock_auth_db([])
response = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(
task_id=task_id, kill_running=False),
expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
def test_cancel_with_realm_permission(self):
# someone creates tasks with/without realm.
self.set_as_privileged_user()
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id_with_realm = self.client_create_task_raw(realm='test:task_realm')
_, task_id_without_realm = self.client_create_task_raw(realm=None)
def assertTaskIsNotAccessible(task_id):
response = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(
task_id=task_id, kill_running=False),
expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
self.assertEqual(
cgi.escape('Task "%s" is not accessible' % task_id, quote=True),
response.body)
# non-privileged user can't cancel the both tasks without permission.
self.set_as_user()
assertTaskIsNotAccessible(task_id_with_realm)
assertTaskIsNotAccessible(task_id_without_realm)
# the user can cancel to the both tasks with swarming.pools.cancelTask
# permission.
self.mock_auth_db([auth.Permission('swarming.pools.cancelTask')])
self.post_prpc(
'CancelTask',
swarming_pb2.TaskCancelRequest(task_id=task_id_with_realm,
kill_running=False))
self.post_prpc(
'CancelTask',
swarming_pb2.TaskCancelRequest(task_id=task_id_without_realm,
kill_running=False))
# the user can cancel with swarming.tasks.cancel permission.
self.mock_auth_db([auth.Permission('swarming.tasks.cancel')])
self.post_prpc(
'CancelTask',
swarming_pb2.TaskCancelRequest(task_id=task_id_with_realm,
kill_running=False))
# but, not accessible to the task without realm.
assertTaskIsNotAccessible(task_id_without_realm)
def test_cancel_running(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id = self.client_create_task_raw(properties=dict(
command=['python', 'runtest.py']))
self.set_as_bot()
params = self.do_handshake()
data = self.post_json('/swarming/api/v1/bot/poll', params)
run_id = data['manifest']['task_id']
def _params(**kwargs):
out = {
'cost_usd': 0.1,
'duration': None,
'exit_code': None,
'id': 'bot1',
'output': None,
'output_chunk_start': 0,
'task_id': run_id,
}
out.update(**kwargs)
return out
self.set_as_bot()
params = _params(output=base64.b64encode('Oh '))
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': False, u'ok': True}, response)
self.set_as_user()
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_result_summary(expected)
expected.bot_idle_since_ts.FromDatetime(self.now)
expected.costs_usd[:] = [0.1]
expected.created_ts.FromDatetime(self.now)
expected.modified_ts.FromDatetime(self.now)
expected.started_ts.FromDatetime(self.now)
expected.state = swarming_pb2.TaskState.RUNNING
expected.tags[:] = [
u'a:tag', u'authenticated:user:user@example.com', u'os:Amiga',
u'pool:default', u'priority:20', u'realm:none', u'service_account:none',
u'swarming.pool.template:none', u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost'
]
actual = swarming_pb2.TaskResultResponse()
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=task_id,
include_performance_stats=False))
_decode(response.body, actual)
self.assertEqual(expected, actual)
# Denied if kill_running == False.
response = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(
task_id=task_id, kill_running=False),
expect_errors=True)
actual = swarming_pb2.CancelResponse()
_decode(response.body, actual)
self.assertEqual(
swarming_pb2.CancelResponse(canceled=False, was_running=True), actual)
# Works if kill_running == True.
response = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(task_id=task_id,
kill_running=True),
expect_errors=True)
actual = swarming_pb2.CancelResponse()
_decode(response.body, actual)
self.assertEqual(
swarming_pb2.CancelResponse(canceled=True, was_running=True), actual)
self.set_as_bot()
params = _params(output=base64.b64encode('hi'), output_chunk_start=3)
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': True, u'ok': True}, response)
# abandoned_ts is set but state isn't changed yet.
self.set_as_user()
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_result_summary(expected)
expected.abandoned_ts.FromDatetime(self.now)
expected.bot_idle_since_ts.FromDatetime(self.now)
expected.costs_usd[:] = [0.1]
expected.created_ts.FromDatetime(self.now)
expected.modified_ts.FromDatetime(self.now)
expected.started_ts.FromDatetime(self.now)
expected.state = swarming_pb2.TaskState.RUNNING
expected.tags[:] = [
'a:tag',
'authenticated:user:user@example.com',
'os:Amiga',
'pool:default',
'priority:20',
'realm:none',
'service_account:none',
'swarming.pool.template:none',
'swarming.pool.version:pools_cfg_rev',
'user:joe@localhost',
]
actual = swarming_pb2.TaskResultResponse()
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=task_id,
include_performance_stats=False))
_decode(response.body, actual)
self.assertEqual(expected, actual)
# Bot terminates the task.
self.set_as_bot()
params = _params(output=base64.b64encode(' again'),
output_chunk_start=6,
duration=0.1,
exit_code=0)
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': True, u'ok': True}, response)
self.set_as_user()
expected = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_result_summary(expected)
expected.abandoned_ts.FromDatetime(self.now)
expected.bot_idle_since_ts.FromDatetime(self.now)
expected.completed_ts.FromDatetime(self.now)
expected.costs_usd[:] = [0.1]
expected.created_ts.FromDatetime(self.now)
expected.duration = 0.1
expected.exit_code = 0
expected.modified_ts.FromDatetime(self.now)
expected.started_ts.FromDatetime(self.now)
expected.state = swarming_pb2.TaskState.KILLED
expected.tags[:] = [
u'a:tag',
u'authenticated:user:user@example.com',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
]
actual = swarming_pb2.TaskResultResponse()
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=task_id,
include_performance_stats=False))
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_cancel_with_too_long_key(self):
"""Asserts that result raises 400 for wildly invalid task IDs."""
self.set_as_privileged_user()
resp = self.post_prpc('CancelTask',
swarming_pb2.TaskCancelRequest(task_id='12310' * 10,
kill_running=False),
expect_errors=True)
self.assertEqual(resp.status, '400 Bad Request')
def test_stdout_ok(self):
"""Asserts that stdout reports a task's output."""
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw()
# task_id determined by bot run
self.set_as_bot()
task_id = self.bot_run_task()
self.set_as_privileged_user()
run_id = task_id[:-1] + '1'
expected = swarming_pb2.TaskOutputResponse(
output=u'rÉsult string'.encode('utf-8'),
state=swarming_pb2.TaskState.COMPLETED)
for i in (task_id, run_id):
response = self.post_prpc('GetStdout',
swarming_pb2.TaskIdWithOffsetRequest(task_id=i))
actual = swarming_pb2.TaskOutputResponse()
_decode(response.body, actual)
self.assertEqual(expected, actual)
# Partial fetch.
response = self.post_prpc(
'GetStdout',
swarming_pb2.TaskIdWithOffsetRequest(task_id=task_id,
offset=1,
length=2))
actual = swarming_pb2.TaskOutputResponse()
_decode(response.body, actual)
# This is because it's counting in bytes, not in unicode characters:
expected = swarming_pb2.TaskOutputResponse(
output=u'É'.encode('utf-8'), state=swarming_pb2.TaskState.COMPLETED)
self.assertEqual(expected, actual)
response = self.post_prpc(
'GetStdout',
swarming_pb2.TaskIdWithOffsetRequest(task_id=task_id,
offset=3,
length=5))
actual = swarming_pb2.TaskOutputResponse()
_decode(response.body, actual)
expected = swarming_pb2.TaskOutputResponse(
output=u'sult '.encode('utf-8'), state=swarming_pb2.TaskState.COMPLETED)
self.assertEqual(expected, actual)
def test_stdout_empty(self):
"""Asserts that incipient tasks produce no output."""
self.set_as_user()
_, task_id = self.client_create_task_raw()
response = self.post_prpc(
'GetStdout', swarming_pb2.TaskIdWithOffsetRequest(task_id=task_id))
actual = swarming_pb2.TaskOutputResponse()
_decode(response.body, actual)
expected = swarming_pb2.TaskOutputResponse(
state=swarming_pb2.TaskState.NO_RESOURCE)
self.assertEqual(expected, actual)
def test_result_run_not_found(self):
self.set_as_user()
_, task_id = self.client_create_task_raw()
run_id = task_id[:-1] + '1'
response = self.post_prpc(
'GetStdout',
swarming_pb2.TaskIdWithOffsetRequest(task_id=run_id),
expect_errors=True)
self.assertEqual(response.status, '404 Not Found')
def test_task_deduped(self):
"""Asserts that task deduplication works as expected."""
self.set_as_user()
self.set_as_bot()
self.bot_poll()
self.set_as_user()
_, task_id_1 = self.client_create_task_raw(properties=dict(idempotent=True))
self.set_as_bot()
task_id_bot = self.bot_run_task()
self.assertEqual(task_id_1, task_id_bot[:-1] + '0')
self.assertEqual('1', task_id_bot[-1:])
# second task; this one's results should be returned immediately
self.set_as_user()
_, task_id_2 = self.client_create_task_raw(name='second',
user='jack@localhost',
properties=dict(idempotent=True))
self.set_as_bot()
resp = self.bot_poll()
self.assertEqual('sleep', resp['cmd'])
self.set_as_user()
# results shouldn't change, even if the second task wasn't executed
response = self.post_prpc(
'GetStdout', swarming_pb2.TaskIdWithOffsetRequest(task_id=task_id_2))
actual = swarming_pb2.TaskOutputResponse()
_decode(response.body, actual)
expected = swarming_pb2.TaskOutputResponse(
output=u'rÉsult string'.encode('utf-8'),
state=swarming_pb2.TaskState.COMPLETED)
self.assertEqual(expected, actual)
@parameterized.expand(['GetRequest', 'GetResult', 'GetStdout'])
def test_get_with_realm_permission(self, api):
# someone creates tasks with/without realm.
self.set_as_privileged_user()
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
_, task_id_with_realm = self.client_create_task_raw(realm='test:task_realm')
_, task_id_without_realm = self.client_create_task_raw(realm=None)
def create_request(task_id):
if api == 'GetRequest':
return swarming_pb2.TaskIdRequest(task_id=task_id)
if api == 'GetResult':
return swarming_pb2.TaskIdWithPerfRequest(task_id=task_id)
if api == 'GetStdout':
return swarming_pb2.TaskIdWithOffsetRequest(task_id=task_id)
raise Exception('Unknown api %s' % api)
def assertTaskIsNotAccessible(task_id):
request = create_request(task_id)
response = self.post_prpc(api, request, expect_errors=True)
self.assertEqual(
cgi.escape('Task "%s" is not accessible' % task_id, quote=True),
response.body)
# non-privileged user can't access to the both tasks without permission.
self.set_as_user()
assertTaskIsNotAccessible(task_id_with_realm)
assertTaskIsNotAccessible(task_id_without_realm)
# the user can access to the both tasks with swarming.pools.listTasks
# permission.
self.mock_auth_db([auth.Permission('swarming.pools.listTasks')])
self.post_prpc(api, create_request(task_id_with_realm))
self.post_prpc(api, create_request(task_id_without_realm))
# the user can access with swarming.tasks.get permission.
self.mock_auth_db([auth.Permission('swarming.tasks.get')])
self.post_prpc(api, create_request(task_id_with_realm))
# but, not accessible to the task with no realm.
assertTaskIsNotAccessible(task_id_without_realm)
def test_new_ok(self):
# It can create a new task.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_privileged_user()
ntr = self._new_task_request_prpc(use_default_slice=True)
response = self.post_prpc('NewTask', ntr)
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
self.assertEqual(u'5cee488008810', actual.task_id)
def test_new_task_performance_stats_are_empty(self):
# first create a new task.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_privileged_user()
ntr = self._new_task_request_prpc(use_default_slice=True)
response = self.post_prpc('NewTask', ntr)
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
# A task which has yet to run will have performance stats which
# are empty.
response = self.post_prpc(
'GetResult',
swarming_pb2.TaskIdWithPerfRequest(task_id=actual.task_id,
include_performance_stats=True))
actual = swarming_pb2.TaskResultResponse()
_decode(response.body, actual)
self.assertEqual(swarming_pb2.PENDING, actual.state)
self.assertFalse(actual.HasField('performance_stats'))
def fail_on_using_legacy_acls(self):
def err(*_args, **_kwargs):
raise AssertionError('Must not be called')
self.mock(task_scheduler, 'check_schedule_request_acl_caller', err)
self.mock(task_scheduler, 'check_schedule_request_acl_service_account', err)
def test_new_ok_in_realms_mode(self):
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
# Legacy pool ACLs must no be used in this mode, only realm ACLs are used.
self.fail_on_using_legacy_acls()
ntr = self._new_task_request_prpc()
ntr.properties.command.extend([u'echo', u'hi'])
ntr.properties.dimensions.extend(
[swarming_pb2.StringPair(key=u'pool', value=u'default')])
ntr.properties.execution_timeout_secs = 30
ntr.service_account = 'service-account@example.com'
ntr.realm = 'test:task_realm'
response = self.post_prpc('NewTask', ntr)
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
self.assertEqual(u'5cee488008810', actual.task_id)
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(actual.task_id)
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
# Correctly initialized the service account state.
self.assertEqual('service-account@example.com', req.service_account)
def test_new_ok_with_default_task_realm_not_enforced(self):
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_default_pool_acl(service_accounts=[],
default_task_realm='test:task_realm',
enforced_realm_permissions=None)
# Realm ACLs should be ignored. Note that they are still called to record
# dry run check results.
self.mock(auth, 'has_permission', lambda *_args, **_kwargs: False)
ntr = self._new_task_request_prpc()
ntr.properties.command.extend([u'echo', u'hi'])
ntr.properties.dimensions.extend(
[swarming_pb2.StringPair(key=u'pool', value=u'default')])
ntr.properties.execution_timeout_secs = 30
response = self.post_prpc('NewTask', ntr)
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
self.assertEqual(u'5cee488008810', actual.task_id)
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(actual.task_id)
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
def test_new_ok_with_default_task_realm_enforced(self):
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
self.mock_default_pool_acl(
service_accounts=['service-account@example.com'],
default_task_realm='test:task_realm',
enforced_realm_permissions={
realms_pb2.REALM_PERMISSION_POOLS_CREATE_TASK,
realms_pb2.REALM_PERMISSION_TASKS_CREATE_IN_REALM,
realms_pb2.REALM_PERMISSION_TASKS_ACT_AS,
})
# Legacy pool ACLs must no be used in this mode, only realm ACLs are used.
self.fail_on_using_legacy_acls()
ntr = self._new_task_request_prpc()
ntr.properties.command.extend([u'echo', u'hi'])
ntr.properties.dimensions.extend(
[swarming_pb2.StringPair(key=u'pool', value=u'default')])
ntr.properties.execution_timeout_secs = 30
ntr.service_account = 'service-account@example.com'
response = self.post_prpc('NewTask', ntr)
actual = swarming_pb2.TaskRequestMetadataResponse()
_decode(response.body, actual)
self.assertEqual(u'5cee488008810', actual.task_id)
# Get the produced TaskRequest.
key, _ = task_pack.get_request_and_result_keys(actual.task_id)
req = key.get()
# Make sure associated the task with the correct realm.
self.assertEqual('test:task_realm', req.realm)
# Correctly initialized the service account state.
self.assertEqual('service-account@example.com', req.service_account)
def test_new_invalid_realm(self):
self.set_as_user()
ntr = self._new_task_request_prpc()
ntr.properties.command.extend([u'echo', u'hi'])
ntr.properties.dimensions.extend(
[swarming_pb2.StringPair(key=u'pool', value=u'default')])
ntr.properties.execution_timeout_secs = 30
ntr.service_account = 'service-account@example.com'
ntr.realm = 'test/invalid'
response = self.post_prpc('NewTask', ntr, expect_errors=True)
expected = ("Bad realm u'test/invalid', want "
"&quot;&lt;project&gt;:&lt;name&gt;&quot;")
actual = response.body
self.assertEqual(expected, actual)
@ndb.tasklet
def _updateToken(self, *_args, **_kwargs):
raise ndb.Return('update-token')
def test_new_ok_with_resultdb_and_realm(self):
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
with mock.patch('server.resultdb.create_invocation_async',
side_effect=self._updateToken):
ntr = self._new_task_request_prpc()
ntr.ClearField('expiration_secs')
task_slice = swarming_pb2.TaskSlice(expiration_secs=180,
wait_for_capacity=True)
apply_default_for_task_props(task_slice.properties)
task_slice.properties.command[:] = ['python', 'run_test.py']
ntr.task_slices.extend([task_slice])
ntr.realm = 'test:task_realm'
ntr.resultdb.enable = True
_, task_id = self._client_create_task_prpc(ntr)
# Get the produced TaskRequest, and verify the realm and resultdb config.
rKey, sKey = task_pack.get_request_and_result_keys(task_id)
request, summary = rKey.get(), sKey.get()
self.assertEqual('test:task_realm', request.realm)
self.assertEqual('invocations/task-test-swarming.appspot.com-5cee488008811',
summary.resultdb_info.invocation)
def test_new_ok_with_resultdb_and_default_task_realm(self):
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x88)
self.mock_auth_db([
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
])
self.mock_default_pool_acl(
service_accounts=['service-account@example.com'],
default_task_realm='test:task_realm',
enforced_realm_permissions={
realms_pb2.REALM_PERMISSION_POOLS_CREATE_TASK,
realms_pb2.REALM_PERMISSION_TASKS_CREATE_IN_REALM,
realms_pb2.REALM_PERMISSION_TASKS_ACT_AS,
})
with mock.patch('server.resultdb.create_invocation_async',
side_effect=self._updateToken):
ntr = self._new_task_request_prpc()
ntr.ClearField('expiration_secs')
task_slice = swarming_pb2.TaskSlice(expiration_secs=180,
wait_for_capacity=True)
apply_default_for_task_props(task_slice.properties)
task_slice.properties.command[:] = ['python', 'run_test.py']
ntr.task_slices.extend([task_slice])
ntr.realm = 'test:task_realm'
ntr.resultdb.enable = True
_, task_id = self._client_create_task_prpc(ntr)
# Get the produced TaskRequest, and verify the realm and resultdb config.
rKey, sKey = task_pack.get_request_and_result_keys(task_id)
request, summary = rKey.get(), sKey.get()
self.assertEqual('test:task_realm', request.realm)
self.assertEqual('invocations/task-test-swarming.appspot.com-5cee488008811',
summary.resultdb_info.invocation)
def _prepare_mass_cancel(self):
# Create 3 tasks: one pending, one running, one complete.
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.do_handshake(do_first_poll=True)
# Completed.
self.set_as_user()
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = "first"
ntr.tags[:] = ['project:yay', 'commit:abcd', 'os:Win']
_, _ = self._client_create_task_prpc(ntr)
self.set_as_bot()
self.bot_run_task()
# Running.
self.set_as_user()
self.mock_now(self.now, 60)
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = 'second'
ntr.user = 'jack@localhost'
ntr.tags[:] = ['project:yay', 'commit:efgh', 'os:Win']
_, running_id = self._client_create_task_prpc(ntr)
self.set_as_bot()
self.bot_poll()
# Pending.
self.set_as_user()
now_120 = self.mock_now(self.now, 120)
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = 'third'
ntr.user = 'jack@localhost'
ntr.tags[:] = ['project:yay', 'commit:ijkhl', 'os:Linux']
_, pending_id = self._client_create_task_prpc(ntr)
return running_id, pending_id, now_120
def test_mass_cancel_pending(self):
_, pending_id, now_120 = self._prepare_mass_cancel()
def enqueue_task(url, name, payload):
self.assertEqual('/internal/taskqueue/important/tasks/cancel', url)
self.assertEqual('cancel-tasks', name)
e = {'tasks': [pending_id], 'kill_running': True}
self.assertEqual(e, json.loads(payload))
return True
self.mock(utils, 'enqueue_task', enqueue_task)
self.set_as_admin()
tcr = swarming_pb2.TasksCancelRequest(
limit=100,
tags=['project:yay'],
start=message_conversion_prpc.date(self.now -
datetime.timedelta(seconds=1)),
end=message_conversion_prpc.date(now_120 +
datetime.timedelta(seconds=1)),
)
resp = self.post_prpc('CancelTasks', tcr)
actual = swarming_pb2.TasksCancelResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TasksCancelResponse(
matched=1, now=message_conversion_prpc.date(now_120))
self.assertEqual(expected, actual)
def test_mass_cancel_running(self):
running_id, pending_id, now_120 = self._prepare_mass_cancel()
def enqueue_task(url, name, payload):
self.assertEqual('/internal/taskqueue/important/tasks/cancel', url)
self.assertEqual('cancel-tasks', name)
e = {'tasks': [pending_id, running_id], 'kill_running': True}
self.assertEqual(e, json.loads(payload))
return True
self.mock(utils, 'enqueue_task', enqueue_task)
self.set_as_admin()
tcr = swarming_pb2.TasksCancelRequest(
limit=100,
tags=['project:yay'],
kill_running=True,
start=message_conversion_prpc.date(self.now -
datetime.timedelta(seconds=1)),
end=message_conversion_prpc.date(now_120 +
datetime.timedelta(seconds=1)),
)
resp = self.post_prpc('CancelTasks', tcr)
actual = swarming_pb2.TasksCancelResponse()
_decode(resp.body, actual)
expected = swarming_pb2.TasksCancelResponse(
matched=2, now=message_conversion_prpc.date(now_120))
self.assertEqual(expected, actual)
def test_mass_cancel_realm_permission(self):
# non-privileged user without realm permission.
self.set_as_user()
self.mock_auth_db([])
# the user needs to specify a pool filter.
tcr = swarming_pb2.TasksCancelRequest(
limit=100,
tags=['project:yay'],
kill_running=True,
)
resp = self.post_prpc('CancelTasks', tcr, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
self.assertEqual(u'No pool is specified', resp.body)
# the user can't access the tasks without permission.
tcr.tags[:] = ['pool:default']
resp = self.post_prpc('CancelTasks', tcr, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
self.assertEqual(
cgi.escape((u'user "user@example.com" does not have '
u'permission "swarming.pools.cancelTask"'),
quote=True), resp.body)
# give permission to the user.
self.mock_auth_db([auth.Permission('swarming.pools.cancelTask')])
# the user still needs to specify a pool filter.
tcr.tags[:] = ['foo:bar']
resp = self.post_prpc('CancelTasks', tcr, expect_errors=True)
self.assertEqual(resp.status, '403 Forbidden')
self.assertEqual(u'No pool is specified', resp.body)
# ok if the user has a permission of the specified pool.
tcr.tags[:] = ['pool:default']
self.post_prpc('CancelTasks', tcr)
def _gen_two_tasks(self):
self.mock_now(self.now)
self.mock(random, 'getrandbits', lambda _: 0x88)
self.set_as_bot()
self.bot_poll()
self.set_as_user()
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = "first"
ntr.tags[:] = ['project:yay', 'commit:post', 'pool:default']
ntr.task_slices[0].properties.idempotent = True
_, first_id = self._client_create_task_prpc(ntr)
self.set_as_bot()
self.bot_run_task()
# Hack the datastore so MODIFIED_TS returns in backward order compared to
# CREATED_TS.
now_120 = self.mock_now(self.now, 120)
entity = task_pack.unpack_result_summary_key(first_id).get()
entity.modified_ts = now_120
entity.put()
self.set_as_user()
self.mock(random, 'getrandbits', lambda _: 0x66)
now_60 = self.mock_now(self.now, 60)
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = 'second'
ntr.user = 'jack@localhost'
ntr.tags[:] = ['project:yay', 'commit:pre', 'pool:default']
ntr.task_slices[0].properties.idempotent = True
_, second_id = self._client_create_task_prpc(ntr)
first = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_result_summary(first)
first.task_id = first_id
first.name = "first"
first.bot_idle_since_ts.FromDatetime(self.now)
first.completed_ts.FromDatetime(self.now)
first.costs_usd[:] = [0.1]
first.created_ts.FromDatetime(self.now)
first.duration = 0.1
first.exit_code = 0
first.modified_ts.FromDatetime(now_120)
self.gen_perf_stats_prpc(first.performance_stats)
first.started_ts.FromDatetime(self.now)
first.tags[:] = [
u'authenticated:user:user@example.com',
u'commit:post',
u'os:Amiga',
u'pool:default',
u'priority:20',
u'project:yay',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev',
u'user:joe@localhost',
]
first.user = 'joe@localhost'
second = swarming_pb2.TaskResultResponse()
self.apply_defaults_for_result_summary(second)
second.name = "second"
second.bot_idle_since_ts.FromDatetime(self.now)
second.completed_ts.FromDatetime(self.now)
second.created_ts.FromDatetime(now_60)
second.deduped_from = '5cee488008811'
second.cost_saved_usd = 0.1
second.duration = 0.1
second.exit_code = 0
second.modified_ts.FromDatetime(now_60)
second.run_id = '5cee488008811'
second.started_ts.FromDatetime(self.now)
second.tags[:] = [
u'authenticated:user:user@example.com', u'commit:pre', u'os:Amiga',
u'pool:default', u'priority:20', u'project:yay', u'realm:none',
u'service_account:none', u'swarming.pool.template:none',
u'swarming.pool.version:pools_cfg_rev', u'user:jack@localhost'
]
second.task_id = second_id
second.user = 'jack@localhost'
start = self.now - datetime.timedelta(days=1)
end = self.now + datetime.timedelta(days=1)
return first, second, now_120, message_conversion_prpc.date(
start), message_conversion_prpc.date(end)
def test_list_ok(self):
utils.clear_cache(config.settings)
first, second, now_120, start, end = self._gen_two_tasks()
# Basic request. Default sort is CREATED_TS
self.set_as_privileged_user()
request = swarming_pb2.TasksWithPerfRequest(limit=100,
start=start,
end=end,
state=swarming_pb2.QUERY_COMPLETED,
include_performance_stats=True)
expected = swarming_pb2.TaskListResponse(
now=message_conversion_prpc.date(now_120), items=[second, first])
self.mock_now(self.now)
resp = self.post_prpc('ListTasks', request)
actual = swarming_pb2.TaskListResponse()
_decode(resp.body, actual)
self.assertEqual(expected.items, actual.items)
# Only return the second with specific tags
request = swarming_pb2.TasksWithPerfRequest(limit=100,
start=start,
end=end,
state=swarming_pb2.QUERY_COMPLETED,
include_performance_stats=True,
tags=['project:yay', 'commit:pre'])
expected = swarming_pb2.TaskListResponse(items=[second])
resp = self.post_prpc('ListTasks', request)
actual = swarming_pb2.TaskListResponse()
_decode(resp.body, actual)
self.assertEqual(expected.items, actual.items)
# Return both since or tags are used
request = swarming_pb2.TasksWithPerfRequest(
limit=100,
end=end,
start=start,
tags=['commit:post|pre'],
state=swarming_pb2.QUERY_COMPLETED,
include_performance_stats=False,
)
first.ClearField('performance_stats')
expected = swarming_pb2.TaskListResponse(items=[second, first])
resp = self.post_prpc('ListTasks', request)
actual = swarming_pb2.TaskListResponse()
_decode(resp.body, actual)
self.assertEqual(expected.items, actual.items)
request = swarming_pb2.TasksWithPerfRequest(
limit=100,
state=swarming_pb2.QUERY_COMPLETED,
end=end,
start=start,
tags=['foo:bar'],
)
actual = swarming_pb2.TaskListResponse()
resp = self.post_prpc('ListTasks', request)
_decode(resp.body, actual)
self.assertEqual(0, len(actual.items))
# Both state and tag.
request = swarming_pb2.TasksWithPerfRequest(
limit=100,
end=end,
start=start,
tags=['commit:pre'],
state=swarming_pb2.QUERY_COMPLETED_SUCCESS)
expected = swarming_pb2.TaskListResponse(items=[second])
resp = self.post_prpc('ListTasks', request)
actual = swarming_pb2.TaskListResponse()
_decode(resp.body, actual)
self.assertEqual(expected.items, actual.items)
def test_count_indexes(self):
# Asserts that no combination crashes.
_, _, _, start, end = self._gen_two_tasks()
self.set_as_privileged_user()
for state in swarming_pb2.StateQuery.DESCRIPTOR.values_by_number:
for tags in ([], ['a:1'], ['a:1', 'b:2']):
request = swarming_pb2.TasksCountRequest(
start=start,
end=end,
state=state,
tags=tags,
)
self.post_prpc('CountTasks', request)
@parameterized.expand([
('ListTasks', swarming_pb2.TasksWithPerfRequest(limit=10)),
('CountTasks', swarming_pb2.TasksCountRequest()),
])
def test_realm_permissions_count(self, rpc, request):
# non-privileged user without realm permission.
self.set_as_user()
self.mock_auth_db([])
start = self.now - datetime.timedelta(days=1)
# the user needs to specify a pool filter.
request.start.FromDatetime(start)
expected = cgi.escape('No pool is specified', quote=True)
response = self.post_prpc(rpc, request, expect_errors=True)
self.assertEqual(expected, response.body)
# the user can't access the tasks without permission.
request.tags.extend(['pool:default'])
expected = cgi.escape(
'user "user@example.com" does not have '
'permission "swarming.pools.listTasks"',
quote=True)
response = self.post_prpc(rpc, request, expect_errors=True)
self.assertEqual(expected, response.body)
# give permission to the user.
self.mock_auth_db([auth.Permission('swarming.pools.listTasks')])
response = self.post_prpc(rpc, request)
def test_count(self):
_, _, _, the_start, the_end = self._gen_two_tasks()
self.set_as_privileged_user()
def _verify(count,
state=swarming_pb2.QUERY_COMPLETED_SUCCESS,
tags=None,
start=None,
end=None):
if start is None:
start = the_start
if end is None:
end = the_end
if tags is None:
tags = []
request = swarming_pb2.TasksCountRequest(
start=start,
end=end,
tags=tags,
state=state,
)
actual = swarming_pb2.TasksCount()
response = self.post_prpc('CountTasks', request)
_decode(response.body, actual)
self.assertEqual(count, actual.count)
_verify(2)
_verify(1, tags=['commit:pre'])
_verify(0, state=swarming_pb2.QUERY_PENDING)
new_start = message_conversion_prpc.date(
the_start.ToDatetime() + datetime.timedelta(days=5), )
new_end = message_conversion_prpc.date(
the_end.ToDatetime() + datetime.timedelta(days=10), )
_verify(0, start=new_start, end=new_end)
def test_list_request(self):
self.mock_now(self.now)
self.mock(random, 'getrandbits', lambda _: 0x88)
names = ["foo", "bar", "baz"]
unique_tags = ["a:1", "b:2", "c:3"]
expected_requests = []
ntrs = []
ticker = test_case.Ticker(self.now, datetime.timedelta(seconds=60))
times = [ticker()]
for name, tag in zip(names, unique_tags):
tags = ['project:yay', 'commit:post', 'pool:default', tag]
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.name = name
ntr.tags[:] = tags
ntrs.append(ntr)
expected = swarming_pb2.TaskRequestResponse()
expected.properties.CopyFrom(ntr.task_slices[0].properties)
all_tags = sorted(tags + [
"priority:20",
"os:Amiga",
"realm:none",
"service_account:none",
"swarming.pool.template:none",
"swarming.pool.version:pools_cfg_rev",
"authenticated:user:priv@example.com",
"user:" + ntr.user,
])
expected.tags[:] = all_tags
expected.task_slices.extend(ntr.task_slices)
expected.resultdb.CopyFrom(swarming_pb2.ResultDBCfg())
expected.name = name
expected.service_account = "none"
expected.user = ntr.user
expected.priority = ntr.priority
expected.authenticated = "user:priv@example.com"
expected.expiration_secs = ntr.task_slices[0].expiration_secs
expected.bot_ping_tolerance_secs = ntr.bot_ping_tolerance_secs
expected_requests.append(expected)
self.set_as_privileged_user()
_, first_id = self._client_create_task_prpc(ntrs[0])
expected_requests[0].created_ts.FromDatetime(self.now)
expected_requests[0].task_id = first_id
times.append(ticker())
self.mock_now(times[-1])
self.set_as_bot()
self.bot_poll()
self.bot_run_task()
self.set_as_privileged_user()
for i in range(1, len(ntrs)):
times.append(ticker())
self.mock_now(times[-1])
_, task_id = self._client_create_task_prpc(ntrs[i])
expected_requests[i].task_id = task_id
expected_requests[i].created_ts.FromDatetime(times[-1])
def _verify(present,
start=None,
end=None,
state=swarming_pb2.QUERY_ALL,
sort=swarming_pb2.QUERY_CREATED_TS,
tags=None,
timeoffset=None):
if timeoffset is None:
timeoffset = datetime.timedelta(seconds=1)
if start is None:
start = times[0] - timeoffset
if end is None:
end = times[-1] + timeoffset
if tags is None:
tags = []
request = swarming_pb2.TasksRequest(limit=10,
state=state,
sort=sort,
tags=tags)
request.start.FromDatetime(start)
request.end.FromDatetime(end)
response = self.post_prpc('ListTaskRequests', request)
actual = swarming_pb2.TaskRequestsResponse()
_decode(response.body, actual)
expected_items = [expected_requests[i] for i in present]
self.assertEqual(expected_items, list(actual.items))
_verify([0], end=times[1])
_verify([], tags=["e:1"])
_verify([0], state=swarming_pb2.QUERY_COMPLETED_SUCCESS)
_verify([2, 1, 0])
_verify([0], tags=["a:1"])
def test_list_task_states_ok(self):
self.set_as_privileged_user()
ntr = self._new_task_request_prpc(use_default_slice=True)
_, pending_id = self._client_create_task_prpc(ntr)
_, canceled_id = self._client_create_task_prpc(ntr)
self._client_cancel_task_prpc(canceled_id)
request = swarming_pb2.TaskStatesRequest(
task_id=[canceled_id, pending_id, '1d69b9f088008810'])
response = self.post_prpc('ListTaskStates', request)
actual = swarming_pb2.TaskStates()
_decode(response.body, actual)
expected = swarming_pb2.TaskStates(states=[
swarming_pb2.CANCELED, swarming_pb2.PENDING, swarming_pb2.PENDING
])
self.assertEqual(expected, actual)
def test_list_task_states_invalid(self):
self.set_as_privileged_user()
request = swarming_pb2.TaskStatesRequest(task_id=['invalid'])
response = self.post_prpc('ListTaskStates', request, True)
self.assertEqual(response.status, '400 Bad Request')
expected_error = cgi.escape("Invalid task_id: Invalid key u'invali'",
quote=True)
self.assertEqual(expected_error, response.body)
def test_batch_get_result(self):
self.set_as_privileged_user()
def create_task(realm):
perms = [
auth.Permission('swarming.pools.createTask'),
auth.Permission('swarming.tasks.createInRealm'),
]
self.mock_auth_db(perms, task_realm=realm)
ntr = self._new_task_request_prpc(use_default_slice=True)
ntr.realm = realm
return self._client_create_task_prpc(ntr)[1]
invisible_pending_id = create_task('test:invisible')
invisible_canceled_id = create_task('test:invisible')
self._client_cancel_task_prpc(invisible_canceled_id)
visible_pending_id = create_task('test:visible')
visible_canceled_id = create_task('test:visible')
self._client_cancel_task_prpc(visible_canceled_id)
run_result_id = visible_pending_id[:-1] + '1'
missing_id = '1d69b9f088008810'
# Need to switch to a different user, otherwise all tasks will be visible,
# since whoever created the task is always allowed to see it. Also need
# a non-privileged user that doesn't have acl.can_view_all_tasks() perm.
self.set_as_user()
self.mock_auth_db([auth.Permission('swarming.tasks.get')],
task_realm='test:visible')
request = swarming_pb2.BatchGetResultRequest(task_ids=[
visible_pending_id,
visible_canceled_id,
invisible_pending_id,
invisible_canceled_id,
run_result_id, # won't work, need summary ID
missing_id,
])
response = self.post_prpc('BatchGetResult', request)
actual = swarming_pb2.BatchGetResultResponse()
_decode(response.body, actual)
extract = []
for res in actual.results:
if res.HasField('error'):
extract.append((res.task_id, res.error.message))
else:
self.assertEqual(res.task_id, res.result.task_id)
extract.append((res.task_id, res.result.state))
self.assertEqual(extract, [
(visible_pending_id, swarming_pb2.PENDING),
(visible_canceled_id, swarming_pb2.CANCELED),
(invisible_pending_id, u'No access to see the status of this task'),
(invisible_canceled_id, u'No access to see the status of this task'),
(
run_result_id,
u'Bad task ID "%s": Can\'t reference to a specific try result.' %
run_result_id,
),
(missing_id, u'No such task'),
])
class InternalsServicePrpcTest(PrpcTest):
def setUp(self):
super(InternalsServicePrpcTest, self).setUp()
self.service = 'swarming.internals.rbe.Internals'
@mock.patch('server.task_scheduler.expire_slice')
def test_expire_slice_no_resource(self, expire_slice_mock):
req_key = task_request.new_request_key()
task_id = task_pack.pack_result_summary_key(
task_pack.request_key_to_result_summary_key(req_key))
self.set_as_swarming_itself()
self.post_prpc(
'ExpireSlice',
rbe_pb2.ExpireSliceRequest(
task_id=task_id,
task_to_run_shard=15,
task_to_run_id=32, # slice #2
reason=rbe_pb2.ExpireSliceRequest.NO_RESOURCE,
))
expire_slice_mock.assert_called_once_with(
task_to_run.task_to_run_key_from_parts(req_key, 15, 32),
task_result.State.NO_RESOURCE,
)
@mock.patch('server.task_scheduler.expire_slice')
def test_expire_slice_permission_denied(self, expire_slice_mock):
req_key = task_request.new_request_key()
task_id = task_pack.pack_result_summary_key(
task_pack.request_key_to_result_summary_key(req_key))
self.set_as_swarming_itself()
self.post_prpc(
'ExpireSlice',
rbe_pb2.ExpireSliceRequest(
task_id=task_id,
task_to_run_shard=15,
task_to_run_id=32, # slice #2
reason=rbe_pb2.ExpireSliceRequest.PERMISSION_DENIED,
details='Boo',
))
expire_slice_mock.assert_called_once_with(
task_to_run.task_to_run_key_from_parts(req_key, 15, 32),
task_result.State.EXPIRED,
)
errs = list(ereporter2.Error.query())
self.assertEqual(len(errs), 1)
self.assertEqual(
errs[0].to_dict(include=['category', 'message', 'params']), {
'category': u'PERMISSION_DENIED',
'message': u'Boo',
'params': {
u'slice_index': 2,
u'task_id': task_id,
},
})
@mock.patch('server.task_scheduler.expire_slice')
def test_expire_slice_bot_died(self, expire_slice_mock):
req_key = task_request.new_request_key()
task_id = task_pack.pack_result_summary_key(
task_pack.request_key_to_result_summary_key(req_key))
self.set_as_swarming_itself()
self.post_prpc(
'ExpireSlice',
rbe_pb2.ExpireSliceRequest(
task_id=task_id,
task_to_run_shard=15,
task_to_run_id=32, # slice #2
reason=rbe_pb2.ExpireSliceRequest.BOT_INTERNAL_ERROR,
details='Boo',
))
expire_slice_mock.assert_called_once_with(
task_to_run.task_to_run_key_from_parts(req_key, 15, 32),
task_result.State.BOT_DIED,
)
errs = list(ereporter2.Error.query())
self.assertEqual(len(errs), 1)
self.assertEqual(
errs[0].to_dict(include=['category', 'message', 'params']), {
'category': u'BOT_INTERNAL_ERROR',
'message': u'Boo',
'params': {
u'slice_index': 2,
u'task_id': task_id,
},
})
class SwarmingServicePrpcTest(PrpcTest):
def setUp(self):
super(SwarmingServicePrpcTest, self).setUp()
self.service = 'swarming.v2.Swarming'
self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(self.now)
self.mock_tq_tasks()
@parameterized.expand(['GetDetails', 'GetToken'])
def test_forbidden(self, rpc):
self.set_as_anonymous()
response = self.post_prpc(rpc, proto.empty_pb2.Empty(), expect_errors=True)
self.assertEqual(response.status, '403 Forbidden')
def test_details(self):
self.set_as_privileged_user()
expected = swarming_pb2.ServerDetails(
bot_version=bot_code.get_bot_version('https://testbed.example.com')[0],
server_version=utils.get_app_version(),
cas_viewer_server='https://test-cas-viewer-server.com',
)
response = self.post_prpc('GetDetails', proto.empty_pb2.Empty())
actual = swarming_pb2.ServerDetails()
_decode(response.body, actual)
self.assertEqual(expected, actual)
@mock.patch('server.bot_code.generate_bootstrap_token')
def test_get_token_ok(self, token_mock):
self.set_as_admin()
token = 'the_token'
token_mock.return_value = token
response = self.post_prpc('GetToken', proto.empty_pb2.Empty())
actual = swarming_pb2.BootstrapToken()
_decode(response.body, actual)
expected = swarming_pb2.BootstrapToken(bootstrap_token=token)
self.assertEqual(expected, actual)
def test_public_permissions(self):
self.set_as_anonymous()
expected = swarming_pb2.ClientPermissions(cancel_task=False,
cancel_tasks=False,
delete_bot=False,
delete_bots=False,
get_bootstrap_token=False,
get_configs=False,
put_configs=False,
terminate_bot=False)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest())
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_user_permissions(self):
self.set_as_user()
expected = swarming_pb2.ClientPermissions(cancel_task=False,
cancel_tasks=False,
delete_bot=False,
delete_bots=False,
get_bootstrap_token=False,
get_configs=False,
put_configs=False,
terminate_bot=False)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest())
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_user_permissions_with_bot_id(self):
self.mock_default_pool_acl([])
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.mock_auth_db([
auth.Permission('swarming.pools.deleteBot'),
auth.Permission('swarming.pools.listBots'),
auth.Permission('swarming.pools.terminateBot'),
])
expected = swarming_pb2.ClientPermissions(cancel_task=False,
cancel_tasks=False,
delete_bot=True,
delete_bots=False,
get_bootstrap_token=False,
list_bots=['default', 'template'],
get_configs=False,
put_configs=False,
terminate_bot=True)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest(bot_id='bot1'))
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_user_permissions_with_task_id(self):
self.mock_default_pool_acl([])
self.set_as_admin()
ntr = self._new_task_request_prpc(use_default_slice=True)
_, task_id = self._client_create_task_prpc(ntr)
self.set_as_user()
self.mock_auth_db([
auth.Permission('swarming.pools.cancelTask'),
auth.Permission('swarming.pools.listTasks'),
])
expected = swarming_pb2.ClientPermissions(
cancel_task=True,
cancel_tasks=False,
delete_bot=False,
delete_bots=False,
get_bootstrap_token=False,
list_tasks=['default', 'template'],
get_configs=False,
put_configs=False,
terminate_bot=False)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest(task_id=task_id))
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_user_permissions_with_pool_tags(self):
"""Asserts that permissions respond correctly to a user has pool
permissions.
"""
self.mock_default_pool_acl([])
# The user has realm permissions.
self.set_as_user()
self.mock_auth_db([
auth.Permission('swarming.pools.cancelTask'),
auth.Permission('swarming.pools.deleteBot'),
auth.Permission('swarming.pools.listBots'),
auth.Permission('swarming.pools.listTasks'),
])
expected = swarming_pb2.ClientPermissions(
cancel_task=False,
cancel_tasks=True,
delete_bot=False,
delete_bots=True,
get_bootstrap_token=False,
list_tasks=['default', 'template'],
list_bots=['default', 'template'],
get_configs=False,
put_configs=False,
terminate_bot=False)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc(
'GetPermissions',
swarming_pb2.PermissionsRequest(tags=['pool:default']))
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_privileged_user_permissions(self):
self.set_as_privileged_user()
expected = swarming_pb2.ClientPermissions(cancel_task=True,
cancel_tasks=False,
delete_bot=False,
delete_bots=False,
get_bootstrap_token=False,
get_configs=False,
put_configs=False,
terminate_bot=True)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest())
_decode(response.body, actual)
self.assertEqual(expected, actual)
def test_admin_permissions(self):
self.set_as_admin()
expected = swarming_pb2.ClientPermissions(cancel_task=True,
cancel_tasks=True,
delete_bot=True,
delete_bots=True,
get_bootstrap_token=True,
get_configs=False,
put_configs=False,
terminate_bot=True)
actual = swarming_pb2.ClientPermissions()
response = self.post_prpc('GetPermissions',
swarming_pb2.PermissionsRequest())
_decode(response.body, actual)
self.assertEqual(expected, actual)
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.FATAL)
unittest.main()