blob: 4def751b33a1d9afcecc1a20b2e77636d4c2164b [file] [log] [blame]
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""This file contains post process filters for use with the
RecipeTestApi.post_process method in GenTests.
"""
from past.builtins import basestring
import re
from collections import defaultdict, OrderedDict, namedtuple
_filterRegexEntry = namedtuple('_filterRegexEntry', 'at_most at_least fields')
class Filter:
"""Filter is an implementation of a post_process callable which can remove
unwanted data from a step OrderedDict."""
def __init__(self, *steps):
"""Builds a new Filter object. It may be optionally prepopulated by
specifying steps.
Usage:
f = Filter('step_a', 'step_b')
yield TEST + api.post_process(f)
f = f.include('other_step')
yield TEST + api.post_process(f)
yield TEST + api.post_process(Filter('step_a', 'step_b', 'other_step'))
"""
self.data = {name: () for name in steps}
self.re_data = {}
def __call__(self, check, step_odict):
unused_includes = self.data.copy()
re_data = self.re_data.copy()
re_usage_count = defaultdict(int)
to_ret = OrderedDict()
for name, step in step_odict.items():
field_set = unused_includes.pop(name, None)
if field_set is None:
for exp, (_, _, fset) in re_data.items():
if exp.match(name):
re_usage_count[exp] += 1
field_set = fset
break
if field_set is None:
continue
if len(field_set) == 0:
to_ret[name] = step
else:
to_ret[name] = {
k: v for k, v in step.to_step_dict().items()
if k in field_set or k == 'name'
}
check(len(unused_includes) == 0)
for regex, (at_least, at_most, _) in re_data.items():
check(re_usage_count[regex] >= at_least)
if at_most is not None:
check(re_usage_count[regex] <= at_most)
return to_ret
def include(self, step_name, fields=()):
"""Include adds a step to the included steps set.
Additionally, if any specified fields are provided, they will be the total
set of fields in the filtered step. The 'name' field is always included. If
fields is omitted, the entire step will be included.
Args:
step_name (str) - The name of the step to include
fields (list(str)) - The field(s) to include. Omit to include all fields.
Returns the new filter.
"""
if isinstance(fields, basestring):
raise ValueError('Expected fields to be a non-string iterable')
new_data = self.data.copy()
new_data[step_name] = frozenset(fields)
ret = Filter()
ret.data = new_data
ret.re_data = self.re_data
return ret
def include_re(self, step_name_re, fields=(), at_least=1, at_most=None):
"""This includes all steps which match the given regular expression.
If a step matches both an include() directive as well as include_re(), the
include() directive will take precedence.
Args:
step_name_re (str or regex) - the regular expression of step names to
match.
fields (list(str)) - the field(s) to include in the matched steps. Omit to
include all fields.
at_least (int) - the number of steps that this regular expression MUST
match.
at_most (int) - the maximum number of steps that this regular expression
MUST NOT exceed.
Returns the new filter.
"""
if isinstance(fields, basestring):
raise ValueError('Expected fields to be a non-string iterable')
new_re_data = self.re_data.copy()
new_re_data[re.compile(step_name_re)] = _filterRegexEntry(
at_least, at_most, frozenset(fields))
ret = Filter()
ret.data = self.data
ret.re_data = new_re_data
return ret
def DoesNotRun(check, step_odict, *steps):
"""Asserts that the given steps don't run.
Usage:
yield TEST + api.post_process(DoesNotRun, 'step_a', 'step_b')
"""
banSet = set(steps)
for step_name in step_odict:
check(step_name not in banSet)
def DoesNotRunRE(check, step_odict, *step_regexes):
"""Asserts that no steps matching any of the regexes have run.
Args:
step_regexes (str) - The step name regexes to ban.
Usage:
yield TEST + api.post_process(DoesNotRunRE, '.*with_patch.*', '.*compile.*')
"""
step_regexes = [re.compile(r) for r in step_regexes]
for step_name in step_odict:
for r in step_regexes:
check(not r.match(step_name))
def MustRun(check, step_odict, *steps):
"""Asserts that steps with the given names are in the expectations.
Args:
steps (str) - The steps that must have run.
Usage:
yield TEST + api.post_process(MustRun, 'step_a', 'step_b')
"""
for step_name in steps:
check(step_name in step_odict)
def MustRunRE(check, step_odict, step_regex, at_least=1, at_most=None):
"""Assert that steps matching the given regex completely are in the
expectations.
Args:
step_regex (str, compiled regex) - The regular expression to match.
at_least (int) - Match at least this many steps. Matching fewer than this
is a CHECK failure.
at_most (int) - Optional upper bound on the number of matches. Matching
more than this is a CHECK failure.
Usage:
yield TEST + api.post_process(MustRunRE, r'.*with_patch.*', at_most=2)
"""
step_regex = re.compile(step_regex)
matches = 0
for step_name in step_odict:
if step_regex.match(step_name):
matches += 1
check(matches >= at_least)
if at_most is not None:
check(matches <= at_most)
def StepSuccess(check, step_odict, step):
"""Assert that a step succeeded.
Args:
step (str) - The step to check for success.
Usage:
yield (
TEST
+ api.post_process(StepSuccess, 'step-name')
)
"""
check(step_odict[step].status == 'SUCCESS')
def StepWarning(check, step_odict, step):
"""Assert that a step has the warning status.
Args:
step (str) - The step to check for warning.
Usage:
yield (
TEST
+ api.post_process(StepWarning, 'step-name')
)
"""
check(step_odict[step].status == 'WARNING')
def StepFailure(check, step_odict, step):
"""Assert that a step failed.
Args:
step (str) - The step to check for a failure.
Usage:
yield (
TEST
+ api.post_process(StepFailure, 'step-name')
)
"""
check(step_odict[step].status == 'FAILURE')
def StepException(check, step_odict, step):
"""Assert that a step had an exception.
Args:
step (str) - The step to check for an exception.
Usage:
yield (
TEST
+ api.post_process(StepException, 'step-name')
)
"""
check(step_odict[step].status == 'EXCEPTION')
def StepCanceled(check, step_odict, step):
"""Assert that a step had an exception.
Args:
step (str) - The step to check for an exception.
Usage:
yield (
TEST
+ api.post_process(StepCanceled, 'step-name')
)
"""
check(step_odict[step].status == 'CANCELED')
def _fullmatch(pattern, string):
m = re.match(pattern, string)
return m and m.span()[1] == len(string)
def StepCommandEquals(check, step_odict, step, expected_cmd):
"""Assert that a step's command matches a given list of strings.
Args:
step (str) - The step to check the command of.
expected_cmd (list(str)) - Strings to match the elements of the step's
command.
Usage:
yield (
TEST
+ api.post_process(StepCommandEquals, 'step-name',
['my', 'command'])
)
"""
assert all((isinstance(elem, str) for elem in expected_cmd)), \
'expected_cmd must be an iterable of strings'
cmd = step_odict[step].cmd
check(expected_cmd == cmd)
def StepCommandRE(check, step_odict, step, expected_patterns):
"""Assert that a step's command matches a given list of regular expressions.
Args:
step (str) - The step to check the command of.
expected_patterns (list(str, re.Pattern)) - Regular expressions to match the
elements of the step's command. The i-th element of the step's command
will be matched against the i-th regular expression. If the pattern does
not match the entire argument string, it is a CHECK failure.
Usage:
yield (
TEST
+ api.post_process(StepCommandRE, 'step-name',
['my', 'command', '.*'])
)
"""
cmd = step_odict[step].cmd
for expected, actual in zip(expected_patterns, cmd):
check(_fullmatch(expected, actual))
unmatched = cmd[len(expected_patterns):]
check('all arguments matched', not unmatched)
unused = expected_patterns[len(cmd):]
check('all patterns used', not unused)
def StepCommandContains(check, step_odict, step, argument_sequence):
"""Assert that a step's command contained the given sequence of arguments.
Args:
step (str) - The name of the step to check the command of.
argument_sequence (list of (str|regex)) - The expected sequence of
arguments. Strings will be compared for equality, while regex patterns
will be matched using the search method. The check will pass if the step's
command contains a subsequence where the elements are matched by the
corresponding elements of argument_sequence.
"""
check('command line for step %s contained %s' % (step, argument_sequence),
argument_sequence in step_odict[step].cmd)
def StepCommandDoesNotContain(check, step_odict, step, argument_sequence):
"""Assert that a step's command does not contain the given sequence of
arguments.
Args:
step (str) - The name of the step to check the command of.
argument_sequence (list of (str|regex)) - The sequence of arguments that
should not exist. The check will fail if the step's command contains a
subsequence where the elements are matched by the corresponding elements
of argument_sequence.
"""
check(
'command line for step %s does not contain %s' %
(step, argument_sequence), argument_sequence not in step_odict[step].cmd)
def StepCommandEmpty(check, step_odict, step):
"""Assert that a step ran no command.
Args:
step (str) - The name of the step to check the command of.
Usage:
yield (TEST + api.post_process(StepCommandEmpty, 'step-name')
"""
check(not step_odict[step].cmd)
def StepTextEquals(check, step_odict, step, expected):
"""Assert that a step's step_text is equal to a given string.
Args:
step (str) - The step to check the step_text of.
expected (str) - The expected value of the step_text.
Usage:
yield TEST + api.post_process(StepTextEquals, 'step-name', 'expected-text')
"""
check(step_odict[step].step_text == expected)
def StepTextContains(check, step_odict, step, expected_substrs):
"""Assert that a step's step_text contains given substrings.
Args:
step (str) - The step to check the step_text of.
expected_substrs (list(str)) - The expected substrings the step_text should
contain.
Usage:
yield (
TEST
+ api.post_process(StepTextContains, 'step-name',
['substr1', 'substr2'])
)
"""
assert not isinstance(expected_substrs, basestring), \
'expected_substrs must be an iterable of strings and must not be a string'
for expected in expected_substrs:
check(expected in step_odict[step].step_text)
def StepSummaryEquals(check, step_odict, step, expected):
"""Check that the step's step_summary_text equals given value.
Args:
step (str) - The step to check the step_text of
expected (str) - The expected value of the step_text
Usage:
yield TEST + \
api.post_process(StepSummaryEquals, 'step-name', 'expected-text')
"""
check(step_odict[step].step_summary_text == expected)
def StepEnvContains(check, step_odict, step, env_dict):
"""Assert that a step's env contains the given key/value pairs.
Args:
step (str) - The name of the step to check the env of.
env_dict (Dict[str, str]) - The expected key/value pairs to look for. The
check will pass if the steps's env contains all of the given key/value
pairs.
"""
for k, v in env_dict.items():
check('env for step %s contained %s: %s' % (step, k, v),
(k, v) in step_odict[step].env.items())
def StepEnvDoesNotContain(check, step_odict, step, env_dict):
"""Assert that a step's env does not contain the given key/value pairs.
Args:
step (str) - The name of the step to check the env of.
env_dict (Dict[str, str]) - The key/value pairs that should not be present.
The check will pass if the step's env does not contain any of the given
key/value pairs.
"""
for k, v in env_dict.items():
check('env for step %s did not contain %s: %s' % (step, k, v),
(k, v) not in step_odict[step].env.items())
def StepEnvEquals(check, step_odict, step, env_dict):
"""Assert that a step's env equals the given dict.
Args:
step (str) - The name of the step to check the env of.
env_dict (Dict[str, str]) - The expected key/value pairs to look for. The
check will pass if the given env_dict is equal to the step's env.
"""
check('env for step %s equaled %s' % (step, env_dict),
step_odict[step].env == env_dict)
def HasLog(check, step_odict, step, log):
"""Assert that a step contains a specific named log.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
Usage:
yield (
TEST
+ api.post_process(HasLog, 'step-name', 'log-name')
)
"""
check(log in step_odict[step].logs)
def DoesNotHaveLog(check, step_odict, step, log):
"""Assert that a step does not contain a specific named log.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
Usage:
yield (
TEST
+ api.post_process(DoesNotHaveLog, 'step-name', 'log-name')
)
"""
check(log not in step_odict[step].logs)
def LogEquals(check, step_odict, step, log, expected):
"""Assert that a step's log is equal to a given string.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
expected (str) - The expected value of the log.
Usage:
yield (
TEST
+ api.post_process(LogEquals, 'step-name', 'log-name', 'expected-text')
)
"""
check(step_odict[step].logs[log] == expected)
def LogContains(check, step_odict, step, log, expected_substrs):
"""Assert that a step's log contains given substrings.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
expected_substrs (list(str)) - The expected substrings the log should
contain.
Usage:
yield (
TEST
+ api.post_process(LogEquals, 'step-name', 'log-name',
['substr1', 'substr2'])
)
"""
assert not isinstance(expected_substrs, basestring), \
'expected_substrs must be an iterable of strings and must not be a string'
for expected in expected_substrs:
check(expected in step_odict[step].logs[log])
def LogDoesNotContain(check, step_odict, step, log, unexpected_substrs):
"""Assert that a step's log does not contain given substrings.
Args:
step (str) - The step to check the log of.
log (str) - The name of the log to check.
unexpected_substrs (list(str)) - The unexpected substrings the log should
not contain.
Usage:
yield (
TEST
+ api.post_process(LogDoesNotContain, 'step-name', 'log-name',
['substr1', 'substr2'])
)
"""
assert not isinstance(unexpected_substrs, basestring), (
'unexpected_substrs must be an iterable of strings and must not be a '
'string')
for unexpected in unexpected_substrs:
check(unexpected not in step_odict[step].logs[log])
def GetBuildProperties(step_odict):
"""Retrieves the build properties for a recipe."""
build_properties = {}
for name, step in step_odict.items():
if name == '$result':
continue
for prop, value in step.output_properties.items():
build_properties[prop] = value
return build_properties
def PropertyEquals(check, step_odict, key, value):
"""Assert that a recipe's output property `key` equals `value`.
Args:
key (str) - The key to look for in output properties.
value (jsonish) - The value to look for in output properties.
Usage:
yield (
TEST
+ api.post_process(PropertyEquals, 'do_not_retry', True)
)
"""
build_properties = GetBuildProperties(step_odict)
# Short circuiting of boolean expressions is broken in check().
# https://crbug.com/946015.
if check(key in build_properties):
check(build_properties[key] == value)
def PropertiesContain(check, step_odict, key):
"""Assert that a recipe's output properties contain `key`.
Args:
key (str) - The key to check for.
Usage:
yield (
TEST
+ api.post_process(PropertiesContain, 'property_key')
)
"""
build_properties = GetBuildProperties(step_odict)
check(key in build_properties)
def PropertiesDoNotContain(check, step_odict, key):
"""Assert that a recipe's output properties do not contain `key`.
Args:
key (str) - The key to check for.
Usage:
yield (
TEST
+ api.post_process(PropertiesDoNotContain, 'property_key')
)
"""
build_properties = GetBuildProperties(step_odict)
check(key not in build_properties)
def StatusCodeIn(check, step_odict, *codes):
"""Assert that recipe result status code is within expected codes.
DEPRECATED: Use StatusSuccess or StatusFailure instead.
Args:
codes (list): list of expected status codes (int).
"""
check(len(codes) == 1)
code = codes[0]
check(code in (0, 1, 2))
if code == 0:
StatusSuccess(check, step_odict)
else:
StatusAnyFailure(check, step_odict)
def StatusSuccess(check, step_odict):
"""Assert that the recipe finished successfully."""
failure = step_odict['$result'].get('failure')
check('recipe succeeded (found failure instead)', failure is None)
def StatusAnyFailure(check, step_odict):
"""Assert that the recipe failed."""
check('recipe failed (found success instead)',
'failure' in step_odict['$result'])
def StatusFailure(check, step_odict):
"""Assert that the recipe had a non-infra failure."""
result = step_odict['$result']
if not check('recipe failed (found success instead)', 'failure' in result):
return
check('expected failure but recipe had infra failure',
'failure' in result['failure'])
def StatusException(check, step_odict):
"""Assert that the recipe had an infra failure."""
result = step_odict['$result']
if not check('recipe had infra failure (found success instead)',
'failure' in result):
return
check('recipe had infra failure (found non-infra failure instead)',
'failure' not in result['failure'])
def ResultReason(check, step_odict, reason):
"""Assert that recipe result reason matches given reason.
DEPRECATED: Please use StatusAnyFailure + SummaryMarkdown instead.
Args:
reason (str): the string to match.
"""
StatusAnyFailure(check, step_odict)
SummaryMarkdown(check, step_odict, reason)
def ResultReasonRE(check, step_odict, reason_regex):
"""Assert that recipe result reason contains given regex.
DEPRECATED: Please use StatusAnyFailure + SummaryMarkdownRE instead.
Args:
reason_regex (str): the regular expression to match.
"""
StatusAnyFailure(check, step_odict)
SummaryMarkdownRE(check, step_odict, reason_regex)
def SummaryMarkdown(check, step_odict, summary):
"""Assert that recipe output summary is the same as the given summary.
Args:
summary (str): the string to match.
"""
result = step_odict['$result']
actual_summary = result.get('failure').get('humanReason') if (
result.get('failure')) else result.get('summaryMarkdown')
if not check('recipe doesn\'t output any summary',
actual_summary):
return
check(
'expected recipe output summary %r (found summary %r instead)' %
(summary, actual_summary), summary == actual_summary)
def SummaryMarkdownRE(check, step_odict, summary_regex):
"""Assert that recipe output summary matches given regex.
Args:
summary_regex (str): the regular expression to match.
"""
result = step_odict['$result']
actual_summary = result.get('failure').get('humanReason') if (
result.get('failure')) else result.get('summaryMarkdown')
if not check('recipe doesn\'t output any summary',
actual_summary):
return
check(
'expected recipe output summary matches %r (found summary %r instead)' %
(summary_regex, actual_summary), re.search(summary_regex, actual_summary))
def DropExpectation(_check, _step_odict):
"""Using this post-process hook will drop the expectations for this test
completely.
Usage:
yield TEST + api.post_process(DropExpectation)
"""
return {}