blob: 2d88e18caff64e9eebbb4dfc379ec3ba28cfdc4c [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""This file contains post process filters for use with the
RecipeTestApi.post_process method in GenTests.
"""
import json
import re
from collections import defaultdict, OrderedDict, namedtuple
_filterRegexEntry = namedtuple('_filterRegexEntry', 'at_most at_least fields')
class Filter(object):
"""Filter is an implementation of a post_process callable which can remove
unwanted data from a step OrderedDict."""
def __init__(self, *steps):
"""Builds a new Filter object. It may be optionally prepopulated by
specifying steps.
Usage:
f = Filter('step_a', 'step_b')
yield TEST + api.post_process(f)
f = f.include('other_step')
yield TEST + api.post_process(f)
yield TEST + api.post_process(Filter('step_a', 'step_b', 'other_step'))
"""
self.data = {name: () for name in steps}
self.re_data = {}
def __call__(self, check, step_odict):
unused_includes = self.data.copy()
re_data = self.re_data.copy()
re_usage_count = defaultdict(int)
to_ret = OrderedDict()
for name, step in step_odict.iteritems():
field_set = unused_includes.pop(name, None)
if field_set is None:
for exp, (_, _, fset) in re_data.iteritems():
if exp.match(name):
re_usage_count[exp] += 1
field_set = fset
break
if field_set is None:
continue
if len(field_set) == 0:
to_ret[name] = step
else:
to_ret[name] = {
k: v for k, v in step.to_step_dict().iteritems()
if k in field_set or k == 'name'
}
check(len(unused_includes) == 0)
for regex, (at_least, at_most, _) in re_data.iteritems():
check(re_usage_count[regex] >= at_least)
if at_most is not None:
check(re_usage_count[regex] <= at_most)
return to_ret
def include(self, step_name, fields=()):
"""Include adds a step to the included steps set.
Additionally, if any specified fields are provided, they will be the total
set of fields in the filtered step. The 'name' field is always included. If
fields is omitted, the entire step will be included.
Args:
step_name (str) - The name of the step to include
fields (list(str)) - The field(s) to include. Omit to include all fields.
Returns the new filter.
"""
if isinstance(fields, basestring):
raise ValueError('Expected fields to be a non-string iterable')
new_data = self.data.copy()
new_data[step_name] = frozenset(fields)
ret = Filter()
ret.data = new_data
ret.re_data = self.re_data
return ret
def include_re(self, step_name_re, fields=(), at_least=1, at_most=None):
"""This includes all steps which match the given regular expression.
If a step matches both an include() directive as well as include_re(), the
include() directive will take precedence.
Args:
step_name_re (str or regex) - the regular expression of step names to
match.
fields (list(str)) - the field(s) to include in the matched steps. Omit to
include all fields.
at_least (int) - the number of steps that this regular expression MUST
match.
at_most (int) - the maximum number of steps that this regular expression
MUST NOT exceed.
Returns the new filter.
"""
if isinstance(fields, basestring):
raise ValueError('Expected fields to be a non-string iterable')
new_re_data = self.re_data.copy()
new_re_data[re.compile(step_name_re)] = _filterRegexEntry(
at_least, at_most, frozenset(fields))
ret = Filter()
ret.data = self.data
ret.re_data = new_re_data
return ret
def DoesNotRun(check, step_odict, *steps):
"""Asserts that the given steps don't run.
Usage:
yield TEST + api.post_process(DoesNotRun, 'step_a', 'step_b')
"""
banSet = set(steps)
for step_name in step_odict:
check(step_name not in banSet)
def DoesNotRunRE(check, step_odict, *step_regexes):
"""Asserts that no steps matching any of the regexes have run.
Args:
step_regexes (str) - The step name regexes to ban.
Usage:
yield TEST + api.post_process(DoesNotRunRE, '.*with_patch.*', '.*compile.*')
"""
step_regexes = [re.compile(r) for r in step_regexes]
for step_name in step_odict:
for r in step_regexes:
check(not r.match(step_name))
def MustRun(check, step_odict, *steps):
"""Asserts that steps with the given names are in the expectations.
Args:
steps (str) - The steps that must have run.
Usage:
yield TEST + api.post_process(MustRun, 'step_a', 'step_b')
"""
for step_name in steps:
check(step_name in step_odict)
def MustRunRE(check, step_odict, step_regex, at_least=1, at_most=None):
"""Assert that steps matching the given regex completely are in the
exepectations.
Args:
step_regex (str, compiled regex) - The regular expression to match.
at_least (int) - Match at least this many steps. Matching fewer than this
is a CHECK failure.
at_most (int) - Optional upper bound on the number of matches. Matching
more than this is a CHECK failure.
Usage:
yield TEST + api.post_process(MustRunRE, r'.*with_patch.*', at_most=2)
"""
step_regex = re.compile(step_regex)
matches = 0
for step_name in step_odict:
if step_regex.match(step_name):
matches += 1
check(matches >= at_least)
if at_most is not None:
check(matches <= at_most)
def StepSuccess(check, step_odict, step):
"""Assert that a step succeeded.
Args:
step (str) - The step to check for success.
Usage:
yield (
TEST
+ api.post_process(StepSuccess, 'step-name')
)
"""
check(step_odict[step].status == 'SUCCESS')
def StepWarning(check, step_odict, step):
"""Assert that a step has the warning status.
Args:
step (str) - The step to check for warning.
Usage:
yield (
TEST
+ api.post_process(StepWarning, 'step-name')
)
"""
check(step_odict[step].status == 'WARNING')
def StepFailure(check, step_odict, step):
"""Assert that a step failed.
Args:
step (str) - The step to check for a failure.
Usage:
yield (
TEST
+ api.post_process(StepFailure, 'step-name')
)
"""
check(step_odict[step].status == 'FAILURE')
def StepException(check, step_odict, step):
"""Assert that a step had an exception.
Args:
step (str) - The step to check for an exception.
Usage:
yield (
TEST
+ api.post_process(Step, 'step-name')
)
"""
check(step_odict[step].status == 'EXCEPTION')
def _fullmatch(pattern, string):
m = re.match(pattern, string)
return m and m.span()[1] == len(string)
def StepCommandRE(check, step_odict, step, expected_patterns):
"""Assert that a step's command matches a given list of regular expressions.
Args:
step (str) - The step to check the command of.
expected_patterns (list(str, re.Pattern)) - Regular expressions to match the
elements of the step's command. The i-th element of the step's command
will be matched against the i-th regular expression. If the pattern does
not match the entire argument string, it is a CHECK failure.
Usage:
yield (
TEST
+ api.post_process(StepCommandRE, 'step-name',
['my', 'command', '.*'])
)
"""
cmd = step_odict[step].cmd
for expected, actual in zip(expected_patterns, cmd):
check(_fullmatch(expected, actual))
unmatched = cmd[len(expected_patterns):]
check('all arguments matched', not unmatched)
unused = expected_patterns[len(cmd):]
check('all patterns used', not unused)
def StepCommandContains(check, step_odict, step, argument_sequence):
"""Assert that a step's command contained the given sequence of arguments.
Args:
step (str) - The name of the step to check the command of.
argument_sequence (list of str) - The expected sequence of arguments.
Does not need to contain all of the command's arguments.
Arguments in the sequence are expected to be found consecutively and
in order.
"""
def subsequence(containing, contained):
for i in xrange(len(containing) - len(contained) + 1):
if containing[i:i+len(contained)] == contained:
return True
return False
step_cmd = step_odict[step].cmd
check('command line for step %s contained %s' % (
step, argument_sequence),
subsequence(step_cmd, argument_sequence))
def StepTextEquals(check, step_odict, step, expected):
"""Assert that a step's step_text is equal to a given string.
Args:
step (str) - The step to check the step_text of.
expected (str) - The expected value of the step_text.
Usage:
yield TEST + api.post_process(StepTextEquals, 'step-name', 'expected-text')
"""
check(step_odict[step].step_text == expected)
def StepTextContains(check, step_odict, step, expected_substrs):
"""Assert that a step's step_text contains given substrings.
Args:
step (str) - The step to check the step_text of.
expected_substrs (list(str)) - The expected substrings the step_text should
contain.
Usage:
yield (
TEST
+ api.post_process(StepTextContains, 'step-name',
['substr1', 'substr2'])
)
"""
assert not isinstance(expected_substrs, basestring), \
'expected_substrs must be an iterable of strings and must not be a string'
for expected in expected_substrs:
check(expected in step_odict[step].step_text)
def LogEquals(check, step_odict, step, log, expected):
"""Assert that a step's log is equal to a given string.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
expected (str) - The expected value of the log.
Usage:
yield (
TEST
+ api.post_process(LogEquals, 'step-name', 'log-name', 'expected-text')
)
"""
check(step_odict[step].logs[log] == expected)
def LogContains(check, step_odict, step, log, expected_substrs):
"""Assert that a step's log contains given substrings.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
expected_substrs (list(str)) - The expected substrings the log should
contain.
Usage:
yield (
TEST
+ api.post_process(LogEquals, 'step-name', 'log-name',
['substr1', 'substr2'])
)
"""
assert not isinstance(expected_substrs, basestring), \
'expected_substrs must be an iterable of strings and must not be a string'
for expected in expected_substrs:
check(expected in step_odict[step].logs[log])
def GetBuildProperties(step_odict):
"""Retrieves the build properties for a recipe."""
build_properties = {}
for name, step in step_odict.iteritems():
if name == '$result':
continue
for prop, value in step.output_properties.iteritems():
build_properties[prop] = json.loads(value)
return build_properties
def PropertyEquals(check, step_odict, key, value):
"""Assert that a recipe's output property `key` equals `value`.
Args:
key (str) - The key to look for in output properties.
value (jsonish) - The value to look for in output properties.
Usage:
yield (
TEST
+ api.post_process(PropertyEquals, 'do_not_retry', True)
)
"""
build_properties = GetBuildProperties(step_odict)
# Short circuiting of boolean expressions is broken in check().
# https://crbug.com/946015.
if check(key in build_properties):
check(build_properties[key] == value)
def PropertiesDoNotContain(check, step_odict, key):
"""Assert that a recipe's output properties do not contain `key`.
Args:
key (str) - The key to check for.
Usage:
yield (
TEST
+ api.post_process(PropertiesDoNotContain, 'property_key')
)
"""
build_properties = GetBuildProperties(step_odict)
check(key not in build_properties)
def StatusCodeIn(check, step_odict, *codes):
"""Assert that recipe result status code is within expected codes.
DEPRECATED: Use StatusSuccess or StatusFailure instead.
Args:
codes (list): list of expected status codes (int).
"""
check(len(codes) == 1)
code = codes[0]
check(code in (0, 1, 2))
if code == 0:
StatusSuccess(check, step_odict)
else:
StatusAnyFailure(check, step_odict)
def StatusSuccess(check, step_odict):
"""Assert that the recipe finished successfully."""
check(not 'failure' in step_odict['$result'])
def StatusAnyFailure(check, step_odict):
"""Assert that the recipe failed."""
check('failure' in step_odict['$result'])
def StatusFailure(check, step_odict):
"""Assert that the recipe had a non-infra failure."""
check('failure' in step_odict['$result']['failure'])
def StatusException(check, step_odict):
"""Assert that the recipe had an infra failure."""
check('failure' not in step_odict['$result']['failure'])
def ResultReasonRE(check, step_odict, reason_regex):
"""Assert that recipe result reason matches given regex.
Args:
reason_regex (str): the regular expression to match.
"""
result = step_odict['$result']
if not check('failure' in result):
return
if not check('humanReason' in result['failure']):
return
check(re.match(reason_regex, result['failure']['humanReason']))
def DropExpectation(_check, _step_odict):
"""Using this post-process hook will drop the expectations for this test
completely.
Usage:
yield TEST + api.post_process(DropExpectation)
"""
return {}