blob: eff52841c0c4c005e99ccdec3202709d7a1d267a [file] [log] [blame]
#!/usr/bin/env vpython
# coding: utf-8
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import base64
import datetime
import logging
import os
import random
import StringIO
import sys
import unittest
import zipfile
import mock
# Setups environment.
import test_env_handlers
from google.appengine.api import datastore_errors
from google.appengine.ext import ndb
from google.protobuf import json_format
import webapp2
import webtest
import handlers_bot
from components import auth
from components import ereporter2
from components import utils
from proto.api import plugin_pb2
from server import bot_archive
from server import bot_auth
from server import bot_code
from server import bot_groups_config
from server import bot_management
from server import external_scheduler
from server import pools_config
from server import service_accounts
from server import task_pack
from server import task_queues
def fmtdate(d):
"""Formats a datetime.datetime instance to the format generated by the API."""
return unicode(d.strftime('%Y-%m-%dT%H:%M:%S'))
class BotApiTest(test_env_handlers.AppTestBase):
# crbug.com/10569967
no_run = 1
def setUp(self):
super(BotApiTest, self).setUp()
# By default requests in tests are coming from bot with fake IP.
routes = handlers_bot.get_routes()
app = webapp2.WSGIApplication(routes, debug=True)
self.app = webtest.TestApp(
app,
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
self.mock(ereporter2, 'log_request', lambda *args, **kwargs: self.fail(
'%s, %s' % (args, kwargs)))
# Bot API test cases run by default as bot.
self.set_as_bot()
self._enqueue_task_orig = self.mock(utils, 'enqueue_task',
self._enqueue_task)
self._enqueue_task_async_orig = self.mock(utils, 'enqueue_task_async',
self._enqueue_task_async)
self.now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(self.now)
self.mock_default_pool_acl([])
def tearDown(self):
super(BotApiTest, self).tearDown()
mock.patch.stopall()
@ndb.non_transactional
def _enqueue_task(self, url, queue_name, **kwargs):
if queue_name == 'es-notify-tasks':
es_host = kwargs['params']['es_host']
proto = plugin_pb2.NotifyTasksRequest()
json_format.Parse(kwargs['params']['request_json'], proto)
return external_scheduler.notify_request_now(es_host, proto)
del kwargs
if queue_name in ('cancel-children-tasks', 'pubsub'):
return True
self.fail(url)
@ndb.non_transactional
def _enqueue_task_async(self, url, queue_name, **kwargs):
if queue_name == 'rebuild-task-cache':
# Call directly into it.
return task_queues.rebuild_task_cache_async(kwargs['payload'])
self.fail(url)
def mock_bot_group_config(self, **kwargs):
cfg = bot_groups_config.BotGroupConfig(**kwargs)
self.mock(bot_auth,
'validate_bot_id_and_fetch_config', lambda *args, **kwargs: cfg)
def test_handshake(self):
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = {
'dimensions': {
'id': ['id1'],
'pool': ['default'],
},
'state': {
u'running_time': 0,
u'sleep_streak': 0
},
'version': '1',
}
response = self.app.post_json(
'/swarming/api/v1/bot/handshake', params=params).json
self.assertEqual([
u'bot_group_cfg',
u'bot_group_cfg_version',
u'bot_version',
u'server_version',
], sorted(response))
self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
self.assertEqual('default', response['bot_group_cfg_version'])
self.assertEqual(64, len(response['bot_version']))
self.assertEqual(u'v1a', response['server_version'])
self.assertEqual([], errors)
def test_handshake_minimum(self):
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = {'dimensions': {'id': ['id1']}}
response = self.app.post_json('/swarming/api/v1/bot/handshake', params).json
self.assertEqual([
u'bot_group_cfg',
u'bot_group_cfg_version',
u'bot_version',
u'server_version',
], sorted(response))
self.assertEqual(64, len(response['bot_version']))
self.assertEqual(u'v1a', response['server_version'])
expected = [
'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/id1\n'
'Unexpected keys missing: [u\'state\', u\'version\']; '
'did you make a typo?',
]
self.assertEqual(expected, errors)
def test_handshake_duplicate_dimension_value(self):
# Test a specific failure mode where '<key>:<value>' is duplicate.
# This throws a BadValueError while trying to save a
# BotEvent.dimensions_flat, even if the bot is forcibly quarantined.
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = {
'dimensions': {
'id': ['id1'],
'pool': ['default', 'default'],
},
'state': {
u'running_time': 0,
u'sleep_streak': 0
},
'version': '1',
}
response = self.app.post_json(
'/swarming/api/v1/bot/handshake', params=params).json
self.assertEqual([
u'bot_group_cfg',
u'bot_group_cfg_version',
u'bot_version',
u'server_version',
], sorted(response))
self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
self.assertEqual('default', response['bot_group_cfg_version'])
self.assertEqual(64, len(response['bot_version']))
self.assertEqual(u'v1a', response['server_version'])
msg = (u'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/id1\n'
'Dimension values include duplication. key: pool, values: '
'[u\'default\', u\'default\']')
self.assertEqual([msg], errors)
def test_handshake_long_dimension(self):
# Test a specific failure mode where '<key>:<value>' is over 1500 bytes.
# This throws a BadValueError while trying to save a
# BotEvent.dimensions_flat, even if the bot is forcibly quarantined.
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = {
'dimensions': {
'id': ['id1'],
'pool': ['default'],
'a': ['b' * 1499],
},
'state': {
u'running_time': 0,
u'sleep_streak': 0
},
'version': '1',
}
response = self.app.post_json(
'/swarming/api/v1/bot/handshake', params=params).json
self.assertEqual([
u'bot_group_cfg',
u'bot_group_cfg_version',
u'bot_version',
u'server_version',
], sorted(response))
self.assertEqual({u'dimensions': {}}, response['bot_group_cfg'])
self.assertEqual('default', response['bot_group_cfg_version'])
self.assertEqual(64, len(response['bot_version']))
self.assertEqual(u'v1a', response['server_version'])
msg = (u'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/id1\n'
'Invalid dimension value. key: a, value: %s' % ('b' * 1499))
self.assertEqual([msg], errors)
def test_handshake_extra(self):
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = {
# Works with unknown items but logs an error. This permits catching
# typos.
'foo': 1,
'dimensions': {
'id': ['bot1'],
'pool': ['default'],
},
'state': {
'bar': 2,
'ip': '127.0.0.1',
'running_time': 1234.0,
'sleep_streak': 0,
},
'version': '123',
}
response = self.app.post_json(
'/swarming/api/v1/bot/handshake', params=params).json
self.assertEqual([
u'bot_group_cfg',
u'bot_group_cfg_version',
u'bot_version',
u'server_version',
], sorted(response))
self.assertEqual(64, len(response['bot_version']))
self.assertEqual(u'v1a', response['server_version'])
expected = [
u'Quarantined Bot\n'
u'https://test-swarming.appspot.com/restricted/bot/bot1\n'
u'Unexpected keys superfluous: [u\'foo\']; did you make a typo?',
]
self.assertEqual(expected, errors)
def test_poll_maintenance(self):
params = self.do_handshake()
params['state']['maintenance'] = 'very busy'
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': True,
}
self.assertEqual(expected, response)
# with non string maintenance message.
params['state']['maintenance'] = True
self.post_json('/swarming/api/v1/bot/poll', params, status=400)
def test_poll_bad_bot(self):
# If bot is not sending required keys but report right version, enforce
# sleeping.
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = self.do_handshake()
params.pop('state')
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'sleep',
u'quarantined': True,
}
self.assertTrue(response.pop(u'duration'))
self.assertEqual(expected, response)
expected = [
'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/bot1\n'
'Unexpected keys missing: [u\'state\']; '
'did you make a typo?',
]
self.assertEqual(expected, errors)
def test_poll_no_bot_id(self):
params = self.do_handshake()
params['dimensions'].pop('id')
response = self.post_json('/swarming/api/v1/bot/poll', params, status=403)
expected = {
u'text': u'Bot ID is not specified',
}
self.assertEqual(expected, response)
def test_poll_empty_dimension_value(self):
params = self.do_handshake()
params['dimensions']['empty_key'] = []
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response['quarantined'])
self.assertEqual([
'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/bot1\n'
'Dimension values should not be empty. key: empty_key'
], errors)
def test_poll_bad_version(self):
params = self.do_handshake()
latest_version = params['version']
params['version'] = 'badversion'
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'update',
u'version': latest_version,
}
self.assertEqual(expected, response)
def test_poll_bad_dimensions(self):
errors = []
def add_error(request, source, message):
self.assertTrue(request)
self.assertEqual('bot', source)
errors.append(message)
self.mock(ereporter2, 'log_request', add_error)
params = self.do_handshake()
params['dimensions']['foo'] = [['bar']] # list is invalid.
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': True,
}
self.assertEqual(expected, response)
# Quarantine message should be explicit.
msg = (u'Quarantined Bot\n'
'https://test-swarming.appspot.com/restricted/bot/bot1\n'
'Invalid dimension value. key: foo, value: [u\'bar\']')
self.assertEqual([msg], errors)
# Quarantine event should be registered, too.
expected_event = {
'authenticated_as': u'bot:whitelisted-ip',
# last valid dimensions should be used in BotEvent.
'dimensions': {
u'id': [u'bot1'],
u'pool': [u'default']
},
'event_type': u'request_sleep',
'external_ip': u'192.168.2.2',
'last_seen_ts': None,
'lease_expiration_ts': None,
'lease_id': None,
'leased_indefinitely': None,
'machine_lease': None,
'machine_type': None,
'maintenance_msg': None,
'message': u"Invalid dimension value. key: foo, value: [u'bar']",
'quarantined': True,
'state': {
u'bot_group_cfg_version': u'default',
u'running_time': 1234.0,
u'sleep_streak': 0,
u'started_ts': 1410990411.111,
},
'task_id': None,
'ts': self.now,
'version': self.bot_version,
}
events = [
e.to_dict() for e in bot_management.get_events_query('bot1', True)
]
self.assertEqual(events[0], expected_event)
# BotInfo should be changed to quarantined state, too.
bot_info = bot_management.get_info_key('bot1').get()
self.assertTrue(bot_info.quarantined)
# last valid dimensions should be kept in BotInfo.
self.assertEqual(bot_info.dimensions_flat, [u'id:bot1', u'pool:default'])
def test_poll_sleep(self):
# A bot polls, gets nothing.
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
# Sleep again
params['state']['sleep_streak'] += 1
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
def test_poll_update(self):
params = self.do_handshake()
old_version = params['version']
params['version'] = 'badversion'
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'update',
u'version': old_version,
}
self.assertEqual(expected, response)
def test_poll_bot_group_config_change(self):
params = self.do_handshake()
params['state']['bot_group_cfg_version'] = 'badversion'
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'bot_restart',
u'message': u'Restarting to pick up new bots.cfg config',
}
self.assertEqual(expected, response)
def test_poll_bot_group_config_change_with_quarantined_flag(self):
params = self.do_handshake()
params['state']['bot_group_cfg_version'] = 'badversion'
params['state']['quarantined'] = True
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'bot_restart',
u'message': u'Restarting to pick up new bots.cfg config',
}
self.assertEqual(expected, response)
def test_poll_task_raw(self):
# Successfully poll a task.
self.mock(random, 'getrandbits', lambda _: 0x88)
# A bot polls, gets a task, updates it, completes it.
params = self.do_handshake(do_first_poll=True)
# Enqueue a task.
self.set_as_user()
_, task_id = self.client_create_task_raw(
properties={u'relative_cwd': u'de/ep'})
self.assertEqual('0', task_id[-1])
# Convert TaskResultSummary reference to TaskRunResult.
task_id = task_id[:-1] + '1'
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'run',
u'manifest': {
u'bot_id': u'bot1',
u'bot_authenticated_as': u'bot:whitelisted-ip',
u'caches': [],
u'cipd_input': {
u'client_package': {
u'package_name': u'infra/tools/cipd/${platform}',
u'path': None,
u'version': u'git_revision:deadbeef',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'git_revision:deadbeef',
}],
u'server': u'https://pool.config.cipd.example.com',
},
u'command': [u'python', u'run_test.py'],
u'containment': {
u'lower_priority': True,
u'containment_type': 2,
u'limit_processes': 1000,
u'limit_total_committed_memory': 1024**3,
},
u'relative_cwd': u'de/ep',
u'dimensions': {
u'os': [u'Amiga'],
u'pool': [u'default'],
},
u'env': {},
u'env_prefixes': {},
u'grace_period': 30,
u'hard_timeout': 3600,
u'host': u'http://localhost:8080',
u'isolated': {
u'input': None,
u'namespace': u'default-gzip',
u'server': u'https://pool.config.isolate.example.com',
},
u'cas_input_root': None,
u'secret_bytes': None,
u'realm': {},
u'resultdb': None,
u'io_timeout': 1200,
u'outputs': [u'foo', u'path/to/foobar'],
u'service_accounts': {
u'system': {
u'service_account': u'none'
},
u'task': {
u'service_account': u'none'
},
},
u'task_id': task_id,
},
}
self.assertEqual(expected, response)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now))
self.assertEqual(expected, response)
def test_poll_task_with_bot_service_account(self):
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
_, task_id = self.client_create_task_raw(service_account='bot')
self.assertEqual('0', task_id[-1])
task_id = task_id[:-1] + '1'
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'run',
u'manifest': {
u'bot_id': u'bot1',
u'bot_authenticated_as': u'bot:whitelisted-ip',
u'caches': [],
u'cipd_input': {
u'client_package': {
u'package_name': u'infra/tools/cipd/${platform}',
u'path': None,
u'version': u'git_revision:deadbeef',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'git_revision:deadbeef',
}],
u'server': u'https://pool.config.cipd.example.com',
},
u'command': [u'python', u'run_test.py'],
u'containment': {
u'lower_priority': True,
u'containment_type': 2,
u'limit_processes': 1000,
u'limit_total_committed_memory': 1024**3,
},
u'relative_cwd': None,
u'dimensions': {
u'os': [u'Amiga'],
u'pool': [u'default'],
},
u'env': {},
u'env_prefixes': {},
u'grace_period': 30,
u'hard_timeout': 3600,
u'host': u'http://localhost:8080',
u'isolated': {
u'input': None,
u'namespace': u'default-gzip',
u'server': u'https://pool.config.isolate.example.com',
},
u'cas_input_root': None,
u'secret_bytes': None,
u'realm': {},
u'resultdb': None,
u'io_timeout': 1200,
u'outputs': [u'foo', u'path/to/foobar'],
u'service_accounts': {
u'system': {
u'service_account': u'none'
},
u'task': {
u'service_account': u'bot'
},
},
u'task_id': task_id,
},
}
self.assertEqual(expected, response)
def test_poll_task_with_caches(self):
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
_, task_id = self.client_create_task_raw({
'caches': [{
'name': 'git_infra',
'path': 'git_cache',
}],
})
self.assertEqual('0', task_id[-1])
task_id = task_id[:-1] + '1'
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'run',
u'manifest': {
u'bot_id': u'bot1',
u'bot_authenticated_as': u'bot:whitelisted-ip',
u'caches': [{
u'hint': '-1',
u'name': u'git_infra',
u'path': u'git_cache',
},],
u'cipd_input': {
u'client_package': {
u'package_name': u'infra/tools/cipd/${platform}',
u'path': None,
u'version': u'git_revision:deadbeef',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'git_revision:deadbeef',
}],
u'server': u'https://pool.config.cipd.example.com',
},
u'command': [u'python', u'run_test.py'],
u'containment': {
u'lower_priority': True,
u'containment_type': 2,
u'limit_processes': 1000,
u'limit_total_committed_memory': 1024**3,
},
u'relative_cwd': None,
u'dimensions': {
u'os': [u'Amiga'],
u'pool': [u'default'],
},
u'env': {},
u'env_prefixes': {},
u'grace_period': 30,
u'hard_timeout': 3600,
u'host': u'http://localhost:8080',
u'isolated': {
u'input': None,
u'namespace': u'default-gzip',
u'server': u'https://pool.config.isolate.example.com',
},
u'cas_input_root': None,
u'io_timeout': 1200,
u'outputs': [u'foo', u'path/to/foobar'],
u'secret_bytes': None,
u'realm': {},
u'resultdb': None,
u'service_accounts': {
u'system': {
u'service_account': u'none'
},
u'task': {
u'service_account': u'none'
},
},
u'task_id': task_id,
},
}
self.assertEqual(expected, response)
@ndb.tasklet
def _mock_create_invocation_async(self, _task_run_id, _realm):
raise ndb.Return('resultdb-update-token')
def test_poll_with_resultdb(self):
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
with mock.patch(
'server.resultdb.create_invocation_async',
mock.Mock(
side_effect=self._mock_create_invocation_async)) as mocked_call:
response, _ = self.client_create_task_raw(
resultdb={'enable': True}, realm='infra:try')
invocation_name = response['task_result']['resultdb_info']['invocation']
mocked_call.assert_called_once()
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'hostname': u'test-resultdb-server.com',
u'current_invocation': {
u'name': invocation_name,
u'update_token': u'resultdb-update-token',
}
}
self.assertEqual(expected, response['manifest']['resultdb'])
def test_poll_with_realm(self):
params = self.do_handshake(do_first_poll=True)
self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.set_as_user()
response, _ = self.client_create_task_raw(realm='test:task_realm')
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'name': u'test:task_realm',
}
self.assertEqual(expected, response['manifest']['realm'])
def test_poll_with_cas_input_root(self):
# fix task_id.
self.mock(random, 'getrandbits', lambda _: 0x88)
params = self.do_handshake(do_first_poll=True)
self.mock(auth, 'has_permission', lambda *_args, **_kwargs: True)
self.mock(service_accounts, 'has_token_server', lambda: True)
self.set_as_user()
_, task_id = self.client_create_task_cas_input_root()
task_id = task_id[:-1] + '1' # conver to run_id.
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'run',
u'manifest': {
u'bot_id': u'bot1',
u'bot_authenticated_as': u'bot:whitelisted-ip',
u'caches': [],
u'cipd_input': {
u'client_package': {
u'package_name': u'infra/tools/cipd/${platform}',
u'path': None,
u'version': u'git_revision:deadbeef',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'git_revision:deadbeef',
}],
u'server': u'https://pool.config.cipd.example.com',
},
u'command': [u'python', u'run_test.py'],
u'containment': {
u'lower_priority': True,
u'containment_type': 2,
u'limit_processes': 1000,
u'limit_total_committed_memory': 1024**3,
},
u'relative_cwd': None,
u'dimensions': {
u'os': [u'Amiga'],
u'pool': [u'default'],
},
u'env': {},
u'env_prefixes': {},
u'grace_period': 30,
u'hard_timeout': 3600,
u'host': u'http://localhost:8080',
u'isolated': None,
u'cas_input_root': {
u'cas_instance': u'projects/test/instances/default',
u'digest': {
u'hash': u'12345',
u'size_bytes': 1,
}
},
u'secret_bytes': None,
u'realm': {},
u'resultdb': None,
u'io_timeout': 1200,
u'outputs': [u'foo', u'path/to/foobar'],
u'service_accounts': {
u'system': {
u'service_account': u'none'
},
u'task': {
u'service_account': u'none'
},
},
u'task_id': task_id,
},
}
self.assertEqual(expected, response)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now))
self.assertEqual(expected, response)
def test_poll_conflicting_dimensions(self):
params = self.do_handshake()
self.assertEqual(params['dimensions']['pool'], ['default'])
self.mock_bot_group_config(
version='default',
owners=None,
auth=(bot_groups_config.BotAuth(
log_if_failed=False,
require_luci_machine_token=False,
require_service_account=None,
require_gce_vm_token=None,
ip_whitelist=None,
),),
dimensions={u'pool': [u'server-side']},
bot_config_script=None,
bot_config_script_rev=None,
bot_config_script_content=None,
system_service_account=None,
is_default=True)
# Bot sends 'default' pool, but server config defined it as 'server-side'.
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
def test_poll_extra_bot_config(self):
self.mock_bot_group_config(
version='default',
owners=None,
auth=(bot_groups_config.BotAuth(
log_if_failed=False,
require_luci_machine_token=False,
require_service_account=None,
require_gce_vm_token=None,
ip_whitelist=None,
),),
dimensions={},
bot_config_script='foo.py',
bot_config_script_rev='abcd',
bot_config_script_content='print("Hi");import sys; sys.exit(1)',
system_service_account=None,
is_default=True)
params = self.do_handshake()
self.assertEqual(u'print("Hi");import sys; sys.exit(1)',
params['bot_config'])
def test_poll_with_external_scheduler(self):
# Inject external scheduler config.
es_cfg = pools_config.ExternalSchedulerConfig(
address='qscheduler-test.example.com',
id='all',
dimensions=[],
all_dimensions=[],
any_dimensions=[],
enabled=True,
allow_es_fallback=False)
mock.patch('server.external_scheduler.config_for_bot').start(
).return_value = es_cfg
mock.patch('server.external_scheduler.config_for_task').start(
).return_value = es_cfg
# Inject prpc client.
es_client_mock = mock.Mock()
mock.patch('server.external_scheduler._get_client').start(
).return_value = es_client_mock
# Register bot.
params = self.do_handshake()
# Enqueue a task.
self.set_as_user()
resp, task_id = self.client_create_task_raw()
task = resp['request']
task_dimensions_flat = [
'%s:%s' % (d['key'], d['value'])
for d in task['task_slices'][0]['properties']['dimensions']
]
run_id = task_id[:-1] + '1'
self.set_as_bot()
bot_dimensions_flat = task_queues.bot_dimensions_to_flat(
params['dimensions'])
# First, no task assigned.
es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
'duration': mock.ANY,
'cmd': 'sleep',
'quarantined': False,
}
self.assertEqual(expected, response)
# Assert prpc calls.
es_client_mock.AssignTasks.assert_called_once()
es_client_mock.NotifyTasks.assert_called_once()
assign_req = es_client_mock.AssignTasks.call_args[0][0]
self.assertEqual(
assign_req,
plugin_pb2.AssignTasksRequest(
scheduler_id=u'all',
idle_bots=[
plugin_pb2.IdleBot(
bot_id=u'bot1', dimensions=bot_dimensions_flat),
],
time=assign_req.time,
))
notify_req = es_client_mock.NotifyTasks.call_args[0][0]
self.assertEqual(
notify_req,
plugin_pb2.NotifyTasksRequest(
scheduler_id=u'all',
notifications=[
plugin_pb2.NotifyTasksItem(
task=plugin_pb2.TaskSpec(
id=task_id,
tags=task['tags'],
slices=[
plugin_pb2.SliceSpec(
dimensions=task_dimensions_flat),
],
state=u'PENDING',
enqueued_time=notify_req.notifications[0].time,
),
time=notify_req.notifications[0].time,
),
],
))
es_client_mock.reset_mock()
# Second, one task assigned.
es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
assignments=[
plugin_pb2.TaskAssignment(
bot_id=u'bot1', task_id=task_id, slice_number=0),
])
es_client_mock.NotifyTasks.return_value = plugin_pb2.NotifyTasksResponse()
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertEqual(response['cmd'], u'run')
self.assertEqual(response['manifest']['task_id'], run_id)
# Assert prpc calls.
es_client_mock.AssignTasks.assert_called_once()
es_client_mock.NotifyTasks.assert_called_once()
notify_req = es_client_mock.NotifyTasks.call_args[0][0]
self.assertEqual(
notify_req,
plugin_pb2.NotifyTasksRequest(
scheduler_id=u'all',
notifications=[
plugin_pb2.NotifyTasksItem(
task=plugin_pb2.TaskSpec(
id=task_id,
tags=task['tags'],
slices=[
plugin_pb2.SliceSpec(
dimensions=task_dimensions_flat),
],
state=u'RUNNING',
bot_id=u'bot1',
enqueued_time=notify_req.notifications[0].time,
),
time=notify_req.notifications[0].time,
),
],
))
es_client_mock.reset_mock()
# Enqueue a task that has a parent task.
self.set_as_user()
_, child_task_id = self.client_create_task_raw(parent_task_id=run_id)
child_run_id = child_task_id[:-1] + '1'
self.set_as_bot()
es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
assignments=[
plugin_pb2.TaskAssignment(
bot_id=u'bot1', task_id=child_task_id, slice_number=0),
])
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertEqual(response['cmd'], u'run')
self.assertEqual(response['manifest']['task_id'], child_run_id)
# Enqueue a task that has a parent task and a grand parent task.
self.set_as_user()
_, grand_child_task_id = self.client_create_task_raw(
parent_task_id=child_run_id)
grand_child_run_id = grand_child_task_id[:-1] + '1'
self.set_as_bot()
es_client_mock.AssignTasks.return_value = plugin_pb2.AssignTasksResponse(
assignments=[
plugin_pb2.TaskAssignment(
bot_id=u'bot1', task_id=grand_child_task_id, slice_number=0),
])
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertEqual(response['cmd'], u'run')
self.assertEqual(response['manifest']['task_id'], grand_child_run_id)
# TODO(crbug.com/1131822): add a test case for _ensure_active_slice() with
# a task has multiple slices.
def test_complete_task_isolated(self):
# Successfully poll a task.
self.mock(random, 'getrandbits', lambda _: 0x88)
# A bot polls, gets a task, updates it, completes it.
params = self.do_handshake(do_first_poll=True)
# Enqueue a task.
self.set_as_user()
_, task_id = self.client_create_task_isolated()
self.assertEqual('0', task_id[-1])
# Convert TaskResultSummary reference to TaskRunResult.
task_id = task_id[:-1] + '1'
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
expected = {
u'cmd': u'run',
u'manifest': {
u'bot_id': u'bot1',
u'bot_authenticated_as': u'bot:whitelisted-ip',
u'caches': [],
u'cipd_input': {
u'client_package': {
u'package_name': u'infra/tools/cipd/${platform}',
u'path': None,
u'version': u'git_revision:deadbeef',
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'git_revision:deadbeef',
}],
u'server': u'https://pool.config.cipd.example.com',
},
u'command': ['python', '-c', 'print(1)'],
u'containment': {
u'lower_priority': True,
u'containment_type': 2,
u'limit_processes': 1000,
u'limit_total_committed_memory': 1024**3,
},
u'relative_cwd': None,
u'dimensions': {
u'os': [u'Amiga'],
u'pool': [u'default'],
},
u'env': {},
u'env_prefixes': {},
u'hard_timeout': 3600,
u'grace_period': 30,
u'host': u'http://localhost:8080',
u'isolated': {
u'input': u'0123456789012345678901234567890123456789',
u'server': u'http://localhost:1',
u'namespace': u'default-gzip',
},
u'cas_input_root': None,
u'secret_bytes': None,
u'realm': {},
u'resultdb': None,
u'io_timeout': 1200,
u'outputs': [u'foo', u'path/to/foobar'],
u'service_accounts': {
u'system': {
u'service_account': u'none'
},
u'task': {
u'service_account': u'none'
},
},
u'task_id': task_id,
},
}
self.assertEqual(expected, response)
# Complete the isolated task.
params = {
'cost_usd': 0.1,
'duration': 3.,
'bot_overhead': 0.1,
'exit_code': 0,
'id': 'bot1',
'isolated_stats': {
'download': {
'duration': 0.1,
'initial_number_items': 10,
'initial_size': 1000,
'items_cold': '',
'items_hot': '',
},
'upload': {
'duration': 0.1,
'items_cold': '',
'items_hot': '',
},
},
'output': base64.b64encode('Ahahah'),
'output_chunk_start': 0,
'outputs_ref': {
u'isolated': u'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
u'isolatedserver': u'http://localhost:1',
u'namespace': u'default-gzip',
},
'cipd_pins': {
u'client_package': {
u'package_name': u'infra/tools/cipd/windows-amd64',
u'version': u'deadbeef' * 5,
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'badc0fee' * 5,
}]
},
'task_id': task_id,
}
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': False, u'ok': True}, response)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
cipd_pins={
u'client_package': {
u'package_name': u'infra/tools/cipd/windows-amd64',
u'version': u'deadbeef' * 5,
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'badc0fee' * 5,
}]
},
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=3.0,
exit_code=u'0',
modified_ts=fmtdate(self.now),
outputs_ref={
u'isolated': u'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
u'isolatedserver': u'http://localhost:1',
u'namespace': u'default-gzip',
},
started_ts=fmtdate(self.now),
state=u'COMPLETED')
self.assertEqual(expected, response)
def test_complete_task_cas_output_root(self):
# Successfully poll a task.
self.mock(random, 'getrandbits', lambda _: 0x88)
# A bot polls, gets a task, updates it, completes it.
params = self.do_handshake(do_first_poll=True)
# Enqueue a task.
self.set_as_user()
_, task_id = self.client_create_task_cas_input_root()
self.assertEqual('0', task_id[-1])
# Convert TaskResultSummary reference to TaskRunResult.
task_id = task_id[:-1] + '1'
self.set_as_bot()
self.post_json('/swarming/api/v1/bot/poll', params)
# Complete the task.
params = {
'cost_usd': 0.1,
'duration': 3.,
'bot_overhead': 0.1,
'exit_code': 0,
'id': 'bot1',
'isolated_stats': {
'download': {
'duration': 0.1,
'initial_number_items': 10,
'initial_size': 1000,
'items_cold': '',
'items_hot': '',
},
'upload': {
'duration': 0.1,
'items_cold': '',
'items_hot': '',
},
},
'output': base64.b64encode('Ahahah'),
'output_chunk_start': 0,
'cas_output_root': {
'cas_instance': 'projects/test/instances/default',
'digest': {
'hash': '12345',
'size_bytes': 1,
}
},
'cipd_pins': {
'client_package': {
'package_name': 'infra/tools/cipd/windows-amd64',
'version': 'deadbeef' * 5,
},
'packages': [{
'package_name': 'rm',
'path': 'bin',
'version': 'badc0fee' * 5,
}]
},
'task_id': task_id,
}
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': False, u'ok': True}, response)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
cas_output_root={
'cas_instance': 'projects/test/instances/default',
'digest': {
'hash': '12345',
'size_bytes': '1',
}
},
cipd_pins={
'client_package': {
'package_name': 'infra/tools/cipd/windows-amd64',
'version': 'deadbeef' * 5,
},
'packages': [{
'package_name': 'rm',
'path': 'bin',
'version': 'badc0fee' * 5,
}]
},
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=3.0,
exit_code=u'0',
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'COMPLETED')
self.assertEqual(expected, response)
def test_bot_ereporter2_error(self):
# ereporter2's //client/utils/on_error.py traps unhandled exceptions
# automatically.
self.mock(random, 'getrandbits', lambda _: 0x88)
errors = []
self.mock(
ereporter2,
'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
# The bot fails somehow.
error_params = {
'v': 1,
'r': {
'args': ['a', 'b'],
'category': 'junk',
'cwd': '/root',
'duration': 0.1,
'endpoint': '/root',
'env': {
'a': 'b'
},
'exception_type': 'FooError',
'hostname': 'localhost',
'message': 'Something happened',
'method': 'GET',
'os': 'Amiga',
'params': {
'a': 123
},
'python_version': '2.7',
'request_id': '123',
'source': params['dimensions']['id'][0],
'source_ip': '127.0.0.1',
'stack': 'stack trace...',
'user': 'Joe',
'version': '12',
},
}
ereporter2_app = webtest.TestApp(
webapp2.WSGIApplication(ereporter2.get_frontend_routes(), debug=True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
response = ereporter2_app.post_json('/ereporter2/api/v1/on_error',
error_params)
expected = {
u'id': 1,
u'url': u'http://localhost/restricted/ereporter2/errors/1',
}
self.assertEqual(expected, response.json)
# A bot error currently does not result in permanent quarantine. It will
# eventually.
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
self.assertEqual([], errors)
def test_bot_event_error(self):
# Native bot error reporting.
self.mock(random, 'getrandbits', lambda _: 0x88)
errors = []
self.mock(
ereporter2,
'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
# The bot fails somehow.
params2 = self.do_handshake()
params2['event'] = 'bot_error'
params2['message'] = 'for the worst'
response = self.post_json('/swarming/api/v1/bot/event', params2)
self.assertEqual({}, response)
# A bot error currently does not result in permanent quarantine. It will
# eventually.
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertTrue(response.pop(u'duration'))
expected = {
u'cmd': u'sleep',
u'quarantined': False,
}
self.assertEqual(expected, response)
expected = [
{
'message': u'for the worst\n\n'
'https://test-swarming.appspot.com/restricted/bot/bot1',
'source': 'bot',
},
]
self.assertEqual(expected, [e[1] for e in errors])
def test_bot_event(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
params = self.do_handshake()
dimensions = params['dimensions']
for e in handlers_bot.BotEventHandler.ALLOWED_EVENTS:
if e == 'bot_error':
# This one is tested specifically since it also logs an error message.
continue
params['event'] = e
params['message'] = 'for the best'
response = self.post_json('/swarming/api/v1/bot/event', params)
self.assertEqual({}, response)
# TODO(maruel): Replace with client api to query last BotEvent.
actual = [
e.to_dict() for e in bot_management.get_events_query('bot1', True)
]
expected = [{
'authenticated_as': u'bot:whitelisted-ip',
'dimensions': dimensions,
'event_type': unicode(e),
'external_ip': u'192.168.2.2',
'last_seen_ts': None,
'lease_id': None,
'lease_expiration_ts': None,
'leased_indefinitely': None,
'machine_type': None,
'machine_lease': None,
'message': u'for the best',
'quarantined': False,
'maintenance_msg': None,
'state': {
u'bot_group_cfg_version': u'default',
u'running_time': 1234.0,
u'sleep_streak': 0,
u'started_ts': 1410990411.111,
},
'task_id': None,
'ts': self.now,
'version': self.bot_version,
}
for e in reversed(handlers_bot.BotEventHandler.ALLOWED_EVENTS)
if e != 'bot_error']
expected.append({
'authenticated_as': u'bot:whitelisted-ip',
'dimensions': dimensions,
'event_type': u'bot_connected',
'external_ip': u'192.168.2.2',
'last_seen_ts': None,
'lease_id': None,
'lease_expiration_ts': None,
'leased_indefinitely': None,
'machine_type': None,
'machine_lease': None,
'message': None,
'quarantined': False,
'maintenance_msg': None,
'state': {
u'running_time': 1234.0,
u'sleep_streak': 0,
u'started_ts': 1410990411.111,
},
'task_id': None,
'ts': self.now,
'version': u'123',
})
self.assertEqual(expected, actual)
def test_bot_event_bad_request(self):
params = self.do_handshake()
params['event'] = 'bot_log'
params['message'] = 'I have an invalid maintenance message.'
params['state']['maintenance'] = True # non-string maintenance message.
self.post_json('/swarming/api/v1/bot/event', params, status=400)
def test_task_complete(self):
# Runs a task up to completion.
self.mock(random, 'getrandbits', lambda _: 0x88)
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
self.client_create_task_raw(
properties=dict(command=['python', 'runtest.py']))
def _params(**kwargs):
out = {
'cost_usd': 0.1,
'duration': None,
'exit_code': None,
'id': 'bot1',
'output': None,
'output_chunk_start': 0,
'task_id': task_id,
}
out.update(**kwargs)
return out
def _cycle(params, expected, must_stop):
"""Cycles between bot update and user retrieving results."""
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': must_stop, u'ok': True}, response)
self.set_as_user()
self.assertEqual(expected, self.client_get_results(task_id))
# 1. Initial task update with no data.
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
task_id = response['manifest']['task_id']
params = _params()
response = self.post_json('/swarming/api/v1/bot/task_update', params)
self.assertEqual({u'must_stop': False, u'ok': True}, response)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
costs_usd=[0.1],
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now))
self.assertEqual(expected, response)
# 2. Task update with some output.
params = _params(output=base64.b64encode('Oh '))
self.assertEqual(expected, response)
_cycle(params, expected, False)
# 3. Task update with some more output.
params = _params(output=base64.b64encode('hi'), output_chunk_start=3)
_cycle(params, expected, False)
# 4. Task update with completion of the command.
params = _params(
duration=0.1, exit_code=23, output=base64.b64encode('Ahahah'))
expected = self.gen_run_result(
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'23',
failure=True,
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'COMPLETED')
_cycle(params, expected, False)
def test_task_update_db_failure(self):
# The error is caught in task_scheduler.bot_update_task().
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw(
properties=dict(command=['python', 'runtest.py']))
self.set_as_bot()
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
task_id = response['manifest']['task_id']
def r(*_):
raise datastore_errors.Timeout('Sorry!')
self.mock(ndb, 'put_multi', r)
params = {
'cost_usd': 0.1,
'duration': 0.1,
'exit_code': 0,
'id': 'bot1',
'output': base64.b64encode('result string'),
'output_chunk_start': 0,
'task_id': task_id,
}
response = self.post_json(
'/swarming/api/v1/bot/task_update', params, status=500)
self.assertEqual({u'error': u'Failed to update, please retry'}, response)
def test_task_update_failure(self):
self.set_as_bot()
self.bot_poll()
self.set_as_user()
self.client_create_task_raw(
properties=dict(command=['python', 'runtest.py']))
self.set_as_bot()
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
task_id = response['manifest']['task_id']
class NewError(Exception):
pass
def r(*_):
raise NewError('Sorry!')
self.mock(ndb, 'put_multi', r)
params = {
'bot_overhead': 0.,
'cost_usd': 0.1,
'duration': 0.1,
'exit_code': 0,
'id': 'bot1',
'isolated_stats': {
'download': {},
},
'output': base64.b64encode('result string'),
'output_chunk_start': 0,
'task_id': task_id,
}
response = self.post_json(
'/swarming/api/v1/bot/task_update', params, status=500)
self.assertEqual({u'error': u'Sorry!'}, response)
def test_task_failure(self):
self.mock(random, 'getrandbits', lambda _: 0x88)
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
task_id = response['manifest']['task_id']
params = {
'cost_usd': 0.1,
'duration': 0.1,
'exit_code': 1,
'id': 'bot1',
'output': base64.b64encode('result string'),
'output_chunk_start': 0,
'task_id': task_id,
}
self.post_json('/swarming/api/v1/bot/task_update', params)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'1',
failure=True,
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'COMPLETED')
self.assertEqual(expected, response)
def test_task_internal_failure(self):
# E.g. task_runner blew up.
self.mock(random, 'getrandbits', lambda _: 0x88)
errors = []
self.mock(
ereporter2,
'log_request', lambda *args, **kwargs: errors.append((args, kwargs)))
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
response = self.post_json('/swarming/api/v1/bot/poll', params)
task_id = response['manifest']['task_id']
# Let's say it failed to start task_runner because the new bot code is
# broken. The end result is still BOT_DIED. The big change is that it
# doesn't need to wait for a cron job to set this status.
params = {
'id': params['dimensions']['id'][0],
'message': 'Oh',
'task_id': task_id,
}
self.post_json('/swarming/api/v1/bot/task_error', params)
self.set_as_user()
response = self.client_get_results(task_id)
expected = self.gen_run_result(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
created_ts=fmtdate(self.now),
internal_failure=True,
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'BOT_DIED')
self.assertEqual(expected, response)
self.assertEqual(1, len(errors))
expected = [
{
'message':
u'Bot: https://test-swarming.appspot.com/restricted/bot/bot1\n'
'Task failed: '
'https://test-swarming.appspot.com/user/task/5cee488008811\nOh',
'source': 'bot',
},
]
self.assertEqual(expected, [e[1] for e in errors])
def test_task_canceled(self):
# Task was canceled while running, resulting in KILLED.
self.mock(random, 'getrandbits', lambda _: 0x88)
params = self.do_handshake(do_first_poll=True)
self.set_as_user()
self.client_create_task_raw()
self.set_as_bot()
params = self.do_handshake()
response = self.post_json('/swarming/api/v1/bot/poll', params)
self.assertEqual(u'run', response.get(u'cmd'))
task_id = response['manifest']['task_id']
# Cancel the task while it's running.
self.set_as_user()
response = self.client_cancel_task(task_id, True)
self.assertEqual({u'ok': True, u'was_running': True}, response)
# Now tagged with abandonned_ts, but state not yet updated.
expected = self.gen_run_result(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
created_ts=fmtdate(self.now),
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now))
response = self.client_get_results(task_id)
self.assertEqual(expected, response)
self.set_as_bot()
response = self.bot_complete_task(task_id=task_id)
self.assertEqual({u'must_stop': True, u'ok': True}, response)
self.set_as_user()
expected = self.gen_run_result(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'KILLED')
response = self.client_get_results(task_id)
self.assertEqual(expected, response)
# Canceling again is denied.
response = self.client_cancel_task(task_id, False)
self.assertEqual({u'ok': False, u'was_running': False}, response)
expected = self.gen_run_result(
abandoned_ts=fmtdate(self.now),
completed_ts=fmtdate(self.now),
costs_usd=[0.1],
created_ts=fmtdate(self.now),
duration=0.1,
exit_code=u'0',
modified_ts=fmtdate(self.now),
started_ts=fmtdate(self.now),
state=u'KILLED')
response = self.client_get_results(task_id)
self.assertEqual(expected, response)
def test_bot_code_as_bot(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
code = self.app.get('/swarming/api/v1/bot/bot_code/' + '0' * 64)
expected = {'config/bot_config.py',
'config/config.json'}.union(bot_archive.FILES)
with zipfile.ZipFile(StringIO.StringIO(code.body), 'r') as z:
self.assertEqual(expected, set(z.namelist()))
def test_bot_code_as_bot_query_string(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
self.app.get(
'/swarming/api/v1/bot/bot_code/' + '0' * 64 + '?bot_id=1', status=302)
def test_bot_code_without_token(self):
self.set_as_anonymous()
self.app.get('/bot_code', status=403)
def test_bot_code_with_token(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
self.set_as_anonymous()
tok = bot_code.generate_bootstrap_token()
self.app.get('/bot_code?tok=%s' % tok, status=302)
def test_bot_code_redirect(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
response = self.app.get('/bot_code')
self.assertEqual(response.status_int, 302) # Found
self.assertEqual(
response.location,
'http://localhost/swarming/api/v1/bot/bot_code/' + '0' * 64)
def test_bot_code_wrong_version(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None, 'rev1'))
response = self.app.get(
'/swarming/api/v1/bot/bot_code/' + '1' * 64,
headers={'X-Luci-Swarming-Bot-ID': 'bot1'})
self.assertEqual(response.status_int, 302) # Found
self.assertEqual(response.location, 'http://localhost/bot_code?bot_id=bot1')
def test_bot_code_wrong_version_without_bot_id(self):
self.mock(bot_code, 'get_bot_version', lambda _: ('0' * 64, None))
response = self.app.get('/swarming/api/v1/bot/bot_code/' + '1' * 64)
self.assertEqual(response.status_int, 302) # Found
self.assertEqual(response.location, 'http://localhost/bot_code')
def test_oauth_token_bad_scopes(self):
self.set_as_bot()
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'system',
'id': 'bot1',
'scopes': 'not a list of strings',
},
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual({u'error': u'"scopes" must be a list of strings'},
response.json)
def test_oauth_token_unknown_account_id(self):
self.set_as_bot()
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'blah',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
},
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual(
{u'error': u'Unknown "account_id", expecting "task" or "system"'},
response.json)
def test_oauth_token_task_account_without_task_id(self):
self.set_as_bot()
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'task',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
},
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual(
{u'error': u'"task_id" is required when using "account_id" == "task"'},
response.json)
def test_oauth_token_task_account(self):
calls = []
def mocked(task_id, bot_id, scopes):
calls.append((task_id, bot_id, scopes))
return 'blah@example.com', service_accounts.AccessToken('blah', 126240504)
self.mock(service_accounts, 'get_task_account_token', mocked)
self.mock_task_service_accounts()
self.mock_default_pool_acl(['blah@example.com'])
self.set_as_bot()
resp = self.bot_poll()
# Create a task.
self.set_as_user()
_, task_summary_id = self.client_create_task_raw(
service_account='blah@example.com')
self.assertEqual('0', task_summary_id[-1])
# Convert TaskResultSummary reference to TaskRunResult.
task_id = task_summary_id[:-1] + '1'
# Make the bot grab it.
self.set_as_bot()
resp = self.bot_poll()
self.assertEqual(u'bot1', resp['manifest']['bot_id'])
self.assertEqual(task_id, resp['manifest']['task_id'])
# Pass wrong task_id to oauth_token.
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'task',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
'task_id': task_id[:-1] + '2',
},
expect_errors=True)
self.assertEqual(400, response.status_code)
self.assertEqual(
{u'error': u'Wrong task_id: the bot is not executing this task'},
response.json)
self.assertFalse(calls)
# Pass correct task_id this time.
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'task',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
'task_id': task_id,
})
self.assertEqual(
{
u'access_token': u'blah',
u'expiry': 126240504,
u'service_account': u'blah@example.com',
}, response.json)
self.assertEqual([(task_id, 'bot1', [u'scope_a', u'scope_b'])], calls)
# Emulate fatal error.
def mocked(*_args, **_kwargs):
raise service_accounts.PermissionError('Fatal error')
self.mock(service_accounts, 'get_task_account_token', mocked)
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'task',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
'task_id': task_id,
},
expect_errors=True)
self.assertEqual(403, response.status_code)
self.assertEqual({u'error': u'Fatal error'}, response.json)
# Emulate transient error.
def mocked(*_args, **_kwargs):
raise service_accounts.InternalError('Transient error')
self.mock(service_accounts, 'get_task_account_token', mocked)
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'task',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
'task_id': task_id,
},
expect_errors=True)
self.assertEqual(500, response.status_code)
self.assertEqual({u'error': u'Transient error'}, response.json)
def test_oauth_token_system_account(self):
self.set_as_bot()
self.mock_bot_group_config(
version='default',
owners=None,
auth=(bot_groups_config.BotAuth(
log_if_failed=False,
require_luci_machine_token=False,
require_service_account=None,
require_gce_vm_token=None,
ip_whitelist=None,
),),
dimensions={},
bot_config_script=None,
bot_config_script_rev=None,
bot_config_script_content=None,
system_service_account='system@example.com',
is_default=True)
calls = []
def mocked(account, scopes):
calls.append((account, scopes))
return account, service_accounts.AccessToken('blah', 126240504)
self.mock(service_accounts, 'get_system_account_token', mocked)
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'system',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
}).json
self.assertEqual(
{
u'access_token': u'blah',
u'expiry': 126240504,
u'service_account': u'system@example.com',
}, response)
self.assertEqual([('system@example.com', [u'scope_a', u'scope_b'])], calls)
def test_oauth_token_system_account_none(self):
self.set_as_bot()
def mocked(account, scopes):
self.assertFalse(account)
self.assertEqual(['scope_a', 'scope_b'], scopes)
return 'none', None
self.mock(service_accounts, 'get_system_account_token', mocked)
response = self.app.post_json(
'/swarming/api/v1/bot/oauth_token',
params={
'account_id': 'system',
'id': 'bot1',
'scopes': ['scope_a', 'scope_b'],
}).json
self.assertEqual({u'service_account': u'none'}, response)
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL,
format='%(levelname)-7s %(filename)s:%(lineno)3d %(message)s')
unittest.main()