blob: e142812f8f87b383cbf0fc9f018dfbe01d295ab7 [file] [log] [blame]
#!/usr/bin/env vpython
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import base64
import datetime
import json
import logging
import os
import random
import sys
import unittest
import uuid
from parameterized import parameterized
import mock
# Setups environment.
APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, APP_DIR)
import test_env_handlers
from google.appengine.api import datastore_errors
from google.appengine.ext import ndb
import webtest
import handlers_backend
import ts_mon_metrics
from components import auth
from components import auth_testing
from components import datastore_utils
from components import net
from components import pubsub
from components import utils
from components.auth.proto import delegation_pb2
from server import bot_management
from server import config
from server import external_scheduler
from server import pools_config
from server import rbe
from server import task_pack
from server import task_queues
from server import task_request
from server import task_result
from server import task_scheduler
from server import task_to_run
from server.task_result import State
from proto.config import pools_pb2
from proto.plugin import plugin_pb2
from bb.go.chromium.org.luci.buildbucket.proto import common_pb2
from bb.go.chromium.org.luci.buildbucket.proto import task_pb2
# pylint: disable=W0212,W0612
def _gen_properties(**kwargs):
"""Creates a TaskProperties."""
args = {
'command': [u'command1'],
'dimensions': {
u'os': [u'Windows-3.1.1'],
u'pool': [u'default']
},
'env': {},
'execution_timeout_secs': 24 * 60 * 60,
'io_timeout_secs': None,
}
args.update(kwargs)
args['dimensions_data'] = args.pop('dimensions')
return task_request.TaskProperties(**args)
def _gen_request_slices(properties=None, **kwargs):
"""Returns an initialized task_request.TaskRequest."""
now = utils.utcnow()
args = {
# Don't be confused, this is not part of the API. This code is
# constructing a DB entity, not a swarming_rpcs.NewTaskRequest.
u'created_ts':
now,
u'manual_tags': [u'tag:1'],
u'name':
u'yay',
u'priority':
50,
u'task_slices': [
task_request.TaskSlice(expiration_secs=60,
properties=properties or _gen_properties(),
wait_for_capacity=False),
],
u'user':
u'Jesus',
u'bot_ping_tolerance_secs':
120,
}
for k in kwargs:
if k == "build_task":
continue # skipping since that arg isn't one for TaskRequest.
args[k] = kwargs[k]
ret = task_request.TaskRequest(**args)
task_request.init_new_request(ret, True, task_request.TEMPLATE_AUTO)
return ret
def _gen_request_id(rid=None):
if rid is None:
return str(uuid.uuid4())
return rid
def _gen_build_task(**kwargs):
return kwargs.get("build_task", None)
def _get_results(request_key):
"""Fetches all task results for a specified TaskRequest ndb.Key.
Returns:
tuple(TaskResultSummary, list of TaskRunResult that exist).
"""
result_summary_key = task_pack.request_key_to_result_summary_key(request_key)
result_summary = result_summary_key.get()
# There's two way to look at it, either use a DB query or fetch all the
# entities that could exist, at most 255. In general, there will be <3
# entities so just fetching them by key would be faster. This function is
# exclusively used in unit tests so it's not performance critical.
q = task_result.TaskRunResult.query(ancestor=result_summary_key)
q = q.order(task_result.TaskRunResult.key)
return result_summary, q.fetch()
def _update_fields_schedule(**kwargs):
fields = {
u'pool': 'default',
u'spec_name': '',
u'status': State.to_string(State.COMPLETED),
u'device_type': '',
}
fields.update(kwargs)
return fields
def _update_fields_pubsub(**kwargs):
fields = {
u'pool': 'default',
u'status': State.to_string(State.COMPLETED),
}
fields.update(kwargs)
return fields
def _run_result_to_to_run_key(run_result):
"""Returns a TaskToRunShard ndb.Key that was used to trigger
the TaskRunResult.
"""
return task_to_run.request_to_task_to_run_key(run_result.request_key.get(),
run_result.current_task_slice)
def _bot_update_task(run_result_key, **kwargs):
args = {
'bot_id': 'localhost',
'cas_output_root': None,
'cipd_pins': None,
'output': 'hi',
'output_chunk_start': 0,
'exit_code': None,
'duration': None,
'hard_timeout': False,
'io_timeout': False,
'cost_usd': 0.1,
'performance_stats': None,
'canceled': None,
}
args.update(kwargs)
return task_scheduler.bot_update_task(run_result_key, **args)
def _deadline():
return utils.utcnow() + datetime.timedelta(seconds=60)
def _decode_tq_task_body(body):
return json.loads(base64.b64decode(body))
class TaskSchedulerApiTest(test_env_handlers.AppTestBase):
def setUp(self):
super(TaskSchedulerApiTest, self).setUp()
self.now = datetime.datetime(2014, 1, 2, 3, 4, 5, 6)
self.mock_now(self.now)
auth_testing.mock_get_current_identity(self)
# Setup the backend to handle task queues.
self.app = webtest.TestApp(
handlers_backend.create_application(True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
self._enqueue_orig = self.mock(utils, 'enqueue_task', self._enqueue)
self._enqueue_async_orig = self.mock(utils, 'enqueue_task_async',
self._enqueue_async)
self.mock(task_scheduler, '_route_to_go', lambda **_kwargs: False)
# See mock_pub_sub()
self._pub_sub_mocked = False
self.publish_successful = True
self._random = 0x88
self.mock(random, 'getrandbits', self._getrandbits)
self.bot_dimensions = {
u'foo': [u'bar'],
u'id': [u'localhost'],
u'os': [u'Windows', u'Windows-3.1.1'],
u'pool': [u'default'],
}
self._known_pools = None
self._last_registered_bot_dims = self.bot_dimensions.copy()
def _enqueue(self, *args, **kwargs):
# Only then add use_dedicated_module as default False.
kwargs = kwargs.copy()
kwargs.setdefault('use_dedicated_module', False)
return self._enqueue_orig(*args, **kwargs)
def _enqueue_async(self, *args, **kwargs):
# Only then add use_dedicated_module as default False.
kwargs = kwargs.copy()
kwargs.setdefault('use_dedicated_module', False)
return self._enqueue_async_orig(*args, **kwargs)
def _getrandbits(self, bits):
self.assertEqual(16, bits)
self._random += 1
return self._random
def mock_pub_sub(self):
self.assertFalse(self._pub_sub_mocked)
self._pub_sub_mocked = True
calls = []
def pubsub_publish(**kwargs):
if not self.publish_successful:
e = net.Error('Fail', 404, json.dumps({'error': 'some error'}))
raise pubsub.TransientError(e)
calls.append(('directly', kwargs))
self.mock(pubsub, 'publish', pubsub_publish)
return calls
def mock_enqueue_rbe_task(self):
calls = []
def mocked_enqueue_rbe_task(request, to_run):
calls.append((request, to_run))
self.mock(rbe, 'enqueue_rbe_task', mocked_enqueue_rbe_task)
return calls
def mock_enqueue_rbe_cancel(self):
calls = []
def mocked_enqueue_rbe_cancel(request, to_run):
calls.append((request, to_run))
self.mock(rbe, 'enqueue_rbe_cancel', mocked_enqueue_rbe_cancel)
return calls
def _gen_result_summary_pending(self, **kwargs):
"""Returns the dict for a TaskResultSummary for a pending task."""
expected = {
'abandoned_ts':
None,
'bot_dimensions':
None,
'bot_id':
None,
'bot_idle_since_ts':
None,
'bot_version':
None,
'bot_logs_cloud_project':
None,
'cipd_pins':
None,
'completed_ts':
None,
'costs_usd': [],
'cost_saved_usd':
None,
'created_ts':
self.now,
'current_task_slice':
0,
'deduped_from':
None,
'duration':
None,
'exit_code':
None,
'expiration_delay':
None,
'failure':
False,
'internal_failure':
False,
'missing_cas': [],
'missing_cipd': [],
'modified_ts':
self.now,
'name':
u'yay',
'priority':
50,
'request_authenticated':
auth_testing.DEFAULT_MOCKED_IDENTITY,
'request_bot_id':
None,
'request_pool':
u'default',
'request_realm':
None,
'cas_output_root':
None,
'resultdb_info':
None,
'server_versions': [u'v1a'],
'started_ts':
None,
'state':
State.PENDING,
'tags': [
u'authenticated:user:mocked@example.com',
u'os:Windows-3.1.1',
u'pool:default',
u'priority:50',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:no_config',
u'tag:1',
u'user:Jesus',
],
'try_number':
None,
'user':
u'Jesus',
}
expected.update(kwargs)
return expected
def _gen_result_summary_reaped(self, **kwargs):
"""Returns the dict for a TaskResultSummary for a pending task."""
kwargs.setdefault(u'bot_dimensions', self.bot_dimensions.copy())
kwargs.setdefault(u'bot_id', u'localhost')
kwargs.setdefault(u'bot_idle_since_ts', self.now)
kwargs.setdefault(u'bot_version', u'abc')
kwargs.setdefault(u'state', State.RUNNING)
kwargs.setdefault(u'try_number', 1)
return self._gen_result_summary_pending(**kwargs)
def _gen_run_result(self, **kwargs):
expected = {
'abandoned_ts': None,
'bot_dimensions': self.bot_dimensions,
'bot_id': u'localhost',
'bot_idle_since_ts': self.now,
'bot_version': u'abc',
'bot_logs_cloud_project': None,
'cas_output_root': None,
'cipd_pins': None,
'completed_ts': None,
'cost_usd': 0.,
'current_task_slice': 0,
'dead_after_ts': None,
'duration': None,
'exit_code': None,
'failure': False,
'internal_failure': False,
'killing': None,
'missing_cas': [],
'missing_cipd': [],
'modified_ts': self.now,
'resultdb_info': None,
'server_versions': [u'v1a'],
'started_ts': self.now,
'state': State.RUNNING,
'try_number': 1,
}
expected.update(**kwargs)
return expected
def _assert_buildbucket_update_status(self, pubsub_msg, expected_status):
# Checks that the BuildTaskUpdate message sent in pubsub_msg has the
# expected status provided.
result = task_pb2.BuildTaskUpdate()
result.ParseFromString(pubsub_msg)
self.assertEqual(result.task.status, expected_status)
def _quick_schedule(self, request_id=None, secret_bytes=None, **kwargs):
"""Schedules a task.
Arguments:
kwargs: passed to _gen_request_slices().
"""
self.execute_tasks()
request = _gen_request_slices(**kwargs)
request_id = _gen_request_id(request_id)
build_task = _gen_build_task(**kwargs)
result_summary = task_scheduler.schedule_request(request,
request_id,
secret_bytes=secret_bytes,
build_task=build_task)
# State will be either PENDING or COMPLETED (for deduped task)
self.execute_tasks()
self.assertEqual(0, self.execute_tasks())
return result_summary
def _register_bot(self, bot_dimensions):
"""Registers the bot so the task queues knows there's a worker than can run
the task.
Arguments:
bot_dimensions: bot dimensions to assert.
"""
self.assertEqual(0, self.execute_tasks())
bot_id = bot_dimensions[u'id'][0]
bot_management.bot_event(event_type='request_sleep',
bot_id=bot_id,
external_ip='1.2.3.4',
authenticated_as='joe@localhost',
dimensions=bot_dimensions,
state={'state': 'real'},
version='1234',
register_dimensions=True)
task_queues.assert_bot(bot_dimensions)
self.execute_tasks()
self._last_registered_bot_dims = bot_dimensions.copy()
def _bot_reap_task(self, bot_dimensions=None, version=None):
bot_dimensions = bot_dimensions or self._last_registered_bot_dims
bot_id = bot_dimensions['id'][0]
queues = task_queues.freshen_up_queues(bot_id)
bot_details = task_scheduler.BotDetails(version or 'abc', None)
return task_scheduler.bot_reap_task(bot_dimensions, queues, bot_details,
_deadline())
def _quick_reap(self, **kwargs):
"""Makes sure the bot is registered and have it reap a task."""
self._register_bot(self.bot_dimensions)
self._quick_schedule(**kwargs)
reaped_request, _, run_result = self._bot_reap_task()
queued_tasks = 0
# Reaping causes a pubsub task if pubsub is specified.
if 'pubsub_topic' in kwargs:
self.assertEqual(1, len(self._taskqueue_stub.GetTasks('pubsub')))
queued_tasks += 1
if kwargs.get('has_build_task', False):
self.assertEqual(1,
len(self._taskqueue_stub.GetTasks('buildbucket-notify')))
queued_tasks += 1
self.assertEqual(queued_tasks, self.execute_tasks())
return run_result
def _cancel_running_task(self, run_result):
"""Cancels running task"""
canceled, was_running = task_scheduler.cancel_task(run_result.request,
run_result.key, True,
run_result.bot_id)
self.assertTrue(canceled)
self.assertTrue(was_running)
self.execute_tasks()
run_result = run_result.key.get()
self.assertTrue(run_result.killing)
def test_all_apis_are_tested(self):
# Ensures there's a test for each public API.
# TODO(maruel): Remove this once coverage is asserted.
module = task_scheduler
expected = set(
i for i in dir(module)
if i[0] != '_' and hasattr(getattr(module, i), 'func_name'))
missing = expected - set(i[5:] for i in dir(self) if i.startswith('test_'))
self.assertFalse(missing)
def test_bot_reap_task(self):
# Essentially check _quick_reap() works.
run_result = self._quick_reap()
self.assertEqual('localhost', run_result.bot_id)
self.assertEqual(1, run_result.try_number)
to_run_key = task_to_run.request_to_task_to_run_key(
run_result.request_key.get(), 0)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
def test_bot_reap_build_task(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual('localhost', run_result.bot_id)
self.assertEqual(1, run_result.try_number)
to_run_key = task_to_run.request_to_task_to_run_key(
run_result.request_key.get(), 0)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
self.assertEqual(1, len(pub_sub_calls))
self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic")
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.STARTED)
@parameterized.expand([
({
u'pool': [u'default'],
u'os': [u'Windows-3.1.1|Windows-3.2.1'],
},),
({
u'pool': [u'default'],
u'os': [u'Windows-3.1.1|Windows-3.2.1'],
u'foo': [u'bar|A|B|C'],
},),
])
def test_bot_reap_task_or_dimensions(self, or_dimensions):
run_result = self._quick_reap(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(dimensions=or_dimensions),
wait_for_capacity=False,
)
])
self.assertEqual('localhost', run_result.bot_id)
self.assertEqual(1, run_result.try_number)
to_run_key = task_to_run.request_to_task_to_run_key(
run_result.request_key.get(), 0)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
def test_bot_reap_tasks_using_or_dimensions(self):
# The fist part of this case checks for bot -> task
# Register a bot first
bot1_dimensions = self.bot_dimensions.copy()
bot1_dimensions[u'id'] = [u'bot1']
bot1_dimensions[u'os'] = [u'v1', u'v2']
bot1_dimensions[u'gpu'] = [u'nv', u'sega']
self._register_bot(bot1_dimensions)
# Then send a request
task_slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
dimensions={
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'sega', u'amd|nv'],
}),
wait_for_capacity=False),
]
self._quick_schedule(task_slices=task_slices)
_, _, run_result = self._bot_reap_task()
self.assertEqual(u'bot1', run_result.bot_id)
to_run_key = _run_result_to_to_run_key(run_result)
self.assertIsNone(to_run_key.get().queue_number)
self.assertEqual(
State.COMPLETED,
_bot_update_task(
run_result.key, bot_id=u'bot1', exit_code=0, duration=0.1))
self.assertEqual(1, self.execute_tasks())
# The second part checks for task -> bot
# Send an identical request
self._quick_schedule(task_slices=task_slices)
# Then register another bot
bot2_dimensions = self.bot_dimensions.copy()
bot2_dimensions[u'id'] = [u'bot2']
bot2_dimensions[u'os'] = [u'v2']
bot2_dimensions[u'gpu'] = [u'amd', u'sega']
self._register_bot(bot2_dimensions)
_, _, run_result = self._bot_reap_task()
self.assertEqual(u'bot2', run_result.bot_id)
to_run_key = _run_result_to_to_run_key(run_result)
self.assertIsNone(to_run_key.get().queue_number)
self.assertEqual(
State.COMPLETED,
_bot_update_task(
run_result.key, bot_id=u'bot2', exit_code=0, duration=0.1))
self.assertEqual(1, self.execute_tasks())
@parameterized.expand([(0,), (1,)])
def test_either_bot_reap_tasks_using_or_dimensions(self, bi):
bot1_dimensions = self.bot_dimensions.copy()
bot1_dimensions[u'id'] = [u'bot1']
bot1_dimensions[u'os'] = [u'v1', u'v2']
bot1_dimensions[u'gpu'] = [u'nv', u'sega']
self._register_bot(bot1_dimensions)
bot2_dimensions = self.bot_dimensions.copy()
bot2_dimensions[u'id'] = [u'bot2']
bot2_dimensions[u'os'] = [u'v2']
bot2_dimensions[u'gpu'] = [u'amd', u'sega']
self._register_bot(bot2_dimensions)
task_slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
dimensions={
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'sega', u'amd|nv'],
}),
wait_for_capacity=False),
]
self._quick_schedule(task_slices=task_slices)
if bi == 0:
test_bot_id = u'bot1'
test_bot_dimensions = bot1_dimensions
else:
test_bot_id = u'bot2'
test_bot_dimensions = bot2_dimensions
_, _, run_result = self._bot_reap_task(test_bot_dimensions)
self.assertEqual(test_bot_id, run_result.bot_id)
to_run_key = _run_result_to_to_run_key(run_result)
self.assertIsNone(to_run_key.get().queue_number)
self.assertEqual(
State.COMPLETED,
_bot_update_task(
run_result.key, bot_id=test_bot_id, exit_code=0, duration=0.1))
self.assertEqual(1, self.execute_tasks())
def test_bot_claim_slice(self):
self.mock_enqueue_rbe_task()
result_summary = self._quick_schedule(
secret_bytes=task_request.SecretBytes(secret_bytes='blob'),
properties=_gen_properties(has_secret_bytes=True),
rbe_instance='some-instance',
)
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self._register_bot(self.bot_dimensions)
def claim(claim_id):
return task_scheduler.bot_claim_slice(
self.bot_dimensions,
task_scheduler.BotDetails(None, None),
to_run_key,
claim_id,
)
def claim_txn(claim_id):
request = to_run.request_key.get()
run_result, secret_bytes = task_scheduler._reap_task(
self.bot_dimensions, task_scheduler.BotDetails(None, None),
to_run_key, request, claim_id, 3, False)
return request, secret_bytes, run_result
# The first call actually makes the change.
request, secret_bytes, run_result = claim('some-claim-id')
self.assertEqual(request.key, result_summary.request_key)
self.assertEqual(secret_bytes.secret_bytes, 'blob')
self.assertEqual(run_result.bot_dimensions, self.bot_dimensions)
# TaskToRun is updated.
to_run = to_run_key.get()
self.assertFalse(to_run.is_reapable)
self.assertEqual(to_run.claim_id, 'some-claim-id')
# The second call with the same claim ID is noop.
request_dup, secret_bytes_dup, run_result_dup = claim('some-claim-id')
self.assertEqual(request_dup, request)
self.assertEqual(secret_bytes_dup, secret_bytes)
self.assertEqual(run_result_dup, run_result)
# If _reap_task is called due to a race, it notices the claim as well.
request_dup, secret_bytes_dup, run_result_dup = claim_txn('some-claim-id')
self.assertEqual(request_dup, request)
self.assertEqual(secret_bytes_dup, secret_bytes)
self.assertEqual(run_result_dup, run_result)
# A call with a different claim ID results in an error.
with self.assertRaises(task_scheduler.ClaimError):
claim('another-claim-id')
with self.assertRaises(task_scheduler.ClaimError):
claim_txn('another-claim-id')
def test_bot_claim_slice_dimensions_mismatch(self):
self.mock_enqueue_rbe_task()
result_summary = self._quick_schedule(rbe_instance='some-instance')
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
bot_dimensions = self.bot_dimensions.copy()
bot_dimensions['pool'] = ['another']
self._register_bot(bot_dimensions)
with self.assertRaises(task_scheduler.ClaimError):
task_scheduler.bot_claim_slice(
bot_dimensions,
task_scheduler.BotDetails(None, None),
to_run_key,
'some-claim-id',
)
def test_schedule_request(self):
# It is tested indirectly in the other functions.
# Essentially check _quick_schedule() and _register_bot() works.
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule()
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertTrue(to_run_key.get().queue_number)
self.assertEqual(State.PENDING, result_summary.state)
status = State.to_string(State.PENDING)
self.assertIsNone(
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status, http_status_code=200)))
def test_schedule_request_task_id_collision(self):
rbe_enqueues = self.mock_enqueue_rbe_task()
self._register_bot(self.bot_dimensions)
key1 = task_request.new_request_key()
key2 = task_request.new_request_key()
self.assertNotEqual(key1, key2)
keys = []
self.mock(task_request, 'new_request_key', lambda: keys.pop(0))
# Creates the request without any collisions.
keys[:] = [key1]
result_summary_1 = self._quick_schedule(rbe_instance='some-instance')
self.assertEqual(key1, result_summary_1.request_key)
self.assertFalse(keys)
del rbe_enqueues[:]
# Collides on the existing key, but switches to the next one.
keys[:] = [key1, key2]
result_summary_2 = self._quick_schedule(rbe_instance='some-instance')
self.assertEqual(key2, result_summary_2.request_key)
self.assertFalse(keys)
# Submitted the correct RBE reservation (using key2 as the base task ID).
self.assertEqual(1, len(rbe_enqueues))
_, ttr = rbe_enqueues[0]
self.assertEqual(ttr.rbe_reservation, rbe.gen_rbe_reservation_id(key2, 0))
def test_schedule_request_no_capacity(self):
# No capacity, denied. That's the default.
pub_sub_calls = self.mock_pub_sub()
request = _gen_request_slices(pubsub_topic='projects/abc/topics/def',
created_ts=(self.now -
datetime.timedelta(seconds=1)))
result_summary = task_scheduler.schedule_request(request)
self.assertEqual(State.NO_RESOURCE, result_summary.state)
self.execute_tasks()
expected = [
(
'directly',
{
'attributes': None,
'message': '{"task_id":"1d69b9f088008910"}',
'topic': u'projects/abc/topics/def',
},
),
]
self.assertEqual(expected, pub_sub_calls)
status = State.to_string(State.NO_RESOURCE)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_schedule_request_no_capacity_backend_task(self):
# No capacity, denied. That's the default.
pub_sub_calls = self.mock_pub_sub()
self.mock(utils, "time_time_ns", lambda: 12345678)
request = _gen_request_slices(pubsub_topic='projects/abc/topics/def',
has_build_task=True,
created_ts=(self.now -
datetime.timedelta(seconds=1)))
result_summary = task_scheduler.schedule_request(
request,
request_id="1234",
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual(State.NO_RESOURCE, result_summary.state)
self.execute_tasks()
self.assertEqual(2, len(pub_sub_calls))
self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic")
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.INFRA_FAILURE)
self.assertEqual(pub_sub_calls[1][1]["topic"], "projects/abc/topics/def")
status = State.to_string(State.NO_RESOURCE)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_schedule_request_no_check_capacity(self):
# No capacity, but check disabled, allowed.
request = _gen_request_slices(task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(),
wait_for_capacity=True),
])
result_summary = task_scheduler.schedule_request(request)
self.assertEqual(State.PENDING, result_summary.state)
self.execute_tasks()
@ndb.tasklet
def _mock_create_invocation_async(self, _task_run_id, _realm, _deadline):
raise ndb.Return('resultdb-update-token')
def test_schedule_request_resultdb(self):
self._register_bot(self.bot_dimensions)
with mock.patch(
'server.resultdb.create_invocation_async',
mock.Mock(side_effect=self._mock_create_invocation_async)) as mock_call:
request = _gen_request_slices(realm='infra:try')
result_summary = task_scheduler.schedule_request(request,
enable_resultdb=True)
mock_call.assert_called_once_with(
'1d69b9f088008911', 'infra:try',
datetime.datetime(2014, 1, 3, 3, 5, 35, 6))
self.assertEqual(
result_summary.resultdb_info,
task_result.ResultDBInfo(
hostname='test-resultdb-server.com',
invocation=(
'invocations/task-test-swarming.appspot.com-1d69b9f088008911')))
self.assertEqual(result_summary.request_key.get().resultdb_update_token,
'resultdb-update-token')
self.execute_tasks()
def test_schedule_request_scheduling_algorithm(self):
self.mock_now(self.now, 60)
request = _gen_request_slices(
task_slices=[
task_request.TaskSlice(expiration_secs=60,
properties=_gen_properties(),
wait_for_capacity=True),
],
scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_UNKNOWN)
result_summary = task_scheduler.schedule_request(request)
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6630c8ee0ca)
request = _gen_request_slices(
task_slices=[
task_request.TaskSlice(expiration_secs=60,
properties=_gen_properties(),
wait_for_capacity=True),
],
scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO)
result_summary = task_scheduler.schedule_request(request)
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6630c8ee0ca)
request = _gen_request_slices(
task_slices=[
task_request.TaskSlice(expiration_secs=60,
properties=_gen_properties(),
wait_for_capacity=True),
],
scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO)
result_summary = task_scheduler.schedule_request(request)
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertEqual(to_run_key.get().queue_number, 0x1a3aa6631f3d2236)
self.execute_tasks()
def test_schedule_request_rbe_mode(self):
enqueued = self.mock_enqueue_rbe_task()
request = _gen_request_slices(rbe_instance='some-instance')
result_summary = task_scheduler.schedule_request(request)
self.assertEqual(State.PENDING, result_summary.state)
self.assertEqual(len(enqueued), 1)
self.assertEqual(request.key, enqueued[0][0].key)
self.assertEqual('sample-app-1d69b9f088008910-0',
enqueued[0][1].key.get().rbe_reservation)
def test_bot_reap_task_expired(self):
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule()
# Forwards clock to get past expiration.
request = result_summary.request_key.get()
self.mock_now(request.expiration_ts, 1)
actual_request, _, run_result = self._bot_reap_task()
# The task is not returned because it's expired.
self.assertIsNone(actual_request)
self.assertIsNone(run_result)
# It's effectively expired.
to_run_key = task_to_run.request_to_task_to_run_key(request, 0)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
self.assertEqual(State.EXPIRED, result_summary.key.get().state)
latency = ((request.expiration_ts - self.now).total_seconds() + 1) * 1000.0
self.assertEqual(
latency,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.EXPIRED))).sum)
def test_bot_reap_task_6_expired_fifo(self):
# A lot of tasks are expired, eventually stop expiring them.
self._register_bot(self.bot_dimensions)
result_summaries = []
for i in range(6):
self.mock_now(self.now, i)
result_summaries.append(
self._quick_schedule(
scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_FIFO))
# Forwards clock to get past expiration.
self.mock_now(result_summaries[-1].request_key.get().expiration_ts, 1)
# Fail to reap a task.
actual_request, _, run_result = self._bot_reap_task()
self.assertIsNone(actual_request)
self.assertIsNone(run_result)
# They all got expired ...
for result_summary in result_summaries[:-1]:
result_summary = result_summary.key.get()
self.assertEqual(State.EXPIRED, result_summary.state)
# ... except for the very last one because of the limit of 5 task expired
# per poll.
result_summary = result_summaries[-1]
result_summary = result_summary.key.get()
self.assertEqual(State.PENDING, result_summary.state)
def test_bot_reap_task_6_expired_lifo(self):
# A lot of tasks are expired, eventually stop expiring them.
self._register_bot(self.bot_dimensions)
result_summaries = []
for i in range(6):
self.mock_now(self.now, i)
result_summaries.append(
self._quick_schedule(
scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO))
# Forwards clock to get past expiration.
self.mock_now(result_summaries[-1].request_key.get().expiration_ts, 1)
# Fail to reap a task.
actual_request, _, run_result = self._bot_reap_task()
self.assertIsNone(actual_request)
self.assertIsNone(run_result)
# They all got expired ...
for result_summary in result_summaries[1:]:
result_summary = result_summary.key.get()
self.assertEqual(State.EXPIRED, result_summary.state)
# ... except for the most recent one because of the limit of 5 task expired
# per poll.
result_summary = result_summaries[0]
result_summary = result_summary.key.get()
self.assertEqual(State.PENDING, result_summary.state)
@ndb.tasklet
def nop_async(self, *_args, **_kwargs):
pass
def test_resultdb_task_expired(self):
self._register_bot(self.bot_dimensions)
with mock.patch(
'server.resultdb.create_invocation_async',
mock.Mock(side_effect=self._mock_create_invocation_async)) as mock_call:
request = _gen_request_slices(realm='infra:try')
result_summary = task_scheduler.schedule_request(request,
enable_resultdb=True)
mock_call.assert_called_once_with('1d69b9f088008911', 'infra:try',
mock.ANY)
with mock.patch('server.resultdb.finalize_invocation_async',
mock.Mock(side_effect=self.nop_async)) as mock_call:
to_run_key = task_to_run.request_to_task_to_run_key(request, 0)
self.mock_now(self.now, 60)
task_scheduler._expire_slice(request, to_run_key, State.EXPIRED, False, 1,
True, 'some-reason')
mock_call.assert_called_once_with('1d69b9f088008911',
u'resultdb-update-token')
self.execute_tasks()
def _setup_es(self, allow_es_fallback):
"""Set up mock es_config."""
es_address = 'externalscheduler_address'
es_id = 'es_id'
external_schedulers = [
pools_config.ExternalSchedulerConfig(
address=es_address,
id=es_id,
dimensions=set([u'foo:bar', u'label-pool:CTS']),
all_dimensions=None,
any_dimensions=None,
enabled=False,
allow_es_fallback=allow_es_fallback),
pools_config.ExternalSchedulerConfig(
address=es_address,
id=es_id,
dimensions=set([u'foo:bar']),
all_dimensions=None,
any_dimensions=None,
enabled=True,
allow_es_fallback=allow_es_fallback),
]
self.mock_pool_config('default', external_schedulers=external_schedulers)
def _mock_reap_calls(self):
"""Mock out external scheduler and native scheduler reap calls.
Returns: (list of es calls, list of native reap calls)
"""
er_calls = []
def ext_reap(*args):
er_calls.append(args)
return None, None, None
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(),
# The tests that use this mock rely on tasks that wait
# for capacity.
wait_for_capacity=True),
])
to_run_key = task_to_run.request_to_task_to_run_key(result_summary.request,
0)
r_calls = []
def reap(*args):
r_calls.append(args)
return [to_run_key.get()]
self.mock(task_scheduler, '_bot_reap_task_external_scheduler', ext_reap)
self.mock(task_to_run, 'yield_next_available_task_to_dispatch', reap)
return er_calls, r_calls
def _mock_es_assign(self, task_id, slice_number):
"""Mock out the return behavior from external_scheduler.assign_task"""
# pylint: disable=unused-argument
def mock_assign(*args):
return task_id, slice_number
self.mock(external_scheduler, "assign_task", mock_assign)
def _mock_es_notify(self):
"""Mock out external_scheduler.notify_requests
Returns a list that will receive any calls that were made to notify.
"""
calls = []
# pylint: disable=unused-argument
def mock_notify(es_cfg, requests, use_tq, is_callback):
assert isinstance(es_cfg, pools_config.ExternalSchedulerConfig)
for request, result in requests:
assert isinstance(request, task_request.TaskRequest)
assert isinstance(result, task_result._TaskResultCommon)
calls.append([es_cfg, requests, use_tq, is_callback])
self.mock(external_scheduler, "notify_requests", mock_notify)
return calls
def test_bot_reap_task_es_with_fallback(self):
self._setup_es(True)
notify_calls = self._mock_es_notify()
er_calls, r_calls = self._mock_reap_calls()
# Ignore es notifications that were side-effects of the setup code, they
# are incidental to this test.
del notify_calls[:]
self._bot_reap_task()
self.assertEqual(len(er_calls), 1, 'external scheduler was not called')
self.assertEqual(len(r_calls), 1, 'native scheduler was not called')
def test_bot_reap_task_es_no_task(self):
self._setup_es(False)
self._mock_es_assign(None, 0)
self._bot_reap_task()
def test_bot_reap_task_es_with_nonpending_task(self):
self._setup_es(False)
notify_calls = self._mock_es_notify()
result_summary = self._quick_schedule()
self._mock_es_assign(result_summary.task_id, 0)
# Ignore es notifications that were side-effects of the setup code, they
# are incidental to this test.
del notify_calls[:]
# It should notify to external scheduler. But an exception won't be raised
# because the task is already running or has finished including failures.
self._bot_reap_task()
self.assertEqual(len(notify_calls), 1)
def test_bot_reap_task_es_with_pending_task(self):
self._setup_es(False)
self._mock_es_notify()
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(),
wait_for_capacity=True)
])
self._mock_es_assign(result_summary.task_id, 0)
# Able to successfully reap given PENDING task from external scheduler.
request, _, _ = self._bot_reap_task()
self.assertEqual(request.task_id, result_summary.task_id)
def test_bot_reap_task_for_nonexternal_pool(self):
self._setup_es(False)
notify_calls = self._mock_es_notify()
dimensions = {u'os': [u'Windows-3.1.1'], u'pool': [u'label-pool:CTS']}
slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(dimensions=dimensions),
wait_for_capacity=False),
]
result_summary = self._quick_schedule(task_slices=slices)
self._mock_es_assign(result_summary.task_id, 0)
del notify_calls[:]
bot_dimensions = {
u'foo': [u'bar'],
u'id': [u'localhost'],
u'label-pool': [u'CTS'],
}
# CTS pool has disabled external scheduler, so notify_calls should have
# nothing recorded.
self._bot_reap_task(bot_dimensions)
self.assertEqual(len(notify_calls), 0)
def test_schedule_request_slice_fallback_to_second_immediate(self):
# First TaskSlice couldn't run so it was immediately skipped, the second ran
# instead.
self._register_bot(self.bot_dimensions)
self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'nonexistent': [u'really'],
u'pool': [u'default'],
}),
wait_for_capacity=False),
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(),
wait_for_capacity=False),
])
request, _, run_result = self._bot_reap_task()
self.assertEqual(1, run_result.current_task_slice)
def test_schedule_request_slice_use_first_after_expiration(self):
# First TaskSlice runs so it is reaped before expiring cron job.
self._register_bot(self.bot_dimensions)
self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(io_timeout_secs=61)),
task_request.TaskSlice(
expiration_secs=180, properties=_gen_properties()),
])
self.mock_now(self.now, 181)
# The first slice is returned from bot_reap_task if it is called earlier
# than cron_abort_expired_task_to_run.
request, _, run_result = self._bot_reap_task()
self.assertEqual(0, run_result.current_task_slice)
def test_schedule_request_slice_fallback_to_different_property(self):
dims1 = self.bot_dimensions
self._register_bot(dims1)
dims2 = self.bot_dimensions.copy()
dims2[u'id'] = [u'second']
dims2[u'os'] = [u'Atari']
self._register_bot(dims2)
# The first TaskSlice couldn't run so it was eventually expired and the
# second couldn't be run by the bot that was polling.
self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(io_timeout_secs=61)),
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'pool': [u'default'],
u'os': [u'Atari']
})),
])
# The second bot can't reap the task.
_, _, run_result = self._bot_reap_task(dims2, 'second')
self.assertIsNone(run_result)
self.mock_now(self.now, 181)
# Expire first task here.
task_scheduler.cron_abort_expired_task_to_run()
tasks = self._taskqueue_stub.GetTasks('task-expire')
self.assertEqual(1, len(tasks))
self.assertEqual(1, self.execute_tasks())
# The first is explicitly expired, and the second TaskSlice cannot be
# reaped by this bot.
_, _, run_result = self._bot_reap_task(dims1)
self.assertIsNone(run_result)
# The second bot is able to reap it immediately. This is because when the
# first bot tried to reap the task, it expired the first TaskToRunShard and
# created a new one, which the second bot *can* reap.
_, _, run_result = self._bot_reap_task(dims2, 'second')
self.assertEqual(1, run_result.current_task_slice)
def test_schedule_request_slice_no_capacity(self):
created_ts = self.now - datetime.timedelta(seconds=1)
self.mock_pub_sub()
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'nonexistent': [u'really'],
u'pool': [u'default'],
}),
wait_for_capacity=False),
task_request.TaskSlice(expiration_secs=180,
properties=_gen_properties(),
wait_for_capacity=False),
],
created_ts=created_ts,
pubsub_topic='projects/abc/topics/def')
# The task is immediately denied, without waiting.
self.assertEqual(State.NO_RESOURCE, result_summary.state)
self.assertEqual(created_ts, result_summary.abandoned_ts)
self.assertEqual(created_ts, result_summary.completed_ts)
self.assertIsNone(result_summary.try_number)
self.assertEqual(0, result_summary.current_task_slice)
self.assertEqual((self.now - created_ts).total_seconds() * 1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.NO_RESOURCE))).sum)
status = State.to_string(State.NO_RESOURCE)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_schedule_request_slice_wait_for_capacity(self):
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'nonexistent': [u'really'],
u'pool': [u'default'],
}),
wait_for_capacity=False),
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(),
wait_for_capacity=True),
])
# Pending on the second slice, even if there's no capacity.
self.assertEqual(State.PENDING, result_summary.state)
self.assertEqual(1, result_summary.current_task_slice)
def test_schedule_request_slice_no_capacity_fallback_second(self):
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'nonexistent': [u'really'],
u'pool': [u'default'],
}),
wait_for_capacity=False),
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(),
wait_for_capacity=False),
])
# The task fell back to the second slice, still pending.
self.assertEqual(State.PENDING, result_summary.state)
self.assertIsNone(result_summary.abandoned_ts)
self.assertIsNone(result_summary.completed_ts)
self.assertIsNone(result_summary.try_number)
self.assertEqual(1, result_summary.current_task_slice)
def test_exponential_backoff(self):
self.mock(task_scheduler.random,
'random', lambda: task_scheduler._PROBABILITY_OF_QUICK_COMEBACK)
self.mock(utils, 'is_dev', lambda: False)
data = [
(0, 2),
(1, 2),
(2, 3),
(3, 5),
(4, 8),
(5, 11),
(6, 17),
(7, 26),
(8, 38),
(9, 58),
(10, 60),
(11, 60),
]
for value, expected in data:
actual = int(round(task_scheduler.exponential_backoff(value)))
self.assertEqual(expected, actual, (value, expected, actual))
def test_exponential_backoff_quick(self):
self.mock(
task_scheduler.random,
'random', lambda: task_scheduler._PROBABILITY_OF_QUICK_COMEBACK - 0.01)
self.assertEqual(1.0, task_scheduler.exponential_backoff(235))
def test_task_handle_pubsub_task(self):
calls = []
def publish_mock(**kwargs):
calls.append(kwargs)
self.mock(task_scheduler.pubsub, 'publish', publish_mock)
task_scheduler.task_handle_pubsub_task({
'topic':
'projects/abc/topics/def',
'task_id':
'abcdef123',
'auth_token':
'token',
'userdata':
'userdata',
'state':
State.PENDING,
'tags': [
u'authenticated:user:mocked@example.com',
u'os:Windows-3.1.1',
u'pool:default',
u'priority:50',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:no_config',
u'tag:1',
u'user:Jesus',
],
'start_time':
0
})
self.assertEqual([{
'attributes': {
'auth_token': 'token'
},
'message': '{"task_id":"abcdef123","userdata":"userdata"}',
'topic': 'projects/abc/topics/def',
}], calls)
def _task_ran_successfully(self):
"""Runs an idempotent task successfully and returns the task_id."""
run_result = self._quick_reap(task_slices=[
task_request.TaskSlice(expiration_secs=60,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
])
self.assertEqual('localhost', run_result.bot_id)
to_run_key = _run_result_to_to_run_key(run_result)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
# It's important to complete the task with success.
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
# An idempotent task has properties_hash set after it succeeded.
self.assertTrue(run_result.result_summary_key.get().properties_hash)
self.assertEqual(1, self.execute_tasks())
return unicode(run_result.task_id)
def _task_deduped(self,
new_ts,
deduped_from,
task_id,
now=None,
created_ts=None):
"""Runs a task that was deduped."""
# TODO(maruel): Test with SecretBytes.
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(expiration_secs=60,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
],
created_ts=created_ts or utils.utcnow())
request = result_summary.request_key.get()
to_run_key = task_to_run.request_to_task_to_run_key(request, 0)
# TaskToRunShard was not stored.
self.assertIsNone(to_run_key.get())
# Bot can't reap.
reaped_request, _, _ = self._bot_reap_task()
self.assertIsNone(reaped_request)
result_summary_duped, run_results_duped = _get_results(request.key)
# A deduped task cannot be deduped again so properties_hash is None.
expected = self._gen_result_summary_reaped(
completed_ts=now or self.now,
cost_saved_usd=0.1,
created_ts=created_ts or new_ts,
deduped_from=deduped_from,
duration=0.1,
exit_code=0,
id=task_id,
# Only this value is updated to 'now', the rest uses the previous run
# timestamps.
modified_ts=new_ts,
started_ts=now or self.now,
state=State.COMPLETED,
try_number=0)
self.assertEqual(expected, result_summary_duped.to_dict())
self.assertEqual([], run_results_duped)
def test_request_idempotent_same_request_id(self):
request_id = "124890823507283423"
result_summary = self._quick_schedule(request_id=request_id)
tr_id = task_request.TaskRequestID.create_key(request_id).get()
assert (tr_id is not None)
result_summary_dedupe = self._quick_schedule(request_id=request_id)
self.assertEqual(result_summary, result_summary_dedupe)
def test_request_idempotent_diff_request_id(self):
result_summary = self._quick_schedule(request_id="124890823507283423")
tr_id = task_request.TaskRequestID.create_key("124890823507283423").get()
assert (tr_id is not None)
result_summary_2 = self._quick_schedule(request_id="124890823507283")
tr_id_2 = task_request.TaskRequestID.create_key("124890823507283").get()
assert (tr_id_2 is not None)
self.assertNotEqual(result_summary, result_summary_2)
def test_request_not_idempotent_no_request_id(self):
# since no request_id was provided, tasks created with same
# request paramaters are not idempotent.
result_summary = self._quick_schedule()
result_summary_2 = self._quick_schedule()
self.assertNotEqual(result_summary, result_summary_2)
def test_task_idempotent(self):
# First task is idempotent.
task_id = self._task_ran_successfully()
# Second task is deduped against first task.
new_ts = self.mock_now(self.now,
config.settings().reusable_task_age_secs - 1)
self._task_deduped(new_ts,
task_id,
'1d8dc670a0008a10',
created_ts=utils.utcnow() -
datetime.timedelta(seconds=1))
self.assertEqual(
1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.COMPLETED))).sum)
def test_task_idempotent_old(self):
# First task is idempotent.
self._task_ran_successfully()
# Second task is scheduled, first task is too old to be reused.
new_ts = self.mock_now(self.now, config.settings().reusable_task_age_secs)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
])
# The task was enqueued for execution.
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertTrue(to_run_key.get().queue_number)
def test_task_idempotent_three(self):
# First task is idempotent.
task_id = self._task_ran_successfully()
# Second task is deduped against first task.
new_ts = self.mock_now(self.now,
config.settings().reusable_task_age_secs - 1)
self._task_deduped(new_ts,
task_id,
'1d8dc670a0008a10',
created_ts=utils.utcnow() -
datetime.timedelta(seconds=1))
self.assertEqual(
1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.COMPLETED))).sum)
# Third task is scheduled, second task is not dedupable, first task is too
# old.
new_ts = self.mock_now(self.now, config.settings().reusable_task_age_secs)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
])
# The task was enqueued for execution.
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertTrue(to_run_key.get().queue_number)
def test_task_idempotent_variable(self):
# Test the edge case where config.settings().reusable_task_age_secs is being
# modified. This ensure TaskResultSummary.order(TRS.key) works.
cfg = config.settings()
cfg.reusable_task_age_secs = 10
self.mock(config, 'settings', lambda: cfg)
# First task is idempotent.
self._task_ran_successfully()
# Second task is scheduled, first task is too old to be reused.
second_ts = self.mock_now(self.now, 10)
task_id = self._task_ran_successfully()
# Now any of the 2 tasks could be reused. Assert the right one (the most
# recent) is reused.
cfg.reusable_task_age_secs = 100
# Third task is deduped against second task. That ensures ordering works
# correctly.
third_ts = self.mock_now(self.now, 20)
self._task_deduped(third_ts, task_id, '1d69ba3ea8008b10', now=second_ts)
def test_task_idempotent_second_slice(self):
# A task will dedupe against a second slice, and skip the first slice.
# First task is idempotent.
task_id = self._task_ran_successfully()
# Second task's second task slice is deduped against first task.
new_ts = self.mock_now(self.now,
config.settings().reusable_task_age_secs - 1)
result_summary = self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=180,
properties=_gen_properties(dimensions={
u'inexistant': [u'really'],
u'pool': [u'default'],
}),
wait_for_capacity=False),
task_request.TaskSlice(expiration_secs=180,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
],
created_ts=utils.utcnow() - datetime.timedelta(seconds=1))
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 0)
self.assertIsNone(to_run_key.get())
to_run_key = task_to_run.request_to_task_to_run_key(
result_summary.request_key.get(), 1)
self.assertIsNone(to_run_key.get())
self.assertEqual(State.COMPLETED, result_summary.state)
self.assertEqual(task_id, result_summary.deduped_from)
self.assertEqual(1, result_summary.current_task_slice)
self.assertEqual(0, result_summary.try_number)
self.assertEqual(
1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.COMPLETED))).sum)
def test_task_invalid_parent(self):
parent_id = self._task_ran_successfully()
self.assertTrue(parent_id.endswith('1'))
invalid_parent_id = parent_id[:-1] + '2'
# Try to create a children task with invalid parent_task_id.
# task should be scheduled without error
result_summary = self._quick_schedule(parent_task_id=invalid_parent_id)
def test_task_parent(self):
run_result = self._quick_reap(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
command=['python'],
cas_input_root=task_request.CASReference(
cas_instance='projects/test/instances/default_instance',
digest=task_request.Digest(hash='1' * 32,
size_bytes=1))),
wait_for_capacity=False),
])
self.assertEqual('localhost', run_result.bot_id)
to_run_key = _run_result_to_to_run_key(run_result)
self.assertIsNone(to_run_key.get().queue_number)
self.assertIsNone(to_run_key.get().expiration_ts)
# It's important to terminate the task with success.
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertEqual(1, self.execute_tasks())
parent_id = run_result.task_id
result_summary = self._quick_schedule(parent_task_id=parent_id)
self.assertEqual(parent_id, result_summary.request_key.get().parent_task_id)
def test_task_timeout(self):
# Create a task, but the bot tries to timeout but fails to report exit code
# and duration.
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
to_run_key = _run_result_to_to_run_key(run_result)
self.mock_now(self.now, 10.5)
self.assertEqual(State.TIMED_OUT,
_bot_update_task(run_result.key, hard_timeout=True))
self.assertEqual(1, self.execute_tasks())
run_result = run_result.key.get()
self.assertEqual(-1, run_result.exit_code)
self.assertEqual(10.5, run_result.duration)
status = State.to_string(State.TIMED_OUT)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_get_results(self):
# TODO(maruel): Split in more focused tests.
created_ts = self.now
self.mock_now(created_ts)
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule()
# The TaskRequest was enqueued, the TaskResultSummary was created but no
# TaskRunResult exist yet since the task was not scheduled on any bot.
result_summary, run_results = _get_results(result_summary.request_key)
expected = self._gen_result_summary_pending(
created_ts=created_ts, id='1d69b9f088008910', modified_ts=created_ts)
self.assertEqual(expected, result_summary.to_dict())
self.assertEqual([], run_results)
# A bot reaps the TaskToRunShard.
reaped_ts = self.now + datetime.timedelta(seconds=60)
self.mock_now(reaped_ts)
reaped_request, _, run_result = self._bot_reap_task()
self.assertEqual(result_summary.request_key.get(), reaped_request)
self.assertTrue(run_result)
result_summary, run_results = _get_results(result_summary.request_key)
expected = self._gen_result_summary_reaped(
created_ts=created_ts,
costs_usd=[0.0],
id='1d69b9f088008910',
modified_ts=reaped_ts,
started_ts=reaped_ts)
self.assertEqual(expected, result_summary.to_dict())
expected = [
self._gen_run_result(
id='1d69b9f088008911',
modified_ts=reaped_ts,
started_ts=reaped_ts,
dead_after_ts=reaped_ts +
datetime.timedelta(seconds=reaped_request.bot_ping_tolerance_secs)),
]
self.assertEqual(expected, [i.to_dict() for i in run_results])
# The bot completes the task.
self.assertEqual((reaped_ts - self.now).total_seconds() * 1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.RUNNING))).sum)
done_ts = self.now + datetime.timedelta(seconds=120)
self.mock_now(done_ts)
cas_output_root = task_request.CASReference(
cas_instance='projects/test/instances/default_instance',
digest=task_request.Digest(hash='a' * 32, size_bytes=1))
performance_stats = task_result.PerformanceStats(
bot_overhead=0.1,
isolated_download=task_result.CASOperationStats(
duration=0.1,
initial_number_items=10,
initial_size=1000,
items_cold='aa',
items_hot='bb'),
isolated_upload=task_result.CASOperationStats(
duration=0.1, items_cold='aa', items_hot='bb'))
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key,
exit_code=0,
duration=3.,
cas_output_root=cas_output_root,
performance_stats=performance_stats))
# Simulate an unexpected retry, e.g. the response of the previous RPC never
# got the client even if it succeedded.
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key,
exit_code=0,
duration=3.,
cas_output_root=cas_output_root,
performance_stats=performance_stats))
result_summary, run_results = _get_results(result_summary.request_key)
expected = self._gen_result_summary_reaped(
completed_ts=done_ts,
costs_usd=[0.1],
created_ts=created_ts,
duration=3.0,
exit_code=0,
id='1d69b9f088008910',
modified_ts=done_ts,
cas_output_root={
'cas_instance': u'projects/test/instances/default_instance',
'digest': {
'hash': u'a' * 32,
'size_bytes': 1,
}
},
started_ts=reaped_ts,
state=State.COMPLETED,
try_number=1)
self.assertEqual(expected, result_summary.to_dict())
expected = [
self._gen_run_result(completed_ts=done_ts,
cost_usd=0.1,
duration=3.0,
exit_code=0,
id='1d69b9f088008911',
modified_ts=done_ts,
cas_output_root={
'cas_instance':
u'projects/test/instances/default_instance',
'digest': {
'hash': u'a' * 32,
'size_bytes': 1,
}
},
started_ts=reaped_ts,
state=State.COMPLETED),
]
self.assertEqual(expected, [t.to_dict() for t in run_results])
self.assertEqual(2, self.execute_tasks())
def test_exit_code_failure(self):
run_result = self._quick_reap()
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=1, duration=0.1))
result_summary, run_results = _get_results(run_result.request_key)
expected = self._gen_result_summary_reaped(
completed_ts=self.now,
costs_usd=[0.1],
duration=0.1,
exit_code=1,
failure=True,
id='1d69b9f088008910',
started_ts=self.now,
state=State.COMPLETED,
try_number=1)
self.assertEqual(expected, result_summary.to_dict())
expected = [
self._gen_run_result(
completed_ts=self.now,
cost_usd=0.1,
duration=0.1,
exit_code=1,
failure=True,
id='1d69b9f088008911',
started_ts=self.now,
state=State.COMPLETED),
]
self.assertEqual(expected, [t.to_dict() for t in run_results])
self.assertEqual(1, self.execute_tasks())
def test_schedule_request_id_without_pool(self):
auth_testing.mock_is_admin(self)
self._register_bot(self.bot_dimensions)
with self.assertRaises(datastore_errors.BadValueError):
self._quick_schedule(
task_slices=[
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(dimensions={u'id': [u'abc']}),
wait_for_capacity=False),
])
def test_bot_update_task(self):
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(
State.RUNNING,
_bot_update_task(run_result.key, output='hi', output_chunk_start=0))
self.assertEqual(
State.COMPLETED,
_bot_update_task(
run_result.key,
output='hey',
output_chunk_start=2,
exit_code=0,
duration=0.1))
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(http_status_code=200)).sum)
self.assertEqual('hihey', run_result.key.get().get_output(0, 0))
self.assertEqual(1, self.execute_tasks())
def test_bot_update_task_new_overwrite(self):
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
run_result = self._quick_reap()
self.assertEqual(
State.RUNNING,
_bot_update_task(
run_result_key=run_result.key, output='hi', output_chunk_start=0))
self.assertEqual(
State.RUNNING,
_bot_update_task(
run_result_key=run_result.key, output='hey', output_chunk_start=1))
self.assertEqual('hhey', run_result.key.get().get_output(0, 0))
def test_bot_update_exception(self):
run_result = self._quick_reap()
def r(*_):
raise datastore_utils.CommitError('Sorry!')
self.mock(ndb, 'put_multi', r)
self.assertIsNone(
None, _bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertIsNone(
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(http_status_code=200)))
def test_bot_update_pubsub_negative_latency(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertEqual(2, len(pub_sub_calls)) # notification is sent
self.assertEqual(1, self.execute_tasks())
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(http_status_code=200)).sum)
def test_bot_update_pubsub_error(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
# Attempt to terminate the task with success, but make PubSub call fail.
self.publish_successful = False
self.assertIsNone(
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(http_status_code=404)).sum)
# Bot retries bot_update, now PubSub works and notification is sent.
self.publish_successful = True
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertEqual(2, len(pub_sub_calls)) # notification is sent
self.assertEqual(1, self.execute_tasks())
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(http_status_code=200)).sum)
def test_bot_update_buildbucket_pubsub_ok(self):
self.mock(utils, "time_time_ns", lambda: 12345678)
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
# 2 calls because _quick_reap sends one and _bot_update_task sends another
self.assertEqual(2, len(pub_sub_calls))
self.assertEqual(1, self.execute_tasks())
# Asserting that first pubsub call sent a STARTED status (_quick_reap ran)
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.STARTED)
self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic")
# Asserting that second pubsub call sent a SUCCESS status
self._assert_buildbucket_update_status(pub_sub_calls[1][1]['message'],
common_pb2.SUCCESS)
self.assertEqual(pub_sub_calls[1][1]["topic"], "backend_pubsub_topic")
def test_bot_update_buildbucket_pubsub_ok_failed_task(self):
self.mock(utils, "time_time", lambda: 12345678)
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=1, duration=0.1))
self.assertEqual(2, len(pub_sub_calls)) # notification is sent
self.assertEqual(1, self.execute_tasks())
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.STARTED)
self.assertEqual(pub_sub_calls[0][1]["topic"], "backend_pubsub_topic")
self._assert_buildbucket_update_status(pub_sub_calls[1][1]['message'],
common_pb2.FAILURE)
self.assertEqual(pub_sub_calls[1][1]["topic"], "backend_pubsub_topic")
def test_task_buildbucket_update(self):
self.mock(utils, "time_time_ns", lambda: 12345678)
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
# Check that an update is not sent due to no change of state
task_scheduler.task_buildbucket_update({
"task_id":
task_pack.pack_result_summary_key(run_result.result_summary_key),
"state":
task_result.State.RUNNING,
'update_id':
12345678000000002
})
self.assertEqual(1, len(pub_sub_calls)) # notification is sent
result = task_pb2.BuildTaskUpdate()
result.ParseFromString(pub_sub_calls[0][1]['message'])
self.assertEqual(result.task.status, common_pb2.STARTED)
self.assertEqual(result.build_id, "1234")
self.assertEqual(result.task.id.id, "1d69b9f088008910")
self.assertEqual(result.task.update_id, 12345678)
# Check that an update is sent due to change of state
task_scheduler.task_buildbucket_update({
"task_id":
task_pack.pack_result_summary_key(run_result.result_summary_key),
"state":
task_result.State.BOT_DIED,
'update_id':
12345678000000003
})
self.assertEqual(2, len(pub_sub_calls)) # second notification is sent
result = task_pb2.BuildTaskUpdate()
result.ParseFromString(pub_sub_calls[1][1]['message'])
self.assertEqual(result.task.status, common_pb2.INFRA_FAILURE)
self.assertEqual(result.build_id, "1234")
self.assertEqual(result.task.id.id, "1d69b9f088008910")
self.assertEqual(result.task.update_id, 12345678000000003)
# Check that no update was made due to prior update_id
task_scheduler.task_buildbucket_update({
"task_id":
task_pack.pack_result_summary_key(run_result.result_summary_key),
"state":
task_result.State.CLIENT_ERROR,
'update_id':
12345678000000001
})
self.assertEqual(2, len(pub_sub_calls))
# Check that an update was not sent due to pubsub transient error
self.publish_successful = False
with self.assertRaises(task_scheduler.Error):
task_scheduler.task_buildbucket_update({
"task_id":
task_pack.pack_result_summary_key(run_result.result_summary_key),
"state":
task_result.State.CANCELED,
'update_id':
12345678000000004
})
self.assertEqual(2, len(pub_sub_calls))
def _bot_update_timeouts(self, hard, io):
run_result = self._quick_reap()
self.assertEqual(
State.TIMED_OUT,
_bot_update_task(
run_result.key,
exit_code=0,
duration=0.1,
hard_timeout=hard,
io_timeout=io))
expected = self._gen_result_summary_reaped(
completed_ts=self.now,
costs_usd=[0.1],
duration=0.1,
exit_code=0,
failure=True,
id='1d69b9f088008910',
started_ts=self.now,
state=State.TIMED_OUT,
try_number=1)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
expected = self._gen_run_result(
completed_ts=self.now,
cost_usd=0.1,
duration=0.1,
exit_code=0,
failure=True,
id='1d69b9f088008911',
started_ts=self.now,
state=State.TIMED_OUT,
try_number=1)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(1, self.execute_tasks())
def test_bot_update_hard_timeout(self):
self._bot_update_timeouts(True, False)
def test_bot_update_io_timeout(self):
self._bot_update_timeouts(False, True)
def test_bot_update_child_with_cancelled_parent(self):
self._register_bot(self.bot_dimensions)
# Run parent task.
parent_request = _gen_request_slices()
parent_result_summary = task_scheduler.schedule_request(parent_request)
self.execute_tasks()
_, _, parent_run_result = self._bot_reap_task()
# Run a child task.
child_request = _gen_request_slices(
parent_task_id=parent_run_result.task_id)
child_result_summary = task_scheduler.schedule_request(child_request)
self.execute_tasks()
bot2_dimensions = self.bot_dimensions.copy()
bot2_dimensions['id'] = [bot2_dimensions['id'][0] + '2']
self._register_bot(bot2_dimensions)
_, _, child_run_result = self._bot_reap_task()
self.execute_tasks()
# Run a child task 2.
child_request2 = _gen_request_slices(
parent_task_id=parent_run_result.task_id)
child_result2_summary = task_scheduler.schedule_request(child_request2)
self.execute_tasks()
bot3_dimensions = self.bot_dimensions.copy()
bot3_dimensions['id'] = [bot3_dimensions['id'][0] + '3']
self._register_bot(bot3_dimensions)
_, _, child_run_result2 = self._bot_reap_task()
self.execute_tasks()
# Run a child task 3. This will be cancelled before running.
child_request3 = _gen_request_slices(
parent_task_id=parent_run_result.task_id)
child_result3_summary = task_scheduler.schedule_request(child_request3)
# Cancel parent task.
ok, was_running = task_scheduler.cancel_task(
parent_run_result.request_key.get(),
parent_run_result.result_summary_key, True, None)
self.assertEqual(True, ok)
self.assertEqual(True, was_running)
# parent_task should push task to cancel-children-tasks.
self.execute_task(
'/internal/taskqueue/important/tasks/cancel-children-tasks',
'cancel-children-tasks',
utils.encode_to_json({
'task': parent_result_summary.task_id,
}))
# and child tasks should be cancelled via task queue.
appended_child_task_ids = [
child_result3_summary.task_id,
child_result2_summary.task_id,
child_result_summary.task_id,
]
self.execute_task(
'/internal/taskqueue/important/tasks/cancel', 'cancel-tasks',
utils.encode_to_json({
'kill_running': True,
'tasks': appended_child_task_ids,
}))
self.assertEqual(
State.KILLED,
_bot_update_task(parent_run_result.key, exit_code=0, duration=0.1))
# Child task is KILLED when parent task is cancelled.
self.assertEqual(
State.KILLED,
_bot_update_task(
child_run_result.key,
bot_id='localhost2',
exit_code=0,
duration=0.1))
self.assertEqual('hi', child_run_result.key.get().get_output(0, 0))
# Child task is KILLED when parent task is cancelled and not being
# completed.
self.assertEqual(
State.KILLED,
_bot_update_task(child_run_result2.key, bot_id='localhost3'))
self.assertEqual('hi', child_run_result2.key.get().get_output(0, 0))
# flush tasks
self.execute_tasks()
def test_task_priority(self):
# Create N tasks of various priority not in order.
priorities = [200, 100, 20, 30, 50, 40, 199]
# Call the expected ordered list out for clarity.
expected = [20, 30, 40, 50, 100, 199, 200]
self.assertEqual(expected, sorted(priorities))
self._register_bot(self.bot_dimensions)
# Triggers many tasks of different priorities.
for p in priorities:
self._quick_schedule(priority=p)
self.execute_tasks()
# Make sure they are scheduled in priority order. Bot polling should hand
# out tasks in the expected order. In practice the order is not 100%
# deterministic when running on GAE but it should be deterministic in the
# unit test.
for i, e in enumerate(expected):
request, _, _ = self._bot_reap_task()
self.assertEqual(request.priority, e)
def test_bot_terminate_task(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
self.assertEqual(
None,
task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now,
{
'missing_cas': None,
'missing_cipd': []
}))
expected = self._gen_result_summary_reaped(
abandoned_ts=self.now,
completed_ts=self.now,
costs_usd=[0.],
id='1d69b9f088008910',
internal_failure=True,
started_ts=self.now,
state=State.BOT_DIED)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
expected = self._gen_run_result(
abandoned_ts=self.now,
completed_ts=self.now,
id='1d69b9f088008911',
internal_failure=True,
state=State.BOT_DIED)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(1, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> BOT_DIED
status = State.to_string(State.BOT_DIED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_bot_terminate_backend_task(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(
pubsub_topic='projects/abc/topics/def',
task_slices=[
task_request.TaskSlice(expiration_secs=1200,
properties=_gen_properties(idempotent=True),
wait_for_capacity=False),
],
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual(2, len(pub_sub_calls)) # PENDING -> RUNNING
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.STARTED)
self.assertEqual(
None,
task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now,
{
'missing_cas': None,
'missing_cipd': []
}))
expected = self._gen_result_summary_reaped(abandoned_ts=self.now,
completed_ts=self.now,
costs_usd=[0.],
id='1d69b9f088008910',
internal_failure=True,
started_ts=self.now,
state=State.BOT_DIED)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
expected = self._gen_run_result(abandoned_ts=self.now,
completed_ts=self.now,
id='1d69b9f088008911',
internal_failure=True,
state=State.BOT_DIED)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(2, self.execute_tasks())
self.assertEqual(4, len(pub_sub_calls)) # RUNNING -> BOT_DIED
self._assert_buildbucket_update_status(pub_sub_calls[2][1]['message'],
common_pb2.INFRA_FAILURE)
status = State.to_string(State.BOT_DIED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_bot_terminate_canceled_task(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
start_time = utils.milliseconds_since_epoch() - 100
# cancel task
self._cancel_running_task(run_result)
self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> killing
# execute termination task
err = task_scheduler.bot_terminate_task(run_result.key, 'localhost',
start_time, {
'missing_cas': None,
'missing_cipd': []
})
self.assertEqual(None, err)
# check result summary
expected = self._gen_result_summary_reaped(
abandoned_ts=self.now,
completed_ts=self.now,
costs_usd=[0.],
id='1d69b9f088008910',
internal_failure=True,
started_ts=self.now,
state=State.KILLED,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
# check run result
expected = self._gen_run_result(
abandoned_ts=self.now,
completed_ts=self.now,
id='1d69b9f088008911',
internal_failure=True,
state=State.KILLED,
killing=False,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(1, self.execute_tasks())
self.assertEqual(3, len(pub_sub_calls)) # killing -> KILLED
status = State.to_string(State.KILLED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
self.assertLessEqual(
0,
ts_mon_metrics._dead_task_detection_latencies.get(fields={
'pool': 'default',
'cron': False,
}).sum)
def test_bot_terminate_task_missing_cas(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # PENDING -> RUNNING
client_error = {
'missing_cas': [{
'digest':
'93b45bab427ab9fe55asdq123324adsdaf8d5/1292',
'instance':
'projects/chromium-swarm/instances/default_instance',
}],
'missing_cipd': [],
}
self.assertEqual(
None,
task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now,
client_error))
expected_missing_cas = [{
'cas_instance': 'projects/chromium-swarm/instances/default_instance',
'digest': {
'hash': '93b45bab427ab9fe55asdq123324adsdaf8d5',
'size_bytes': 1292
}
}]
# check result summary
expected = self._gen_result_summary_reaped(
abandoned_ts=self.now,
completed_ts=self.now,
costs_usd=[0.],
id='1d69b9f088008910',
internal_failure=False,
missing_cas=expected_missing_cas,
started_ts=self.now,
state=State.CLIENT_ERROR,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
# check run result
expected = self._gen_run_result(
abandoned_ts=self.now,
completed_ts=self.now,
id='1d69b9f088008911',
internal_failure=False,
missing_cas=expected_missing_cas,
state=State.CLIENT_ERROR,
killing=None,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(1, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> CLIENT_ERROR
status = State.to_string(State.CLIENT_ERROR)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_bot_terminate_task_missing_cipd(self):
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # PENDING -> CLIENT_ERROR
client_error = {
'missing_cas':
None,
'missing_cipd': [{
'package_name': u'foo',
'version': u'deadbeef',
'path': u'not/found/here',
}],
}
self.assertEqual(
None,
task_scheduler.bot_terminate_task(run_result.key, 'localhost', self.now,
client_error))
# check result summary
expected = self._gen_result_summary_reaped(
abandoned_ts=self.now,
completed_ts=self.now,
costs_usd=[0.],
id='1d69b9f088008910',
internal_failure=False,
missing_cipd=client_error['missing_cipd'],
started_ts=self.now,
state=State.CLIENT_ERROR,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.result_summary_key.get().to_dict())
# check run result
expected = self._gen_run_result(
abandoned_ts=self.now,
completed_ts=self.now,
id='1d69b9f088008911',
internal_failure=False,
missing_cipd=client_error['missing_cipd'],
state=State.CLIENT_ERROR,
killing=None,
duration=0.0,
exit_code=-1,
failure=True,
)
self.assertEqual(expected, run_result.key.get().to_dict())
self.assertEqual(1, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # RUNNING -> CLIENT_ERROR
status = State.to_string(State.CLIENT_ERROR)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_bot_terminate_task_wrong_bot(self):
run_result = self._quick_reap()
expected = (
'Bot bot1 sent task kill for task 1d69b9f088008911 owned by bot '
'localhost')
err = task_scheduler.bot_terminate_task(run_result.key, 'bot1', 0, {
'missing_cas': None,
'missing_cipd': []
})
self.assertEqual(expected, err)
def test_cancel_task(self):
# Cancel a pending task.
pub_sub_calls = self.mock_pub_sub()
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
pubsub_topic='projects/abc/topics/def')
self.assertEqual(0, len(pub_sub_calls)) # Nothing yet.
ok, was_running = task_scheduler.cancel_task(
result_summary.request_key.get(), result_summary.key, False, None)
self.assertEqual(True, ok)
self.assertEqual(False, was_running)
self.assertEqual(1, self.execute_tasks())
self.assertEqual(1, len(pub_sub_calls)) # CANCELED
result_summary = result_summary.key.get()
self.assertEqual(State.CANCELED, result_summary.state)
self.assertEqual(1, len(pub_sub_calls)) # No other message.
status = State.to_string(State.CANCELED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_cancel_task_backend_task(self):
# Cancel a pending task.
pub_sub_calls = self.mock_pub_sub()
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
pubsub_topic='projects/abc/topics/def',
has_build_task=True,
build_task=task_request.BuildTask(
build_id="1234",
buildbucket_host="buildbucket_host",
latest_task_status=task_result.State.PENDING,
pubsub_topic="backend_pubsub_topic",
update_id=0))
self.assertEqual(0, len(pub_sub_calls)) # Nothing yet.
ok, was_running = task_scheduler.cancel_task(
result_summary.request_key.get(), result_summary.key, False, None)
self.assertEqual(True, ok)
self.assertEqual(False, was_running)
# 2 because both pubsub-notify and bb update
self.assertEqual(2, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # CANCELED
self._assert_buildbucket_update_status(pub_sub_calls[0][1]['message'],
common_pb2.CANCELED)
result_summary = result_summary.key.get()
self.assertEqual(State.CANCELED, result_summary.state)
status = State.to_string(State.CANCELED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_cancel_task_with_id(self):
# Cancel a pending task.
pub_sub_calls = self.mock_pub_sub()
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(
pubsub_topic='projects/abc/topics/def')
self.assertEqual(0, len(pub_sub_calls)) # Nothing yet.
ok, was_running = task_scheduler.cancel_task_with_id(
result_summary.task_id, False, None)
self.assertEqual(True, ok)
self.assertEqual(False, was_running)
self.assertEqual(1, self.execute_tasks())
self.assertEqual(1, len(pub_sub_calls)) # CANCELED
status = State.to_string(State.CANCELED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
result_summary = result_summary.key.get()
self.assertEqual(State.CANCELED, result_summary.state)
self.assertEqual(1, len(pub_sub_calls)) # No other message.
def test_cancel_task_running(self):
# Cancel a running task.
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # RUNNING
# Denied if kill_running == False.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
False, None)
self.assertEqual(False, ok)
self.assertEqual(True, was_running)
self.assertEqual(0, self.execute_tasks())
self.assertEqual(1, len(pub_sub_calls)) # No message.
# Works if kill_running == True.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
True, None)
self.assertEqual(True, ok)
self.assertEqual(True, was_running)
self.assertEqual(2, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # CANCELED
# At this point, the task is still running and the bot is unaware.
run_result = run_result.key.get()
self.assertEqual(State.RUNNING, run_result.state)
self.assertEqual(True, run_result.killing)
# Repeatedly canceling works.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
True, None)
self.assertEqual(True, ok)
self.assertEqual(True, was_running)
self.assertEqual(2, self.execute_tasks())
self.assertEqual(3, len(pub_sub_calls)) # CANCELED (again)
# Bot pulls once, gets the signal about killing, which starts the graceful
# termination dance.
self.assertEqual(State.KILLED, _bot_update_task(run_result.key))
# At this point, it is still running, until the bot completes the task.
run_result = run_result.key.get()
self.assertEqual(State.RUNNING, run_result.state)
self.assertEqual(True, run_result.killing)
# Close the task.
self.assertEqual(
State.KILLED,
_bot_update_task(
run_result.key, output_chunk_start=3, exit_code=0, duration=0.1))
run_result = run_result.key.get()
self.assertEqual(False, run_result.killing)
self.assertEqual(State.KILLED, run_result.state)
self.assertEqual(4, len(pub_sub_calls)) # KILLED
self.assertEqual(1, self.execute_tasks())
status = State.to_string(State.KILLED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_cancel_task_bot_id(self):
# Cancel a running task.
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # RUNNING
# Denied if bot_id ('foo') doesn't match.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
True, 'foo')
self.assertEqual(False, ok)
self.assertEqual(True, was_running)
self.assertEqual(1, self.execute_tasks())
self.assertEqual(1, len(pub_sub_calls)) # No message.
# Works if bot_id matches.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
True, 'localhost')
self.assertEqual(True, ok)
self.assertEqual(True, was_running)
self.assertEqual(2, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # CANCELED
def test_cancel_task_completed(self):
# Cancel a completed task.
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # RUNNING
# The task completes successfully.
self.assertEqual(
State.COMPLETED,
_bot_update_task(run_result.key, exit_code=0, duration=0.1))
self.assertEqual(2, len(pub_sub_calls)) # COMPLETED
# Cancel request is denied.
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
False, None)
self.assertEqual(False, ok)
self.assertEqual(False, was_running)
run_result = run_result.key.get()
self.assertIsNone(run_result.killing)
self.assertEqual(State.COMPLETED, run_result.state)
self.assertEqual(2, len(pub_sub_calls)) # No other message.
self.assertEqual(1, self.execute_tasks())
def test_cancel_task_running_setup(self):
# Cancel a assigned task before running
pub_sub_calls = self.mock_pub_sub()
run_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
self.assertEqual(1, len(pub_sub_calls)) # RUNNING
# Request cancel
ok, was_running = task_scheduler.cancel_task(run_result.request_key.get(),
run_result.result_summary_key,
True, None)
self.assertEqual(True, ok)
self.assertEqual(True, was_running)
self.assertEqual(2, self.execute_tasks())
self.assertEqual(2, len(pub_sub_calls)) # CANCELED
# At this point, the task status is still running and the bot is unaware.
run_result = run_result.key.get()
self.assertEqual(State.RUNNING, run_result.state)
self.assertEqual(True, run_result.killing)
# Bot pulls once, gets the signal about killing, which starts the graceful
# termination dance.
self.assertEqual(State.KILLED, _bot_update_task(run_result.key))
# At this point, the status is still running, until the bot cancels
# the task.
run_result = run_result.key.get()
self.assertEqual(State.RUNNING, run_result.state)
self.assertEqual(True, run_result.killing)
# Close the task.
self.assertEqual(
State.CANCELED,
_bot_update_task(
run_result.key, exit_code=0, duration=0.1, canceled=True))
run_result = run_result.key.get()
self.assertEqual(False, run_result.killing)
self.assertEqual(State.CANCELED, run_result.state)
self.assertEqual(3, len(pub_sub_calls)) # CANCELED
self.assertEqual(1, self.execute_tasks())
status = State.to_string(State.CANCELED)
self.assertLessEqual(
0,
ts_mon_metrics._task_state_change_pubsub_notify_latencies.get(
fields=_update_fields_pubsub(status=status,
http_status_code=200)).sum)
def test_cancel_task_rbe_mode(self):
self.mock_enqueue_rbe_task()
cancels = self.mock_enqueue_rbe_cancel()
self._register_bot(self.bot_dimensions)
result_summary = self._quick_schedule(rbe_instance='some-instance')
ok, was_running = task_scheduler.cancel_task(
result_summary.request_key.get(), result_summary.key, False, None)
self.assertEqual(True, ok)
self.assertEqual(False, was_running)
result_summary = result_summary.key.get()
self.assertEqual(State.CANCELED, result_summary.state)
# Enqueued a TQ task to cancel the reservation.
self.assertEqual(1, len(cancels))
req, ttr = cancels[0]
self.assertEqual('some-instance', req.rbe_instance)
self.assertEqual('sample-app-1d69b9f088008910-0', ttr.rbe_reservation)
def test_cancel_tasks(self):
# Create RUNNING task
pub_sub_calls = self.mock_pub_sub()
self._quick_reap(pubsub_topic='projects/abc/topics/def')
# Create PENDING task
task_slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
dimensions={
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'sega', u'amd|nv'],
}),
wait_for_capacity=True),
]
self._quick_schedule(task_slices=task_slices)
query = task_result.get_result_summaries_query(start=None,
end=None,
sort='created_ts',
state='pending_running',
tags=[])
cursor, results = task_scheduler.cancel_tasks(3, query)
self.assertIsNone(cursor)
self.assertEqual(len(results), 2)
self.execute_tasks()
def test_cancel_tasks_skip_running(self):
# Create RUNNING task
pub_sub_calls = self.mock_pub_sub()
running_result = self._quick_reap(pubsub_topic='projects/abc/topics/def')
# Create PENDING task
task_slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
dimensions={
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'sega', u'amd|nv'],
}),
wait_for_capacity=True),
]
pending_result = self._quick_schedule(task_slices=task_slices)
query = task_result.get_result_summaries_query(start=None,
end=None,
sort='created_ts',
state='pending',
tags=[])
self.mock_now(self.now, seconds=1)
cursor, results = task_scheduler.cancel_tasks(3, query)
self.assertIsNone(cursor)
self.assertEqual(len(results), 1)
self.execute_tasks()
self.assertEqual(
1000.0,
ts_mon_metrics._task_state_change_schedule_latencies.get(
fields=_update_fields_schedule(
status=State.to_string(State.CANCELED))).sum)
def test_cancel_tasks_conditions(self):
# Create PENDING tasks
task_slices = [
task_request.TaskSlice(
expiration_secs=60,
properties=_gen_properties(
dimensions={
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'sega', u'amd|nv'],
}),
wait_for_capacity=True),
]
pending_result_1 = self._quick_schedule(task_slices=task_slices,
manual_tags=['tag:1', 'tag:2'])
pending_result_2 = self._quick_schedule(task_slices=task_slices,
manual_tags=['tag:2', 'tag:1'])
self._quick_schedule(task_slices=task_slices,
manual_tags=['tag:3', 'tag:2'])
self._quick_schedule(task_slices=task_slices, manual_tags=['tag:4'])
pending_results = [pending_result_1, pending_result_2]
query <