blob: 341e584633247b817f74d3c489071e1f12a09659 [file] [log] [blame]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import json
from recipe_engine import recipe_test_api
from PB.recipe_modules.recipe_engine.swarming import properties
from .api import TaskRequest
from .state import TaskState
class SwarmingTestApi(recipe_test_api.RecipeTestApi):
TaskState = TaskState
def __init__(self, *args, **kwargs):
super(SwarmingTestApi, self).__init__(*args, **kwargs)
self._task_id_count = 0
self._saved_task_for_show_request = None
def check_triggered_request(self, check, step_odict, step, *checkers):
"""Check the input request of a swarming trigger call.
Args:
check, step_odict: provided by post_process
step (str): the step name to check
checkers (Seq[lambda]): a list of functions that take in two args: |check|
and a TaskRequest object.
Example usage:
c1 = lambda check, req: check(req[0].dimensions == {'os': 'Linux'})
c2 = lambda check, req: check(req[0].env == {'FOO': '42'})
post_check(api.swarming.check_triggered_request, 'trigger foo', c1, c2)
"""
# step.json.input is not available.
cmd = step_odict[step].cmd
json_d = cmd[cmd.index('-json-input') + 1]
json_reqs = json.loads(json_d)['requests']
for jr in json_reqs:
req = TaskRequest(self.m)._from_jsonish(jr)
for c in checkers:
c(check, req)
def example_task_request_jsonish(self):
"""Returns a dict that can be parsed by task_request_from_jsonish()."""
return {
'name': 'QEMU',
'priority': 20,
'service_account': 'foo@example.com',
'task_slices': [{
'expiration_secs': '18000',
'properties': {
'cipd_input': {
'packages': [],
'server': ''
},
'command': ['/bin/true'],
'relative_cwd': 'some/dir',
'containment': {
'containment_type': 'NONE',
'limit_processes': False,
'limit_total_committed_memory': False,
'lower_priority': False,
},
'dimensions': [{
'key': 'pool',
'value': 'swarming-pool',
}],
'env': [],
'env_prefixes': [],
'execution_timeout_secs': '2400',
'grace_period_secs': '30',
'cas_input_root': {
'cas_instance':
'projects/example-project/instances/default_instance',
'digest': {
'hash':
'24b2420bc49d8b8fdc1d011a163708927532b37dc9f91d7d8d6877e3a86559ca',
'size_bytes':
'73',
},
},
'idempotent': False,
'io_timeout_secs': '430',
'outputs': []
}
}],
'realm': 'project:bucket',
'resultdb': {
'enable': True,
},
}
def properties(self, task_id='fake-task-id', bot_id='fake-bot-id'):
return self.m.properties.environ(
properties.EnvProperties(
SWARMING_TASK_ID=task_id, SWARMING_BOT_ID=bot_id))
def trigger(self, task_names, initial_id=None, resultdb=True):
"""Generates step test data intended to mock api.swarming.trigger()
Args:
task_names (seq[str]): A sequence of task names representing the tasks we
want to trigger.
initial_id (int): The beginning of the ID range.
resultdb (bool|seq[bool]): If true, adds an invocation name to the trigger
output. A sequence of bools can be passed to add an invocation on a
task-by-task basis.
"""
start = self._task_id_count if initial_id is None else initial_id
self._task_id_count += len(task_names)
if resultdb in (True, False):
resultdb = [resultdb] * len(task_names)
assert len(resultdb) == len(task_names)
trigger_output = {'tasks': []}
for idx, (name, add_invocation) in enumerate(
zip(task_names, resultdb), start=start):
task_output = {
'task_id': '%d' % idx,
'request': {
'name': name,
},
}
if add_invocation:
task_output['task_result'] = {
'resultdb_info': {
'invocation': 'invocations/%d' % idx,
},
}
trigger_output['tasks'].append(task_output)
return self.m.json.output(trigger_output)
@staticmethod
def task_result(id,
name,
state=TaskState.COMPLETED,
duration=62.35,
failure=False,
output='hello world!',
outputs=(),
bot_id='vm-123'):
"""
Returns the raw results of a Swarming task.
Args:
id (str): The ID of the task.
name (str): The name of the task.
state (TaskState|None): The final state of the task; if None, the task is
regarded to be in an unknown state.
duration (int): The duration of the task
failure (bool): Whether the task failed; ignored if state is not
TaskState.COMPLETE.
output (str): That raw output of the task.
outputs (seq(str)):
"""
assert isinstance(state, TaskState) or state == None
assert state not in [
TaskState.INVALID,
TaskState.PENDING,
TaskState.RUNNING,
], 'state %s invalid or not final' % state.name
if state == None:
return {
'error': 'Bot could not be contacted',
'results': {
'task_id': id
},
}
cas_hash = '24b2420bc49d8b8fdc1d011a163708927532b37dc9f91d7d8d6877e3a86559ca'
raw_results = {
'output': output,
'outputs': outputs,
'results': {
'bot_id': bot_id,
'name': name,
'task_id': id,
'state': state.name,
'duration': duration,
'cas_output_root': {
'cas_instance':
'projects/example-project/instances/default_instance',
'digest': {
'hash': cas_hash,
'size_bytes': '73',
},
},
'resultdb_info': {
'invocation': 'invocations/some-inv-name',
},
},
}
if state == TaskState.COMPLETED:
raw_results['results']['exit_code'] = str(int(failure))
return raw_results
def collect(self, task_results):
"""Generates test step data for the swarming API collect method.
Args:
task_results (seq[dict]): A sequence of dicts encoding swarming task
results.
Returns:
Step test data in the form of JSON output intended to mock a swarming API
collect method call.
"""
id_to_result = {
result['results']['task_id']: result for result in task_results
}
return self.m.json.output(id_to_result)
def set_task_for_show_request(self, task):
self._saved_task_for_show_request = task
def show_request(self):
"""Return saved TaskRequest jsonish data for the Swarming API show-request
method.
Returns:
Step test data in the form of JSON output intended to mock a swarming API
show-request method call.
"""
task = self._saved_task_for_show_request
if isinstance(task, TaskRequest):
task = task.to_jsonish()
return self.m.json.output_stream(task)
def generate_bot_json(self,
bot_id,
is_dead=False,
quarantined=False,
maintenance_msg=None,
dimensions=None,
state=None):
bot = {'bot_id': bot_id}
if is_dead:
bot['is_dead'] = True
if quarantined:
bot['quarantined'] = True
if maintenance_msg:
bot['maintenance_msg'] = maintenance_msg
if dimensions:
bot['dimensions'] = [{
'key': k,
'value': v
} for k, v in dimensions.items()]
if state:
bot['state'] = self.m.json.dumps(state)
return bot
def list_bots(self, dimensions=None):
"""Generates step test data intended to mock api.swarming.list_bots()
Args:
dimensions (None|Dict[str, str]): Select bots that match the given
dimensions.
"""
raw_results = [
self.generate_bot_json(
'build11-dead--device1',
is_dead=True,
dimensions=dimensions),
self.generate_bot_json(
'build12-quarantined--device1',
quarantined=True,
dimensions=dimensions),
self.generate_bot_json(
'build13-maintenance--device1',
maintenance_msg='In maintenance mode',
dimensions=dimensions),
self.generate_bot_json(
'build14-alive--device1',
dimensions=dimensions,
state={'devices': {
'build13-alive-1-serial': {
'max_uid': 19999,
}
}}),
]
return self.m.json.output(raw_results)