blob: 7c4a72de414de05c46a765375185df75135b3cf7 [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
from collections import OrderedDict
import attr
from google.protobuf import json_format as jsonpb
from import sections as sections_pb2
from ...recipe_test_api import StepTestData, BaseTestData
from ...step_data import ExecutionResult
from ...third_party import luci_context
from ...engine_types import ResourceCost
from ..engine_env import FakeEnviron
from ..global_shutdown import GLOBAL_SHUTDOWN
from . import StepRunner, Step
class SimulationStepRunner(StepRunner):
"""Pretends to run steps, instead recording what would have been run.
This is the main workhorse of simulation_test. Returns the log of
steps that would have been run in steps_ran. Uses test_data to mock return
def __init__(self, test_data):
self._test_data = test_data
# dot-name -> StepTestData
self._used_steps = {}
# (dot-name, namespace, name) -> PlaceholderTestData
self._used_placeholders = {}
# (dot-name, handle_name) -> PlaceholderTestData
self._used_handle_placeholders = {}
# dot-name -> {
# env_prefixes: {str: List[str]}
# env_suffixes: {str: List[str]}
# env: {str: str}
# infra_step: bool
# timeout: int
# allow_subannotations: bool
# }
# NOTE: This data is merged with the ordered presentation (UI) data in the
# test command implementation. Thus this dictionary doesn't need to be
# ordered.
# TODO(iannucci): Make this expectation data a real type (either @attr.s or
# a protobuf message)
self._step_precursor_data = {}
# dot-name -> Step
self._step_history = {}
def register_step_config(self, name_tokens, step_config):
dot_name = '.'.join(name_tokens)
# This moves the test data from _test_data to _used_steps. This will return
# StepData() if `dot_name` isn't in self._test_data.
self._used_steps[dot_name] = self._test_data.pop_step_test_data(
dot_name, step_config.step_test_data or StepTestData)
if self._used_steps[dot_name].global_shutdown_event == 'before':
self._step_precursor_data[dot_name] = {
'env_prefixes': step_config.env_prefixes.mapping,
'env_suffixes': step_config.env_suffixes.mapping,
'env': {
k: v for k, v in step_config.env.items()
# Trim out LUCI_CONTEXT because it's useless information in tests, since
# the entire luci_context data is included in the test output.
if k.upper() != luci_context.ENV_KEY
'timeout': step_config.timeout,
'infra_step': step_config.infra_step,
'allow_subannotations': step_config.allow_subannotations,
if step_config.cost != ResourceCost():
self._step_precursor_data[dot_name]['cost'] = step_config.cost
def placeholder(self, name_tokens, placeholder):
dot_name = '.'.join(name_tokens)
# TODO(iannucci): this is janky; simplify all the placeholder naming stuff.
# See comment on step_data.StepData.
module_name, method_name = placeholder.namespaces
name =
key = (dot_name, module_name, method_name, name)
if key not in self._used_placeholders:
self._used_placeholders[key] = self._used_steps[dot_name].pop_placeholder(
module_name, method_name, name)
ret = self._used_placeholders[key]
return ret
def handle_placeholder(self, name_tokens, handle_name):
dot_name = '.'.join(name_tokens)
key = (dot_name, handle_name)
if key not in self._used_placeholders:
self._used_placeholders[key] = getattr(
self._used_steps[dot_name], handle_name)
return self._used_placeholders[key]
def now(self):
# Note that we COULD coordinate with some simulatable time system (e.g. the
# recipe_engine/time module)... however this is just used for adjusting
# the soft_deadline in LUCI_CONTEXT['deadline'] prior to invoking
# write_luci_context where the simulation currently discards it anyway.
return 0
def write_luci_context(self, section_values):
# We ignore this environment variable anyway.
return ""
def run(self, name_tokens, debug_log, step: Step):
del debug_log # unused
dot_name = '.'.join(name_tokens)
# Create the "recipe expectation" dict for this step.
# TODO(iannucci): Rationalize these:
# * use step.env instead of precursor
# * Always omit empty fields (right now cmd is kept)
step_obj = attr.asdict(
step, filter=lambda attr, value: bool(value))
step_obj['name'] = dot_name
if 'cmd' not in step_obj:
step_obj['cmd'] = []
precursor = self._step_precursor_data[dot_name]
step_obj.pop('luci_context', None)
if step.luci_context:
lctx = {}
for name, section in step.luci_context.items():
if name == 'deadline':
# This is the default deadline and is fully specified by the
# `timeout` parameter below. To avoid blowing out expectations, we
# omit the section.
default_deadline = sections_pb2.Deadline(
if section == default_deadline:
lctx[name] = jsonpb.MessageToDict(section)
# Finally, if any sections actually made it through, set it on step_obj
# here.
if lctx:
step_obj['luci_context'] = lctx
for handle_name in ('stdout', 'stderr'):
step_obj.pop(handle_name, None)
if 'cost' in precursor:
if precursor['cost'] is None:
step_obj['cost'] = None
step_obj['cost'] = attr.asdict(precursor['cost'])
if precursor['env_prefixes']:
step_obj['env_prefixes'] = precursor['env_prefixes']
if precursor['env_suffixes']:
step_obj['env_suffixes'] = precursor['env_suffixes']
if precursor['env']:
fake_env = FakeEnviron()
step_obj['env'] = {
k: (v if v is None else v % fake_env)
for k, v in precursor['env'].items()
step_obj.pop('env', None)
if precursor['infra_step']:
step_obj['infra_step'] = True
if precursor['allow_subannotations']:
step_obj['allow_subannotations'] = True
if precursor['timeout']:
step_obj['timeout'] = precursor['timeout']
self._step_history.setdefault(dot_name, {}).update(step_obj)
tdata = self._used_steps[dot_name]
if not step.cmd and tdata.retcode:
# If you're here, it means that you had recipe code which ran a step with
# no command at all (e.g. None or []). These sorts of steps are 'display
# only' and will never have a non-0 exit code when the recipe runs in
# production (on a builder, etc.).
# If the intent of the code was just to have a step that raises an
# exception, use `api.step.empty` with the `status` kwarg set, which will
# make a display-only step, and also raise an appropriate exception.
# If the intent of the code was to have a step which can sometimes fail
# for tests, change this step to actually run a real command, or pick
# a real step to simulate a non-zero return code for.
# See CL
raise ValueError(
f'Cannot simulate no-op step {"|".join(name_tokens)!r} with a retcode'
' other than 0. To simulate a step with a retcode, make it real'
' (e.g. give it a command). To just have a step which displays with'
' an error, use `api.step.empty()`.')
if tdata.global_shutdown_event == 'after':
if tdata.times_out_after and precursor['timeout']:
if tdata.times_out_after > precursor['timeout']:
return ExecutionResult(had_timeout=True)
if tdata.cancel:
return ExecutionResult(was_cancelled=True, retcode=tdata.retcode)
return ExecutionResult(retcode=tdata.retcode or 0)
def run_noop(self, name_tokens, debug_log):
return, debug_log, Step(
def export_steps_ran(self):
"""Returns a dictionary of all steps run.
This maps from the step's dot-name to dictionaries of:
* name (str) - The step's dot-name
* cmd (List[str]) - The command
* cwd (str) - The current working directory
* env (Dict[str, (str|None)]) - Mapping of direct environment
* env_prefixes (Dict[str, List[str]]) - Mapping of direct environment
replacements which should be joined at the beginning of the env key with
* env_suffixes (Dict[str, List[str]]) - Mapping of direct environment
replacements which should be joined at the end of the env key with
* infra_step (bool) - If this step was intended to be an 'infra step' or
* timeout (int) - The timeout, in seconds.
* luci_context (Dict[str, Dict{...}]) - The luci_context data for this
TODO(iannucci): Make this map to a real type.
return self._step_history.copy()