blob: 0bec281474919b9700d0e8b2e1d168501f08c595 [file] [log] [blame]
#!/usr/bin/env vpython
# -*- coding: utf-8 -*-
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import datetime
import logging
import os
import random
import string
import sys
import unittest
import mock
from parameterized import parameterized
import test_env
test_env.setup_test_env()
from google.protobuf import duration_pb2
from google.appengine.api import datastore_errors
from google.appengine.api import memcache
from google.appengine.ext import ndb
import webtest
from components import auth_testing
from components import utils
from test_support import test_case
from proto.api import swarming_pb2 # pylint: disable=no-name-in-module
from server import large
from server import task_pack
from server import task_request
from server import task_result
from server import task_scheduler
from server import task_to_run
import ts_mon_metrics
# pylint: disable=W0212
def _gen_properties(**kwargs):
"""Creates a TaskProperties."""
args = {
'command': [u'command1'],
'containment': {
u'lower_priority': True,
u'containment_type': None,
u'limit_processes': None,
u'limit_total_committed_memory': None,
},
'dimensions': {
u'pool': [u'default']
},
'env': {},
'execution_timeout_secs': 24 * 60 * 60,
'io_timeout_secs': None,
}
args.update(kwargs or {})
args['dimensions_data'] = args.pop('dimensions')
return task_request.TaskProperties(**args)
def _gen_request_slice(**kwargs):
"""Creates a TaskRequest."""
now = utils.utcnow()
args = {
'created_ts': now,
'manual_tags': [u'tag:1'],
'name': 'Request name',
'priority': 50,
'task_slices': [
task_request.TaskSlice(
expiration_secs=60, properties=_gen_properties()),
],
'user': 'Jesus',
'bot_ping_tolerance_secs': 120,
}
args.update(kwargs)
ret = task_request.TaskRequest(**args)
task_request.init_new_request(ret, True, task_request.TEMPLATE_AUTO)
ret.key = task_request.new_request_key()
ret.put()
return ret
def _gen_request(properties=None, **kwargs):
"""Creates a TaskRequest."""
return _gen_request_slice(
task_slices=[
task_request.TaskSlice(
expiration_secs=60, properties=properties or _gen_properties()),
],
**kwargs)
def _gen_summary_result(**kwargs):
"""Creates a TaskRunResult."""
request = _gen_request(**kwargs)
result_summary = task_result.new_result_summary(request)
result_summary.modified_ts = utils.utcnow()
ndb.transaction(result_summary.put)
return result_summary.key.get()
def _gen_run_result(**kwargs):
"""Creates a TaskRunResult."""
result_summary = _gen_summary_result(**kwargs)
request = result_summary.request_key.get()
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails('abc', 'test_project')
run_result = task_result.new_run_result(request, to_run, 'localhost',
bot_details, {},
result_summary.resultdb_info)
run_result.started_ts = result_summary.modified_ts
run_result.modified_ts = utils.utcnow()
run_result.dead_after_ts = utils.utcnow() + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
return run_result.key.get()
def _safe_cmp(a, b):
# cmp(datetime.datetime.utcnow(), None) throws TypeError. Workaround.
return cmp(utils.encode_to_json(a), utils.encode_to_json(b))
def _need_update_from_run_result(summary, run_result):
"""Returns True if set_from_run_result() would modify this instance.
E.g. they are different and TaskResultSummary needs to be updated from the
corresponding TaskRunResult.
"""
assert isinstance(run_result, task_result.TaskRunResult), run_result
# A previous try is still sending update. Ignore it from a result summary
# PoV.
if summary.try_number and summary.try_number > run_result.try_number:
return False
# Compare all fields defined through shared _TaskResultCommon.
for property_name in task_result._TaskResultCommon._properties_fixed():
if getattr(summary, property_name) != getattr(run_result, property_name):
return True
# Compare fields that are defined separately in TaskRunResult and
# TaskResultSummary. They have slightly different types.
return (
summary.bot_id != run_result.bot_id or
summary.state != run_result.state or
summary.try_number != run_result.try_number)
def get_entities(entity_model):
return sorted((i.to_dict() for i in entity_model.query().fetch()),
cmp=_safe_cmp)
class TestCase(test_case.TestCase):
APP_DIR = test_env.APP_DIR
def setUp(self):
super(TestCase, self).setUp()
auth_testing.mock_get_current_identity(self)
class TaskResultApiTest(TestCase):
def setUp(self):
super(TaskResultApiTest, self).setUp()
self.now = datetime.datetime(2014, 1, 2, 3, 4, 5, 6)
self.mock_now(self.now)
self.mock(random, 'getrandbits', lambda _: 0x88)
def assertEntities(self, expected, entity_model):
self.assertEqual(expected, get_entities(entity_model))
def _gen_summary(self, **kwargs):
"""Returns TaskResultSummary.to_dict()."""
out = {
'abandoned_ts':
None,
'bot_dimensions':
None,
'bot_id':
None,
'bot_idle_since_ts':
None,
'bot_version':
None,
'bot_logs_cloud_project':
None,
'cipd_pins':
None,
'completed_ts':
None,
'costs_usd': [],
'cost_saved_usd':
None,
'created_ts':
self.now,
'current_task_slice':
0,
'deduped_from':
None,
'duration':
None,
'exit_code':
None,
'expiration_delay':
None,
'failure':
False,
# Constant due to the mock of both utils.utcnow() and
# random.getrandbits().
'id':
'1d69b9f088008810',
'internal_failure':
False,
'missing_cas': [],
'missing_cipd': [],
'modified_ts':
None,
'name':
u'Request name',
'priority':
50,
'request_authenticated':
auth_testing.DEFAULT_MOCKED_IDENTITY,
'request_bot_id':
None,
'request_pool':
u'default',
'request_realm':
None,
'cas_output_root':
None,
'resultdb_info':
None,
'server_versions': [u'v1a'],
'started_ts':
None,
'state':
task_result.State.PENDING,
'tags': [
u'authenticated:user:mocked@example.com',
u'pool:default',
u'priority:50',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:no_config',
u'tag:1',
u'user:Jesus',
],
'try_number':
None,
'user':
u'Jesus',
}
out.update(kwargs)
return out
def _gen_result(self, **kwargs):
"""Returns TaskRunResult.to_dict()."""
out = {
'abandoned_ts': None,
'bot_dimensions': {
u'id': [u'localhost'],
u'foo': [u'bar', u'biz']
},
'bot_id': u'localhost',
'bot_idle_since_ts': None,
'bot_version': u'abc',
'bot_logs_cloud_project': None,
'cas_output_root': None,
'cipd_pins': None,
'completed_ts': None,
'cost_usd': 0.,
'current_task_slice': 0,
'dead_after_ts': None,
'duration': None,
'exit_code': None,
'failure': False,
# Constant due to the mock of both utils.utcnow() and
# random.getrandbits().
'id': '1d69b9f088008811',
'internal_failure': False,
'killing': None,
'modified_ts': None,
'missing_cipd': [],
'missing_cas': [],
'resultdb_info': None,
'server_versions': [u'v1a'],
'started_ts': None,
'state': task_result.State.RUNNING,
'try_number': 1,
}
out.update(kwargs)
return out
def test_all_apis_are_tested(self):
# Ensures there's a test for each public API.
module = task_result
expected = set(
i for i in dir(module)
if i[0] != '_' and hasattr(getattr(module, i), 'func_name'))
missing = expected - set(i[5:] for i in dir(self) if i.startswith('test_'))
self.assertFalse(missing)
def test_State(self):
for i in task_result.State.STATES:
self.assertTrue(task_result.State.to_string(i))
with self.assertRaises(ValueError):
task_result.State.to_string(0)
self.assertEqual(
set(task_result.State._NAMES), set(task_result.State.STATES))
items = (
task_result.State.STATES_RUNNING + task_result.State.STATES_DONE +
task_result.State.STATES_ABANDONED)
self.assertEqual(set(items), set(task_result.State.STATES))
self.assertEqual(len(items), len(set(items)))
def test_filter_query(self):
# This only tests that the function doesn't raise exception for typical
# args.
now = self.now
self.assertIsNotNone(
task_result.filter_query(task_result.TaskResultSummary,
task_result.TaskResultSummary.query(), now,
now, 'created_ts', 'all'))
self.assertIsNotNone(
task_result.filter_query(task_result.TaskResultSummary,
task_result.TaskResultSummary.query(), None,
None, 'started_ts', 'pending_running'))
def test_state_to_string(self):
# Same code as State.to_string() except that it works for
# TaskResultSummary too.
class Foo(ndb.Model):
deduped_from = None
state = task_result.StateProperty()
failure = ndb.BooleanProperty(default=False)
internal_failure = ndb.BooleanProperty(default=False)
for i in task_result.State.STATES:
self.assertTrue(task_result.State.to_string(i))
for i in task_result.State.STATES:
self.assertTrue(task_result.state_to_string(Foo(state=i)))
f = Foo(state=task_result.State.COMPLETED)
f.deduped_from = '123'
self.assertEqual('Deduped', task_result.state_to_string(f))
def test_new_result_summary(self):
request = _gen_request()
actual = task_result.new_result_summary(request)
actual.modified_ts = self.now
# Trigger _pre_put_hook().
actual.put()
expected = self._gen_summary(modified_ts=self.now)
self.assertEqual(expected, actual.to_dict())
self.assertEqual(50, actual.request.priority)
self.assertEqual(True, actual.can_be_canceled)
actual.state = task_result.State.RUNNING
self.assertEqual(True, actual.can_be_canceled)
actual.state = task_result.State.TIMED_OUT
actual.duration = 0.1
actual.completed_ts = self.now
self.assertEqual(False, actual.can_be_canceled)
def test_new_run_result(self):
request = _gen_request()
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails(u'abc', None)
actual = task_result.new_run_result(
request, to_run, u'localhost', bot_details, {
u'id': [u'localhost'],
u'foo': [u'bar', u'biz']
},
task_result.ResultDBInfo(hostname='hostname', invocation='invocation'))
actual.modified_ts = self.now
actual.started_ts = self.now
actual.dead_after_ts = self.now + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
# Trigger _pre_put_hook().
actual.put()
expected = self._gen_result(
modified_ts=self.now,
started_ts=self.now,
dead_after_ts=self.now +
datetime.timedelta(seconds=request.bot_ping_tolerance_secs),
resultdb_info={
'hostname': 'hostname',
'invocation': 'invocation'
})
self.assertEqual(expected, actual.to_dict())
self.assertEqual(50, actual.request.priority)
self.assertEqual(True, actual.can_be_canceled)
self.assertEqual(0, actual.current_task_slice)
def test_result_summary_post_hook_sends_metric_at_completion(self):
request = _gen_request()
summary = task_result.new_result_summary(request)
summary.modified_ts = self.now
# on_task_completed should not be called when state is pending.
self.mock(ts_mon_metrics, 'on_task_completed', self.fail)
summary.put()
# change state to completed
summary.completed_ts = self.now
summary.modified_ts = self.now
summary.started_ts = self.now
summary.duration = 1.
summary.exit_code = 0
summary.state = task_result.State.COMPLETED
# on_task_completed should be called when state got updated to
# completed.
calls = []
def on_task_completed(smry):
calls.append(smry)
self.mock(ts_mon_metrics, 'on_task_completed', on_task_completed)
summary.put()
self.assertEqual(len(calls), 1)
def test_result_summary_post_hook_call_finalize_invocation(self):
request = _gen_request(resultdb_update_token='secret')
summary = task_result.new_result_summary(request)
summary.modified_ts = self.now
# Store current state for summary._prev_state
summary.put()
# change state to completed
summary.completed_ts = self.now
summary.modified_ts = self.now
summary.started_ts = self.now
summary.duration = 1.
summary.exit_code = 0
summary.state = task_result.State.COMPLETED
summary.try_number = 1
def on_task_completed(_):
pass
self.mock(ts_mon_metrics, 'on_task_completed', on_task_completed)
@ndb.tasklet
def nop_async(_run_id, _update_token):
pass
with mock.patch('server.resultdb.finalize_invocation_async',
mock.Mock(side_effect=nop_async)) as mocked:
summary.put()
mocked.assert_called_once_with('1d69b9f088008811', 'secret')
def test_result_summary_post_hook_sends_metric_at_no_resource_failure(self):
request = _gen_request()
summary = task_result.new_result_summary(request)
summary.completed_ts = self.now
summary.modified_ts = self.now
summary.state = task_result.State.NO_RESOURCE
# on_task_completed should be called even at the initial write if it's the
# end with no resource error.
calls = []
def on_task_completed(smry):
calls.append(smry)
self.mock(ts_mon_metrics, 'on_task_completed', on_task_completed)
summary.put()
self.assertEqual(len(calls), 1)
def test_new_run_result_duration_no_exit_code(self):
request = _gen_request()
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails(u'abc', None)
actual = task_result.new_run_result(request, to_run, u'localhost',
bot_details, {
u'id': [u'localhost'],
u'foo': [u'bar', u'biz']
}, None)
actual.completed_ts = self.now
actual.modified_ts = self.now
actual.started_ts = self.now
actual.duration = 1.
actual.state = task_result.State.COMPLETED
# Trigger _pre_put_hook().
with self.assertRaises(datastore_errors.BadValueError):
actual.put()
actual.state = task_result.State.TIMED_OUT
actual.put()
expected = self._gen_result(
completed_ts=self.now,
duration=1.,
modified_ts=self.now,
failure=True,
started_ts=self.now,
state=task_result.State.TIMED_OUT)
self.assertEqual(expected, actual.to_dict())
def test_integration(self):
# Creates a TaskRequest, along its TaskResultSummary and TaskToRunShard.
# Have a bot reap the task, and complete the task. Ensure the resulting
# TaskResultSummary and TaskRunResult are properly updated.
#
# Force tedious chunking.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
request = _gen_request()
result_summary = task_result.new_result_summary(request)
to_run = task_to_run.new_task_to_run(request, 0)
result_summary.modified_ts = utils.utcnow()
ndb.transaction(lambda: ndb.put_multi([result_summary, to_run]))
expected = self._gen_summary(modified_ts=self.now)
self.assertEqual(expected, result_summary.to_dict())
# Nothing changed 2 secs later except latency.
self.mock_now(self.now, 2)
self.assertEqual(expected, result_summary.to_dict())
# Task is reaped after 2 seconds (4 secs total).
reap_ts = self.now + datetime.timedelta(seconds=4)
self.mock_now(reap_ts)
to_run.consume(None)
to_run.put()
bot_details = task_scheduler.BotDetails(u'abc', 'test_project')
run_result = task_result.new_run_result(request, to_run, u'localhost',
bot_details, {},
result_summary.resultdb_info)
run_result.started_ts = utils.utcnow()
run_result.modified_ts = run_result.started_ts
run_result.dead_after_ts = utils.utcnow() + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
expected = self._gen_summary(
bot_dimensions={},
bot_version=u'abc',
bot_id=u'localhost',
bot_logs_cloud_project=u'test_project',
costs_usd=[0.],
modified_ts=reap_ts,
state=task_result.State.RUNNING,
started_ts=reap_ts,
try_number=1)
self.assertEqual(expected, result_summary.key.get().to_dict())
# Task completed after 2 seconds (6 secs total), the task has been running
# for 2 seconds.
complete_ts = self.now + datetime.timedelta(seconds=6)
self.mock_now(complete_ts)
run_result.completed_ts = complete_ts
run_result.duration = 0.1
run_result.exit_code = 0
run_result.state = task_result.State.COMPLETED
run_result.modified_ts = utils.utcnow()
run_result.dead_after_ts = None
task_result.PerformanceStats(
key=task_pack.run_result_key_to_performance_stats_key(run_result.key),
bot_overhead=0.1,
cache_trim=task_result.OperationStats(duration=0.01),
package_installation=task_result.OperationStats(duration=0.01),
named_caches_install=task_result.OperationStats(duration=0.01),
named_caches_uninstall=task_result.OperationStats(duration=0.01),
isolated_download=task_result.CASOperationStats(
duration=0.05,
initial_number_items=10,
initial_size=10000,
items_cold=large.pack([1, 2]),
items_hot=large.pack([3, 4, 5])),
isolated_upload=task_result.CASOperationStats(
duration=0.01, items_cold=large.pack([10])),
cleanup=task_result.OperationStats(duration=0.01)).put()
ndb.transaction(lambda: ndb.put_multi(run_result.append_output('foo', 0)))
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
expected = self._gen_summary(
bot_dimensions={},
bot_version=u'abc',
bot_id=u'localhost',
bot_logs_cloud_project=u'test_project',
completed_ts=complete_ts,
costs_usd=[0.],
duration=0.1,
exit_code=0,
modified_ts=complete_ts,
state=task_result.State.COMPLETED,
started_ts=reap_ts,
try_number=1)
self.assertEqual(expected, result_summary.key.get().to_dict())
expected = {
'bot_overhead': 0.1,
'cache_trim': {
'duration': 0.01,
},
'named_caches_install': {
'duration': 0.01,
},
'named_caches_uninstall': {
'duration': 0.01,
},
'isolated_download': {
'duration': 0.05,
'initial_number_items': 10,
'initial_size': 10000,
'items_cold': large.pack([1, 2]),
'items_hot': large.pack([3, 4, 5]),
'num_items_cold': 2,
'total_bytes_items_cold': 3,
'num_items_hot': 3,
'total_bytes_items_hot': 12,
},
'isolated_upload': {
'duration': 0.01,
'initial_number_items': None,
'initial_size': None,
'items_cold': large.pack([10]),
'items_hot': None,
'num_items_cold': 1,
'total_bytes_items_cold': 10,
'num_items_hot': None,
'total_bytes_items_hot': None,
},
'package_installation': {
'duration': 0.01,
},
'cleanup': {
'duration': 0.01,
},
}
self.assertEqual(expected, result_summary.performance_stats.to_dict())
self.assertEqual('foo', result_summary.get_output(0, 0))
self.assertEqual(
datetime.timedelta(seconds=2),
result_summary.duration_as_seen_by_server)
self.assertEqual(
datetime.timedelta(seconds=0.1),
result_summary.duration_now(utils.utcnow()))
self.assertEqual(datetime.timedelta(seconds=4), result_summary.pending)
self.assertEqual(
datetime.timedelta(seconds=4),
result_summary.pending_now(utils.utcnow()))
self.assertEqual(
task_pack.pack_result_summary_key(result_summary.key),
result_summary.task_id)
self.assertEqual(complete_ts, result_summary.ended_ts)
self.assertEqual(
task_pack.pack_run_result_key(run_result.key), run_result.task_id)
self.assertEqual(complete_ts, run_result.ended_ts)
def test_yield_result_summary_by_parent_task_id(self):
# prepare parent task
parent_run_result = _gen_run_result()
parent_run_result_id = parent_run_result.task_id
parent_summary = parent_run_result.key.parent().get()
# create child task result summaries
self.mock_now(self.now, 1)
child_summary_1 = _gen_summary_result(parent_task_id=parent_run_result_id)
self.mock_now(self.now, 2)
child_summary_2 = _gen_summary_result(parent_task_id=parent_run_result_id)
# should find the children by parent_task_id
result_summary_iter = task_result.yield_result_summary_by_parent_task_id(
parent_summary.task_id)
expected = [child_summary_1.key, child_summary_2.key]
self.assertEqual(sorted(expected),
sorted(s.key for s in result_summary_iter))
def test_set_from_run_result(self):
request = _gen_request()
result_summary = task_result.new_result_summary(request)
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails('abc', 'test_project')
run_result = task_result.new_run_result(request, to_run, 'localhost',
bot_details, {},
result_summary.resultdb_info)
run_result.started_ts = utils.utcnow()
self.assertTrue(_need_update_from_run_result(result_summary, run_result))
result_summary.modified_ts = utils.utcnow()
run_result.modified_ts = utils.utcnow()
run_result.dead_after_ts = utils.utcnow() + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
self.assertTrue(_need_update_from_run_result(result_summary, run_result))
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi([result_summary]))
self.assertFalse(_need_update_from_run_result(result_summary, run_result))
def test_set_from_run_result_two_server_versions(self):
request = _gen_request()
result_summary = task_result.new_result_summary(request)
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails('abc', 'test_project')
run_result = task_result.new_run_result(request, to_run, 'localhost',
bot_details, {},
result_summary.resultdb_info)
run_result.started_ts = utils.utcnow()
self.assertTrue(_need_update_from_run_result(result_summary, run_result))
result_summary.modified_ts = utils.utcnow()
run_result.modified_ts = utils.utcnow()
run_result.dead_after_ts = utils.utcnow() + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
self.assertTrue(_need_update_from_run_result(result_summary, run_result))
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi([result_summary]))
self.mock(utils, 'get_app_version', lambda: 'new-version')
run_result.signal_server_version()
run_result.modified_ts = utils.utcnow()
run_result.dead_after_ts = utils.utcnow() + datetime.timedelta(
seconds=request.bot_ping_tolerance_secs)
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi((result_summary, run_result)))
self.assertEqual(['v1a', 'new-version'],
run_result.key.get().server_versions)
self.assertEqual(['v1a', 'new-version'],
result_summary.key.get().server_versions)
def test_run_result_duration(self):
run_result = task_result.TaskRunResult(
started_ts=datetime.datetime(2010, 1, 1, 0, 0, 0),
completed_ts=datetime.datetime(2010, 1, 1, 0, 2, 0))
self.assertEqual(
datetime.timedelta(seconds=120), run_result.duration_as_seen_by_server)
self.assertEqual(
datetime.timedelta(seconds=120),
run_result.duration_now(utils.utcnow()))
run_result = task_result.TaskRunResult(
started_ts=datetime.datetime(2010, 1, 1, 0, 0, 0),
abandoned_ts=datetime.datetime(2010, 1, 1, 0, 1, 0))
self.assertEqual(None, run_result.duration_as_seen_by_server)
self.assertEqual(None, run_result.duration_now(utils.utcnow()))
def test_run_result_timeout(self):
request = _gen_request()
result_summary = task_result.new_result_summary(request)
result_summary.modified_ts = utils.utcnow()
ndb.transaction(result_summary.put)
to_run = task_to_run.new_task_to_run(request, 0)
bot_details = task_scheduler.BotDetails('abc', 'test_project')
run_result = task_result.new_run_result(request, to_run, 'localhost',
bot_details, {},
result_summary.resultdb_info)
run_result.state = task_result.State.TIMED_OUT
run_result.duration = 0.1
run_result.exit_code = -1
run_result.started_ts = utils.utcnow()
run_result.completed_ts = run_result.started_ts
run_result.modified_ts = run_result.started_ts
ndb.transaction(lambda: result_summary.set_from_run_result(
run_result, request))
ndb.transaction(lambda: ndb.put_multi((run_result, result_summary)))
run_result = run_result.key.get()
result_summary = result_summary.key.get()
self.assertEqual(True, run_result.failure)
self.assertEqual(True, result_summary.failure)
def test_result_task_state(self):
def check(expected, **kwargs):
self.assertEqual(expected,
task_result.TaskResultSummary(**kwargs).task_state)
# That's an incorrect state:
check(swarming_pb2.TASK_STATE_INVALID, state=task_result.State.BOT_DIED)
check(swarming_pb2.PENDING, state=task_result.State.PENDING)
# https://crbug.com/915342: PENDING_DEDUPING
check(swarming_pb2.RUNNING, state=task_result.State.RUNNING)
# https://crbug.com/796757: RUNNING_OVERHEAD_SETUP
# https://crbug.com/813412: RUNNING_OVERHEAD_TEARDOWN
# https://crbug.com/916560: TERMINATING
check(
swarming_pb2.RAN_INTERNAL_FAILURE,
internal_failure=True,
state=task_result.State.BOT_DIED)
# https://crbug.com/902807: DUT_FAILURE
# https://crbug.com/916553: BOT_DISAPPEARED
# https://crbug.com/916559: PREEMPTED
check(swarming_pb2.COMPLETED, state=task_result.State.COMPLETED)
check(swarming_pb2.TIMED_OUT, state=task_result.State.TIMED_OUT)
# https://crbug.com/916556: TIMED_OUT_SILENCE
check(swarming_pb2.KILLED, state=task_result.State.KILLED)
# https://crbug.com/916553: MISSING_INPUTS
check(
swarming_pb2.DEDUPED,
state=task_result.State.COMPLETED,
deduped_from=u'123')
check(swarming_pb2.EXPIRED, state=task_result.State.EXPIRED)
check(swarming_pb2.CANCELED, state=task_result.State.CANCELED)
check(swarming_pb2.NO_RESOURCE, state=task_result.State.NO_RESOURCE)
# https://crbug.com/916562: LOAD_SHED
# https://crbug.com/916557: RESOURCE_EXHAUSTED
def test_TaskRunResult_to_proto(self):
cipd_client_pkg = task_request.CipdPackage(
package_name=u'infra/tools/cipd/${platform}',
version=u'git_revision:deadbeef')
# Grand parent entity must have a valid key id and be stored.
# This task uses user:Jesus, which will be inherited automatically.
grand_parent = _gen_request()
grand_parent.key = task_request.new_request_key()
grand_parent.put()
# Parent entity must have a valid key id and be stored.
# Create them 1 second apart to differentiate create_time.
self.mock_now(self.now, 1)
grand_parent_run_id = grand_parent.task_id[:-1] + u'1'
parent = _gen_request(parent_task_id=grand_parent_run_id)
parent.key = task_request.new_request_key()
parent.put()
self.mock_now(self.now, 2)
run_result = _gen_run_result(
parent_task_id=parent.task_id[:-1] + u'1',
properties=_gen_properties(
cipd_input={
u'client_package': cipd_client_pkg,
u'packages': [
task_request.CipdPackage(
package_name=u'rm', path=u'bin', version=u'latest'),
],
u'server': u'http://localhost:2'
},
containment=task_request.Containment(lower_priority=True),
),
)
run_result.bot_idle_since_ts = self.now + datetime.timedelta(seconds=10)
run_result.started_ts = self.now + datetime.timedelta(seconds=20)
run_result.abandoned_ts = self.now + datetime.timedelta(seconds=30)
run_result.completed_ts = self.now + datetime.timedelta(seconds=40)
run_result.modified_ts = self.now + datetime.timedelta(seconds=50)
run_result.duration = 1.
run_result.current_task_slice = 2
run_result.exit_code = 1
run_result.cas_output_root = task_request.CASReference(
cas_instance=u'projects/test/instances/default',
digest={
'hash': u'12345',
'size_bytes': 1,
})
run_result.cipd_pins = task_result.CipdPins(
client_package=cipd_client_pkg,
packages=[
task_request.CipdPackage(
package_name=u'rm', path=u'bin', version=u'stable'),
])
task_result.PerformanceStats(
key=task_pack.run_result_key_to_performance_stats_key(run_result.key),
bot_overhead=0.1,
cache_trim=task_result.OperationStats(duration=0.001),
package_installation=task_result.OperationStats(duration=0.002),
named_caches_install=task_result.OperationStats(duration=0.003),
named_caches_uninstall=task_result.OperationStats(duration=0.004),
isolated_download=task_result.CASOperationStats(
duration=0.05,
initial_number_items=10,
initial_size=10000,
items_cold=large.pack([1, 2]),
items_hot=large.pack([3, 4, 5])),
isolated_upload=task_result.CASOperationStats(
duration=0.01, items_cold=large.pack([10])),
cleanup=task_result.OperationStats(duration=0.01)).put()
# Note: It cannot be both TIMED_OUT and have run_result.deduped_from set.
run_result.state = task_result.State.TIMED_OUT
run_result.bot_dimensions = {u'id': [u'bot1'], u'pool': [u'default']}
run_result.dead_after_ts = None
run_result.put()
props_h = '09057c5f724649e6f5566ecc1594e73515c41d507aa68dd79b5466394edbda3c'
expected = swarming_pb2.TaskResult(
request=swarming_pb2.TaskRequest(
task_slices=[
swarming_pb2.TaskSlice(
properties=swarming_pb2.TaskProperties(
cipd_inputs=[
swarming_pb2.CIPDPackage(
package_name=u'rm',
version=u'latest',
dest_path=u'bin',
),
],
containment=swarming_pb2.Containment(
lower_priority=True),
command=[u'command1'],
dimensions=[
swarming_pb2.StringListPair(
key=u'pool', values=[u'default']),
],
execution_timeout=duration_pb2.Duration(seconds=86400),
grace_period=duration_pb2.Duration(seconds=30),
),
expiration=duration_pb2.Duration(seconds=60),
properties_hash=props_h,
),
],
priority=50,
service_account=u'none',
name=u'Request name',
authenticated=u"user:mocked@example.com",
tags=[
u"authenticated:user:mocked@example.com",
u'parent_task_id:1d69b9f470008811',
u'pool:default',
u'priority:50',
u'realm:none',
u'service_account:none',
u'swarming.pool.template:no_config',
u'tag:1',
u'user:Jesus',
],
user=u'Jesus',
task_id=u'1d69b9f858008810',
parent_task_id='1d69b9f470008810',
parent_run_id='1d69b9f470008811',
),
duration=duration_pb2.Duration(seconds=1),
state=swarming_pb2.TIMED_OUT,
state_category=swarming_pb2.CATEGORY_EXECUTION_DONE,
try_number=1,
current_task_slice=2,
bot=swarming_pb2.Bot(
bot_id=u'bot1',
pools=[u'default'],
dimensions=[
swarming_pb2.StringListPair(key=u'id', values=[u'bot1']),
swarming_pb2.StringListPair(key=u'pool', values=[u'default']),
],
info=swarming_pb2.BotInfo(),
),
server_versions=[u'v1a'],
task_id=u'1d69b9f858008810',
run_id=u'1d69b9f858008811',
cipd_pins=swarming_pb2.CIPDPins(
server=u'http://localhost:2',
client_package=swarming_pb2.CIPDPackage(
package_name=u'infra/tools/cipd/${platform}',
version=u'git_revision:deadbeef',
),
packages=[
swarming_pb2.CIPDPackage(
package_name=u'rm',
version=u'stable',
dest_path=u'bin',
),
],
),
performance=swarming_pb2.TaskPerformance(
total_overhead=duration_pb2.Duration(nanos=100000000),
other_overhead=duration_pb2.Duration(nanos=20000000),
setup=swarming_pb2.TaskOverheadStats(
duration=duration_pb2.Duration(nanos=56000000),
cold=swarming_pb2.CASEntriesStats(
num_items=2,
total_bytes_items=3,
),
hot=swarming_pb2.CASEntriesStats(
num_items=3,
total_bytes_items=12,
),
),
setup_overhead=swarming_pb2.TaskSetupOverhead(
duration=duration_pb2.Duration(nanos=56000000),
cache_trim=swarming_pb2.CacheTrimOverhead(
duration=duration_pb2.Duration(nanos=1000000)),
cipd=swarming_pb2.CIPDOverhead(
duration=duration_pb2.Duration(nanos=2000000)),
named_cache=swarming_pb2.NamedCacheOverhead(
duration=duration_pb2.Duration(nanos=3000000)),
cas=swarming_pb2.CASOverhead(
duration=duration_pb2.Duration(nanos=50000000),
cold=swarming_pb2.CASEntriesStats(
num_items=2,
total_bytes_items=3,
),
hot=swarming_pb2.CASEntriesStats(
num_items=3,
total_bytes_items=12,
),
),
),
teardown=swarming_pb2.TaskOverheadStats(
duration=duration_pb2.Duration(nanos=24000000),
cold=swarming_pb2.CASEntriesStats(
num_items=1,
total_bytes_items=10,
),
),
teardown_overhead=swarming_pb2.TaskTeardownOverhead(
duration=duration_pb2.Duration(nanos=24000000),
cas=swarming_pb2.CASOverhead(
duration=duration_pb2.Duration(nanos=10000000),
cold=swarming_pb2.CASEntriesStats(
num_items=1,
total_bytes_items=10,
)),
named_cache=swarming_pb2.NamedCacheOverhead(
duration=duration_pb2.Duration(nanos=4000000)),
cleanup=swarming_pb2.CleanupOverhead(
duration=duration_pb2.Duration(nanos=10000000)),
),
),
exit_code=1,
cas_output_root=swarming_pb2.CASReference(
cas_instance=u'projects/test/instances/default',
digest=swarming_pb2.Digest(hash='12345', size_bytes=1),
),
)
expected.request.create_time.FromDatetime(self.now +
datetime.timedelta(seconds=2))
expected.create_time.FromDatetime(self.now + datetime.timedelta(seconds=2))
expected.bot.info.idle_since_ts.FromDatetime(self.now +
datetime.timedelta(seconds=10))
expected.start_time.FromDatetime(self.now + datetime.timedelta(seconds=20))
expected.abandon_time.FromDatetime(self.now +
datetime.timedelta(seconds=30))
expected.end_time.FromDatetime(self.now + datetime.timedelta(seconds=40))
actual = swarming_pb2.TaskResult()
run_result.to_proto(actual)
self.assertEqual(unicode(expected), unicode(actual))
# Make sure the root task id is the grand parent.
self.assertEqual(u'1d69b9f088008810', grand_parent.task_id)
self.assertEqual(u'1d69b9f088008811', grand_parent_run_id)
# Confirming that the parent and grand parent have different task ID.
self.assertEqual(u'1d69b9f470008810', parent.task_id)
self.assertEqual(expected.request.parent_task_id, parent.task_id)
actual = swarming_pb2.TaskResult()
expected.request.root_task_id = grand_parent.task_id
expected.request.root_run_id = grand_parent_run_id
run_result.to_proto(actual, append_root_ids=True)
self.assertEqual(unicode(expected), unicode(actual))
@parameterized.expand([
(2**31 - 1,),
(-2**31,),
(3221225786,), # 0xc000013a, STATUS_CONTROL_C_EXIT on Windows
(2**31,), # 0x80000000
(2**32 - 1,), # 0xffffffff
(2**32,), # 33bit
(-2**31 - 1,), # 33bit
])
def test_TaskRunResult_to_proto_exitcode(self, exit_code):
actual = swarming_pb2.TaskResult()
req = task_request.TaskRequest(id=1230)
res = task_result.TaskResultSummary(parent=req.key)
res._request_cache = req
res.exit_code = exit_code
res.to_proto(actual)
self.assertEqual(actual.exit_code, exit_code)
def test_TaskResultSummary_to_proto_empty(self):
# Assert that it doesn't throw on empty entity.
actual = swarming_pb2.TaskResult()
# It's unreasonable to expect the entity key to be unset, which complicates
# this test a bit.
req = task_request.TaskRequest(id=1230)
res = task_result.TaskResultSummary(parent=req.key)
res._request_cache = req
res.to_proto(actual)
expected = swarming_pb2.TaskResult(
request=swarming_pb2.TaskRequest(task_id='7ffffffffffffb310'),
state=swarming_pb2.PENDING,
state_category=swarming_pb2.CATEGORY_PENDING,
task_id='7ffffffffffffb310')
self.assertEqual(expected, actual)
def test_TaskRunResult_to_proto_empty(self):
# Assert that it doesn't throw on empty entity.
actual = swarming_pb2.TaskResult()
# It's unreasonable to expect the entity key to be unset, which complicates
# this test a bit.
req = task_request.TaskRequest(id=1230)
res_sum = task_result.TaskResultSummary(parent=req.key, id=1)
res_sum._request_cache = req
res = task_result.TaskRunResult(parent=res_sum.key, id=1)
res._request_cache = req
res.to_proto(actual)
expected = swarming_pb2.TaskResult(
request=swarming_pb2.TaskRequest(task_id='7ffffffffffffb310'),
state=swarming_pb2.RUNNING,
state_category=swarming_pb2.CATEGORY_RUNNING,
try_number=1,
task_id='7ffffffffffffb310',
run_id='7ffffffffffffb311')
self.assertEqual(expected, actual)
def test_performance_stats_pre_put_hook(self):
with self.assertRaises(datastore_errors.BadValueError):
task_result.PerformanceStats().put()
def test_get_result_summaries_query(self):
# Indirectly tested by API.
pass
def test_get_run_results_query(self):
# Indirectly tested by API.
pass
def test_fetch_task_results(self):
running_res = _gen_run_result() # RUNNING
pending_res = _gen_summary_result() # PENDING
ndb.get_context().clear_cache()
memcache.flush_all()
running_res_cache = _gen_run_result() # RUNNING in cache
pending_res_cache = _gen_summary_result() # PENDING in cache
results = task_result.fetch_task_results([
running_res.task_id, pending_res.task_id, running_res_cache.task_id,
pending_res_cache.task_id, '1d69b9f188008811'
])
self.assertEqual(
results,
[running_res, pending_res, running_res_cache, pending_res_cache, None])
def test_fetch_task_result_summaries(self):
# Tested by test_fetch_task_results already.
pass
class TestOutput(TestCase):
def assertTaskOutputChunk(self, expected):
q = task_result.TaskOutputChunk.query().order(
task_result.TaskOutputChunk.key)
self.assertEqual(expected, [t.to_dict() for t in q.fetch()])
def test_append_output(self):
# Force tedious chunking.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
run_result = _gen_run_result()
# Test that one can stream output and it is returned fine.
def run(*args):
ndb.put_multi(run_result.append_output(*args))
run('Part1\n', 0)
run('Part2\n', len('Part1\n'))
run('Part3\n', len('Part1P\n'))
self.assertEqual('Part1\nPPart3\n', run_result.get_output(0, 0))
def test_append_output_max_chunk(self):
# Ensures that data is dropped.
# Force tedious chunking.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
self.mock(task_result.TaskOutput, 'PUT_MAX_CHUNKS', 16)
self.assertEqual(2 * 16, task_result.TaskOutput.PUT_MAX_CONTENT())
run_result = _gen_run_result()
calls = []
self.mock(logging, 'warning', lambda *args: calls.append(args))
max_chunk = 'x' * task_result.TaskOutput.PUT_MAX_CONTENT()
entities = run_result.append_output(max_chunk, 0)
self.assertEqual(task_result.TaskOutput.PUT_MAX_CHUNKS, len(entities))
ndb.put_multi(entities)
self.assertEqual([], calls)
# Try with PUT_MAX_CONTENT + 1 bytes, so the last byte is discarded.
entities = run_result.append_output(max_chunk + 'x', 0)
self.assertEqual(task_result.TaskOutput.PUT_MAX_CHUNKS, len(entities))
ndb.put_multi(entities)
self.assertEqual(1, len(calls))
self.assertTrue(calls[0][0].startswith('Dropping '), calls[0][0])
self.assertEqual(1, calls[0][1])
def test_append_output_partial(self):
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('Foo', 10))
expected_output = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00Foo'
self.assertEqual(expected_output, run_result.get_output(0, 0))
self.assertTaskOutputChunk([{'chunk': expected_output, 'gaps': [0, 10]}])
def test_append_output_partial_hole(self):
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('Foo', 0))
ndb.put_multi(run_result.append_output('Bar', 10))
expected_output = 'Foo\x00\x00\x00\x00\x00\x00\x00Bar'
self.assertEqual(expected_output, run_result.get_output(0, 0))
self.assertTaskOutputChunk([{'chunk': expected_output, 'gaps': [3, 10]}])
def test_append_output_partial_far(self):
run_result = _gen_run_result()
ndb.put_multi(
run_result.append_output('Foo', 10 + task_result.TaskOutput.CHUNK_SIZE))
expected_output = '\x00' * (task_result.TaskOutput.CHUNK_SIZE + 10) + 'Foo'
self.assertEqual(expected_output, run_result.get_output(0, 0))
expected = [
{
'chunk': '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00Foo',
'gaps': [0, 10]
},
]
self.assertTaskOutputChunk(expected)
def test_append_output_partial_far_split(self):
# Missing, writing happens on two different TaskOutputChunk entities.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 16)
run_result = _gen_run_result()
ndb.put_multi(
run_result.append_output('FooBar',
2 * task_result.TaskOutput.CHUNK_SIZE - 3))
expected_output = ('\x00' * (task_result.TaskOutput.CHUNK_SIZE * 2 - 3) +
'FooBar')
self.assertEqual(expected_output, run_result.get_output(0, 0))
expected = [
{
'chunk': '\x00' * (task_result.TaskOutput.CHUNK_SIZE - 3) + 'Foo',
'gaps': [0, 13],
},
{
'chunk': 'Bar',
'gaps': []
},
]
self.assertTaskOutputChunk(expected)
def test_append_output_overwrite(self):
# Overwrite previously written data.
# Force tedious chunking.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 2)
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('FooBar', 0))
ndb.put_multi(run_result.append_output('X', 3))
self.assertEqual('FooXar', run_result.get_output(0, 0))
self.assertTaskOutputChunk([
{
'chunk': 'Fo',
'gaps': []
},
{
'chunk': 'oX',
'gaps': []
},
{
'chunk': 'ar',
'gaps': []
},
])
def test_append_output_reverse_order(self):
# Write the data in reverse order in multiple calls.
# TODO(maruel): This isn't working perfectly, for example this fails if
# CHUNK_SIZE is mocked to 8.
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('Wow', 11))
ndb.put_multi(run_result.append_output('Foo', 8))
ndb.put_multi(run_result.append_output('Baz', 0))
ndb.put_multi(run_result.append_output('Bar', 4))
expected_output = 'Baz\x00Bar\x00FooWow'
self.assertEqual(expected_output, run_result.get_output(0, 0))
self.assertTaskOutputChunk([{
'chunk': expected_output,
'gaps': [3, 4, 7, 8]
}])
def test_append_output_reverse_order_second_chunk(self):
# Write the data in reverse order in multiple calls.
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 16)
run_result = _gen_run_result()
ndb.put_multi(
run_result.append_output('Wow', task_result.TaskOutput.CHUNK_SIZE + 11))
ndb.put_multi(
run_result.append_output('Foo', task_result.TaskOutput.CHUNK_SIZE + 8))
ndb.put_multi(
run_result.append_output('Baz', task_result.TaskOutput.CHUNK_SIZE + 0))
ndb.put_multi(
run_result.append_output('Bar', task_result.TaskOutput.CHUNK_SIZE + 4))
expected_output = (
task_result.TaskOutput.CHUNK_SIZE * '\x00' + 'Baz\x00Bar\x00FooWow')
self.assertEqual(expected_output, run_result.get_output(0, 0))
self.assertTaskOutputChunk([{
'chunk': 'Baz\x00Bar\x00FooWow',
'gaps': [3, 4, 7, 8]
}])
def test_get_output_subset(self):
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 16)
run_result = _gen_run_result()
data = string.ascii_letters
ndb.put_multi(run_result.append_output(data, 0))
self.assertEqual(data[12:18], run_result.get_output(12, 6))
self.assertEqual(data[12:], run_result.get_output(12, 0))
def test_get_output_utf8(self):
self.mock(task_result.TaskOutput, 'CHUNK_SIZE', 4)
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('Foo๐Ÿค Bar', 0))
self.assertEqual('Foo๐Ÿค Bar', run_result.get_output(0, 0))
self.assertTaskOutputChunk([
{
'chunk': b'Foo\xf0',
'gaps': []
},
{
'chunk': b'\x9f\xa4\xa0B',
'gaps': []
},
{
'chunk': b'ar',
'gaps': []
},
])
def test_get_output_utf8_range(self):
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('Foo๐Ÿค Bar', 0))
self.assertEqual('๐Ÿค ', run_result.get_output(3, 4))
def test_get_output_utf8_limit(self):
run_result = _gen_run_result()
ndb.put_multi(run_result.append_output('๐Ÿ˜€๐Ÿ˜ƒ๐Ÿ˜„๐Ÿ˜๐Ÿ˜†', 0))
self.assertEqual('๐Ÿ˜€๐Ÿ˜ƒ๐Ÿ˜„๐Ÿ˜', run_result.get_output(0, 16))
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
unittest.main()