blob: f40421c9ce4b871d449335f3f60169eb070be4de [file] [log] [blame] [edit]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import collections
import contextlib
import traceback
from ... import recipe_api
from ... import recipe_test_api
from .. import stream
from . import StepRunner, OpenStep, FakeEnviron
from . import construct_step_result, render_step, merge_envs
class SimulationStepRunner(StepRunner):
"""Pretends to run steps, instead recording what would have been run.
This is the main workhorse of recipes.py simulation_test. Returns the log of
steps that would have been run in steps_ran. Uses test_data to mock return
values.
"""
def __init__(self, stream_engine, test_data, annotator):
self._test_data = test_data
self._stream_engine = stream_engine
self._annotator = annotator
self._step_history = collections.OrderedDict()
@property
def stream_engine(self):
return self._stream_engine
def open_step(self, step_config):
try:
test_data_fn = step_config.step_test_data or recipe_test_api.StepTestData
step_test = self._test_data.pop_step_test_data(step_config.name,
test_data_fn)
rendered_step = render_step(step_config, step_test)
# Merge our environment. Note that do NOT apply prefixes when rendering
# expectations, as they are rendered independently.
step_env = merge_envs(FakeEnviron(), rendered_step.config.env, {}, {},
None)
rendered_step = rendered_step._replace(
config=rendered_step.config._replace(env=step_env.data))
step_config = None # Make sure we use rendered step config.
# Layer the simulation step on top of the given stream engine.
step_stream = self._stream_engine.new_step_stream(rendered_step.config)
except:
with self.stream_engine.make_step_stream('Step Preparation Exception') as s:
s.set_step_status('EXCEPTION')
with s.new_log_stream('exception') as l:
l.write_split(traceback.format_exc())
raise
class ReturnOpenStep(OpenStep):
# pylint: disable=no-self-argument
def run(inner):
timeout = rendered_step.config.timeout
if (timeout and step_test.times_out_after and
step_test.times_out_after > timeout):
raise recipe_api.StepTimeout(rendered_step.config.name, timeout)
# Install a placeholder for order.
self._step_history[rendered_step.config.name] = None
return construct_step_result(rendered_step, step_test.retcode)
def finalize(inner):
rs = rendered_step
# note that '~' sorts after 'z' so that this will be last on each
# step. also use _step to get access to the mutable step
# dictionary.
buf = self._annotator.step_buffer(rs.config.name)
lines = filter(None, buf.getvalue()).splitlines()
# Only keep @@@annotation@@@ lines.
lines = [stream.encode_str(x) for x in lines if x.startswith('@@@')]
if lines:
# This magically floats into step_history, which we have already
# added step_config to.
rs = rs._replace(followup_annotations=lines)
step_stream.close()
self._step_history[rs.config.name] = rs
@property
def stream(inner):
return step_stream
return ReturnOpenStep()
@contextlib.contextmanager
def run_context(self):
try:
yield
except Exception as ex:
with self._test_data.should_raise_exception(ex) as should_raise:
if should_raise:
raise
assert_msg = (
"Unconsumed test data for steps: %s. Ran the following steps "
"(in order):\n%s" % (
self._test_data.step_data.keys(),
'\n'.join(repr(s) for s in self._step_history.keys())))
if self._test_data.expected_exception:
assert_msg += ", (exception %s)" % self._test_data.expected_exception
assert self._test_data.consumed, assert_msg
def _rendered_step_to_dict(self, rs):
d = rs.config.render_to_dict()
if rs.followup_annotations:
d['~followup_annotations'] = rs.followup_annotations
return d
@property
def steps_ran(self):
return collections.OrderedDict(
(name, self._rendered_step_to_dict(rs))
for name, rs in self._step_history.iteritems())