| #!/usr/bin/env vpython3 |
| # Copyright 2016 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| |
| import collections |
| from typing import Any, Callable, OrderedDict |
| |
| import test_env |
| |
| from recipe_engine import post_process |
| from recipe_engine.internal.test import magic_check_fn |
| from recipe_engine.recipe_test_api import RecipeTestApi |
| |
| from PB.recipe_engine.internal.test.runner import Outcome |
| |
| |
| def make_step(name: str, *fields: str) -> dict[str, Any]: |
| """Create a step dict containing the given fields and reasonable defaults. |
| |
| Args: |
| name: The name of the step. |
| fields: Names of step fields to include, such as 'cmd', 'cwd', 'env'. If not |
| specified, then all default fields will be included. |
| |
| Returns: |
| A dict representing a step, containing reasonable defaults for the given |
| fields. |
| """ |
| ret = { |
| 'name': name, |
| 'cmd': ['thing', 'other'], |
| 'cwd': 'some-directory', |
| 'env': {'var': 'value'}, |
| } |
| if fields: |
| return {k: v for k, v in ret.items() if k in fields or k == 'name'} |
| return ret |
| |
| |
| def make_step_dict(*names: str) -> OrderedDict[str, dict[str, Any]]: |
| """Create an OrderedDict of step dicts with given names and default fields. |
| |
| Args: |
| names: Step names to include. |
| |
| Returns: |
| An ordered dict of {step name: step dict}, in the same order that steps |
| were provided. Each step dict will contain reasonable default values for |
| fields such as 'cmd'. |
| """ |
| return collections.OrderedDict([(name, make_step(name)) for name in names]) |
| |
| |
| class PostProcessUnitTest(test_env.RecipeEngineUnitTest): |
| """Helper class for testing post_process functions.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case. |
| |
| Subclasses should override this property to set a step dict that makes sense |
| for the given test case. |
| |
| Raises: |
| NotImplementedError: If not overridden by subclasses. |
| """ |
| raise NotImplementedError() |
| |
| def post_process(self, func: Callable, *args, |
| **kwargs) -> tuple[Any, list[list[str]]]: |
| """Run the given post_process function with self.step_dict. |
| |
| Args: |
| func: A post_process checker function, such as MustRun. |
| *args: Additional positional args for post_process. |
| **kwargs: Additional keyword args for post-process. |
| |
| Returns: |
| A tuple (expectations, failures), where expectations is as returned by |
| magic_check_fn.post_process, and failures is a list of failures returned |
| by the check, each represented by a list of output strings. |
| """ |
| test_data = RecipeTestApi(None).post_process(func, *args, **kwargs) |
| results = Outcome.Results() |
| expectations = magic_check_fn.post_process(results, self.step_dict, |
| test_data) |
| return expectations, results.check |
| |
| def expect_pass(self, func: Callable, *args, **kwargs) -> None: |
| """Assert that the given post_process func passes. |
| |
| The given function will be called with self.step_dict. |
| |
| Args: |
| func: The post_process checker function, such as MustRun. |
| *args: Additional positional args for the post_process call. |
| **kwargs: Additional keyword args for the post_process call. |
| |
| Raises: |
| AssertionError: If the post_process check raised any failures. |
| """ |
| _, failures = self.post_process(func, *args, **kwargs) |
| self.assertEqual(len(failures), 0) |
| |
| def expect_fails(self, num_fails: int, func: Callable, *args, |
| **kwargs) -> list[list[str]]: |
| """Assert that the post_process func fails the expected number of times. |
| |
| The given function will be called with self.step_dict. |
| |
| Args: |
| num_fails: The number of failures expected. |
| func: The post_process checker function, such as MustRun. |
| *args: Additional positional args for the post_process call. |
| **kwargs: Additional keyword args for the post_process call. |
| |
| Returns: |
| A list of failures, each represented as a list of output lines. |
| |
| Raises: |
| AssertionError: If the post_process did not raise exactly the expected |
| number of failures. |
| """ |
| _, failures = self.post_process(func, *args, **kwargs) |
| self.assertEqual(len(failures), num_fails) |
| return failures |
| |
| def assertHas(self, failure: list[str], *text: str) -> None: |
| """Assert that the given failure contains all the given strings. |
| |
| Args: |
| failure: A failed post_process check, as a list of output lines. |
| *text: Strings that must be contained by the concatenated failure message. |
| |
| Raises: |
| AssertionError: If the failure message does not contain each of *text |
| """ |
| combined = '\n'.join(failure.lines) |
| for item in text: |
| self.assertIn(item, combined) |
| |
| |
| class TestFilter(PostProcessUnitTest): |
| """Test case for post_process.Filter.""" |
| f = post_process.Filter |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return make_step_dict('a', 'b', 'b.sub', 'b.sub2') |
| |
| def test_basic(self): |
| results, failures = self.post_process(self.f('a', 'b')) |
| self.assertEqual(results, list(make_step_dict('a', 'b').values())) |
| self.assertEqual(len(failures), 0) |
| |
| def test_built(self): |
| f = self.f() |
| f = f.include('b') |
| f = f.include('a') |
| results, failures = self.post_process(f) |
| self.assertEqual(results, list(make_step_dict('a', 'b').values())) |
| self.assertEqual(len(failures), 0) |
| |
| def test_built_fields(self): |
| f = self.f() |
| f = f.include('b', ['env']) |
| f = f.include('a', ['cmd']) |
| results, failures = self.post_process(f) |
| self.assertEqual(results, [make_step('a', 'cmd'), make_step('b', 'env')]) |
| self.assertEqual(len(failures), 0) |
| |
| def test_built_extra_includes(self): |
| f = self.f('a', 'b', 'x') |
| results, failures = self.post_process(f) |
| self.assertEqual(results, list(make_step_dict('a', 'b').values())) |
| self.assertEqual(len(failures), 1) |
| self.assertHas(failures[0], |
| 'check((len(unused_includes) == 0))', |
| "unused_includes: {'x': ()}") |
| |
| def test_re(self): |
| f = self.f().include_re(r'b\.') |
| results, failures = self.post_process(f) |
| self.assertEqual(results, list(make_step_dict('b.sub', 'b.sub2').values())) |
| self.assertEqual(len(failures), 0) |
| |
| def test_re_low_limit(self): |
| f = self.f().include_re(r'b\.', at_least=3) |
| results, failures = self.post_process(f) |
| self.assertEqual(results, list(make_step_dict('b.sub', 'b.sub2').values())) |
| self.assertEqual(len(failures), 1) |
| self.assertHas(failures[-1], 'check((re_usage_count[regex] >= at_least))', |
| 'at_least: 3', 're_usage_count[regex]: 2', |
| 'regex: re.compile(\'b\\\\.\'') |
| |
| def test_re_high_limit(self): |
| f = self.f().include_re(r'b\.', at_most=1) |
| results, failures = self.post_process(f) |
| self.assertEqual(results, list(make_step_dict('b.sub', 'b.sub2').values())) |
| self.assertEqual(len(failures), 1) |
| self.assertHas(failures[0], 'check((re_usage_count[regex] <= at_most))') |
| self.assertHas(failures[0], 'at_most: 1', 're_usage_count[regex]: 2', |
| 'regex: re.compile(\'b\\\\.\'') |
| |
| |
| class TestRun(PostProcessUnitTest): |
| """Test case for checks relating to which steps run.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return make_step_dict('a', 'b', 'b.sub', 'b.sub2') |
| |
| def test_mr_pass(self): |
| self.expect_pass(post_process.MustRun, 'a') |
| |
| def test_mr_fail(self): |
| self.expect_fails(1, post_process.MustRun, 'x') |
| |
| def test_mr_pass_re(self): |
| self.expect_pass(post_process.MustRunRE, 'a') |
| self.expect_pass(post_process.MustRunRE, 'a', at_most=1) |
| self.expect_pass(post_process.MustRunRE, 'a', at_least=1, at_most=1) |
| |
| def test_mr_fail_re(self): |
| self.expect_fails(1, post_process.MustRunRE, 'x') |
| self.expect_fails(1, post_process.MustRunRE, 'b', at_most=1) |
| self.expect_fails(1, post_process.MustRunRE, 'b', at_least=4) |
| |
| def test_dnr_pass(self): |
| self.expect_pass(post_process.DoesNotRun, 'x') |
| |
| def test_dnr_fail(self): |
| self.expect_fails(1, post_process.DoesNotRun, 'a') |
| |
| def test_dnr_pass_re(self): |
| self.expect_pass(post_process.DoesNotRunRE, 'x') |
| |
| def test_dnr_fail_re(self): |
| self.expect_fails(3, post_process.DoesNotRunRE, 'b') |
| |
| |
| class TestStepStatus(PostProcessUnitTest): |
| """Test case for checks relating to step status.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return collections.OrderedDict([ |
| ('success-step', { |
| 'name': 'success-step', |
| 'status': 'SUCCESS' |
| }), |
| ('failure-step', { |
| 'name': 'failure-step', |
| 'status': 'FAILURE' |
| }), |
| ('exception-step', { |
| 'name': 'exception-step', |
| 'status': 'EXCEPTION' |
| }), |
| ]) |
| |
| def test_step_success_pass(self): |
| self.expect_pass(post_process.StepSuccess, 'success-step') |
| |
| def test_step_success_fail(self): |
| failures = self.expect_fails(1, post_process.StepSuccess, 'failure-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'SUCCESS'))") |
| failures = self.expect_fails(1, post_process.StepSuccess, 'exception-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'SUCCESS'))") |
| |
| def test_step_failure_pass(self): |
| self.expect_pass(post_process.StepFailure, 'failure-step') |
| |
| def test_step_failure_fail(self): |
| failures = self.expect_fails(1, post_process.StepFailure, 'success-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'FAILURE'))") |
| failures = self.expect_fails(1, post_process.StepFailure, 'exception-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'FAILURE'))") |
| |
| def test_step_exception_pass(self): |
| self.expect_pass(post_process.StepException, 'exception-step') |
| |
| def test_step_exception_fail(self): |
| failures = self.expect_fails(1, post_process.StepException, 'success-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'EXCEPTION'))") |
| failures = self.expect_fails(1, post_process.StepException, 'failure-step') |
| self.assertHas(failures[0], |
| "check((step_odict[step].status == 'EXCEPTION'))") |
| |
| |
| class TestStepCommandEquals(PostProcessUnitTest): |
| """Test case for StepCommandEquals.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return make_step_dict('my-step') |
| |
| def test_pass(self): |
| """Assert that comparing against the exact cmd list passes.""" |
| self.expect_pass(post_process.StepCommandEquals, 'my-step', |
| ['thing', 'other']) |
| |
| def test_too_many_args(self): |
| """Assert that comparing against the cmd list plus extra args fails.""" |
| self.expect_fails(1, post_process.StepCommandEquals, 'my-step', |
| ['thing', 'other', 'foo']) |
| |
| def test_too_few_args(self): |
| """Assert that comparing against the cmd list minus some args fails.""" |
| self.expect_fails(1, post_process.StepCommandEquals, 'my-step', ['thing']) |
| |
| def test_string_instead_of_list(self): |
| """Assert that comparing against a command string fails.""" |
| self.expect_fails(1, post_process.StepCommandEquals, 'my-step', |
| 'thing other') |
| |
| def test_regex_would_pass(self): |
| """Assert that comparing against a list of cmd regexes fails.""" |
| self.expect_pass(post_process.StepCommandRE, 'my-step', |
| ['thing', '[other]+']) |
| self.expect_fails(1, post_process.StepCommandEquals, 'my-step', |
| ['thing', '[other]+']) |
| |
| |
| class TestStepCommandRe(PostProcessUnitTest): |
| """Test case for StepCommandRE.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a stadnard step dict for this test case.""" |
| return collections.OrderedDict([('x', { |
| 'name': 'x', |
| 'cmd': ['echo', 'foo', 'bar', 'baz'] |
| })]) |
| |
| def test_step_command_re_pass(self): |
| self.expect_pass(post_process.StepCommandRE, 'x', |
| ['echo', 'f.*', 'bar', '.*z']) |
| |
| def test_step_command_re_fail(self): |
| failures = self.expect_fails(2, post_process.StepCommandRE, 'x', |
| ['echo', 'fo', 'bar2', 'baz']) |
| self.assertHas(failures[0], |
| 'check(_fullmatch(expected, actual))', |
| "expected: 'fo'") |
| self.assertHas(failures[1], |
| 'check(_fullmatch(expected, actual))', |
| "expected: 'bar2'") |
| |
| failures = self.expect_fails(1, post_process.StepCommandRE, 'x', |
| ['echo', 'foo']) |
| self.assertHas(failures[0], |
| "CHECK 'all arguments matched'", |
| "unmatched: ['bar', 'baz']") |
| |
| failures = self.expect_fails(1, post_process.StepCommandRE, 'x', |
| ['echo', 'foo', 'bar', 'baz', 'quux', 'quuy']) |
| self.assertHas(failures[0], |
| "CHECK 'all patterns used'", |
| "unused: ['quux', 'quuy']") |
| |
| |
| class TestStepCommandContains(PostProcessUnitTest): |
| """Test case for StepCommandContains.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return collections.OrderedDict([('two', { |
| 'name': 'two', |
| 'cmd': ['a', 'b'] |
| }), ('one', { |
| 'name': 'one', |
| 'cmd': ['a'] |
| }), ('zero', { |
| 'name': 'zero', |
| 'cmd': [] |
| }), ('x', { |
| 'name': 'x', |
| 'cmd': ['echo', 'foo', 'bar', 'baz'] |
| })]) |
| |
| def expect_fail(self, func, failure, *args, **kwargs): |
| _, failures = self.post_process(func, *args, **kwargs) |
| self.assertEqual(len(failures), 1) |
| self.assertHas(failures[0], 'CHECK %r' % failure) |
| return failures |
| |
| def test_step_command_contains_one_pass(self): |
| self.expect_pass(post_process.StepCommandContains, 'one', ['a']) |
| |
| def test_step_command_contains_one_pass_trivial(self): |
| self.expect_pass(post_process.StepCommandContains, 'one', []) |
| |
| def test_step_command_contains_one_fail(self): |
| self.expect_fail(post_process.StepCommandContains, |
| "command line for step one contained ['b']", |
| 'one', ['b']) |
| |
| def test_step_command_contains_two_fail_order(self): |
| self.expect_fail(post_process.StepCommandContains, |
| "command line for step two contained ['b', 'a']", |
| 'two', ['b', 'a']) |
| |
| def test_step_command_contains_zero_pass(self): |
| self.expect_pass(post_process.StepCommandContains, 'zero', []) |
| |
| def test_step_command_contains_zero_fail(self): |
| self.expect_fail(post_process.StepCommandContains, |
| "command line for step zero contained ['a']", |
| 'zero', ['a']) |
| |
| def test_step_command_contains_pass(self): |
| self.expect_pass(post_process.StepCommandContains, 'x', |
| ['echo', 'foo', 'bar']) |
| self.expect_pass(post_process.StepCommandContains, 'x', |
| ['foo', 'bar', 'baz']) |
| |
| def test_step_command_contains_fail(self): |
| self.expect_fail(post_process.StepCommandContains, |
| 'command line for step x contained %r' % ['foo', 'baz'], |
| 'x', ['foo', 'baz']) |
| |
| |
| class TestStepCommandDoesNotContain(PostProcessUnitTest): |
| """Test case for StepCommandDoesNotContain.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return collections.OrderedDict([('two', { |
| 'name': 'two', |
| 'cmd': ['a', 'b'] |
| }), ('one', { |
| 'name': 'one', |
| 'cmd': ['a'] |
| }), ('zero', { |
| 'name': 'zero', |
| 'cmd': [] |
| }), ('x', { |
| 'name': 'x', |
| 'cmd': ['echo', 'foo', 'bar', 'baz'] |
| })]) |
| |
| def expect_pass(self, func, *args, **kwargs): |
| _, failures = self.post_process(func, *args, **kwargs) |
| self.assertEqual(len(failures), 0) |
| |
| def expect_fail(self, func, *args, **kwargs): |
| _, failures = self.post_process(func, *args, **kwargs) |
| self.assertEqual(len(failures), 1) |
| return failures |
| |
| def test_step_command_does_not_contain_one_pass(self): |
| self.expect_pass(post_process.StepCommandDoesNotContain, 'one', ['foo']) |
| |
| def test_step_command_does_not_contain_one_fail(self): |
| self.expect_fail(post_process.StepCommandDoesNotContain, 'one', ['a']) |
| |
| def test_step_command_does_not_contain_two_pass_order(self): |
| self.expect_pass(post_process.StepCommandDoesNotContain, 'two', ['b', 'a']) |
| |
| def test_step_command_does_not_contain_two_fail_order(self): |
| self.expect_fail(post_process.StepCommandDoesNotContain, 'two', ['a', 'b']) |
| |
| def test_step_command_does_not_contain_fail(self): |
| self.expect_fail(post_process.StepCommandDoesNotContain, 'x', |
| ['echo', 'foo', 'bar']) |
| self.expect_fail(post_process.StepCommandDoesNotContain, 'x', |
| ['foo', 'bar', 'baz']) |
| |
| def test_step_command_does_not_contain_pass(self): |
| self.expect_pass(post_process.StepCommandDoesNotContain, 'x', |
| ['foo', 'baz']) |
| |
| |
| class TestStepText(PostProcessUnitTest): |
| """Test case for checks that relate to step text and step summary.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return collections.OrderedDict([('x', { |
| 'name': 'x', |
| 'step_text': 'foobar', |
| 'step_summary_text': 'test summary', |
| })]) |
| |
| def test_step_text_equals_pass(self): |
| self.expect_pass(post_process.StepTextEquals, 'x', 'foobar') |
| |
| def test_step_text_equals_fail(self): |
| failures = self.expect_fails(1, post_process.StepTextEquals, 'x', 'foo') |
| self.assertHas(failures[0], |
| 'check((step_odict[step].step_text == expected))') |
| |
| def test_step_text_contains_pass(self): |
| self.expect_pass(post_process.StepTextContains, 'x', ['foo', 'bar']) |
| |
| def test_step_summary_text_equals_pass(self): |
| self.expect_pass(post_process.StepSummaryEquals, 'x', 'test summary') |
| |
| def test_step_summary_text_equals_fail(self): |
| failures = self.expect_fails(1, post_process.StepSummaryEquals, 'x', |
| 'bad') |
| self.assertHas(failures[0], |
| 'check((step_odict[step].step_summary_text == expected))', |
| "expected: 'bad'") |
| |
| |
| def test_step_text_contains_fail(self): |
| failures = self.expect_fails( |
| 2, post_process.StepTextContains, 'x', ['food', 'bar', 'baz']) |
| self.assertHas(failures[0], |
| 'check((expected in step_odict[step].step_text))', |
| "expected: 'food'") |
| self.assertHas(failures[1], |
| 'check((expected in step_odict[step].step_text))', |
| "expected: 'baz'") |
| |
| |
| class TestLog(PostProcessUnitTest): |
| """Test case for checks that relate to logs.""" |
| |
| @property |
| def step_dict(self) -> dict[str, dict[str, Any]]: |
| """Return a standard step dict for this test case.""" |
| return collections.OrderedDict([('x', { |
| 'name': 'x', |
| 'logs': { |
| 'log-x': 'foo\nbar', |
| }, |
| })]) |
| |
| def test_log_equals_pass(self): |
| self.expect_pass(post_process.LogEquals, 'x', 'log-x', 'foo\nbar') |
| |
| def test_log_equals_fail(self): |
| failures = self.expect_fails(1, post_process.LogEquals, |
| 'x', 'log-x', 'foo\nbar\n') |
| self.assertHas(failures[0], |
| 'check((step_odict[step].logs[log] == expected))') |
| |
| def test_log_contains_pass(self): |
| self.expect_pass(post_process.LogContains, 'x', 'log-x', |
| ['foo\n', '\nbar', 'foo\nbar']) |
| |
| def test_log_contains_fail(self): |
| failures = self.expect_fails( |
| 3, post_process.LogContains, 'x', 'log-x', |
| ['food', 'bar', 'baz', 'foobar']) |
| self.assertHas(failures[0], |
| 'check((expected in step_odict[step].logs[log]))', |
| "expected: 'food'") |
| self.assertHas(failures[1], |
| 'check((expected in step_odict[step].logs[log]))', |
| "expected: 'baz'") |
| self.assertHas(failures[2], |
| 'check((expected in step_odict[step].logs[log]))', |
| "expected: 'foobar'") |
| |
| def test_log_does_not_contain_pass(self): |
| self.expect_pass(post_process.LogDoesNotContain, 'x', 'log-x', |
| ['i dont exist']) |
| |
| def test_log_does_not_contain_fail(self): |
| failures = self.expect_fails(1, post_process.LogDoesNotContain, 'x', |
| 'log-x', ['foo']) |
| self.assertHas(failures[0], |
| 'check((unexpected not in step_odict[step].logs[log]))', |
| "unexpected: 'foo'") |
| |
| |
| if __name__ == '__main__': |
| test_env.main() |