| #!/usr/bin/env vpython |
| # Copyright 2014 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| import base64 |
| import datetime |
| import json |
| import logging |
| import os |
| import random |
| import sys |
| import unittest |
| import uuid |
| |
| from parameterized import parameterized |
| import mock |
| |
| # Setups environment. |
| APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| sys.path.insert(0, APP_DIR) |
| import test_env_handlers |
| |
| from google.appengine.api import datastore_errors |
| from google.appengine.ext import ndb |
| |
| import webtest |
| |
| import handlers_backend |
| import ts_mon_metrics |
| |
| from components import auth |
| from components import auth_testing |
| from components import datastore_utils |
| from components import net |
| from components import pubsub |
| from components import utils |
| from components.auth.proto import delegation_pb2 |
| |
| from server import bot_management |
| from server import config |
| from server import external_scheduler |
| from server import pools_config |
| from server import rbe |
| from server import task_pack |
| from server import task_queues |
| from server import task_request |
| from server import task_result |
| from server import task_scheduler |
| from server import task_to_run |
| from server.task_result import State |
| |
| from proto.config import pools_pb2 |
| from proto.plugin import plugin_pb2 |
| |
| from bb.go.chromium.org.luci.buildbucket.proto import common_pb2 |
| from bb.go.chromium.org.luci.buildbucket.proto import task_pb2 |
| |
| |
| # pylint: disable=W0212,W0612 |
| |
| |
| def _gen_properties(**kwargs): |
| """Creates a TaskProperties.""" |
| args = { |
| 'command': [u'command1'], |
| 'dimensions': { |
| u'os': [u'Windows-3.1.1'], |
| u'pool': [u'default'] |
| }, |
| 'env': {}, |
| 'execution_timeout_secs': 24 * 60 * 60, |
| 'io_timeout_secs': None, |
| } |
| args.update(kwargs) |
| args['dimensions_data'] = args.pop('dimensions') |
| return task_request.TaskProperties(**args) |
| |
| |
| def _gen_request_slices(properties=None, **kwargs): |
| """Returns an initialized task_request.TaskRequest.""" |
| now = utils.utcnow() |
| args = { |
| # Don't be confused, this is not part of the API. This code is |
| # constructing a DB entity, not a swarming_rpcs.NewTaskRequest. |
| u'created_ts': |
| now, |
| u'manual_tags': [u'tag:1'], |
| u'name': |
| u'yay', |
| u'priority': |
| 50, |
| u'task_slices': [ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=properties or _gen_properties(), |
| wait_for_capacity=False), |
| ], |
| u'user': |
| u'Jesus', |
| u'bot_ping_tolerance_secs': |
| 120, |
| } |
| for k in kwargs: |
| if k == "build_task": |
| continue # skipping since that arg isn't one for TaskRequest. |
| args[k] = kwargs[k] |
| ret = task_request.TaskRequest(**args) |
| task_request.init_new_request(ret, True, task_request.TEMPLATE_AUTO) |
| return ret |
| |
| |
| def _gen_request_id(rid=None): |
| if rid is None: |
| return str(uuid.uuid4()) |
| return rid |
| |
| |
| def _gen_build_task(**kwargs): |
| return kwargs.get("build_task", None) |
| |
| |
| def _get_results(request_key): |
| """Fetches all task results for a specified TaskRequest ndb.Key. |
| |
| Returns: |
| tuple(TaskResultSummary, list of TaskRunResult that exist). |
| """ |
| result_summary_key = task_pack.request_key_to_result_summary_key(request_key) |
| result_summary = result_summary_key.get() |
| # There's two way to look at it, either use a DB query or fetch all the |
| # entities that could exist, at most 255. In general, there will be <3 |
| # entities so just fetching them by key would be faster. This function is |
| # exclusively used in unit tests so it's not performance critical. |
| q = task_result.TaskRunResult.query(ancestor=result_summary_key) |
| q = q.order(task_result.TaskRunResult.key) |
| return result_summary, q.fetch() |
| |
| |
| def _update_fields_schedule(**kwargs): |
| fields = { |
| u'pool': 'default', |
| u'spec_name': '', |
| u'status': State.to_string(State.COMPLETED), |
| u'device_type': '', |
| } |
| fields.update(kwargs) |
| return fields |
| |
| |
| def _update_fields_pubsub(**kwargs): |
| fields = { |
| u'pool': 'default', |
| u'status': State.to_string(State.COMPLETED), |
| } |
| fields.update(kwargs) |
| return fields |
| |
| |
| def _run_result_to_to_run_key(run_result): |
| """Returns a TaskToRunShard ndb.Key that was used to trigger |
| the TaskRunResult. |
| """ |
| return task_to_run.request_to_task_to_run_key(run_result.request_key.get(), |
| run_result.current_task_slice) |
| |
| |
| def _bot_update_task(run_result_key, **kwargs): |
| args = { |
| 'bot_id': 'localhost', |
| 'cas_output_root': None, |
| 'cipd_pins': None, |
| 'output': 'hi', |
| 'output_chunk_start': 0, |
| 'exit_code': None, |
| 'duration': None, |
| 'hard_timeout': False, |
| 'io_timeout': False, |
| 'cost_usd': 0.1, |
| 'performance_stats': None, |
| 'canceled': None, |
| } |
| args.update(kwargs) |
| return task_scheduler.bot_update_task(run_result_key, **args) |
| |
| |
| def _deadline(): |
| return utils.utcnow() + datetime.timedelta(seconds=60) |
| |
| |
| def _decode_tq_task_body(body): |
| return json.loads(base64.b64decode(body)) |
| |
| |
| class TaskSchedulerApiTest(test_env_handlers.AppTestBase): |
| |
| def setUp(self): |
| super(TaskSchedulerApiTest, self).setUp() |
| self.now = datetime.datetime(2014, 1, 2, 3, 4, 5, 6) |
| self.mock_now(self.now) |
| auth_testing.mock_get_current_identity(self) |
| # Setup the backend to handle task queues. |
| self.app = webtest.TestApp( |
| handlers_backend.create_application(True), |
| extra_environ={ |
| 'REMOTE_ADDR': self.source_ip, |
| 'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'], |
| }) |
| self._enqueue_orig = self.mock(utils, 'enqueue_task', self._enqueue) |
| self._enqueue_async_orig = self.mock(utils, 'enqueue_task_async', |
| self._enqueue_async) |
| self.mock(task_scheduler, '_route_to_go', lambda **_kwargs: False) |
| |
| # See mock_pub_sub() |
| self._pub_sub_mocked = False |
| self.publish_successful = True |
| self._random = 0x88 |
| self.mock(random, 'getrandbits', self._getrandbits) |
| self.bot_dimensions = { |
| u'foo': [u'bar'], |
| u'id': [u'localhost'], |
| u'os': [u'Windows', u'Windows-3.1.1'], |
| u'pool': [u'default'], |
| } |
| self._known_pools = None |
| self._last_registered_bot_dims = self.bot_dimensions.copy() |
| |
| def _enqueue(self, *args, **kwargs): |
| # Only then add use_dedicated_module as default False. |
| kwargs = kwargs.copy() |
| kwargs.setdefault('use_dedicated_module', False) |
| return self._enqueue_orig(*args, **kwargs) |
| |
| def _enqueue_async(self, *args, **kwargs): |
| # Only then add use_dedicated_module as default False. |
| kwargs = kwargs.copy() |
| kwargs.setdefault('use_dedicated_module', False) |
| return self._enqueue_async_orig(*args, **kwargs) |
| |
| def _getrandbits(self, bits): |
| self.assertEqual(16, bits) |
| self._random += 1 |
| return self._random |
| |
| def mock_pub_sub(self): |
| self.assertFalse(self._pub_sub_mocked) |
| self._pub_sub_mocked = True |
| calls = [] |
| |
| def pubsub_publish(**kwargs): |
| if not self.publish_successful: |
| e = net.Error('Fail', 404, json.dumps({'error': 'some error'})) |
| raise pubsub.TransientError(e) |
| calls.append(('directly', kwargs)) |
| |
| self.mock(pubsub, 'publish', pubsub_publish) |
| return calls |
| |
| def mock_enqueue_rbe_task(self): |
| calls = [] |
| |
| def mocked_enqueue_rbe_task(request, to_run): |
| calls.append((request, to_run)) |
| |
| self.mock(rbe, 'enqueue_rbe_task', mocked_enqueue_rbe_task) |
| return calls |
| |
| def mock_enqueue_rbe_cancel(self): |
| calls = [] |
| |
| def mocked_enqueue_rbe_cancel(request, to_run): |
| calls.append((request, to_run)) |
| |
| self.mock(rbe, 'enqueue_rbe_cancel', mocked_enqueue_rbe_cancel) |
| return calls |
| |
| def _gen_result_summary_pending(self, **kwargs): |
| """Returns the dict for a TaskResultSummary for a pending task.""" |
| expected = { |
| 'abandoned_ts': |
| None, |
| 'bot_dimensions': |
| None, |
| 'bot_id': |
| None, |
| 'bot_idle_since_ts': |
| None, |
| 'bot_version': |
| None, |
| 'bot_logs_cloud_project': |
| None, |
| 'cipd_pins': |
| None, |
| 'completed_ts': |
| None, |
| 'costs_usd': [], |
| 'cost_saved_usd': |
| None, |
| 'created_ts': |
| self.now, |
| 'current_task_slice': |
| 0, |
| 'deduped_from': |
| None, |
| 'duration': |
| None, |
| 'exit_code': |
| None, |
| 'expiration_delay': |
| None, |
| 'failure': |
| False, |
| 'internal_failure': |
| False, |
| 'missing_cas': [], |
| 'missing_cipd': [], |
| 'modified_ts': |
| self.now, |
| 'name': |
| u'yay', |
| 'priority': |
| 50, |
| 'request_authenticated': |
| auth_testing.DEFAULT_MOCKED_IDENTITY, |
| 'request_bot_id': |
| None, |
| 'request_pool': |
| u'default', |
| 'request_realm': |
| None, |
| 'cas_output_root': |
| None, |
| 'resultdb_info': |
| None, |
| 'server_versions': [u'v1a'], |
| 'started_ts': |
| None, |
| 'state': |
| State.PENDING, |
| 'tags': [ |
| u'authenticated:user:mocked@example.com', |
| u'os:Windows-3.1.1', |
| u'pool:default', |
| u'priority:50', |
| u'realm:none', |
| u'service_account:none', |
| u'swarming.pool.template:no_config', |
| u'tag:1', |
| u'user:Jesus', |
| ], |
| 'try_number': |
| None, |
| 'user': |
| u'Jesus', |
| } |
| expected.update(kwargs) |
| return expected |
| |
| def _gen_result_summary_reaped(self, **kwargs): |
| """Returns the dict for a TaskResultSummary for a pending task.""" |
| kwargs.setdefault(u'bot_dimensions', self.bot_dimensions.copy()) |
| kwargs.setdefault(u'bot_id', u'localhost') |
| kwargs.setdefault(u'bot_idle_since_ts', self.now) |
| kwargs.setdefault(u'bot_version', u'abc') |
| kwargs.setdefault(u'state', State.RUNNING) |
| kwargs.setdefault(u'try_number', 1) |
| return self._gen_result_summary_pending(**kwargs) |
| |
| def _gen_run_result(self, **kwargs): |
| expected = { |
| 'abandoned_ts': None, |
| 'bot_dimensions': self.bot_dimensions, |
| 'bot_id': u'localhost', |
| 'bot_idle_since_ts': self.now, |
| 'bot_version': u'abc', |
| 'bot_logs_cloud_project': None, |
| 'cas_output_root': None, |
| 'cipd_pins': None, |
| 'completed_ts': None, |
| 'cost_usd': 0., |
| 'current_task_slice': 0, |
| 'dead_after_ts': None, |
| 'duration': None, |
| 'exit_code': None, |
| 'failure': False, |
| 'internal_failure': False, |
| 'killing': None, |
| 'missing_cas': [], |
| 'missing_cipd': [], |
| 'modified_ts': self.now, |
| 'resultdb_info': None, |
| 'server_versions': [u'v1a'], |
| 'started_ts': self.now, |
| 'state': State.RUNNING, |
| 'try_number': 1, |
| } |
| expected.update(**kwargs) |
| return expected |
| |
| def _assert_buildbucket_update_status(self, pubsub_msg, expected_status): |
| # Checks that the BuildTaskUpdate message sent in pubsub_msg has the |
| # expected status provided. |
| result = task_pb2.BuildTaskUpdate() |
| result.ParseFromString(pubsub_msg) |
| self.assertEqual(result.task.status, expected_status) |
| |
| def _quick_schedule(self, request_id=None, secret_bytes=None, **kwargs): |
| """Schedules a task. |
| |
| Arguments: |
| kwargs: passed to _gen_request_slices(). |
| """ |
| self.execute_tasks() |
| request = _gen_request_slices(**kwargs) |
| request_id = _gen_request_id(request_id) |
| build_task = _gen_build_task(**kwargs) |
| result_summary = task_scheduler.schedule_request(request, |
| request_id, |
| secret_bytes=secret_bytes, |
| build_task=build_task) |
| # State will be either PENDING or COMPLETED (for deduped task) |
| self.execute_tasks() |
| self.assertEqual(0, self.execute_tasks()) |
| return result_summary |
| |
| def _register_bot(self, bot_dimensions): |
| """Registers the bot so the task queues knows there's a worker than can run |
| the task. |
| |
| Arguments: |
| bot_dimensions: bot dimensions to assert. |
| """ |
| self.assertEqual(0, self.execute_tasks()) |
| bot_id = bot_dimensions[u'id'][0] |
| bot_management.bot_event(event_type='request_sleep', |
| bot_id=bot_id, |
| external_ip='1.2.3.4', |
| authenticated_as='joe@localhost', |
| dimensions=bot_dimensions, |
| state={'state': 'real'}, |
| version='1234', |
| register_dimensions=True) |
| task_queues.assert_bot(bot_dimensions) |
| self.execute_tasks() |
| self._last_registered_bot_dims = bot_dimensions.copy() |
| |
| def _bot_reap_task(self, bot_dimensions=None, version=None): |
| bot_dimensions = bot_dimensions or self._last_registered_bot_dims |
| bot_id = bot_dimensions['id'][0] |
| queues = task_queues.freshen_up_queues(bot_id) |
| bot_details = task_scheduler.BotDetails(version or 'abc', None) |
| return task_scheduler.bot_reap_task(bot_dimensions, queues, bot_details, |
| _deadline()) |
| |
| def _quick_reap(self, **kwargs): |
| """Makes sure the bot is registered and have it reap a task.""" |
| self._register_bot(self.bot_dimensions) |
| self._quick_schedule(**kwargs) |
| |
| reaped_request, _, run_result = self._bot_reap_task() |
| |
| queued_tasks = 0 |
| # Reaping causes a pubsub task if pubsub is specified. |
| if 'pubsub_topic' in kwargs: |
| self.assertEqual(1, len(self._taskqueue_stub.GetTasks('pubsub'))) |
| queued_tasks += 1 |
| |
| if kwargs.get('has_build_task', False): |
| self.assertEqual(1, |
| len(self._taskqueue_stub.GetTasks('buildbucket-notify'))) |
| queued_tasks += 1 |
| |
| self.assertEqual(queued_tasks, self.execute_tasks()) |
| return run_result |
| |
| def _cancel_running_task(self, run_result): |
| """Cancels running task""" |
| canceled, was_running = task_scheduler.cancel_task(run_result.request, |
| run_result.key, True, |
| run_result.bot_id) |
| self.assertTrue(canceled) |
| self.assertTrue(was_running) |
| self.execute_tasks() |
| run_result = run_result.key.get() |
| self.assertTrue(run_result.killing) |
| |
| def test_all_apis_are_tested(self): |
| # Ensures there's a test for each public API. |
| # TODO(maruel): Remove this once coverage is asserted. |
| module = task_scheduler |
| expected = set( |
| i for i in dir(module) |
| if i[0] != '_' and hasattr(getattr(module, i), 'func_name')) |
| missing = expected - set(i[5:] for i in dir(self) if i.startswith('test_')) |
| self.assertFalse(missing) |
| |
| def test_bot_reap_task(self): |
| # Essentially check _quick_reap() works. |
| run_result = self._quick_reap() |
| self.assertEqual('localhost', run_result.bot_id) |
| self.assertEqual(1, run_result.try_number) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| run_result.request_key.get(), 0) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| |
| def test_bot_reap_build_task(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap( |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual('localhost', run_result.bot_id) |
| self.assertEqual(1, run_result.try_number) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| run_result.request_key.get(), 0) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| self.assertEqual(1, len(pub_sub_calls)) |
| self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic") |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.STARTED) |
| |
| @parameterized.expand([ |
| ({ |
| u'pool': [u'default'], |
| u'os': [u'Windows-3.1.1|Windows-3.2.1'], |
| },), |
| ({ |
| u'pool': [u'default'], |
| u'os': [u'Windows-3.1.1|Windows-3.2.1'], |
| u'foo': [u'bar|A|B|C'], |
| },), |
| ]) |
| def test_bot_reap_task_or_dimensions(self, or_dimensions): |
| run_result = self._quick_reap( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(dimensions=or_dimensions), |
| wait_for_capacity=False, |
| ) |
| ]) |
| |
| self.assertEqual('localhost', run_result.bot_id) |
| self.assertEqual(1, run_result.try_number) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| run_result.request_key.get(), 0) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| |
| def test_bot_reap_tasks_using_or_dimensions(self): |
| # The fist part of this case checks for bot -> task |
| # Register a bot first |
| bot1_dimensions = self.bot_dimensions.copy() |
| bot1_dimensions[u'id'] = [u'bot1'] |
| bot1_dimensions[u'os'] = [u'v1', u'v2'] |
| bot1_dimensions[u'gpu'] = [u'nv', u'sega'] |
| self._register_bot(bot1_dimensions) |
| # Then send a request |
| task_slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'v1|v2'], |
| u'gpu': [u'sega', u'amd|nv'], |
| }), |
| wait_for_capacity=False), |
| ] |
| self._quick_schedule(task_slices=task_slices) |
| _, _, run_result = self._bot_reap_task() |
| self.assertEqual(u'bot1', run_result.bot_id) |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task( |
| run_result.key, bot_id=u'bot1', exit_code=0, duration=0.1)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| # The second part checks for task -> bot |
| # Send an identical request |
| self._quick_schedule(task_slices=task_slices) |
| # Then register another bot |
| bot2_dimensions = self.bot_dimensions.copy() |
| bot2_dimensions[u'id'] = [u'bot2'] |
| bot2_dimensions[u'os'] = [u'v2'] |
| bot2_dimensions[u'gpu'] = [u'amd', u'sega'] |
| self._register_bot(bot2_dimensions) |
| |
| _, _, run_result = self._bot_reap_task() |
| self.assertEqual(u'bot2', run_result.bot_id) |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task( |
| run_result.key, bot_id=u'bot2', exit_code=0, duration=0.1)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| @parameterized.expand([(0,), (1,)]) |
| def test_either_bot_reap_tasks_using_or_dimensions(self, bi): |
| bot1_dimensions = self.bot_dimensions.copy() |
| bot1_dimensions[u'id'] = [u'bot1'] |
| bot1_dimensions[u'os'] = [u'v1', u'v2'] |
| bot1_dimensions[u'gpu'] = [u'nv', u'sega'] |
| self._register_bot(bot1_dimensions) |
| |
| bot2_dimensions = self.bot_dimensions.copy() |
| bot2_dimensions[u'id'] = [u'bot2'] |
| bot2_dimensions[u'os'] = [u'v2'] |
| bot2_dimensions[u'gpu'] = [u'amd', u'sega'] |
| self._register_bot(bot2_dimensions) |
| |
| task_slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'v1|v2'], |
| u'gpu': [u'sega', u'amd|nv'], |
| }), |
| wait_for_capacity=False), |
| ] |
| self._quick_schedule(task_slices=task_slices) |
| |
| if bi == 0: |
| test_bot_id = u'bot1' |
| test_bot_dimensions = bot1_dimensions |
| else: |
| test_bot_id = u'bot2' |
| test_bot_dimensions = bot2_dimensions |
| |
| _, _, run_result = self._bot_reap_task(test_bot_dimensions) |
| self.assertEqual(test_bot_id, run_result.bot_id) |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task( |
| run_result.key, bot_id=test_bot_id, exit_code=0, duration=0.1)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| def test_bot_claim_slice(self): |
| self.mock_enqueue_rbe_task() |
| |
| result_summary = self._quick_schedule( |
| secret_bytes=task_request.SecretBytes(secret_bytes='blob'), |
| properties=_gen_properties(has_secret_bytes=True), |
| rbe_instance='some-instance', |
| ) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| |
| self._register_bot(self.bot_dimensions) |
| |
| def claim(claim_id): |
| return task_scheduler.bot_claim_slice( |
| self.bot_dimensions, |
| task_scheduler.BotDetails(None, None), |
| to_run_key, |
| claim_id, |
| ) |
| |
| def claim_txn(claim_id): |
| request = to_run.request_key.get() |
| run_result, secret_bytes = task_scheduler._reap_task( |
| self.bot_dimensions, task_scheduler.BotDetails(None, None), |
| to_run_key, request, claim_id, 3, False) |
| return request, secret_bytes, run_result |
| |
| # The first call actually makes the change. |
| request, secret_bytes, run_result = claim('some-claim-id') |
| self.assertEqual(request.key, result_summary.request_key) |
| self.assertEqual(secret_bytes.secret_bytes, 'blob') |
| self.assertEqual(run_result.bot_dimensions, self.bot_dimensions) |
| |
| # TaskToRun is updated. |
| to_run = to_run_key.get() |
| self.assertFalse(to_run.is_reapable) |
| self.assertEqual(to_run.claim_id, 'some-claim-id') |
| |
| # The second call with the same claim ID is noop. |
| request_dup, secret_bytes_dup, run_result_dup = claim('some-claim-id') |
| self.assertEqual(request_dup, request) |
| self.assertEqual(secret_bytes_dup, secret_bytes) |
| self.assertEqual(run_result_dup, run_result) |
| |
| # If _reap_task is called due to a race, it notices the claim as well. |
| request_dup, secret_bytes_dup, run_result_dup = claim_txn('some-claim-id') |
| self.assertEqual(request_dup, request) |
| self.assertEqual(secret_bytes_dup, secret_bytes) |
| self.assertEqual(run_result_dup, run_result) |
| |
| # A call with a different claim ID results in an error. |
| with self.assertRaises(task_scheduler.ClaimError): |
| claim('another-claim-id') |
| with self.assertRaises(task_scheduler.ClaimError): |
| claim_txn('another-claim-id') |
| |
| def test_bot_claim_slice_dimensions_mismatch(self): |
| self.mock_enqueue_rbe_task() |
| |
| result_summary = self._quick_schedule(rbe_instance='some-instance') |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| |
| bot_dimensions = self.bot_dimensions.copy() |
| bot_dimensions['pool'] = ['another'] |
| self._register_bot(bot_dimensions) |
| |
| with self.assertRaises(task_scheduler.ClaimError): |
| task_scheduler.bot_claim_slice( |
| bot_dimensions, |
| task_scheduler.BotDetails(None, None), |
| to_run_key, |
| 'some-claim-id', |
| ) |
| |
| def test_schedule_request(self): |
| # It is tested indirectly in the other functions. |
| # Essentially check _quick_schedule() and _register_bot() works. |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule() |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertTrue(to_run_key.get().queue_number) |
| self.assertEqual(State.PENDING, result_summary.state) |
| status = State.to_string(State.PENDING) |
| self.assertIsNone( |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, http_status_code=200))) |
| |
| def test_schedule_request_task_id_collision(self): |
| rbe_enqueues = self.mock_enqueue_rbe_task() |
| |
| self._register_bot(self.bot_dimensions) |
| |
| key1 = task_request.new_request_key() |
| key2 = task_request.new_request_key() |
| self.assertNotEqual(key1, key2) |
| |
| keys = [] |
| self.mock(task_request, 'new_request_key', lambda: keys.pop(0)) |
| |
| # Creates the request without any collisions. |
| keys[:] = [key1] |
| result_summary_1 = self._quick_schedule(rbe_instance='some-instance') |
| self.assertEqual(key1, result_summary_1.request_key) |
| self.assertFalse(keys) |
| del rbe_enqueues[:] |
| |
| # Collides on the existing key, but switches to the next one. |
| keys[:] = [key1, key2] |
| result_summary_2 = self._quick_schedule(rbe_instance='some-instance') |
| self.assertEqual(key2, result_summary_2.request_key) |
| self.assertFalse(keys) |
| |
| # Submitted the correct RBE reservation (using key2 as the base task ID). |
| self.assertEqual(1, len(rbe_enqueues)) |
| _, ttr = rbe_enqueues[0] |
| self.assertEqual(ttr.rbe_reservation, rbe.gen_rbe_reservation_id(key2, 0)) |
| |
| def test_schedule_request_no_capacity(self): |
| # No capacity, denied. That's the default. |
| pub_sub_calls = self.mock_pub_sub() |
| request = _gen_request_slices(pubsub_topic='projects/abc/topics/def', |
| created_ts=(self.now - |
| datetime.timedelta(seconds=1))) |
| result_summary = task_scheduler.schedule_request(request) |
| self.assertEqual(State.NO_RESOURCE, result_summary.state) |
| self.execute_tasks() |
| expected = [ |
| ( |
| 'directly', |
| { |
| 'attributes': None, |
| 'message': '{"task_id":"1d69b9f088008910"}', |
| 'topic': u'projects/abc/topics/def', |
| }, |
| ), |
| ] |
| self.assertEqual(expected, pub_sub_calls) |
| status = State.to_string(State.NO_RESOURCE) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_schedule_request_no_capacity_backend_task(self): |
| # No capacity, denied. That's the default. |
| pub_sub_calls = self.mock_pub_sub() |
| self.mock(utils, "time_time_ns", lambda: 12345678) |
| request = _gen_request_slices(pubsub_topic='projects/abc/topics/def', |
| has_build_task=True, |
| created_ts=(self.now - |
| datetime.timedelta(seconds=1))) |
| result_summary = task_scheduler.schedule_request( |
| request, |
| request_id="1234", |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual(State.NO_RESOURCE, result_summary.state) |
| self.execute_tasks() |
| |
| self.assertEqual(2, len(pub_sub_calls)) |
| self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic") |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.INFRA_FAILURE) |
| self.assertEqual(pub_sub_calls[1][1]["topic"], "projects/abc/topics/def") |
| status = State.to_string(State.NO_RESOURCE) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_schedule_request_no_check_capacity(self): |
| # No capacity, but check disabled, allowed. |
| request = _gen_request_slices(task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(), |
| wait_for_capacity=True), |
| ]) |
| result_summary = task_scheduler.schedule_request(request) |
| self.assertEqual(State.PENDING, result_summary.state) |
| self.execute_tasks() |
| |
| @ndb.tasklet |
| def _mock_create_invocation_async(self, _task_run_id, _realm, _deadline): |
| raise ndb.Return('resultdb-update-token') |
| |
| def test_schedule_request_resultdb(self): |
| self._register_bot(self.bot_dimensions) |
| |
| with mock.patch( |
| 'server.resultdb.create_invocation_async', |
| mock.Mock(side_effect=self._mock_create_invocation_async)) as mock_call: |
| request = _gen_request_slices(realm='infra:try') |
| result_summary = task_scheduler.schedule_request(request, |
| enable_resultdb=True) |
| mock_call.assert_called_once_with( |
| '1d69b9f088008911', 'infra:try', |
| datetime.datetime(2014, 1, 3, 3, 5, 35, 6)) |
| |
| self.assertEqual( |
| result_summary.resultdb_info, |
| task_result.ResultDBInfo( |
| hostname='test-resultdb-server.com', |
| invocation=( |
| 'invocations/task-test-swarming.appspot.com-1d69b9f088008911'))) |
| self.assertEqual(result_summary.request_key.get().resultdb_update_token, |
| 'resultdb-update-token') |
| |
| self.execute_tasks() |
| |
| def test_schedule_request_scheduling_algorithm(self): |
| self.mock_now(self.now, 60) |
| |
| request = _gen_request_slices( |
| task_slices=[ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=_gen_properties(), |
| wait_for_capacity=True), |
| ], |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_UNKNOWN) |
| result_summary = task_scheduler.schedule_request(request) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6630c8ee0ca) |
| |
| request = _gen_request_slices( |
| task_slices=[ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=_gen_properties(), |
| wait_for_capacity=True), |
| ], |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO) |
| result_summary = task_scheduler.schedule_request(request) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6630c8ee0ca) |
| |
| request = _gen_request_slices( |
| task_slices=[ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=_gen_properties(), |
| wait_for_capacity=True), |
| ], |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO) |
| result_summary = task_scheduler.schedule_request(request) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6631f3d2236) |
| |
| self.execute_tasks() |
| |
| def test_schedule_request_rbe_mode(self): |
| enqueued = self.mock_enqueue_rbe_task() |
| |
| request = _gen_request_slices(rbe_instance='some-instance') |
| result_summary = task_scheduler.schedule_request(request) |
| self.assertEqual(State.PENDING, result_summary.state) |
| |
| self.assertEqual(len(enqueued), 1) |
| self.assertEqual(request.key, enqueued[0][0].key) |
| self.assertEqual('sample-app-1d69b9f088008910-0', |
| enqueued[0][1].key.get().rbe_reservation) |
| |
| def test_bot_reap_task_expired(self): |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule() |
| # Forwards clock to get past expiration. |
| request = result_summary.request_key.get() |
| self.mock_now(request.expiration_ts, 1) |
| |
| actual_request, _, run_result = self._bot_reap_task() |
| # The task is not returned because it's expired. |
| self.assertIsNone(actual_request) |
| self.assertIsNone(run_result) |
| # It's effectively expired. |
| to_run_key = task_to_run.request_to_task_to_run_key(request, 0) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| self.assertEqual(State.EXPIRED, result_summary.key.get().state) |
| |
| latency = ((request.expiration_ts - self.now).total_seconds() + 1) * 1000.0 |
| self.assertEqual( |
| latency, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.EXPIRED))).sum) |
| |
| def test_bot_reap_task_6_expired_fifo(self): |
| # A lot of tasks are expired, eventually stop expiring them. |
| self._register_bot(self.bot_dimensions) |
| result_summaries = [] |
| for i in range(6): |
| self.mock_now(self.now, i) |
| result_summaries.append( |
| self._quick_schedule( |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO)) |
| # Forwards clock to get past expiration. |
| self.mock_now(result_summaries[-1].request_key.get().expiration_ts, 1) |
| |
| # Fail to reap a task. |
| actual_request, _, run_result = self._bot_reap_task() |
| self.assertIsNone(actual_request) |
| self.assertIsNone(run_result) |
| # They all got expired ... |
| for result_summary in result_summaries[:-1]: |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.EXPIRED, result_summary.state) |
| # ... except for the very last one because of the limit of 5 task expired |
| # per poll. |
| result_summary = result_summaries[-1] |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.PENDING, result_summary.state) |
| |
| def test_bot_reap_task_6_expired_lifo(self): |
| # A lot of tasks are expired, eventually stop expiring them. |
| self._register_bot(self.bot_dimensions) |
| result_summaries = [] |
| for i in range(6): |
| self.mock_now(self.now, i) |
| result_summaries.append( |
| self._quick_schedule( |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO)) |
| # Forwards clock to get past expiration. |
| self.mock_now(result_summaries[-1].request_key.get().expiration_ts, 1) |
| |
| # Fail to reap a task. |
| actual_request, _, run_result = self._bot_reap_task() |
| self.assertIsNone(actual_request) |
| self.assertIsNone(run_result) |
| # They all got expired ... |
| for result_summary in result_summaries[1:]: |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.EXPIRED, result_summary.state) |
| # ... except for the most recent one because of the limit of 5 task expired |
| # per poll. |
| result_summary = result_summaries[0] |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.PENDING, result_summary.state) |
| |
| @ndb.tasklet |
| def nop_async(self, *_args, **_kwargs): |
| pass |
| |
| def test_resultdb_task_expired(self): |
| self._register_bot(self.bot_dimensions) |
| |
| with mock.patch( |
| 'server.resultdb.create_invocation_async', |
| mock.Mock(side_effect=self._mock_create_invocation_async)) as mock_call: |
| request = _gen_request_slices(realm='infra:try') |
| result_summary = task_scheduler.schedule_request(request, |
| enable_resultdb=True) |
| mock_call.assert_called_once_with('1d69b9f088008911', 'infra:try', |
| mock.ANY) |
| |
| with mock.patch('server.resultdb.finalize_invocation_async', |
| mock.Mock(side_effect=self.nop_async)) as mock_call: |
| to_run_key = task_to_run.request_to_task_to_run_key(request, 0) |
| self.mock_now(self.now, 60) |
| task_scheduler._expire_slice(request, to_run_key, State.EXPIRED, False, 1, |
| True, 'some-reason') |
| mock_call.assert_called_once_with('1d69b9f088008911', |
| u'resultdb-update-token') |
| |
| self.execute_tasks() |
| |
| def _setup_es(self, allow_es_fallback): |
| """Set up mock es_config.""" |
| es_address = 'externalscheduler_address' |
| es_id = 'es_id' |
| external_schedulers = [ |
| pools_config.ExternalSchedulerConfig( |
| address=es_address, |
| id=es_id, |
| dimensions=set([u'foo:bar', u'label-pool:CTS']), |
| all_dimensions=None, |
| any_dimensions=None, |
| enabled=False, |
| allow_es_fallback=allow_es_fallback), |
| pools_config.ExternalSchedulerConfig( |
| address=es_address, |
| id=es_id, |
| dimensions=set([u'foo:bar']), |
| all_dimensions=None, |
| any_dimensions=None, |
| enabled=True, |
| allow_es_fallback=allow_es_fallback), |
| ] |
| self.mock_pool_config('default', external_schedulers=external_schedulers) |
| |
| def _mock_reap_calls(self): |
| """Mock out external scheduler and native scheduler reap calls. |
| |
| Returns: (list of es calls, list of native reap calls) |
| """ |
| er_calls = [] |
| |
| def ext_reap(*args): |
| er_calls.append(args) |
| return None, None, None |
| |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(), |
| # The tests that use this mock rely on tasks that wait |
| # for capacity. |
| wait_for_capacity=True), |
| ]) |
| to_run_key = task_to_run.request_to_task_to_run_key(result_summary.request, |
| 0) |
| |
| r_calls = [] |
| |
| def reap(*args): |
| r_calls.append(args) |
| return [to_run_key.get()] |
| |
| self.mock(task_scheduler, '_bot_reap_task_external_scheduler', ext_reap) |
| self.mock(task_to_run, 'yield_next_available_task_to_dispatch', reap) |
| |
| return er_calls, r_calls |
| |
| def _mock_es_assign(self, task_id, slice_number): |
| """Mock out the return behavior from external_scheduler.assign_task""" |
| |
| # pylint: disable=unused-argument |
| def mock_assign(*args): |
| return task_id, slice_number |
| |
| self.mock(external_scheduler, "assign_task", mock_assign) |
| |
| def _mock_es_notify(self): |
| """Mock out external_scheduler.notify_requests |
| |
| Returns a list that will receive any calls that were made to notify. |
| """ |
| calls = [] |
| |
| # pylint: disable=unused-argument |
| def mock_notify(es_cfg, requests, use_tq, is_callback): |
| assert isinstance(es_cfg, pools_config.ExternalSchedulerConfig) |
| for request, result in requests: |
| assert isinstance(request, task_request.TaskRequest) |
| assert isinstance(result, task_result._TaskResultCommon) |
| calls.append([es_cfg, requests, use_tq, is_callback]) |
| |
| self.mock(external_scheduler, "notify_requests", mock_notify) |
| return calls |
| |
| def test_bot_reap_task_es_with_fallback(self): |
| self._setup_es(True) |
| notify_calls = self._mock_es_notify() |
| er_calls, r_calls = self._mock_reap_calls() |
| |
| # Ignore es notifications that were side-effects of the setup code, they |
| # are incidental to this test. |
| del notify_calls[:] |
| |
| self._bot_reap_task() |
| |
| self.assertEqual(len(er_calls), 1, 'external scheduler was not called') |
| self.assertEqual(len(r_calls), 1, 'native scheduler was not called') |
| |
| def test_bot_reap_task_es_no_task(self): |
| self._setup_es(False) |
| self._mock_es_assign(None, 0) |
| |
| self._bot_reap_task() |
| |
| def test_bot_reap_task_es_with_nonpending_task(self): |
| self._setup_es(False) |
| notify_calls = self._mock_es_notify() |
| result_summary = self._quick_schedule() |
| self._mock_es_assign(result_summary.task_id, 0) |
| |
| # Ignore es notifications that were side-effects of the setup code, they |
| # are incidental to this test. |
| del notify_calls[:] |
| |
| # It should notify to external scheduler. But an exception won't be raised |
| # because the task is already running or has finished including failures. |
| self._bot_reap_task() |
| self.assertEqual(len(notify_calls), 1) |
| |
| def test_bot_reap_task_es_with_pending_task(self): |
| self._setup_es(False) |
| self._mock_es_notify() |
| |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(), |
| wait_for_capacity=True) |
| ]) |
| |
| self._mock_es_assign(result_summary.task_id, 0) |
| |
| # Able to successfully reap given PENDING task from external scheduler. |
| request, _, _ = self._bot_reap_task() |
| self.assertEqual(request.task_id, result_summary.task_id) |
| |
| def test_bot_reap_task_for_nonexternal_pool(self): |
| self._setup_es(False) |
| notify_calls = self._mock_es_notify() |
| dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'label-pool:CTS']} |
| slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(dimensions=dimensions), |
| wait_for_capacity=False), |
| ] |
| result_summary = self._quick_schedule(task_slices=slices) |
| self._mock_es_assign(result_summary.task_id, 0) |
| |
| del notify_calls[:] |
| bot_dimensions = { |
| u'foo': [u'bar'], |
| u'id': [u'localhost'], |
| u'label-pool': [u'CTS'], |
| } |
| # CTS pool has disabled external scheduler, so notify_calls should have |
| # nothing recorded. |
| self._bot_reap_task(bot_dimensions) |
| self.assertEqual(len(notify_calls), 0) |
| |
| def test_schedule_request_slice_fallback_to_second_immediate(self): |
| # First TaskSlice couldn't run so it was immediately skipped, the second ran |
| # instead. |
| self._register_bot(self.bot_dimensions) |
| self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'nonexistent': [u'really'], |
| u'pool': [u'default'], |
| }), |
| wait_for_capacity=False), |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(), |
| wait_for_capacity=False), |
| ]) |
| request, _, run_result = self._bot_reap_task() |
| self.assertEqual(1, run_result.current_task_slice) |
| |
| def test_schedule_request_slice_use_first_after_expiration(self): |
| # First TaskSlice runs so it is reaped before expiring cron job. |
| self._register_bot(self.bot_dimensions) |
| self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(io_timeout_secs=61)), |
| task_request.TaskSlice( |
| expiration_secs=180, properties=_gen_properties()), |
| ]) |
| self.mock_now(self.now, 181) |
| # The first slice is returned from bot_reap_task if it is called earlier |
| # than cron_abort_expired_task_to_run. |
| request, _, run_result = self._bot_reap_task() |
| self.assertEqual(0, run_result.current_task_slice) |
| |
| def test_schedule_request_slice_fallback_to_different_property(self): |
| dims1 = self.bot_dimensions |
| self._register_bot(dims1) |
| |
| dims2 = self.bot_dimensions.copy() |
| dims2[u'id'] = [u'second'] |
| dims2[u'os'] = [u'Atari'] |
| self._register_bot(dims2) |
| |
| # The first TaskSlice couldn't run so it was eventually expired and the |
| # second couldn't be run by the bot that was polling. |
| self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(io_timeout_secs=61)), |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'Atari'] |
| })), |
| ]) |
| # The second bot can't reap the task. |
| _, _, run_result = self._bot_reap_task(dims2, 'second') |
| self.assertIsNone(run_result) |
| |
| self.mock_now(self.now, 181) |
| |
| # Expire first task here. |
| task_scheduler.cron_abort_expired_task_to_run() |
| tasks = self._taskqueue_stub.GetTasks('task-expire') |
| self.assertEqual(1, len(tasks)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| # The first is explicitly expired, and the second TaskSlice cannot be |
| # reaped by this bot. |
| _, _, run_result = self._bot_reap_task(dims1) |
| self.assertIsNone(run_result) |
| # The second bot is able to reap it immediately. This is because when the |
| # first bot tried to reap the task, it expired the first TaskToRunShard and |
| # created a new one, which the second bot *can* reap. |
| _, _, run_result = self._bot_reap_task(dims2, 'second') |
| self.assertEqual(1, run_result.current_task_slice) |
| |
| def test_schedule_request_slice_no_capacity(self): |
| created_ts = self.now - datetime.timedelta(seconds=1) |
| self.mock_pub_sub() |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'nonexistent': [u'really'], |
| u'pool': [u'default'], |
| }), |
| wait_for_capacity=False), |
| task_request.TaskSlice(expiration_secs=180, |
| properties=_gen_properties(), |
| wait_for_capacity=False), |
| ], |
| created_ts=created_ts, |
| pubsub_topic='projects/abc/topics/def') |
| # The task is immediately denied, without waiting. |
| self.assertEqual(State.NO_RESOURCE, result_summary.state) |
| self.assertEqual(created_ts, result_summary.abandoned_ts) |
| self.assertEqual(created_ts, result_summary.completed_ts) |
| self.assertIsNone(result_summary.try_number) |
| self.assertEqual(0, result_summary.current_task_slice) |
| self.assertEqual((self.now - created_ts).total_seconds() * 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.NO_RESOURCE))).sum) |
| |
| status = State.to_string(State.NO_RESOURCE) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_schedule_request_slice_wait_for_capacity(self): |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'nonexistent': [u'really'], |
| u'pool': [u'default'], |
| }), |
| wait_for_capacity=False), |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(), |
| wait_for_capacity=True), |
| ]) |
| # Pending on the second slice, even if there's no capacity. |
| self.assertEqual(State.PENDING, result_summary.state) |
| self.assertEqual(1, result_summary.current_task_slice) |
| |
| def test_schedule_request_slice_no_capacity_fallback_second(self): |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'nonexistent': [u'really'], |
| u'pool': [u'default'], |
| }), |
| wait_for_capacity=False), |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(), |
| wait_for_capacity=False), |
| ]) |
| # The task fell back to the second slice, still pending. |
| self.assertEqual(State.PENDING, result_summary.state) |
| self.assertIsNone(result_summary.abandoned_ts) |
| self.assertIsNone(result_summary.completed_ts) |
| self.assertIsNone(result_summary.try_number) |
| self.assertEqual(1, result_summary.current_task_slice) |
| |
| def test_exponential_backoff(self): |
| self.mock(task_scheduler.random, |
| 'random', lambda: task_scheduler._PROBABILITY_OF_QUICK_COMEBACK) |
| self.mock(utils, 'is_dev', lambda: False) |
| data = [ |
| (0, 2), |
| (1, 2), |
| (2, 3), |
| (3, 5), |
| (4, 8), |
| (5, 11), |
| (6, 17), |
| (7, 26), |
| (8, 38), |
| (9, 58), |
| (10, 60), |
| (11, 60), |
| ] |
| for value, expected in data: |
| actual = int(round(task_scheduler.exponential_backoff(value))) |
| self.assertEqual(expected, actual, (value, expected, actual)) |
| |
| def test_exponential_backoff_quick(self): |
| self.mock( |
| task_scheduler.random, |
| 'random', lambda: task_scheduler._PROBABILITY_OF_QUICK_COMEBACK - 0.01) |
| self.assertEqual(1.0, task_scheduler.exponential_backoff(235)) |
| |
| def test_task_handle_pubsub_task(self): |
| calls = [] |
| |
| def publish_mock(**kwargs): |
| calls.append(kwargs) |
| |
| self.mock(task_scheduler.pubsub, 'publish', publish_mock) |
| task_scheduler.task_handle_pubsub_task({ |
| 'topic': |
| 'projects/abc/topics/def', |
| 'task_id': |
| 'abcdef123', |
| 'auth_token': |
| 'token', |
| 'userdata': |
| 'userdata', |
| 'state': |
| State.PENDING, |
| 'tags': [ |
| u'authenticated:user:mocked@example.com', |
| u'os:Windows-3.1.1', |
| u'pool:default', |
| u'priority:50', |
| u'realm:none', |
| u'service_account:none', |
| u'swarming.pool.template:no_config', |
| u'tag:1', |
| u'user:Jesus', |
| ], |
| 'start_time': |
| 0 |
| }) |
| self.assertEqual([{ |
| 'attributes': { |
| 'auth_token': 'token' |
| }, |
| 'message': '{"task_id":"abcdef123","userdata":"userdata"}', |
| 'topic': 'projects/abc/topics/def', |
| }], calls) |
| |
| def _task_ran_successfully(self): |
| """Runs an idempotent task successfully and returns the task_id.""" |
| run_result = self._quick_reap(task_slices=[ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ]) |
| self.assertEqual('localhost', run_result.bot_id) |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| # It's important to complete the task with success. |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| # An idempotent task has properties_hash set after it succeeded. |
| self.assertTrue(run_result.result_summary_key.get().properties_hash) |
| self.assertEqual(1, self.execute_tasks()) |
| return unicode(run_result.task_id) |
| |
| def _task_deduped(self, |
| new_ts, |
| deduped_from, |
| task_id, |
| now=None, |
| created_ts=None): |
| """Runs a task that was deduped.""" |
| # TODO(maruel): Test with SecretBytes. |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice(expiration_secs=60, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ], |
| created_ts=created_ts or utils.utcnow()) |
| request = result_summary.request_key.get() |
| to_run_key = task_to_run.request_to_task_to_run_key(request, 0) |
| # TaskToRunShard was not stored. |
| self.assertIsNone(to_run_key.get()) |
| # Bot can't reap. |
| reaped_request, _, _ = self._bot_reap_task() |
| self.assertIsNone(reaped_request) |
| |
| result_summary_duped, run_results_duped = _get_results(request.key) |
| # A deduped task cannot be deduped again so properties_hash is None. |
| expected = self._gen_result_summary_reaped( |
| completed_ts=now or self.now, |
| cost_saved_usd=0.1, |
| created_ts=created_ts or new_ts, |
| deduped_from=deduped_from, |
| duration=0.1, |
| exit_code=0, |
| id=task_id, |
| # Only this value is updated to 'now', the rest uses the previous run |
| # timestamps. |
| modified_ts=new_ts, |
| started_ts=now or self.now, |
| state=State.COMPLETED, |
| try_number=0) |
| self.assertEqual(expected, result_summary_duped.to_dict()) |
| self.assertEqual([], run_results_duped) |
| |
| def test_request_idempotent_same_request_id(self): |
| request_id = "124890823507283423" |
| result_summary = self._quick_schedule(request_id=request_id) |
| tr_id = task_request.TaskRequestID.create_key(request_id).get() |
| assert (tr_id is not None) |
| result_summary_dedupe = self._quick_schedule(request_id=request_id) |
| self.assertEqual(result_summary, result_summary_dedupe) |
| |
| def test_request_idempotent_diff_request_id(self): |
| result_summary = self._quick_schedule(request_id="124890823507283423") |
| tr_id = task_request.TaskRequestID.create_key("124890823507283423").get() |
| assert (tr_id is not None) |
| result_summary_2 = self._quick_schedule(request_id="124890823507283") |
| tr_id_2 = task_request.TaskRequestID.create_key("124890823507283").get() |
| assert (tr_id_2 is not None) |
| self.assertNotEqual(result_summary, result_summary_2) |
| |
| def test_request_not_idempotent_no_request_id(self): |
| # since no request_id was provided, tasks created with same |
| # request paramaters are not idempotent. |
| result_summary = self._quick_schedule() |
| result_summary_2 = self._quick_schedule() |
| self.assertNotEqual(result_summary, result_summary_2) |
| |
| def test_task_idempotent(self): |
| # First task is idempotent. |
| task_id = self._task_ran_successfully() |
| # Second task is deduped against first task. |
| new_ts = self.mock_now(self.now, |
| config.settings().reusable_task_age_secs - 1) |
| self._task_deduped(new_ts, |
| task_id, |
| '1d8dc670a0008a10', |
| created_ts=utils.utcnow() - |
| datetime.timedelta(seconds=1)) |
| self.assertEqual( |
| 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.COMPLETED))).sum) |
| |
| def test_task_idempotent_old(self): |
| # First task is idempotent. |
| self._task_ran_successfully() |
| |
| # Second task is scheduled, first task is too old to be reused. |
| new_ts = self.mock_now(self.now, config.settings().reusable_task_age_secs) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ]) |
| # The task was enqueued for execution. |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertTrue(to_run_key.get().queue_number) |
| |
| def test_task_idempotent_three(self): |
| # First task is idempotent. |
| task_id = self._task_ran_successfully() |
| |
| # Second task is deduped against first task. |
| new_ts = self.mock_now(self.now, |
| config.settings().reusable_task_age_secs - 1) |
| self._task_deduped(new_ts, |
| task_id, |
| '1d8dc670a0008a10', |
| created_ts=utils.utcnow() - |
| datetime.timedelta(seconds=1)) |
| self.assertEqual( |
| 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.COMPLETED))).sum) |
| # Third task is scheduled, second task is not dedupable, first task is too |
| # old. |
| new_ts = self.mock_now(self.now, config.settings().reusable_task_age_secs) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ]) |
| # The task was enqueued for execution. |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertTrue(to_run_key.get().queue_number) |
| |
| def test_task_idempotent_variable(self): |
| # Test the edge case where config.settings().reusable_task_age_secs is being |
| # modified. This ensure TaskResultSummary.order(TRS.key) works. |
| cfg = config.settings() |
| cfg.reusable_task_age_secs = 10 |
| self.mock(config, 'settings', lambda: cfg) |
| |
| # First task is idempotent. |
| self._task_ran_successfully() |
| |
| # Second task is scheduled, first task is too old to be reused. |
| second_ts = self.mock_now(self.now, 10) |
| task_id = self._task_ran_successfully() |
| |
| # Now any of the 2 tasks could be reused. Assert the right one (the most |
| # recent) is reused. |
| cfg.reusable_task_age_secs = 100 |
| |
| # Third task is deduped against second task. That ensures ordering works |
| # correctly. |
| third_ts = self.mock_now(self.now, 20) |
| self._task_deduped(third_ts, task_id, '1d69ba3ea8008b10', now=second_ts) |
| |
| def test_task_idempotent_second_slice(self): |
| # A task will dedupe against a second slice, and skip the first slice. |
| # First task is idempotent. |
| task_id = self._task_ran_successfully() |
| |
| # Second task's second task slice is deduped against first task. |
| new_ts = self.mock_now(self.now, |
| config.settings().reusable_task_age_secs - 1) |
| result_summary = self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=180, |
| properties=_gen_properties(dimensions={ |
| u'inexistant': [u'really'], |
| u'pool': [u'default'], |
| }), |
| wait_for_capacity=False), |
| task_request.TaskSlice(expiration_secs=180, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ], |
| created_ts=utils.utcnow() - datetime.timedelta(seconds=1)) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 0) |
| self.assertIsNone(to_run_key.get()) |
| to_run_key = task_to_run.request_to_task_to_run_key( |
| result_summary.request_key.get(), 1) |
| self.assertIsNone(to_run_key.get()) |
| self.assertEqual(State.COMPLETED, result_summary.state) |
| self.assertEqual(task_id, result_summary.deduped_from) |
| self.assertEqual(1, result_summary.current_task_slice) |
| self.assertEqual(0, result_summary.try_number) |
| self.assertEqual( |
| 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.COMPLETED))).sum) |
| |
| def test_task_invalid_parent(self): |
| parent_id = self._task_ran_successfully() |
| self.assertTrue(parent_id.endswith('1')) |
| invalid_parent_id = parent_id[:-1] + '2' |
| # Try to create a children task with invalid parent_task_id. |
| # task should be scheduled without error |
| result_summary = self._quick_schedule(parent_task_id=invalid_parent_id) |
| |
| def test_task_parent(self): |
| run_result = self._quick_reap( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| command=['python'], |
| cas_input_root=task_request.CASReference( |
| cas_instance='projects/test/instances/default_instance', |
| digest=task_request.Digest(hash='1' * 32, |
| size_bytes=1))), |
| wait_for_capacity=False), |
| ]) |
| self.assertEqual('localhost', run_result.bot_id) |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.assertIsNone(to_run_key.get().queue_number) |
| self.assertIsNone(to_run_key.get().expiration_ts) |
| # It's important to terminate the task with success. |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| parent_id = run_result.task_id |
| result_summary = self._quick_schedule(parent_task_id=parent_id) |
| self.assertEqual(parent_id, result_summary.request_key.get().parent_task_id) |
| |
| def test_task_timeout(self): |
| # Create a task, but the bot tries to timeout but fails to report exit code |
| # and duration. |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| to_run_key = _run_result_to_to_run_key(run_result) |
| self.mock_now(self.now, 10.5) |
| self.assertEqual(State.TIMED_OUT, |
| _bot_update_task(run_result.key, hard_timeout=True)) |
| self.assertEqual(1, self.execute_tasks()) |
| run_result = run_result.key.get() |
| self.assertEqual(-1, run_result.exit_code) |
| self.assertEqual(10.5, run_result.duration) |
| |
| status = State.to_string(State.TIMED_OUT) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_get_results(self): |
| # TODO(maruel): Split in more focused tests. |
| created_ts = self.now |
| self.mock_now(created_ts) |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule() |
| |
| # The TaskRequest was enqueued, the TaskResultSummary was created but no |
| # TaskRunResult exist yet since the task was not scheduled on any bot. |
| result_summary, run_results = _get_results(result_summary.request_key) |
| expected = self._gen_result_summary_pending( |
| created_ts=created_ts, id='1d69b9f088008910', modified_ts=created_ts) |
| self.assertEqual(expected, result_summary.to_dict()) |
| self.assertEqual([], run_results) |
| |
| # A bot reaps the TaskToRunShard. |
| reaped_ts = self.now + datetime.timedelta(seconds=60) |
| self.mock_now(reaped_ts) |
| reaped_request, _, run_result = self._bot_reap_task() |
| self.assertEqual(result_summary.request_key.get(), reaped_request) |
| self.assertTrue(run_result) |
| result_summary, run_results = _get_results(result_summary.request_key) |
| expected = self._gen_result_summary_reaped( |
| created_ts=created_ts, |
| costs_usd=[0.0], |
| id='1d69b9f088008910', |
| modified_ts=reaped_ts, |
| started_ts=reaped_ts) |
| self.assertEqual(expected, result_summary.to_dict()) |
| expected = [ |
| self._gen_run_result( |
| id='1d69b9f088008911', |
| modified_ts=reaped_ts, |
| started_ts=reaped_ts, |
| dead_after_ts=reaped_ts + |
| datetime.timedelta(seconds=reaped_request.bot_ping_tolerance_secs)), |
| ] |
| self.assertEqual(expected, [i.to_dict() for i in run_results]) |
| |
| # The bot completes the task. |
| self.assertEqual((reaped_ts - self.now).total_seconds() * 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.RUNNING))).sum) |
| |
| done_ts = self.now + datetime.timedelta(seconds=120) |
| self.mock_now(done_ts) |
| cas_output_root = task_request.CASReference( |
| cas_instance='projects/test/instances/default_instance', |
| digest=task_request.Digest(hash='a' * 32, size_bytes=1)) |
| performance_stats = task_result.PerformanceStats( |
| bot_overhead=0.1, |
| isolated_download=task_result.CASOperationStats( |
| duration=0.1, |
| initial_number_items=10, |
| initial_size=1000, |
| items_cold='aa', |
| items_hot='bb'), |
| isolated_upload=task_result.CASOperationStats( |
| duration=0.1, items_cold='aa', items_hot='bb')) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, |
| exit_code=0, |
| duration=3., |
| cas_output_root=cas_output_root, |
| performance_stats=performance_stats)) |
| # Simulate an unexpected retry, e.g. the response of the previous RPC never |
| # got the client even if it succeedded. |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, |
| exit_code=0, |
| duration=3., |
| cas_output_root=cas_output_root, |
| performance_stats=performance_stats)) |
| result_summary, run_results = _get_results(result_summary.request_key) |
| expected = self._gen_result_summary_reaped( |
| completed_ts=done_ts, |
| costs_usd=[0.1], |
| created_ts=created_ts, |
| duration=3.0, |
| exit_code=0, |
| id='1d69b9f088008910', |
| modified_ts=done_ts, |
| cas_output_root={ |
| 'cas_instance': u'projects/test/instances/default_instance', |
| 'digest': { |
| 'hash': u'a' * 32, |
| 'size_bytes': 1, |
| } |
| }, |
| started_ts=reaped_ts, |
| state=State.COMPLETED, |
| try_number=1) |
| self.assertEqual(expected, result_summary.to_dict()) |
| expected = [ |
| self._gen_run_result(completed_ts=done_ts, |
| cost_usd=0.1, |
| duration=3.0, |
| exit_code=0, |
| id='1d69b9f088008911', |
| modified_ts=done_ts, |
| cas_output_root={ |
| 'cas_instance': |
| u'projects/test/instances/default_instance', |
| 'digest': { |
| 'hash': u'a' * 32, |
| 'size_bytes': 1, |
| } |
| }, |
| started_ts=reaped_ts, |
| state=State.COMPLETED), |
| ] |
| self.assertEqual(expected, [t.to_dict() for t in run_results]) |
| self.assertEqual(2, self.execute_tasks()) |
| |
| def test_exit_code_failure(self): |
| run_result = self._quick_reap() |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=1, duration=0.1)) |
| result_summary, run_results = _get_results(run_result.request_key) |
| |
| expected = self._gen_result_summary_reaped( |
| completed_ts=self.now, |
| costs_usd=[0.1], |
| duration=0.1, |
| exit_code=1, |
| failure=True, |
| id='1d69b9f088008910', |
| started_ts=self.now, |
| state=State.COMPLETED, |
| try_number=1) |
| self.assertEqual(expected, result_summary.to_dict()) |
| |
| expected = [ |
| self._gen_run_result( |
| completed_ts=self.now, |
| cost_usd=0.1, |
| duration=0.1, |
| exit_code=1, |
| failure=True, |
| id='1d69b9f088008911', |
| started_ts=self.now, |
| state=State.COMPLETED), |
| ] |
| self.assertEqual(expected, [t.to_dict() for t in run_results]) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| def test_schedule_request_id_without_pool(self): |
| auth_testing.mock_is_admin(self) |
| self._register_bot(self.bot_dimensions) |
| with self.assertRaises(datastore_errors.BadValueError): |
| self._quick_schedule( |
| task_slices=[ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties(dimensions={u'id': [u'abc']}), |
| wait_for_capacity=False), |
| ]) |
| |
| def test_bot_update_task(self): |
| self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2) |
| self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| |
| self.assertEqual( |
| State.RUNNING, |
| _bot_update_task(run_result.key, output='hi', output_chunk_start=0)) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task( |
| run_result.key, |
| output='hey', |
| output_chunk_start=2, |
| exit_code=0, |
| duration=0.1)) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(http_status_code=200)).sum) |
| |
| self.assertEqual('hihey', run_result.key.get().get_output(0, 0)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| def test_bot_update_task_new_overwrite(self): |
| self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2) |
| run_result = self._quick_reap() |
| self.assertEqual( |
| State.RUNNING, |
| _bot_update_task( |
| run_result_key=run_result.key, output='hi', output_chunk_start=0)) |
| self.assertEqual( |
| State.RUNNING, |
| _bot_update_task( |
| run_result_key=run_result.key, output='hey', output_chunk_start=1)) |
| self.assertEqual('hhey', run_result.key.get().get_output(0, 0)) |
| |
| def test_bot_update_exception(self): |
| run_result = self._quick_reap() |
| |
| def r(*_): |
| raise datastore_utils.CommitError('Sorry!') |
| |
| self.mock(ndb, 'put_multi', r) |
| self.assertIsNone( |
| None, _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| |
| self.assertIsNone( |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(http_status_code=200))) |
| |
| def test_bot_update_pubsub_negative_latency(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| self.assertEqual(2, len(pub_sub_calls)) # notification is sent |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(http_status_code=200)).sum) |
| |
| def test_bot_update_pubsub_error(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| # Attempt to terminate the task with success, but make PubSub call fail. |
| self.publish_successful = False |
| self.assertIsNone( |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(http_status_code=404)).sum) |
| |
| # Bot retries bot_update, now PubSub works and notification is sent. |
| self.publish_successful = True |
| |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| self.assertEqual(2, len(pub_sub_calls)) # notification is sent |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(http_status_code=200)).sum) |
| |
| def test_bot_update_buildbucket_pubsub_ok(self): |
| self.mock(utils, "time_time_ns", lambda: 12345678) |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap( |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| # 2 calls because _quick_reap sends one and _bot_update_task sends another |
| self.assertEqual(2, len(pub_sub_calls)) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| # Asserting that first pubsub call sent a STARTED status (_quick_reap ran) |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.STARTED) |
| self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic") |
| |
| # Asserting that second pubsub call sent a SUCCESS status |
| self._assert_buildbucket_update_status(pub_sub_calls[1][1]['message'], |
| common_pb2.SUCCESS) |
| self.assertEqual(pub_sub_calls[1][1]["topic"], "backend_pubsub_topic") |
| |
| def test_bot_update_buildbucket_pubsub_ok_failed_task(self): |
| self.mock(utils, "time_time", lambda: 12345678) |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap( |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=1, duration=0.1)) |
| self.assertEqual(2, len(pub_sub_calls)) # notification is sent |
| self.assertEqual(1, self.execute_tasks()) |
| |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.STARTED) |
| self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic") |
| |
| self._assert_buildbucket_update_status(pub_sub_calls[1][1]['message'], |
| common_pb2.FAILURE) |
| self.assertEqual(pub_sub_calls[1][1]["topic"], "backend_pubsub_topic") |
| |
| def test_task_buildbucket_update(self): |
| self.mock(utils, "time_time_ns", lambda: 12345678) |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap( |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| |
| # Check that an update is not sent due to no change of state |
| task_scheduler.task_buildbucket_update({ |
| "task_id": |
| task_pack.pack_result_summary_key(run_result.result_summary_key), |
| "state": |
| task_result.State.RUNNING, |
| 'update_id': |
| 12345678000000002 |
| }) |
| self.assertEqual(1, len(pub_sub_calls)) # notification is sent |
| result = task_pb2.BuildTaskUpdate() |
| result.ParseFromString(pub_sub_calls[0][1]['message']) |
| self.assertEqual(result.task.status, common_pb2.STARTED) |
| self.assertEqual(result.build_id, "1234") |
| self.assertEqual(result.task.id.id, "1d69b9f088008910") |
| self.assertEqual(result.task.update_id, 12345678) |
| |
| # Check that an update is sent due to change of state |
| task_scheduler.task_buildbucket_update({ |
| "task_id": |
| task_pack.pack_result_summary_key(run_result.result_summary_key), |
| "state": |
| task_result.State.BOT_DIED, |
| 'update_id': |
| 12345678000000003 |
| }) |
| self.assertEqual(2, len(pub_sub_calls)) # second notification is sent |
| result = task_pb2.BuildTaskUpdate() |
| result.ParseFromString(pub_sub_calls[1][1]['message']) |
| self.assertEqual(result.task.status, common_pb2.INFRA_FAILURE) |
| self.assertEqual(result.build_id, "1234") |
| self.assertEqual(result.task.id.id, "1d69b9f088008910") |
| self.assertEqual(result.task.update_id, 12345678000000003) |
| |
| # Check that no update was made due to prior update_id |
| task_scheduler.task_buildbucket_update({ |
| "task_id": |
| task_pack.pack_result_summary_key(run_result.result_summary_key), |
| "state": |
| task_result.State.CLIENT_ERROR, |
| 'update_id': |
| 12345678000000001 |
| }) |
| self.assertEqual(2, len(pub_sub_calls)) |
| |
| # Check that an update was not sent due to pubsub transient error |
| self.publish_successful = False |
| with self.assertRaises(task_scheduler.Error): |
| task_scheduler.task_buildbucket_update({ |
| "task_id": |
| task_pack.pack_result_summary_key(run_result.result_summary_key), |
| "state": |
| task_result.State.CANCELED, |
| 'update_id': |
| 12345678000000004 |
| }) |
| self.assertEqual(2, len(pub_sub_calls)) |
| |
| def _bot_update_timeouts(self, hard, io): |
| run_result = self._quick_reap() |
| self.assertEqual( |
| State.TIMED_OUT, |
| _bot_update_task( |
| run_result.key, |
| exit_code=0, |
| duration=0.1, |
| hard_timeout=hard, |
| io_timeout=io)) |
| expected = self._gen_result_summary_reaped( |
| completed_ts=self.now, |
| costs_usd=[0.1], |
| duration=0.1, |
| exit_code=0, |
| failure=True, |
| id='1d69b9f088008910', |
| started_ts=self.now, |
| state=State.TIMED_OUT, |
| try_number=1) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| |
| expected = self._gen_run_result( |
| completed_ts=self.now, |
| cost_usd=0.1, |
| duration=0.1, |
| exit_code=0, |
| failure=True, |
| id='1d69b9f088008911', |
| started_ts=self.now, |
| state=State.TIMED_OUT, |
| try_number=1) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| self.assertEqual(1, self.execute_tasks()) |
| |
| def test_bot_update_hard_timeout(self): |
| self._bot_update_timeouts(True, False) |
| |
| def test_bot_update_io_timeout(self): |
| self._bot_update_timeouts(False, True) |
| |
| def test_bot_update_child_with_cancelled_parent(self): |
| self._register_bot(self.bot_dimensions) |
| |
| # Run parent task. |
| parent_request = _gen_request_slices() |
| parent_result_summary = task_scheduler.schedule_request(parent_request) |
| self.execute_tasks() |
| |
| _, _, parent_run_result = self._bot_reap_task() |
| |
| # Run a child task. |
| child_request = _gen_request_slices( |
| parent_task_id=parent_run_result.task_id) |
| child_result_summary = task_scheduler.schedule_request(child_request) |
| self.execute_tasks() |
| |
| bot2_dimensions = self.bot_dimensions.copy() |
| bot2_dimensions['id'] = [bot2_dimensions['id'][0] + '2'] |
| self._register_bot(bot2_dimensions) |
| _, _, child_run_result = self._bot_reap_task() |
| self.execute_tasks() |
| |
| # Run a child task 2. |
| child_request2 = _gen_request_slices( |
| parent_task_id=parent_run_result.task_id) |
| child_result2_summary = task_scheduler.schedule_request(child_request2) |
| self.execute_tasks() |
| |
| bot3_dimensions = self.bot_dimensions.copy() |
| bot3_dimensions['id'] = [bot3_dimensions['id'][0] + '3'] |
| self._register_bot(bot3_dimensions) |
| _, _, child_run_result2 = self._bot_reap_task() |
| self.execute_tasks() |
| |
| # Run a child task 3. This will be cancelled before running. |
| child_request3 = _gen_request_slices( |
| parent_task_id=parent_run_result.task_id) |
| child_result3_summary = task_scheduler.schedule_request(child_request3) |
| |
| # Cancel parent task. |
| ok, was_running = task_scheduler.cancel_task( |
| parent_run_result.request_key.get(), |
| parent_run_result.result_summary_key, True, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(True, was_running) |
| |
| # parent_task should push task to cancel-children-tasks. |
| self.execute_task( |
| '/internal/taskqueue/important/tasks/cancel-children-tasks', |
| 'cancel-children-tasks', |
| utils.encode_to_json({ |
| 'task': parent_result_summary.task_id, |
| })) |
| # and child tasks should be cancelled via task queue. |
| appended_child_task_ids = [ |
| child_result3_summary.task_id, |
| child_result2_summary.task_id, |
| child_result_summary.task_id, |
| ] |
| self.execute_task( |
| '/internal/taskqueue/important/tasks/cancel', 'cancel-tasks', |
| utils.encode_to_json({ |
| 'kill_running': True, |
| 'tasks': appended_child_task_ids, |
| })) |
| |
| self.assertEqual( |
| State.KILLED, |
| _bot_update_task(parent_run_result.key, exit_code=0, duration=0.1)) |
| |
| # Child task is KILLED when parent task is cancelled. |
| self.assertEqual( |
| State.KILLED, |
| _bot_update_task( |
| child_run_result.key, |
| bot_id='localhost2', |
| exit_code=0, |
| duration=0.1)) |
| |
| self.assertEqual('hi', child_run_result.key.get().get_output(0, 0)) |
| |
| # Child task is KILLED when parent task is cancelled and not being |
| # completed. |
| self.assertEqual( |
| State.KILLED, |
| _bot_update_task(child_run_result2.key, bot_id='localhost3')) |
| self.assertEqual('hi', child_run_result2.key.get().get_output(0, 0)) |
| |
| # flush tasks |
| self.execute_tasks() |
| |
| def test_task_priority(self): |
| # Create N tasks of various priority not in order. |
| priorities = [200, 100, 20, 30, 50, 40, 199] |
| # Call the expected ordered list out for clarity. |
| expected = [20, 30, 40, 50, 100, 199, 200] |
| self.assertEqual(expected, sorted(priorities)) |
| |
| self._register_bot(self.bot_dimensions) |
| # Triggers many tasks of different priorities. |
| for p in priorities: |
| self._quick_schedule(priority=p) |
| self.execute_tasks() |
| |
| # Make sure they are scheduled in priority order. Bot polling should hand |
| # out tasks in the expected order. In practice the order is not 100% |
| # deterministic when running on GAE but it should be deterministic in the |
| # unit test. |
| for i, e in enumerate(expected): |
| request, _, _ = self._bot_reap_task() |
| self.assertEqual(request.priority, e) |
| |
| def test_bot_terminate_task(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING |
| |
| self.assertEqual( |
| None, |
| task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now, |
| { |
| 'missing_cas': None, |
| 'missing_cipd': [] |
| })) |
| expected = self._gen_result_summary_reaped( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| costs_usd=[0.], |
| id='1d69b9f088008910', |
| internal_failure=True, |
| started_ts=self.now, |
| state=State.BOT_DIED) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| expected = self._gen_run_result( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| id='1d69b9f088008911', |
| internal_failure=True, |
| state=State.BOT_DIED) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> BOT_DIED |
| status = State.to_string(State.BOT_DIED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_bot_terminate_backend_task(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap( |
| pubsub_topic='projects/abc/topics/def', |
| task_slices=[ |
| task_request.TaskSlice(expiration_secs=1200, |
| properties=_gen_properties(idempotent=True), |
| wait_for_capacity=False), |
| ], |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual(2, len(pub_sub_calls)) # PENDING -> RUNNING |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.STARTED) |
| |
| self.assertEqual( |
| None, |
| task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now, |
| { |
| 'missing_cas': None, |
| 'missing_cipd': [] |
| })) |
| expected = self._gen_result_summary_reaped(abandoned_ts=self.now, |
| completed_ts=self.now, |
| costs_usd=[0.], |
| id='1d69b9f088008910', |
| internal_failure=True, |
| started_ts=self.now, |
| state=State.BOT_DIED) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| expected = self._gen_run_result(abandoned_ts=self.now, |
| completed_ts=self.now, |
| id='1d69b9f088008911', |
| internal_failure=True, |
| state=State.BOT_DIED) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> BOT_DIED |
| self._assert_buildbucket_update_status(pub_sub_calls[2][1]['message'], |
| common_pb2.INFRA_FAILURE) |
| status = State.to_string(State.BOT_DIED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_bot_terminate_canceled_task(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING |
| start_time = utils.milliseconds_since_epoch() - 100 |
| |
| # cancel task |
| self._cancel_running_task(run_result) |
| self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> killing |
| |
| # execute termination task |
| err = task_scheduler.bot_terminate_task(run_result.key, 'localhost', |
| start_time, { |
| 'missing_cas': None, |
| 'missing_cipd': [] |
| }) |
| |
| self.assertEqual(None, err) |
| |
| # check result summary |
| expected = self._gen_result_summary_reaped( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| costs_usd=[0.], |
| id='1d69b9f088008910', |
| internal_failure=True, |
| started_ts=self.now, |
| state=State.KILLED, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| |
| # check run result |
| expected = self._gen_run_result( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| id='1d69b9f088008911', |
| internal_failure=True, |
| state=State.KILLED, |
| killing=False, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(3, len(pub_sub_calls)) # killing -> KILLED |
| |
| status = State.to_string(State.KILLED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._dead_task_detection_latencies.get(fields={ |
| 'pool': 'default', |
| 'cron': False, |
| }).sum) |
| |
| def test_bot_terminate_task_missing_cas(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING |
| client_error = { |
| 'missing_cas': [{ |
| 'digest': |
| '93b45bab427ab9fe55asdq123324adsdaf8d5/1292', |
| 'instance': |
| 'projects/chromium-swarm/instances/default_instance', |
| }], |
| 'missing_cipd': [], |
| } |
| |
| self.assertEqual( |
| None, |
| task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now, |
| client_error)) |
| expected_missing_cas = [{ |
| 'cas_instance': 'projects/chromium-swarm/instances/default_instance', |
| 'digest': { |
| 'hash': '93b45bab427ab9fe55asdq123324adsdaf8d5', |
| 'size_bytes': 1292 |
| } |
| }] |
| # check result summary |
| expected = self._gen_result_summary_reaped( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| costs_usd=[0.], |
| id='1d69b9f088008910', |
| internal_failure=False, |
| missing_cas=expected_missing_cas, |
| started_ts=self.now, |
| state=State.CLIENT_ERROR, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| |
| # check run result |
| expected = self._gen_run_result( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| id='1d69b9f088008911', |
| internal_failure=False, |
| missing_cas=expected_missing_cas, |
| state=State.CLIENT_ERROR, |
| killing=None, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> CLIENT_ERROR |
| |
| status = State.to_string(State.CLIENT_ERROR) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_bot_terminate_task_missing_cipd(self): |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # PENDING -> CLIENT_ERROR |
| client_error = { |
| 'missing_cas': |
| None, |
| 'missing_cipd': [{ |
| 'package_name': u'foo', |
| 'version': u'deadbeef', |
| 'path': u'not/found/here', |
| }], |
| } |
| |
| self.assertEqual( |
| None, |
| task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now, |
| client_error)) |
| # check result summary |
| expected = self._gen_result_summary_reaped( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| costs_usd=[0.], |
| id='1d69b9f088008910', |
| internal_failure=False, |
| missing_cipd=client_error['missing_cipd'], |
| started_ts=self.now, |
| state=State.CLIENT_ERROR, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.result_summary_key.get().to_dict()) |
| |
| # check run result |
| expected = self._gen_run_result( |
| abandoned_ts=self.now, |
| completed_ts=self.now, |
| id='1d69b9f088008911', |
| internal_failure=False, |
| missing_cipd=client_error['missing_cipd'], |
| state=State.CLIENT_ERROR, |
| killing=None, |
| duration=0.0, |
| exit_code=-1, |
| failure=True, |
| ) |
| self.assertEqual(expected, run_result.key.get().to_dict()) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> CLIENT_ERROR |
| |
| status = State.to_string(State.CLIENT_ERROR) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_bot_terminate_task_wrong_bot(self): |
| run_result = self._quick_reap() |
| expected = ( |
| 'Bot bot1 sent task kill for task 1d69b9f088008911 owned by bot ' |
| 'localhost') |
| err = task_scheduler.bot_terminate_task(run_result.key, 'bot1', 0, { |
| 'missing_cas': None, |
| 'missing_cipd': [] |
| }) |
| self.assertEqual(expected, err) |
| |
| def test_cancel_task(self): |
| # Cancel a pending task. |
| pub_sub_calls = self.mock_pub_sub() |
| |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(0, len(pub_sub_calls)) # Nothing yet. |
| |
| ok, was_running = task_scheduler.cancel_task( |
| result_summary.request_key.get(), result_summary.key, False, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(False, was_running) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(1, len(pub_sub_calls)) # CANCELED |
| |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.CANCELED, result_summary.state) |
| self.assertEqual(1, len(pub_sub_calls)) # No other message. |
| |
| status = State.to_string(State.CANCELED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_cancel_task_backend_task(self): |
| # Cancel a pending task. |
| pub_sub_calls = self.mock_pub_sub() |
| |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| pubsub_topic='projects/abc/topics/def', |
| has_build_task=True, |
| build_task=task_request.BuildTask( |
| build_id="1234", |
| buildbucket_host="buildbucket_host", |
| latest_task_status=task_result.State.PENDING, |
| pubsub_topic="backend_pubsub_topic", |
| update_id=0)) |
| self.assertEqual(0, len(pub_sub_calls)) # Nothing yet. |
| |
| ok, was_running = task_scheduler.cancel_task( |
| result_summary.request_key.get(), result_summary.key, False, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(False, was_running) |
| # 2 because both pubsub-notify and bb update |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # CANCELED |
| self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'], |
| common_pb2.CANCELED) |
| |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.CANCELED, result_summary.state) |
| |
| status = State.to_string(State.CANCELED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_cancel_task_with_id(self): |
| # Cancel a pending task. |
| pub_sub_calls = self.mock_pub_sub() |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule( |
| pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(0, len(pub_sub_calls)) # Nothing yet. |
| |
| ok, was_running = task_scheduler.cancel_task_with_id( |
| result_summary.task_id, False, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(False, was_running) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(1, len(pub_sub_calls)) # CANCELED |
| |
| status = State.to_string(State.CANCELED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.CANCELED, result_summary.state) |
| self.assertEqual(1, len(pub_sub_calls)) # No other message. |
| |
| def test_cancel_task_running(self): |
| # Cancel a running task. |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # RUNNING |
| |
| # Denied if kill_running == False. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| False, None) |
| self.assertEqual(False, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(0, self.execute_tasks()) |
| self.assertEqual(1, len(pub_sub_calls)) # No message. |
| |
| # Works if kill_running == True. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| True, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # CANCELED |
| |
| # At this point, the task is still running and the bot is unaware. |
| run_result = run_result.key.get() |
| self.assertEqual(State.RUNNING, run_result.state) |
| self.assertEqual(True, run_result.killing) |
| |
| # Repeatedly canceling works. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| True, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(3, len(pub_sub_calls)) # CANCELED (again) |
| |
| # Bot pulls once, gets the signal about killing, which starts the graceful |
| # termination dance. |
| self.assertEqual(State.KILLED, _bot_update_task(run_result.key)) |
| |
| # At this point, it is still running, until the bot completes the task. |
| run_result = run_result.key.get() |
| self.assertEqual(State.RUNNING, run_result.state) |
| self.assertEqual(True, run_result.killing) |
| |
| # Close the task. |
| self.assertEqual( |
| State.KILLED, |
| _bot_update_task( |
| run_result.key, output_chunk_start=3, exit_code=0, duration=0.1)) |
| |
| run_result = run_result.key.get() |
| self.assertEqual(False, run_result.killing) |
| self.assertEqual(State.KILLED, run_result.state) |
| self.assertEqual(4, len(pub_sub_calls)) # KILLED |
| self.assertEqual(1, self.execute_tasks()) |
| |
| status = State.to_string(State.KILLED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_cancel_task_bot_id(self): |
| # Cancel a running task. |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # RUNNING |
| |
| # Denied if bot_id ('foo') doesn't match. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| True, 'foo') |
| self.assertEqual(False, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(1, self.execute_tasks()) |
| self.assertEqual(1, len(pub_sub_calls)) # No message. |
| |
| # Works if bot_id matches. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| True, 'localhost') |
| self.assertEqual(True, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # CANCELED |
| |
| def test_cancel_task_completed(self): |
| # Cancel a completed task. |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # RUNNING |
| |
| # The task completes successfully. |
| self.assertEqual( |
| State.COMPLETED, |
| _bot_update_task(run_result.key, exit_code=0, duration=0.1)) |
| self.assertEqual(2, len(pub_sub_calls)) # COMPLETED |
| |
| # Cancel request is denied. |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| False, None) |
| self.assertEqual(False, ok) |
| self.assertEqual(False, was_running) |
| |
| run_result = run_result.key.get() |
| self.assertIsNone(run_result.killing) |
| self.assertEqual(State.COMPLETED, run_result.state) |
| self.assertEqual(2, len(pub_sub_calls)) # No other message. |
| self.assertEqual(1, self.execute_tasks()) |
| |
| def test_cancel_task_running_setup(self): |
| # Cancel a assigned task before running |
| pub_sub_calls = self.mock_pub_sub() |
| run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| self.assertEqual(1, len(pub_sub_calls)) # RUNNING |
| |
| # Request cancel |
| ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(), |
| run_result.result_summary_key, |
| True, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(True, was_running) |
| self.assertEqual(2, self.execute_tasks()) |
| self.assertEqual(2, len(pub_sub_calls)) # CANCELED |
| |
| # At this point, the task status is still running and the bot is unaware. |
| run_result = run_result.key.get() |
| self.assertEqual(State.RUNNING, run_result.state) |
| self.assertEqual(True, run_result.killing) |
| |
| # Bot pulls once, gets the signal about killing, which starts the graceful |
| # termination dance. |
| self.assertEqual(State.KILLED, _bot_update_task(run_result.key)) |
| |
| # At this point, the status is still running, until the bot cancels |
| # the task. |
| run_result = run_result.key.get() |
| self.assertEqual(State.RUNNING, run_result.state) |
| self.assertEqual(True, run_result.killing) |
| |
| # Close the task. |
| self.assertEqual( |
| State.CANCELED, |
| _bot_update_task( |
| run_result.key, exit_code=0, duration=0.1, canceled=True)) |
| |
| run_result = run_result.key.get() |
| self.assertEqual(False, run_result.killing) |
| self.assertEqual(State.CANCELED, run_result.state) |
| self.assertEqual(3, len(pub_sub_calls)) # CANCELED |
| self.assertEqual(1, self.execute_tasks()) |
| |
| status = State.to_string(State.CANCELED) |
| self.assertLessEqual( |
| 0, |
| ts_mon_metrics._task_state_change_pubsub_notify_latencies.get( |
| fields=_update_fields_pubsub(status=status, |
| http_status_code=200)).sum) |
| |
| def test_cancel_task_rbe_mode(self): |
| self.mock_enqueue_rbe_task() |
| cancels = self.mock_enqueue_rbe_cancel() |
| |
| self._register_bot(self.bot_dimensions) |
| result_summary = self._quick_schedule(rbe_instance='some-instance') |
| |
| ok, was_running = task_scheduler.cancel_task( |
| result_summary.request_key.get(), result_summary.key, False, None) |
| self.assertEqual(True, ok) |
| self.assertEqual(False, was_running) |
| |
| result_summary = result_summary.key.get() |
| self.assertEqual(State.CANCELED, result_summary.state) |
| |
| # Enqueued a TQ task to cancel the reservation. |
| self.assertEqual(1, len(cancels)) |
| req, ttr = cancels[0] |
| self.assertEqual('some-instance', req.rbe_instance) |
| self.assertEqual('sample-app-1d69b9f088008910-0', ttr.rbe_reservation) |
| |
| def test_cancel_tasks(self): |
| # Create RUNNING task |
| pub_sub_calls = self.mock_pub_sub() |
| self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| |
| # Create PENDING task |
| task_slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'v1|v2'], |
| u'gpu': [u'sega', u'amd|nv'], |
| }), |
| wait_for_capacity=True), |
| ] |
| self._quick_schedule(task_slices=task_slices) |
| |
| query = task_result.get_result_summaries_query(start=None, |
| end=None, |
| sort='created_ts', |
| state='pending_running', |
| tags=[]) |
| cursor, results = task_scheduler.cancel_tasks(3, query) |
| self.assertIsNone(cursor) |
| self.assertEqual(len(results), 2) |
| self.execute_tasks() |
| |
| def test_cancel_tasks_skip_running(self): |
| # Create RUNNING task |
| pub_sub_calls = self.mock_pub_sub() |
| running_result = self._quick_reap(pubsub_topic='projects/abc/topics/def') |
| |
| # Create PENDING task |
| task_slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'v1|v2'], |
| u'gpu': [u'sega', u'amd|nv'], |
| }), |
| wait_for_capacity=True), |
| ] |
| pending_result = self._quick_schedule(task_slices=task_slices) |
| query = task_result.get_result_summaries_query(start=None, |
| end=None, |
| sort='created_ts', |
| state='pending', |
| tags=[]) |
| self.mock_now(self.now, seconds=1) |
| cursor, results = task_scheduler.cancel_tasks(3, query) |
| self.assertIsNone(cursor) |
| self.assertEqual(len(results), 1) |
| self.execute_tasks() |
| self.assertEqual( |
| 1000.0, |
| ts_mon_metrics._task_state_change_schedule_latencies.get( |
| fields=_update_fields_schedule( |
| status=State.to_string(State.CANCELED))).sum) |
| |
| def test_cancel_tasks_conditions(self): |
| # Create PENDING tasks |
| task_slices = [ |
| task_request.TaskSlice( |
| expiration_secs=60, |
| properties=_gen_properties( |
| dimensions={ |
| u'pool': [u'default'], |
| u'os': [u'v1|v2'], |
| u'gpu': [u'sega', u'amd|nv'], |
| }), |
| wait_for_capacity=True), |
| ] |
| pending_result_1 = self._quick_schedule(task_slices=task_slices, |
| manual_tags=['tag:1', 'tag:2']) |
| |
| pending_result_2 = self._quick_schedule(task_slices=task_slices, |
| manual_tags=['tag:2', 'tag:1']) |
| self._quick_schedule(task_slices=task_slices, |
| manual_tags=['tag:3', 'tag:2']) |
| self._quick_schedule(task_slices=task_slices, manual_tags=['tag:4']) |
| pending_results = [pending_result_1, pending_result_2] |
| |
| query <
|