blob: 2f37437a291d78aaa25906f12b115656bd236ee5 [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import contextlib
import inspect
from collections import namedtuple, defaultdict
from .util import ModuleInjectionSite, static_call, static_wraps
from .types import freeze
def combineify(name, dest, a, b, overwrite=False):
"""
Combines dictionary members in two objects into a third one using addition.
Args:
name - the name of the member
dest - the destination object
a - the first source object
b - the second source object
overwrite - if True, for the same key, overwrite value from a with the one
from b; otherwise, use addition to merge them.
"""
dest_dict = getattr(dest, name)
dest_dict.update(getattr(a, name))
for k, v in getattr(b, name).iteritems():
if k in dest_dict:
if not overwrite:
dest_dict[k] += v
else:
dest_dict[k] = v
else:
dest_dict[k] = v
class BaseTestData(object):
def __init__(self, enabled=True):
super(BaseTestData, self).__init__()
self._enabled = enabled
@property
def enabled(self):
return self._enabled
class PlaceholderTestData(BaseTestData):
def __init__(self, data=None, name=None):
super(PlaceholderTestData, self).__init__()
self.data = data
self.name = name
def __repr__(self):
if self.name is None:
return "PlaceholderTestData(DEFAULT, %r)" % (self.data,)
else:
return "PlaceholderTestData(%r, %r)" % (self.name, self.data,)
class StepTestData(BaseTestData):
"""
Mutable container for per-step test data.
This data is consumed while running the recipe (during
annotated_run.run_steps).
"""
def __init__(self):
super(StepTestData, self).__init__()
# { (module, placeholder, name) -> data }. Data are for output placeholders.
self.placeholder_data = defaultdict(dict)
self.override = False
self._stdout = None
self._stderr = None
self._retcode = None
self._times_out_after = None
def __add__(self, other):
assert isinstance(other, StepTestData)
if other.override:
return other
ret = StepTestData()
combineify('placeholder_data', ret, self, other, overwrite=True)
# pylint: disable=W0212
ret._stdout = other._stdout or self._stdout
ret._stderr = other._stderr or self._stderr
ret._retcode = self._retcode
if other._retcode is not None:
if ret._retcode is not None and ret._retcode != other._retcode:
raise ValueError('Conflicting retcode values.')
ret._retcode = other._retcode
ret._times_out_after = self._times_out_after
if other._times_out_after is not None:
if ret._times_out_after is not None:
raise ValueError('Conflicting times_out_after values.')
ret._times_out_after = other._times_out_after
return ret
def unwrap_placeholder(self):
# {(module, placeholder, name): data} => data.
if len(self.placeholder_data) != 1:
raise ValueError('Cannot unwrap placeholder_data with length > 1: len=%d'
% len(self.placeholder_data))
return self.placeholder_data.values()[0]
def pop_placeholder(self, module_name, placeholder_name, name):
return self.placeholder_data.pop(
(module_name, placeholder_name, name), PlaceholderTestData())
@property
def retcode(self): # pylint: disable=E0202
return self._retcode or 0
@retcode.setter
def retcode(self, value): # pylint: disable=E0202
self._retcode = value
@property
def times_out_after(self): # pylint: disable=E0202
return self._times_out_after or 0
@times_out_after.setter
def times_out_after(self, value): # pylint: disable=E0202
self._times_out_after = value
@property
def stdout(self):
return self._stdout or PlaceholderTestData(None)
@stdout.setter
def stdout(self, value):
assert isinstance(value, PlaceholderTestData)
self._stdout = value
@property
def stderr(self):
return self._stderr or PlaceholderTestData(None)
@stderr.setter
def stderr(self, value):
assert isinstance(value, PlaceholderTestData)
self._stderr = value
@property
def stdin(self): # pylint: disable=R0201
return PlaceholderTestData(None)
def __repr__(self):
dct = {
'placeholder_data': dict(self.placeholder_data.iteritems()),
'stdout': self._stdout,
'stderr': self._stderr,
'retcode': self._retcode,
'override': self.override,
}
if self.times_out_after:
dct['times_out_after'] = self.times_out_after
return "StepTestData(%r)" % dct
class ModuleTestData(BaseTestData, dict):
"""
Mutable container for test data for a specific module.
This test data is consumed at module load time (i.e. when create_recipe_api
runs).
"""
def __add__(self, other):
assert isinstance(other, ModuleTestData)
ret = ModuleTestData()
ret.update(self)
ret.update(other)
return ret
def __repr__(self):
return "ModuleTestData(%r)" % super(ModuleTestData, self).__repr__()
PostprocessHookContext = namedtuple(
'PostprocessHookContext', 'func args kwargs filename lineno')
"""The context describing where a post-process hook was added."""
PostprocessHook = namedtuple(
'PostprocessHook', 'func args kwargs context')
"""The details of a post-process hook.
func, args and kwargs detail the actual objects to use to invoke the check.
Context describes where the hook was added. Depending on whether post_process
or post_check is used, the context may or may not contain the same func, args
and kwargs.
"""
class TestData(BaseTestData):
def __init__(self, name=None):
super(TestData, self).__init__()
self.name = name
self.properties = {} # key -> val
self.mod_data = defaultdict(ModuleTestData)
self.step_data = defaultdict(StepTestData)
self.expected_exception = None
self.post_process_hooks = [] # list(PostprocessHook)
def __add__(self, other):
assert isinstance(other, TestData)
ret = TestData(self.name or other.name)
ret.properties.update(self.properties)
ret.properties.update(other.properties)
combineify('mod_data', ret, self, other)
combineify('step_data', ret, self, other)
ret.post_process_hooks.extend(self.post_process_hooks)
ret.post_process_hooks.extend(other.post_process_hooks)
ret.expected_exception = self.expected_exception
if other.expected_exception:
ret.expected_exception = other.expected_exception
return ret
@property
def consumed(self):
return not (self.step_data or self.expected_exception)
def pop_step_test_data(self, step_name, step_test_data_fn):
step_test_data = step_test_data_fn()
if step_name in self.step_data:
try:
step_test_data += self.step_data.pop(step_name)
except ValueError as ve:
raise ValueError('in step %r: %s' % (step_name, ve))
return step_test_data
def get_module_test_data(self, module_name):
return self.mod_data.get(module_name, ModuleTestData())
def expect_exception(self, exception):
if self.expected_exception:
raise ValueError('Cannot expect more than one exception')
if not isinstance(exception, basestring):
raise ValueError('expect_exception expects a string containing the '
'exception class name')
self.expected_exception = exception
@contextlib.contextmanager
def should_raise_exception(self, exception):
"""
Context manager which tells the caller if it should re-raise the exception.
Should be called by something handling an exception caused by executing the
test. The caller can handle the exception, taking any actions it wants with
the exception. Once it is done, the caller should check the value given by
this context manager. If it is true, it should re-raise the exception.
The caller should reraise the exception, rather than this function, because
that preserves the previous stack trace, which immensely helps with
debugging.
Usage:
try:
foo.bar()
boom.baz()
except Something as exc:
with test_data.should_raise_exception(exc) as should_raise:
print 'exception occured: %s' % exc
if test_data.should_raise_exception(exc):
raise
"""
name = exception.__class__.__name__
should_raise = not (self.enabled and name == self.expected_exception)
yield should_raise
if not should_raise:
self.expected_exception = None
def post_process(self, func, args, kwargs, context):
self.post_process_hooks.append(PostprocessHook(func, args, kwargs, context))
def __repr__(self):
return "TestData(%r)" % ({
'name': self.name,
'properties': self.properties,
'mod_data': dict(self.mod_data.iteritems()),
'step_data': dict(self.step_data.iteritems()),
'expected_exception': self.expected_exception,
},)
class DisabledTestData(BaseTestData):
def __init__(self):
super(DisabledTestData, self).__init__(False)
def __getattr__(self, name):
return self
def pop_placeholder(self, _module_name, _placeholder_name, _name):
return self
def pop_step_test_data(self, _step_name, _step_test_data_fn):
return self
def get_module_test_data(self, _module_name):
return self
@contextlib.contextmanager
def should_raise_exception(self, exception): # pylint: disable=unused-argument
yield True
def mod_test_data(func):
@static_wraps(func)
def inner(self, *args, **kwargs):
assert isinstance(self, RecipeTestApi)
mod_name = self._module.NAME # pylint: disable=W0212
ret = TestData(None)
data = static_call(self, func, *args, **kwargs)
ret.mod_data[mod_name][inner.__name__] = data
return ret
return inner
def placeholder_step_data(func):
"""Decorates RecipeTestApi member functions to allow those functions to
return just the output placeholder data, instead of the normally required
StepTestData() object.
The wrapped function may return either:
* <placeholder data>, <retcode or None>, <name or None>
* StepTestData containing exactly one PlaceholderTestData and possible a
retcode. This is useful for returning the result of another method which
is wrapped with placeholder_step_data.
In either case, the wrapper function will return a StepTestData object with
the retcode and placeholder datum inserted with a name of:
(<Test module name>, <wrapped function name>, <name>)
Say you had a 'foo_module' with the following RecipeTestApi:
class FooTestApi(RecipeTestApi):
@placeholder_step_data
@staticmethod
def cool_method(data, retcode=None, name=None):
return ("Test data (%s)" % data), retcode, name
@placeholder_step_data
def other_method(self, retcode=None, name=None):
return self.cool_method('hammer time', retcode=retcode, name=name)
Code calling cool_method('hello', name='cool1') would get a StepTestData:
StepTestData(
placeholder_data = {
('foo_module', 'cool_method', 'cool1') :
PlaceholderTestData('Test data (hello)')
},
retcode = None
)
Code calling other_method(retcode=50, name='other1') would get a StepTestData:
StepTestData(
placeholder_data = {
('foo_module', 'other_method', 'other1'):
PlaceholderTestData('Test data (hammer time)')
},
retcode = 50
)
"""
@static_wraps(func)
def inner(self, *args, **kwargs):
assert isinstance(self, RecipeTestApi)
mod_name = self._module.NAME # pylint: disable=W0212
data = static_call(self, func, *args, **kwargs)
if isinstance(data, StepTestData):
all_data = data.placeholder_data.values()
if len(all_data) != 1:
raise ValueError(
'placeholder_step_data is only expecting a single output placeholder '
'datum. Got: %r' % data
)
placeholder_data, retcode = all_data[0], data._retcode
else:
placeholder_data, retcode, name = data
placeholder_data = PlaceholderTestData(data=placeholder_data, name=name)
ret = StepTestData()
key = (mod_name, inner.__name__, placeholder_data.name)
ret.placeholder_data[key] = placeholder_data
ret.retcode = retcode
return ret
return inner
class RecipeTestApi(object):
"""Provides testing interface for GenTest method.
There are two primary components to the test api:
* Test data creation methods (test and step_data)
* test_api's from all the modules in DEPS.
Every test in GenTests(api) takes the form:
yield <instance of TestData>
There are 4 basic pieces to TestData:
name - The name of the test.
properties - Simple key-value dictionary which is used as the combined
build_properties and factory_properties for this test.
mod_data - Module-specific testing data (see the platform module for a
good example). This is testing data which is only used once at
the start of the execution of the recipe. Modules should
provide methods to get their specific test information. See
the platform module's test_api for a good example of this.
step_data - Step-specific data. There are two major components to this.
retcode - The return code of the step
placeholder_data - A mapping from placeholder name to the
PlaceholderTestData object in the step.
stdout, stderr - PlaceholderTestData objects for stdout and stderr.
TestData objects are concatenatable, so it's convenient to phrase test cases
as a series of added TestData objects. For example:
DEPS = ['properties', 'platform', 'json']
def GenTests(api):
yield (
api.test('try_win64') +
api.properties.tryserver(power_level=9001) +
api.platform('win', 64) +
api.step_data(
'some_step',
api.json.output("bobface", name="a"),
api.json.output({'key': 'value'}, name="b")
)
)
This example would run a single test (named 'try_win64') with the standard
tryserver properties (plus an extra property 'power_level' whose value was
over 9000). The test would run as if it were being run on a 64-bit windows
installation, and the step named 'some_step' would have the json output of
the placeholder with name "a" be mocked to return '"bobface"', and the json
output of the placeholder with name "b" be mocked to return
'{"key": "value"}'.
The properties.tryserver() call is documented in the 'properties' module's
test_api.
The platform() call is documented in the 'platform' module's test_api.
The json.output() call is documented in the 'json' module's test_api.
"""
def __init__(self, module=None):
"""Note: Injected dependencies are NOT available in __init__()."""
# If we're the 'root' api, inject directly into 'self'.
# Otherwise inject into 'self.m'
self.m = self if module is None else ModuleInjectionSite()
self._module = module
@staticmethod
def test(name):
"""Returns a new empty TestData with the name filled in.
Use in GenTests:
def GenTests(api):
yield api.test('basic')
"""
return TestData(name)
@staticmethod
def empty_test_data():
"""Returns a TestData with no information.
This is the identity of the + operator for combining TestData.
"""
return TestData(None)
@staticmethod
def _step_data(name, *data, **kwargs):
"""Returns a new TestData with the mock data filled in for a single step.
Used by step_data and override_step_data.
Args:
name - The name of the step we're providing data for
data - Zero or more StepTestData objects. These may fill in output
placeholder data for zero or more modules, as well as possibly
setting the retcode for this step.
retcode=(int or None) - Override the retcode for this step, even if it
was set by |data|. This must be set as a keyword arg.
stdout - StepTestData object with a single output placeholder datum for a
step's stdout.
stderr - StepTestData object with a single output placeholder datum for a
step's stderr.
override=(bool) - This step data completely replaces any previously
generated step data, instead of adding on to it.
times_out_after=(int) - Causes the step to timeout after the given number
of seconds.
Use in GenTests:
# Hypothetically, suppose that your recipe has default test data for two
# steps 'init' and 'sync' (probably via recipe_api.inject_test_data()).
# For this example, lets say that the default test data looks like:
# api.step_data('init', api.json.output({'some': ["cool", "json"]}))
# AND
# api.step_data('sync', api.json.output({'src': {'rev': 100}}))
# Then, your GenTests code may augment or replace this data like:
def GenTests(api):
yield (
api.test('more') +
api.step_data( # Adds step data for a step with no default test data
'mystep',
api.json.output({'legend': ['...', 'DARY!']})
) +
api.step_data( # Adds retcode to default step_data for this step
'init',
retcode=1
) +
api.override_step_data( # Removes json output and overrides retcode
'sync',
retcode=100
)
)
"""
assert all(isinstance(d, StepTestData) for d in data)
ret = TestData(None)
if data:
ret.step_data[name] = reduce(lambda x,y: x + y, data)
if 'retcode' in kwargs:
ret.step_data[name].retcode = kwargs['retcode']
if 'times_out_after' in kwargs:
ret.step_data[name].times_out_after = kwargs['times_out_after']
if 'override' in kwargs:
ret.step_data[name].override = kwargs['override']
for key in ('stdout', 'stderr'):
if key in kwargs:
stdio_test_data = kwargs[key]
assert isinstance(stdio_test_data, StepTestData)
setattr(ret.step_data[name], key, stdio_test_data.unwrap_placeholder())
return ret
def step_data(self, name, *data, **kwargs):
"""See _step_data()"""
return self._step_data(name, *data, **kwargs)
step_data.__doc__ = _step_data.__doc__
def override_step_data(self, name, *data, **kwargs):
"""See _step_data()"""
kwargs['override'] = True
return self._step_data(name, *data, **kwargs)
override_step_data.__doc__ = _step_data.__doc__
def expect_exception(self, exc_type): #pylint: disable=R0201
ret = TestData(None)
ret.expect_exception(exc_type)
return ret
def post_process(self, func, *args, **kwargs):
"""Calling this adds a post-processing hook for this test's expectations.
`func` should be a callable whose signature is in the form of:
func(check, step_odict, *args, **kwargs) -> (step_odict or None)
Where:
* `step_odict` is an ordered dictionary of step dictionaries, as would be
recorded into the JSON expectation file for this test. The dictionary key
is the step's name. The dictionary has enhanced behavior such that
indexing with a key that isn't in the dictionary won't raise a KeyError;
instead the execution of your function is halted and a check failure will
be reported. This allows you to write your check functions without having
to worry about if a step is missing to provide usable failure output.
* `check` is a semi-magical function which you can use to test things.
Using `check` will allow you to see all the violated assertions from your
post_process functions simultaneously. Always call `check` directly (i.e.
with parens) to produce helpful check messages. `check` also has a second
form that takes a human hint to print when the `check` fails. Hints should
be written as the ___ in the sentence 'check that ___.'. Essentially,
check has the function signatures:
`def check(<bool expression>) #=> bool`
`def check(hint, <bool expression>) #=> bool`
Check returns True iff the boolean expression was True.
If the hint is omitted, then the boolean expression itself becomes the
hint when the check failure message is printed.
Note that check DOES NOT stop your function. It is not an assert. Your
function will continue to execute after invoking the check function. If
the boolean expression is False, the check will produce a helpful error
message and cause the test case to fail.
* args and kwargs are optional, and completely up to your implementation.
They will be passed straight through to your function, and are provided to
eliminate an extra `lambda` if your function needs to take additional
inputs.
Raising an exception will print the exception and will halt the
postprocessing chain entirely.
The function must return either `None`, or it may return a filtered subset
of step_odict (e.g. ommitting some steps and/or dictionary keys). This will
be the new value of step_odict for the test. Returning an empty dict or
OrderedDict will remove the expectations from disk altogether. Returning
`None` (Python's implicit default return value) is equivalent to returning
the unmodified step_odict. 'name' will always be preserved in every step,
even if you remove it.
Calling post_process multiple times will apply each function in order,
chaining the output of one function to the input of the next function. This
is intended to be use to compose the effects of multiple re-usable
postprocessing functions, some of which are pre-defined in
`recipe_engine.post_process` which you can import in your recipe.
Example:
from recipe_engine.post_process import (Filter, DoesNotRun,
DropExpectation)
def GenTests(api):
yield api.test('no post processing')
yield (api.test('only thing_step')
+ api.post_process(Filter('thing_step'))
)
tstepFilt = Filter()
tstepFilt = tstepFilt.include('thing_step', 'cmd')
yield (api.test('only thing_step\'s cmd')
+ api.post_process(tstepFilt)
)
yield (api.test('assert bob_step does not run')
+ api.post_process(DoesNotRun, 'bob_step')
)
yield (api.test('only care one step and the result')
+ api.post_process(Filter('one_step', '$result'))
)
def assertStuff(check, step_odict, to_check):
check(to_check in step_odict['step_name']['cmd'])
yield (api.test('assert something and have NO expectation file')
+ api.post_process(assertStuff, 'to_check_arg')
+ api.post_process(DropExpectation)
)
"""
ret = TestData()
_, filename, lineno, _, _, _ = inspect.stack()[1]
context = PostprocessHookContext(func, args, kwargs, filename, lineno)
ret.post_process(func, args, kwargs, context)
return ret
def post_check(self, func, *args, **kwargs):
"""Add a check-only post-processing hook.
See `post_process` for information on the arguments and behavior. The
difference between `post_check` and `post_process` is the return value of
`func` is ignored, so it's not possible for a hook added using `post_check`
to propagate changes in the steps dictionary to later hooks. This enables
the use of lambdas for performing simple checks.
Example:
from recipe_engine.post_process import DoesNotRun, DropExpectation
def GenTests(api):
yield (api.test('lambda-check')
+ api.post_check(lambda check, steps: check('foo' not in steps))
+ api.post_process(DropExpectation)
)
yield (api.test('reuse-existing-hook')
+ api.post_check(DoesNotRun, 'foo')
+ api.post_process(DropExpectation)
)
"""
def post_check(check, steps, f, *args, **kwargs):
f(check, steps, *args, **kwargs)
ret = TestData()
_, filename, lineno, _, _, _ = inspect.stack()[1]
context = PostprocessHookContext(func, args, kwargs, filename, lineno)
ret.post_process(post_check, (func,) + args, kwargs, context)
return ret