| #!/usr/bin/env vpython |
| # Copyright 2014 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import datetime |
| import logging |
| import os |
| import random |
| import sys |
| import unittest |
| |
| # Setups environment. |
| APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| sys.path.insert(0, APP_DIR) |
| import test_env_handlers |
| |
| import webtest |
| |
| from google.appengine.api import datastore_errors |
| from google.appengine.ext import ndb |
| |
| import handlers_backend |
| |
| from components import auth_testing |
| from components import utils |
| from test_support import test_case |
| |
| from proto.config import pools_pb2 |
| from server import bot_management |
| from server import pools_config |
| from server import task_queues |
| from server import task_request |
| from server import task_to_run |
| |
| # pylint: disable=W0212 |
| # Method could be a function - pylint: disable=R0201 |
| |
| |
| def _gen_cipd(**kwargs): |
| """Creates a CipdInputs.""" |
| args = { |
| u'client_package': |
| task_request.CipdPackage( |
| package_name=u'infra/tools/cipd/${platform}', |
| version=u'git_revision:deadbeef'), |
| u'packages': [ |
| task_request.CipdPackage( |
| package_name=u'rm', path=u'bin', |
| version=u'git_revision:deadbeef'), |
| ], |
| u'server': |
| u'https://chrome-infra-packages.appspot.com', |
| } |
| args.update(kwargs) |
| return task_request.CipdInput(**args) |
| |
| |
| def _gen_properties(**kwargs): |
| """Creates a TaskProperties.""" |
| args = { |
| u'cipd_input': |
| _gen_cipd(), |
| u'command': [u'command1', u'arg1'], |
| u'dimensions': { |
| u'OS': [u'Windows-3.1.1'], |
| u'hostname': [u'localhost'], |
| u'pool': [u'default'], |
| }, |
| u'env': { |
| u'foo': u'bar', |
| u'joe': u'2' |
| }, |
| u'env_prefixes': { |
| u'PATH': [u'local/path'] |
| }, |
| u'execution_timeout_secs': |
| 24 * 60 * 60, |
| u'grace_period_secs': |
| 30, |
| u'idempotent': |
| False, |
| u'io_timeout_secs': |
| None, |
| u'has_secret_bytes': |
| u'secret_bytes' in kwargs, |
| } |
| args.update(kwargs) |
| args[u'dimensions_data'] = args.pop(u'dimensions') |
| return task_request.TaskProperties(**args) |
| |
| |
| def _gen_request_slices(**kwargs): |
| """Creates a TaskRequest.""" |
| now = utils.utcnow() |
| args = { |
| 'created_ts': now, |
| 'manual_tags': [u'tag:1'], |
| 'name': u'Request name', |
| 'priority': 50, |
| 'user': u'Jesus', |
| 'scheduling_algorithm': pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO, |
| } |
| args.update(kwargs) |
| req = task_request.TaskRequest(**args) |
| task_request.init_new_request(req, True, task_request.TEMPLATE_AUTO) |
| return req |
| |
| |
| def _gen_request(properties=None, **kwargs): |
| """Creates a TaskRequest with one task slice.""" |
| return _gen_request_slices( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, properties=properties or _gen_properties()), |
| ], |
| **kwargs) |
| |
| |
| class TaskToRunApiTest(test_env_handlers.AppTestBase): |
| |
| def setUp(self): |
| super(TaskToRunApiTest, self).setUp() |
| self.now = datetime.datetime(2019, 1, 2, 3, 4, 5, 6) |
| self.mock_now(self.now) |
| auth_testing.mock_get_current_identity(self) |
| # Setup the backend to handle task queues. |
| self.app = webtest.TestApp( |
| handlers_backend.create_application(True), |
| extra_environ={ |
| 'REMOTE_ADDR': self.source_ip, |
| 'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'], |
| }) |
| self._enqueue_orig = self.mock(utils, 'enqueue_task_async', self._enqueue) |
| self.mock_pool_config('default') |
| |
| def _enqueue(self, *args, **kwargs): |
| return self._enqueue_orig(*args, use_dedicated_module=False, **kwargs) |
| |
| def _yield_next_available_task_to_dispatch(self, bot_dimensions): |
| bot_id = bot_dimensions[u'id'][0] |
| bot_management.bot_event(event_type='bot_connected', |
| bot_id=bot_id, |
| external_ip='1.2.3.4', |
| authenticated_as='joe@localhost', |
| dimensions=bot_dimensions, |
| state={'state': 'real'}, |
| version='1234', |
| register_dimensions=False) |
| task_queues.assert_bot(bot_dimensions) |
| self.execute_tasks() |
| queues = task_queues.freshen_up_queues(bot_id) |
| matcher = task_to_run.dimensions_matcher(bot_dimensions) |
| return [ |
| to_run.to_dict() |
| for to_run in task_to_run.yield_next_available_task_to_dispatch( |
| bot_id, 'pool-for-monitoring', queues, matcher, |
| utils.utcnow() + datetime.timedelta(minutes=1)) |
| ] |
| |
| def mkreq(self, req): |
| """Stores a new initialized TaskRequest.""" |
| # It is important that the task queue to be asserted. |
| task_queues.assert_task_async(req).get_result() |
| self.execute_tasks() |
| req.key = task_request.new_request_key() |
| req.put() |
| return req |
| |
| def _gen_new_task_to_run(self, **kwargs): |
| """Returns TaskRequest, TaskToRunShard saved in the DB.""" |
| request = self.mkreq(_gen_request(**kwargs)) |
| to_run = task_to_run.new_task_to_run(request, 0) |
| to_run.put() |
| return request, to_run |
| |
| def _gen_new_task_to_run_slices(self, **kwargs): |
| """Returns TaskRequest, TaskToRunShard saved in the DB.""" |
| request = self.mkreq(_gen_request_slices(**kwargs)) |
| to_run = task_to_run.new_task_to_run(request, 0) |
| to_run.put() |
| return request, to_run |
| |
| def mock_pool_config(self, name, scheduling_algorithm=None): |
| """Mocks up a pool with the given scheduling algorithm.""" |
| def mocked_get_pool_config(pool): |
| if pool == name: |
| return pools_config.init_pool_config( |
| name=name, |
| scheduling_algorithm=(scheduling_algorithm |
| or pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO), |
| ) |
| return None |
| self.mock(pools_config, 'get_pool_config', mocked_get_pool_config) |
| |
| def test_all_apis_are_tested(self): |
| actual = frozenset(i[5:] for i in dir(self) if i.startswith('test_')) |
| # Contains the list of all public APIs. |
| expected = frozenset( |
| i for i in dir(task_to_run) |
| if i[0] != '_' and hasattr(getattr(task_to_run, i), 'func_name')) |
| missing = expected - actual |
| self.assertFalse(missing) |
| |
| def test_task_to_run_key_to_request_key(self): |
| request = self.mkreq(_gen_request()) |
| task_key = task_to_run.request_to_task_to_run_key(request, 0) |
| actual = task_to_run.task_to_run_key_to_request_key(task_key) |
| self.assertEqual(request.key, actual) |
| |
| def test_request_to_task_to_run_key(self): |
| self.mock(random, 'getrandbits', lambda _: 0x88) |
| request = self.mkreq(_gen_request()) |
| shard = request.task_slice( |
| 0).properties.dimensions_hash % task_to_run.N_SHARDS |
| expected_kind = 'TaskToRunShard%d' % shard |
| # Ensures that the hash value is constant for the same input. |
| self.assertEqual( |
| ndb.Key('TaskRequest', 0x7bddaa9d777ff77e, expected_kind, 1), |
| task_to_run.request_to_task_to_run_key(request, 0)) |
| |
| def test_gen_queue_number(self): |
| # tuples of (input, expected). |
| # 0x3fc00000 is the priority mask. |
| data = [ |
| # Priorities. |
| ((1, '1970-01-01 00:00:00.000', 0), (0x92cc0300, 75)), |
| ((1, '1970-01-01 00:00:00.000', 1), (0x930c0300, 76)), |
| ((1, '1970-01-01 00:00:00.000', 2), (0x934c0300, 77)), |
| ((1, '1970-01-01 00:00:00.000', 3), (0x938c0300, 78)), |
| ((1, '1970-01-01 00:00:00.000', 255), (0xd28c0300, 330)), |
| # Largest hash. |
| ((0xffffffff, '1970-01-01 00:00:00.000', 255), (0x7fffffffd28c0300, |
| 330)), |
| # Time resolution. |
| ((1, '1970-01-01 00:00:00.040', 0), (0x92cc0300, 75)), |
| ((1, '1970-01-01 00:00:00.050', 0), (0x92cc0300, 75)), |
| ((1, '1970-01-01 00:00:00.100', 0), (0x92cc02ff, 75)), |
| ((1, '1970-01-01 00:00:00.900', 0), (0x92cc02f7, 75)), |
| ((1, '1970-01-01 00:00:01.000', 0), (0x92cc02f6, 75)), # 10 |
| ((1, '2010-01-02 03:04:05.060', 0), (0x92bd248d, 74)), |
| ((1, '2010-01-02 03:04:05.060', 1), (0x92fd248d, 75)), |
| ((1, '2010-12-31 23:59:59.999', 0), (0x80000000, 0)), |
| ((1, '2010-12-31 23:59:59.999', 1), (0x80400000, 1)), |
| ((1, '2010-12-31 23:59:59.999', 2), (0x80800000, 2)), |
| ((1, '2010-12-31 23:59:59.999', 255), (0xbfc00000, 255)), |
| # It's the end of the world as we know it... |
| ((1, '9998-12-31 23:59:59.999', 0), (0x80000000, 0)), |
| ((1, '9998-12-31 23:59:59.999', 1), (0x80400000, 1)), |
| ((1, '9998-12-31 23:59:59.999', 255), (0xbfc00000, 255)), |
| ] |
| for i, ((dimensions_hash, timestamp, priority), |
| (expected_v, expected_p)) in enumerate(data): |
| d = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f') |
| actual = task_to_run._gen_queue_number( |
| dimensions_hash, d, priority, |
| pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO) |
| self.assertEqual((i, '0x%016x' % expected_v), (i, '0x%016x' % actual)) |
| # Ensure we can extract the priority back. That said, it is corrupted by |
| # time. |
| v = task_to_run._TaskToRunBase(queue_number=actual) |
| self.assertEqual((i, expected_p), |
| (i, task_to_run._queue_number_priority(v))) |
| |
| def test_new_task_to_run(self): |
| self.mock(random, 'getrandbits', lambda _: 0x12) |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| data = _gen_request_slices( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=31, |
| properties=_gen_properties( |
| command=[u'command1', u'arg1'], |
| dimensions=request_dimensions, |
| env={u'foo': u'bar'}, |
| execution_timeout_secs=30)), |
| task_request.TaskSlice( |
| expiration_secs=30, |
| properties=_gen_properties( |
| command=[u'command2'], |
| dimensions=request_dimensions, |
| execution_timeout_secs=30)), |
| ], |
| priority=20, |
| created_ts=self.now) |
| request = self.mkreq(data) |
| # request.created_ts is used. |
| self.mock_now(self.now, 1) |
| expected = { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(seconds=31), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa66317bd248e', |
| 'task_slice_index': 0, |
| } |
| actual = task_to_run.new_task_to_run(request, 0).to_dict() |
| self.assertEqual(expected, actual) |
| # now is used when task_slice_index != 0. |
| expected['created_ts'] = self.now + datetime.timedelta(seconds=1) |
| expected['task_slice_index'] = 1 |
| expected['expiration_ts'] = self.now + datetime.timedelta( |
| minutes=1, seconds=1) |
| actual = task_to_run.new_task_to_run(request, 1).to_dict() |
| self.assertEqual(expected, actual) |
| |
| def test_new_task_to_run_limits(self): |
| # Generate a TaskRequest with eight TaskSlice. |
| slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(dimensions={ |
| u'pool': [u'default'], |
| u'v': [unicode(i)] |
| })) for i in range(8) |
| ] |
| request = self.mkreq(_gen_request_slices(task_slices=slices)) |
| task_to_run.new_task_to_run(request, 0) |
| task_to_run.new_task_to_run(request, 7) |
| with self.assertRaises(IndexError): |
| task_to_run.new_task_to_run(request, 8) |
| |
| def test_task_to_run_key_slice_index(self): |
| slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(dimensions={ |
| u'pool': [u'default'], |
| u'v': [unicode(i)] |
| })) for i in range(8) |
| ] |
| request = self.mkreq(_gen_request_slices(task_slices=slices)) |
| for i in range(len(slices)): |
| to_run = task_to_run.new_task_to_run(request, i) |
| self.assertEqual(i, to_run.task_slice_index) |
| self.assertEqual(i, task_to_run.task_to_run_key_slice_index(to_run.key)) |
| |
| def test_new_task_to_run_list(self): |
| self.mock(random, 'getrandbits', lambda _: 0x12) |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| data = _gen_request_slices( |
| priority=20, |
| created_ts=self.now, |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=31, |
| properties=_gen_properties( |
| command=[u'command1', u'arg1'], |
| dimensions=request_dimensions, |
| env={u'foo': u'bar'}, |
| execution_timeout_secs=30)), |
| ]) |
| request = self.mkreq(data) |
| task_to_run.new_task_to_run(request, 0).put() |
| |
| # Create a second with higher priority. |
| self.mock(random, 'getrandbits', lambda _: 0x23) |
| data = _gen_request_slices( |
| priority=10, |
| created_ts=self.now, |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=31, |
| properties=_gen_properties( |
| command=[u'command1', u'arg1'], |
| dimensions=request_dimensions, |
| env={u'foo': u'bar'}, |
| execution_timeout_secs=30)), |
| ]) |
| task_to_run.new_task_to_run(self.mkreq(data), 0).put() |
| |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(seconds=31), |
| 'expiration_delay': None, |
| 'request_key': '0x7bddaa9d777ffdce', |
| # Lower priority value means higher priority. |
| 'queue_number': '0x1a3aa663153d248e', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(seconds=31), |
| 'expiration_delay': None, |
| 'request_key': '0x7bddaa9d777ffede', |
| 'queue_number': '0x1a3aa66317bd248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| |
| def flatten(i): |
| out = i.to_dict() |
| out['request_key'] = '0x%016x' % i.request_key.integer_id() |
| return out |
| |
| # Warning: Ordering by key doesn't work because of TaskToRunShard; e.g. |
| # the entity key ordering DOES NOT correlate with .queue_number |
| # Ensure they come out in expected order. |
| actual = [] |
| for shard in range(task_to_run.N_SHARDS): |
| to_runs = task_to_run.get_shard_kind(shard).query().order( |
| task_to_run._TaskToRunBase.queue_number).fetch() |
| actual.extend(to_runs) |
| self.assertEqual(expected, map(flatten, actual)) |
| |
| def test_new_task_to_run_scheduling_algorithm(self): |
| self.mock_now(self.now, 60) |
| |
| # LIFO. |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(), |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO, |
| )) |
| shard = task_to_run.new_task_to_run(request, 0).put() |
| self.assertEqual(shard.get().queue_number, 0x4bef13b79f3d2236) |
| |
| # FIFO. |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(), |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO, |
| )) |
| shard = task_to_run.new_task_to_run(request, 0).put() |
| self.assertEqual(shard.get().queue_number, 0x4bef13b78c8ee0ca) |
| |
| # UNKNOWN is interpreted as FIFO. |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(), |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_UNKNOWN, |
| )) |
| shard = task_to_run.new_task_to_run(request, 0).put() |
| self.assertEqual(shard.get().queue_number, 0x4bef13b78c8ee0ca) |
| |
| def test_dimensions_matcher(self): |
| def match_dimensions(request_dimensions, bot_dimensions): |
| return task_to_run.dimensions_matcher(bot_dimensions)(request_dimensions) |
| |
| data_true = ( |
| ({}, {}), |
| ({}, { |
| 'a': 'b' |
| }), |
| ({ |
| 'a': ['b'] |
| }, { |
| 'a': ['b'] |
| }), |
| ({ |
| 'os': ['amiga'] |
| }, { |
| 'os': ['amiga', 'amiga-3.1'] |
| }), |
| ({ |
| 'os': ['amiga'], |
| 'foo': ['bar'] |
| }, { |
| 'os': ['amiga', 'amiga-3.1'], |
| 'a': 'b', |
| 'foo': 'bar' |
| }), |
| ({ |
| 'os': ['amiga', 'amiga-3.1'], |
| 'foo': ['bar'] |
| }, { |
| 'os': ['amiga', 'amiga-3.1'], |
| 'a': 'b', |
| 'foo': 'bar' |
| }), |
| ({ |
| 'os': ['amiga|amiga-3.1'], |
| 'foo': ['a|b', 'c'] |
| }, { |
| 'os': 'amiga', |
| 'foo': ['b', 'c'], |
| 'more': ['gsu'], |
| }), |
| ) |
| |
| for request_dimensions, bot_dimensions in data_true: |
| self.assertEqual(True, match_dimensions(request_dimensions, |
| bot_dimensions)) |
| |
| data_false = ( |
| ({ |
| 'os': ['amiga'] |
| }, { |
| 'os': ['Win', 'Win-3.1'] |
| }), |
| ({ |
| 'os': ['amiga'] |
| }, { |
| 'foo': 'bar' |
| }), |
| ({ |
| 'foo': ['a|b', 'c'], |
| }, { |
| 'foo': 'b' |
| }), |
| ) |
| for request_dimensions, bot_dimensions in data_false: |
| self.assertEqual(False, |
| match_dimensions(request_dimensions, bot_dimensions)) |
| |
| def test_yield_next_available_task_to_dispatch_none(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| # Bot declares no dimensions, so it will fail to match. |
| bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual([], actual) |
| |
| def test_yield_next_available_task_to_dispatch_none_mismatch(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| # Bot declares other dimensions, so it will fail to match. |
| bot_dimensions = { |
| u'id': [u'bot1'], |
| u'os': [u'Windows-3.0'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual([], actual) |
| |
| def test_yield_next_available_task_to_dispatch(self): |
| request_dimensions = { |
| u'foo': [u'bar'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| _request, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| # Bot declares exactly same dimensions so it matches. |
| bot_dimensions = request_dimensions.copy() |
| bot_dimensions[u'id'] = [u'bot1'] |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x613fbb331f3d248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_subset(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| _request, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| # Bot declares more dimensions than needed, this is fine and it matches. |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6631f3d248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_subset_multivalue(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| _request, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| # Bot declares more dimensions than needed. |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows', u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6631f3d248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_multi_normal(self): |
| # Task added one after the other, normal case. |
| request_dimensions_1 = { |
| u'foo': [u'bar'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| _request1, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions_1)) |
| |
| # It's normally time ordered. |
| self.mock_now(self.now, 1) |
| request_dimensions_2 = {u'id': [u'localhost'], u'pool': [u'default']} |
| _request2, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions_2)) |
| |
| bot_dimensions = { |
| u'foo': [u'bar'], |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x613fbb331f3d248e', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': |
| self.now + datetime.timedelta(seconds=1), |
| 'expiration_ts': |
| self.now + datetime.timedelta(minutes=1, seconds=1), |
| 'expiration_delay': |
| None, |
| 'queue_number': |
| '0x5385bf749f3d2484', |
| 'task_slice_index': |
| 0, |
| }, |
| ] |
| # There is a significant risk of non-determinism. |
| self.assertEqual(sorted(expected), sorted(actual)) |
| |
| def test_yield_next_available_task_to_dispatch_clock_skew(self): |
| # Asserts that a TaskToRunShard added later in the DB (with a Key with an |
| # higher value) but with a timestamp sooner (for example, time |
| # desynchronization between machines) is still returned in the timestamp |
| # order, e.g. priority is done based on timestamps and priority only. |
| request_dimensions_1 = { |
| u'foo': [u'bar'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| _request1, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions_1)) |
| |
| # The second shard is added before the first, potentially because of a |
| # desynchronized clock. It'll have lower priority. |
| self.mock_now(self.now, -1) |
| request_dimensions_2 = {u'id': [u'localhost'], u'pool': [u'default']} |
| _request2, _ = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions_2)) |
| |
| bot_dimensions = { |
| u'foo': [u'bar'], |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': |
| self.now + datetime.timedelta(seconds=-1), |
| # Due to time being late on the second requester frontend. |
| 'expiration_ts': |
| self.now + datetime.timedelta(minutes=1, seconds=-1), |
| 'expiration_delay': |
| None, |
| 'queue_number': |
| '0x5385bf749f3d2498', |
| 'task_slice_index': |
| 0, |
| }, |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x613fbb331f3d248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| # There is a significant risk of non-determinism. |
| self.assertEqual(expected, sorted(actual, key=lambda x: x['queue_number'])) |
| |
| def test_yield_next_available_task_to_dispatch_priority(self): |
| # Tasks added earlier but with higher priority are returned first. |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| self._gen_new_task_to_run( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=10) |
| |
| # This one is later but has lower priority. |
| self.mock_now(self.now, 60) |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50)) |
| task_to_run.new_task_to_run(request, 0).put() |
| |
| # It should return them all, in the expected order: highest priority |
| # (lowest priority value) first. |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa663153d248e', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_ts': self.now + datetime.timedelta(minutes=2), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6631f3d2236', |
| 'task_slice_index': 0, |
| }, |
| ] |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_fifo(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| self._gen_new_task_to_run( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50, |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO) |
| |
| self.mock_now(self.now, 60) |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50, |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO)) |
| task_to_run.new_task_to_run(request, 0).put() |
| |
| # It should return them all, in the expected order: first in, first out. |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6630c8ede72', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_ts': self.now + datetime.timedelta(minutes=2), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6630c8ee0ca', |
| 'task_slice_index': 0, |
| }, |
| ] |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_lifo(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| self._gen_new_task_to_run( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50, |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO) |
| |
| self.mock_now(self.now, 60) |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50, |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO)) |
| task_to_run.new_task_to_run(request, 0).put() |
| |
| # It should return them all, in the expected order: last in, first out. |
| expected = [ |
| { |
| 'created_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_ts': self.now + datetime.timedelta(minutes=2), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6631f3d2236', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa6631f3d248e', |
| 'task_slice_index': 0, |
| }, |
| ] |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_multi_priority(self): |
| # High priority tasks added earlier with other dimensions are returned |
| # first. |
| request_dimensions_1 = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| _request1, _ = self._gen_new_task_to_run( |
| properties=_gen_properties(dimensions=request_dimensions_1), |
| priority=10) |
| |
| # This one is later but has lower priority. |
| self.mock_now(self.now, 60) |
| request_dimensions_2 = {u'id': [u'localhost'], u'pool': [u'default']} |
| request2 = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions_2), |
| priority=50)) |
| task_to_run.new_task_to_run(request2, 0).put() |
| |
| # It should return them all, in the expected order: highest priority |
| # (lowest priority value) first. |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x1a3aa663153d248e', |
| 'task_slice_index': 0, |
| }, |
| { |
| 'created_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_ts': self.now + datetime.timedelta(minutes=2), |
| 'expiration_delay': None, |
| 'queue_number': '0x5385bf749f3d2236', |
| 'task_slice_index': 0, |
| }, |
| ] |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_run_task_terminate(self): |
| request_dimensions = {u'id': [u'fake-id']} |
| _request, task = self._gen_new_task_to_run( |
| priority=0, |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO, |
| properties=_gen_properties(cipd_input=None, |
| command=[], |
| dimensions=request_dimensions, |
| env=None, |
| env_prefixes=None, |
| execution_timeout_secs=0, |
| grace_period_secs=0)) |
| self.assertTrue( |
| task.key.parent().get().task_slice(0).properties.is_terminate) |
| # Bot declares exactly same dimensions so it matches. |
| bot_dimensions = request_dimensions.copy() |
| bot_dimensions[u'pool'] = [u'default'] |
| actual = self._yield_next_available_task_to_dispatch(bot_dimensions) |
| expected = [ |
| { |
| 'created_ts': self.now, |
| 'expiration_ts': self.now + datetime.timedelta(minutes=1), |
| 'expiration_delay': None, |
| 'queue_number': '0x54795e3c800ede72', |
| 'task_slice_index': 0, |
| }, |
| ] |
| self.assertEqual(expected, actual) |
| |
| def test_yield_next_available_task_to_dispatch_large_queue(self): |
| submitted = [] |
| |
| def submit_bunch(count, request_dimensions): |
| for _ in range(count): |
| self.mock_now(self.now, len(submitted)) |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50)) |
| ttr = task_to_run.new_task_to_run(request, 0) |
| ttr.put() |
| submitted.append(ttr.to_dict()) |
| |
| submit_bunch(51, {u'os': [u'Windows-3.1.1'], u'pool': [u'p1']}) |
| submit_bunch(11, {u'os': [u'Windows-3.1.1'], u'pool': [u'p2']}) |
| submit_bunch(9, {u'os': [u'Windows-3.1.1'], u'pool': [u'p3']}) |
| |
| # Got them all. |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'p1', u'p2', u'p3'], |
| } |
| collected = list( |
| self._yield_next_available_task_to_dispatch(bot_dimensions)) |
| self.assertEqual(len(submitted), len(collected)) |
| |
| # Same items. Ignore order, there are other tests for that. |
| submitted.sort(key=lambda ttr: ttr['created_ts']) |
| collected.sort(key=lambda ttr: ttr['created_ts']) |
| self.assertEqual(submitted, collected) |
| |
| def test_yield_next_available_task_checks_cache(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'p1']} |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'p1', u'p2', u'p3'], |
| } |
| |
| submitted = [] |
| |
| def submit_bunch(count, mark_as_claimed): |
| bunch = [] |
| for _ in range(count): |
| self.mock_now(self.now, len(submitted)) |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50)) |
| ttr = task_to_run.new_task_to_run(request, 0) |
| ttr.put() |
| submitted.append(ttr) |
| bunch.append(ttr.to_dict()) |
| if mark_as_claimed: |
| task_to_run.Claim.obtain(ttr.key) |
| return bunch |
| |
| available = [] |
| |
| available.extend(submit_bunch(7, False)) |
| submit_bunch(7, True) |
| available.extend(submit_bunch(7, False)) |
| |
| # Got only ones that weren't marked as consumed. |
| collected = list( |
| self._yield_next_available_task_to_dispatch(bot_dimensions)) |
| self.assertEqual(len(available), len(collected)) |
| |
| # Same items. Ignore order, there are other tests for that. |
| available.sort(key=lambda ttr: ttr['created_ts']) |
| collected.sort(key=lambda ttr: ttr['created_ts']) |
| self.assertEqual(available, collected) |
| |
| def test_yield_next_available_task_to_dispatch_deadline(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'p1']} |
| for _ in range(40): |
| request = self.mkreq( |
| _gen_request( |
| properties=_gen_properties(dimensions=request_dimensions), |
| priority=50)) |
| ttr = task_to_run.new_task_to_run(request, 0) |
| ttr.put() |
| |
| bot_id = u'localhost' |
| bot_dimensions = { |
| u'id': [bot_id], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'p1'], |
| } |
| task_queues.assert_bot(bot_dimensions) |
| self.execute_tasks() |
| queues = task_queues.freshen_up_queues(bot_id) |
| matcher = task_to_run.dimensions_matcher(bot_dimensions) |
| |
| seen = 0 |
| raised = False |
| try: |
| deadline = utils.utcnow() + datetime.timedelta(seconds=23) |
| for _ in task_to_run.yield_next_available_task_to_dispatch( |
| bot_id, 'pool-for-monitoring', queues, matcher, deadline): |
| seen += 1 |
| self.mock_now(self.now, seen) # 1 sec per iteration |
| except task_to_run.ScanDeadlineError: |
| raised = True |
| |
| # Gave up soon enough. |
| self.assertTrue(raised) |
| self.assertEqual(23, seen) |
| |
| def test_yield_expired_task_to_run(self): |
| # There's a cut off at 2019-09-01, so the default self.now on Jan 2nd |
| # doesn't work when looking 4 weeks ago. |
| self.now = datetime.datetime(2019, 10, 10, 3, 4, 5, 6) |
| self.mock_now(self.now, 0) |
| # task_to_run_1: still active |
| _, _to_run_1 = self._gen_new_task_to_run_slices( |
| created_ts=self.now, |
| task_slices=[{ |
| 'expiration_secs': 60, |
| 'properties': _gen_properties() |
| }]) |
| # task_to_run_2: just reached to the expiration time |
| _, to_run_2 = self._gen_new_task_to_run_slices( |
| created_ts=self.now - datetime.timedelta(seconds=61), |
| task_slices=[{ |
| 'expiration_secs': 60, |
| 'properties': _gen_properties() |
| }]) |
| # task_to_run_3: already passed the expiration time 1 day ago |
| _, to_run_3 = self._gen_new_task_to_run_slices( |
| created_ts=self.now - datetime.timedelta(days=1), |
| task_slices=[{ |
| 'expiration_secs': 60, |
| 'properties': _gen_properties() |
| }]) |
| # task_to_run_4: already passed the expiration time long time ago |
| _, _to_run_4 = self._gen_new_task_to_run_slices( |
| created_ts=self.now - datetime.timedelta(weeks=4), |
| task_slices=[{ |
| 'expiration_secs': 60, |
| 'properties': _gen_properties() |
| }]) |
| |
| bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']} |
| |
| self.assertEqual( |
| 0, len(self._yield_next_available_task_to_dispatch(bot_dimensions))) |
| |
| actual = list(task_to_run.yield_expired_task_to_run(0.0)) |
| |
| # Only to_run_2 and to_run_3 should be yielded. to_run_4 is too old and is |
| # ignored. |
| expected = [to_run_3, to_run_2] |
| self.assertEqual(expected, actual) |
| |
| def test_is_reapable(self): |
| request_dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'default']} |
| _, to_run = self._gen_new_task_to_run(properties=_gen_properties( |
| dimensions=request_dimensions)) |
| bot_dimensions = { |
| u'id': [u'localhost'], |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| self.assertEqual( |
| 1, len(self._yield_next_available_task_to_dispatch(bot_dimensions))) |
| |
| self.assertEqual(True, to_run.is_reapable) |
| to_run.consume(None) |
| to_run.put() |
| self.assertEqual(False, to_run.is_reapable) |
| |
| def test_claim(self): |
| request = self.mkreq(_gen_request()) |
| to_run = task_to_run.new_task_to_run(request, 0).key |
| self.assertFalse(task_to_run.Claim.check(to_run)) |
| with task_to_run.Claim.obtain(to_run): |
| self.assertTrue(task_to_run.Claim.check(to_run)) |
| self.assertFalse(task_to_run.Claim.check(to_run)) |
| |
| def test_pre_put_hook(self): |
| _, to_run = self._gen_new_task_to_run() |
| |
| # no error if expiration_ts and queue_number has values |
| to_run.put() |
| |
| # raise an error if expiration_ts is None, but queue_number has some value |
| to_run.expiration_ts = None |
| with self.assertRaises(datastore_errors.BadValueError): |
| to_run.put() |
| |
| # no error if both expiration_ts and queue_number is None |
| to_run.queue_number = None |
| to_run.put() |
| |
| def test_get_shard_kind(self): |
| for i in range(task_to_run.N_SHARDS): |
| k = task_to_run.get_shard_kind(i) |
| self.assertEqual(k.__name__, 'TaskToRunShard%d' % i) |
| self.assertTrue(issubclass(k, task_to_run._TaskToRunBase)) |
| # The next call should return the cached kind. |
| self.assertEqual(k, task_to_run.get_shard_kind(i)) |
| |
| with self.assertRaises(AssertionError): |
| task_to_run.get_shard_kind(task_to_run.N_SHARDS) |
| |
| def test_get_task_to_runs(self): |
| request = self.mkreq(_gen_request()) |
| to_run = task_to_run.new_task_to_run(request, 0) |
| to_run.put() |
| |
| actual = task_to_run.get_task_to_runs(request, 0) |
| expected = [to_run] |
| self.assertEqual(expected, actual) |
| |
| def test_task_to_run_key_from_parts(self): |
| request = self.mkreq(_gen_request()) |
| to_run = task_to_run.new_task_to_run(request, 0) |
| |
| from_parts = task_to_run.task_to_run_key_from_parts(request.key, |
| to_run.shard_index, |
| to_run.key.id()) |
| |
| self.assertEqual(from_parts, to_run.key) |
| |
| |
| if __name__ == '__main__': |
| if '-v' in sys.argv: |
| unittest.TestCase.maxDiff = None |
| logging.basicConfig( |
| level=logging.DEBUG if '-v' in sys.argv else logging.ERROR) |
| unittest.main() |