| #!/usr/bin/env vpython3 |
| # Copyright 2020 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import copy |
| import json |
| import subprocess |
| import sys |
| from typing import List, Tuple |
| import unittest |
| |
| import unittest.mock as mock |
| |
| from unexpected_passes_common import builders |
| from unexpected_passes_common import constants |
| from unexpected_passes_common import data_types |
| from unexpected_passes_common import expectations |
| from unexpected_passes_common import multiprocessing_utils |
| from unexpected_passes_common import queries |
| from unexpected_passes_common import unittest_utils |
| |
| queries.QUERY_DELAY = 0 |
| |
| |
| class HelperMethodUnittest(unittest.TestCase): |
| def testStripPrefixFromBuildIdValidId(self) -> None: |
| self.assertEqual(queries._StripPrefixFromBuildId('build-1'), '1') |
| |
| def testStripPrefixFromBuildIdInvalidId(self) -> None: |
| with self.assertRaises(AssertionError): |
| queries._StripPrefixFromBuildId('build1') |
| with self.assertRaises(AssertionError): |
| queries._StripPrefixFromBuildId('build-1-2') |
| |
| def testConvertActualResultToExpectationFileFormatAbort(self) -> None: |
| self.assertEqual( |
| queries._ConvertActualResultToExpectationFileFormat('ABORT'), 'Timeout') |
| |
| |
| class QueryGeneratorUnittest(unittest.TestCase): |
| def setUp(self): |
| self._builder = data_types.BuilderEntry('ci', constants.BuilderTypes.CI, |
| False) |
| |
| def testSplitQueryGeneratorInitialSplit(self) -> None: |
| """Tests that initial query splitting works as expected.""" |
| test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 2) |
| self.assertEqual(test_filter._test_id_lists, [['1', '2'], ['3']]) |
| self.assertEqual(len(test_filter.GetClauses()), 2) |
| test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 3) |
| self.assertEqual(test_filter._test_id_lists, [['1', '2', '3']]) |
| self.assertEqual(len(test_filter.GetClauses()), 1) |
| |
| def testSplitQueryGeneratorSplitQuery(self) -> None: |
| """Tests that SplitQueryGenerator's query splitting works.""" |
| test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2'], 10) |
| self.assertEqual(len(test_filter.GetClauses()), 1) |
| test_filter.SplitQuery() |
| self.assertEqual(len(test_filter.GetClauses()), 2) |
| |
| def testSplitQueryGeneratorSplitQueryCannotSplitFurther(self) -> None: |
| """Tests that SplitQueryGenerator's failure mode.""" |
| test_filter = queries.SplitQueryGenerator(self._builder, ['1'], 1) |
| with self.assertRaises(queries.QuerySplitError): |
| test_filter.SplitQuery() |
| |
| |
| class QueryBuilderUnittest(unittest.TestCase): |
| def setUp(self) -> None: |
| self._patcher = mock.patch.object(subprocess, 'Popen') |
| self._popen_mock = self._patcher.start() |
| self.addCleanup(self._patcher.stop) |
| |
| builders.ClearInstance() |
| expectations.ClearInstance() |
| unittest_utils.RegisterGenericBuildersImplementation() |
| unittest_utils.RegisterGenericExpectationsImplementation() |
| self._querier = unittest_utils.CreateGenericQuerier() |
| |
| self._relevant_file_patcher = mock.patch.object( |
| self._querier, |
| '_GetRelevantExpectationFilesForQueryResult', |
| return_value=None) |
| self._relevant_file_mock = self._relevant_file_patcher.start() |
| self.addCleanup(self._relevant_file_patcher.stop) |
| |
| self._builder = data_types.BuilderEntry('builder', |
| constants.BuilderTypes.CI, False) |
| |
| def testQueryFailureRaised(self) -> None: |
| """Tests that a query failure is properly surfaced.""" |
| self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1) |
| with self.assertRaises(RuntimeError): |
| self._querier.QueryBuilder( |
| data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False)) |
| |
| def testInvalidNumSamples(self) -> None: |
| """Tests that the number of samples is validated.""" |
| with self.assertRaises(AssertionError): |
| unittest_utils.CreateGenericQuerier(num_samples=-1) |
| |
| def testInvalidNumJobs(self) -> None: |
| """Tests that the number of jobs is validated.""" |
| with self.assertRaises(AssertionError): |
| unittest_utils.CreateGenericQuerier(num_jobs=0) |
| |
| def testNoResults(self) -> None: |
| """Tests functionality if the query returns no results.""" |
| self._popen_mock.return_value = unittest_utils.FakeProcess(stdout='[]') |
| results, expectation_files = self._querier.QueryBuilder( |
| data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False)) |
| self.assertEqual(results, []) |
| self.assertIsNone(expectation_files, None) |
| |
| def testValidResults(self) -> None: |
| """Tests functionality when valid results are returned.""" |
| self._relevant_file_mock.return_value = ['foo_expectations'] |
| query_results = [ |
| { |
| 'id': |
| 'build-1234', |
| 'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/' |
| 'gpu_tests.pixel_integration_test.' |
| 'PixelIntegrationTest.test_name'), |
| 'status': |
| 'FAIL', |
| 'typ_expectations': [ |
| 'RetryOnFailure', |
| ], |
| 'typ_tags': [ |
| 'win', |
| 'intel', |
| 'unknown_tag', # This is expected to be removed. |
| ], |
| 'step_name': |
| 'step_name', |
| }, |
| ] |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| stdout=json.dumps(query_results)) |
| results, expectation_files = self._querier.QueryBuilder( |
| data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False)) |
| self.assertEqual(len(results), 1) |
| self.assertEqual( |
| results[0], |
| data_types.Result('test_name', ['win', 'intel'], 'Failure', 'step_name', |
| '1234')) |
| self.assertEqual(expectation_files, ['foo_expectations']) |
| |
| def testValidResultsNoneExpectations(self) -> None: |
| """Tests when an implementation uses None for expectation files.""" |
| query_results = [ |
| { |
| 'id': |
| 'build-1234', |
| 'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/' |
| 'gpu_tests.pixel_integration_test.' |
| 'PixelIntegrationTest.test_name'), |
| 'status': |
| 'FAIL', |
| 'typ_expectations': [ |
| 'RetryOnFailure', |
| ], |
| 'typ_tags': [ |
| 'win', |
| 'intel', |
| ], |
| 'step_name': |
| 'step_name', |
| }, |
| { |
| 'id': |
| 'build-1234', |
| 'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/' |
| 'gpu_tests.pixel_integration_test.' |
| 'PixelIntegrationTest.test_name'), |
| 'status': |
| 'FAIL', |
| 'typ_expectations': [ |
| 'RetryOnFailure', |
| ], |
| 'typ_tags': [ |
| 'win', |
| 'nvidia', |
| ], |
| 'step_name': |
| 'step_name', |
| }, |
| ] |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| stdout=json.dumps(query_results)) |
| with mock.patch.object( |
| self._querier, '_GetRelevantExpectationFilesForQueryResult') as ef_mock: |
| ef_mock.return_value = None |
| results, expectation_files = self._querier.QueryBuilder( |
| data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False)) |
| self.assertEqual(len(results), 2) |
| self.assertIn( |
| data_types.Result('test_name', ['win', 'intel'], 'Failure', |
| 'step_name', '1234'), results) |
| self.assertIn( |
| data_types.Result('test_name', ['win', 'nvidia'], 'Failure', |
| 'step_name', '1234'), results) |
| self.assertIsNone(expectation_files) |
| ef_mock.assert_called_once() |
| |
| def testValidResultsMultipleSteps(self) -> None: |
| """Tests functionality when results from multiple steps are present.""" |
| |
| def SideEffect(result: queries.QueryResult) -> List[str]: |
| if result['step_name'] == 'a step name': |
| return ['foo_expectations'] |
| if result['step_name'] == 'another step name': |
| return ['bar_expectations'] |
| raise RuntimeError('Unknown step %s' % result['step_name']) |
| |
| self._relevant_file_mock.side_effect = SideEffect |
| query_results = [ |
| { |
| 'id': 'build-1234', |
| 'test_id': 'ninja://:blink_web_tests/some/test/with.test_name', |
| 'status': 'FAIL', |
| 'typ_expectations': [ |
| 'Failure', |
| ], |
| 'typ_tags': [ |
| 'linux', |
| 'intel', |
| ], |
| 'step_name': 'a step name', |
| }, |
| { |
| 'id': 'build-1234', |
| 'test_id': 'ninja://:blink_web_tests/some/test/with.test_name', |
| 'status': 'FAIL', |
| 'typ_expectations': [ |
| 'Crash', |
| ], |
| 'typ_tags': [ |
| 'linux', |
| 'amd', |
| ], |
| 'step_name': 'another step name', |
| }, |
| ] |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| stdout=json.dumps(query_results)) |
| results, expectation_files = self._querier.QueryBuilder( |
| data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False)) |
| self.assertEqual(len(results), 2) |
| self.assertIn( |
| data_types.Result('test_name', ['linux', 'intel'], 'Failure', |
| 'a step name', '1234'), results) |
| self.assertIn( |
| data_types.Result('test_name', ['linux', 'amd'], 'Failure', |
| 'another step name', '1234'), results) |
| self.assertEqual(len(expectation_files), 2) |
| self.assertEqual(set(expectation_files), |
| set(['foo_expectations', 'bar_expectations'])) |
| |
| def testFilterInsertion(self) -> None: |
| """Tests that test filters are properly inserted into the query.""" |
| with mock.patch.object( |
| self._querier, |
| '_GetQueryGeneratorForBuilder', |
| return_value=unittest_utils.SimpleFixedQueryGenerator( |
| self._builder, 'a real filter')), mock.patch.object( |
| self._querier, |
| '_RunBigQueryCommandsForJsonOutput') as query_mock: |
| self._querier.QueryBuilder(self._builder) |
| query_mock.assert_called_once() |
| query = query_mock.call_args[0][0][0] |
| self.assertIn('a real filter', query) |
| |
| def testEarlyReturnOnNoFilter(self) -> None: |
| """Tests that the absence of a test filter results in an early return.""" |
| with mock.patch.object( |
| self._querier, '_GetQueryGeneratorForBuilder', |
| return_value=None), mock.patch.object( |
| self._querier, '_RunBigQueryCommandsForJsonOutput') as query_mock: |
| results, expectation_files = self._querier.QueryBuilder(self._builder) |
| query_mock.assert_not_called() |
| self.assertEqual(results, []) |
| self.assertEqual(expectation_files, None) |
| |
| def testRetryOnMemoryLimit(self) -> None: |
| """Tests that queries are split and retried if the memory limit is hit.""" |
| |
| def SideEffect(*_, **__) -> list: |
| SideEffect.call_count += 1 |
| if SideEffect.call_count == 1: |
| raise queries.MemoryLimitError() |
| return [] |
| |
| SideEffect.call_count = 0 |
| |
| with mock.patch.object( |
| self._querier, |
| '_GetQueryGeneratorForBuilder', |
| return_value=unittest_utils.SimpleSplitQueryGenerator( |
| self._builder, ['filter_a', 'filter_b'], 10)), mock.patch.object( |
| self._querier, |
| '_RunBigQueryCommandsForJsonOutput') as query_mock: |
| query_mock.side_effect = SideEffect |
| self._querier.QueryBuilder(self._builder) |
| self.assertEqual(query_mock.call_count, 2) |
| |
| args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 0) |
| first_query = args[0][0] |
| self.assertIn('filter_a', first_query) |
| self.assertIn('filter_b', first_query) |
| |
| args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 1) |
| second_query_first_half = args[0][0] |
| self.assertIn('filter_a', second_query_first_half) |
| self.assertNotIn('filter_b', second_query_first_half) |
| |
| second_query_second_half = args[0][1] |
| self.assertIn('filter_b', second_query_second_half) |
| self.assertNotIn('filter_a', second_query_second_half) |
| |
| |
| class FillExpectationMapForBuildersUnittest(unittest.TestCase): |
| def setUp(self) -> None: |
| self._querier = unittest_utils.CreateGenericQuerier() |
| |
| self._query_patcher = mock.patch.object(self._querier, 'QueryBuilder') |
| self._query_mock = self._query_patcher.start() |
| self.addCleanup(self._query_patcher.stop) |
| self._pool_patcher = mock.patch.object(multiprocessing_utils, |
| 'GetProcessPool') |
| self._pool_mock = self._pool_patcher.start() |
| self._pool_mock.return_value = unittest_utils.FakePool() |
| self.addCleanup(self._pool_patcher.stop) |
| self._filter_patcher = mock.patch.object(self._querier, |
| '_FilterOutInactiveBuilders') |
| self._filter_mock = self._filter_patcher.start() |
| self._filter_mock.side_effect = lambda b, _: b |
| self.addCleanup(self._filter_patcher.stop) |
| |
| def testErrorOnMixedBuilders(self) -> None: |
| """Tests that providing builders of mixed type is an error.""" |
| builders_to_fill = [ |
| data_types.BuilderEntry('ci_builder', constants.BuilderTypes.CI, False), |
| data_types.BuilderEntry('try_builder', constants.BuilderTypes.TRY, |
| False) |
| ] |
| with self.assertRaises(AssertionError): |
| self._querier.FillExpectationMapForBuilders( |
| data_types.TestExpectationMap({}), builders_to_fill) |
| |
| def testValidResults(self) -> None: |
| """Tests functionality when valid results are returned by the query.""" |
| |
| def SideEffect(builder: data_types.BuilderEntry, |
| *args) -> Tuple[data_types.ResultListType, None]: |
| del args |
| if builder.name == 'matched_builder': |
| return ([ |
| data_types.Result('foo', ['win'], 'Pass', 'step_name', 'build_id') |
| ], None) |
| if builder.name == 'matched_internal': |
| return ([ |
| data_types.Result('foo', ['win'], 'Pass', 'step_name_internal', |
| 'build_id') |
| ], None) |
| if builder.name == 'unmatched_internal': |
| return ([ |
| data_types.Result('bar', [], 'Pass', 'step_name_internal', |
| 'build_id') |
| ], None) |
| return ([data_types.Result('bar', [], 'Pass', 'step_name', |
| 'build_id')], None) |
| |
| self._query_mock.side_effect = SideEffect |
| |
| expectation = data_types.Expectation('foo', ['win'], 'RetryOnFailure') |
| expectation_map = data_types.TestExpectationMap({ |
| 'foo': |
| data_types.ExpectationBuilderMap({ |
| expectation: |
| data_types.BuilderStepMap(), |
| }), |
| }) |
| builders_to_fill = [ |
| data_types.BuilderEntry('matched_builder', constants.BuilderTypes.CI, |
| False), |
| data_types.BuilderEntry('unmatched_builder', constants.BuilderTypes.CI, |
| False), |
| data_types.BuilderEntry('matched_internal', constants.BuilderTypes.CI, |
| True), |
| data_types.BuilderEntry('unmatched_internal', constants.BuilderTypes.CI, |
| True), |
| ] |
| unmatched_results = self._querier.FillExpectationMapForBuilders( |
| expectation_map, builders_to_fill) |
| stats = data_types.BuildStats() |
| stats.AddPassedBuild(frozenset(['win'])) |
| expected_expectation_map = { |
| 'foo': { |
| expectation: { |
| 'chromium/ci:matched_builder': { |
| 'step_name': stats, |
| }, |
| 'chrome/ci:matched_internal': { |
| 'step_name_internal': stats, |
| }, |
| }, |
| }, |
| } |
| self.assertEqual(expectation_map, expected_expectation_map) |
| self.assertEqual( |
| unmatched_results, { |
| 'chromium/ci:unmatched_builder': [ |
| data_types.Result('bar', [], 'Pass', 'step_name', 'build_id'), |
| ], |
| 'chrome/ci:unmatched_internal': [ |
| data_types.Result('bar', [], 'Pass', 'step_name_internal', |
| 'build_id'), |
| ], |
| }) |
| |
| def testQueryFailureIsSurfaced(self) -> None: |
| """Tests that a query failure is properly surfaced despite being async.""" |
| self._query_mock.side_effect = IndexError('failure') |
| with self.assertRaises(IndexError): |
| self._querier.FillExpectationMapForBuilders( |
| data_types.TestExpectationMap(), [ |
| data_types.BuilderEntry('matched_builder', |
| constants.BuilderTypes.CI, False) |
| ]) |
| |
| |
| class FilterOutInactiveBuildersUnittest(unittest.TestCase): |
| def setUp(self) -> None: |
| self._subprocess_patcher = mock.patch( |
| 'unexpected_passes_common.queries.subprocess.Popen') |
| self._subprocess_mock = self._subprocess_patcher.start() |
| self.addCleanup(self._subprocess_patcher.stop) |
| |
| self._querier = unittest_utils.CreateGenericQuerier() |
| |
| def testAllActiveBuilders(self) -> None: |
| """Tests that no builders are removed if no inactive builders are found.""" |
| results = [{ |
| 'builder_name': 'foo_builder', |
| }, { |
| 'builder_name': 'bar_builder', |
| }] |
| fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results)) |
| self._subprocess_mock.return_value = fake_process |
| initial_builders = [ |
| data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, |
| False), |
| data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI, |
| False), |
| ] |
| expected_builders = copy.copy(initial_builders) |
| filtered_builders = self._querier._FilterOutInactiveBuilders( |
| initial_builders, constants.BuilderTypes.CI) |
| self.assertEqual(filtered_builders, expected_builders) |
| |
| def testInactiveBuilders(self) -> None: |
| """Tests that inactive builders are removed.""" |
| results = [{ |
| 'builder_name': 'foo_builder', |
| }] |
| fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results)) |
| self._subprocess_mock.return_value = fake_process |
| initial_builders = [ |
| data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, |
| False), |
| data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI, |
| False), |
| ] |
| expected_builders = [ |
| data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False) |
| ] |
| filtered_builders = self._querier._FilterOutInactiveBuilders( |
| initial_builders, constants.BuilderTypes.CI) |
| self.assertEqual(filtered_builders, expected_builders) |
| |
| def testByteConversion(self) -> None: |
| """Tests that bytes are properly handled if returned.""" |
| results = [{ |
| 'builder_name': 'foo_builder', |
| }] |
| fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results)) |
| self._subprocess_mock.return_value = fake_process |
| initial_builders = [ |
| data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, |
| False), |
| data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI, |
| False), |
| ] |
| expected_builders = [ |
| data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False) |
| ] |
| filtered_builders = self._querier._FilterOutInactiveBuilders( |
| initial_builders, constants.BuilderTypes.CI) |
| self.assertEqual(filtered_builders, expected_builders) |
| |
| |
| class RunBigQueryCommandsForJsonOutputUnittest(unittest.TestCase): |
| def setUp(self) -> None: |
| self._popen_patcher = mock.patch.object(subprocess, 'Popen') |
| self._popen_mock = self._popen_patcher.start() |
| self.addCleanup(self._popen_patcher.stop) |
| |
| self._querier = unittest_utils.CreateGenericQuerier() |
| |
| def testJsonReturned(self) -> None: |
| """Tests that valid JSON parsed from stdout is returned.""" |
| query_output = [{'foo': 'bar'}] |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| stdout=json.dumps(query_output)) |
| result = self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| self.assertEqual(result, query_output) |
| self._popen_mock.assert_called_once() |
| |
| def testJsonReturnedSplitQuery(self) -> None: |
| """Tests that valid JSON is returned when a split query is used.""" |
| |
| def SideEffect(*_, **__) -> unittest_utils.FakeProcess: |
| SideEffect.call_count += 1 |
| if SideEffect.call_count == 1: |
| return unittest_utils.FakeProcess(stdout=json.dumps([{'foo': 'bar'}])) |
| return unittest_utils.FakeProcess(stdout=json.dumps([{'bar': 'baz'}])) |
| |
| SideEffect.call_count = 0 |
| |
| self._popen_mock.side_effect = SideEffect |
| result = self._querier._RunBigQueryCommandsForJsonOutput(['1', '2'], {}) |
| |
| self.assertEqual(len(result), 2) |
| self.assertIn({'foo': 'bar'}, result) |
| self.assertIn({'bar': 'baz'}, result) |
| |
| def testExceptionRaisedOnFailure(self) -> None: |
| """Tests that an exception is raised if the query fails.""" |
| self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1) |
| with self.assertRaises(RuntimeError): |
| self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| |
| def testRateLimitRetrySuccess(self) -> None: |
| """Tests that rate limit errors result in retries.""" |
| |
| def SideEffect(*_, **__) -> unittest_utils.FakeProcess: |
| SideEffect.call_count += 1 |
| if SideEffect.call_count == 1: |
| return unittest_utils.FakeProcess( |
| returncode=1, stdout='Exceeded rate limits for foo.') |
| return unittest_utils.FakeProcess(stdout='[]') |
| |
| SideEffect.call_count = 0 |
| |
| self._popen_mock.side_effect = SideEffect |
| self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| self.assertEqual(self._popen_mock.call_count, 2) |
| |
| def testRateLimitRetryFailure(self) -> None: |
| """Tests that rate limit errors stop retrying after enough iterations.""" |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| returncode=1, stdout='Exceeded rate limits for foo.') |
| with self.assertRaises(RuntimeError): |
| self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| self.assertEqual(self._popen_mock.call_count, queries.MAX_QUERY_TRIES) |
| |
| def testBatching(self) -> None: |
| """Tests that batching preferences are properly forwarded.""" |
| query_output = [{'foo': 'bar'}] |
| self._popen_mock.return_value = unittest_utils.FakeProcess( |
| stdout=json.dumps(query_output)) |
| |
| self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| self._popen_mock.assert_called_once() |
| args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list, |
| 0) |
| cmd = args[0] |
| self.assertIn('--batch', cmd) |
| |
| self._querier = unittest_utils.CreateGenericQuerier(use_batching=False) |
| self._popen_mock.reset_mock() |
| self._querier._RunBigQueryCommandsForJsonOutput([''], {}) |
| self._popen_mock.assert_called_once() |
| args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list, |
| 0) |
| cmd = args[0] |
| self.assertNotIn('--batch', cmd) |
| |
| |
| class GenerateBigQueryCommandUnittest(unittest.TestCase): |
| def testNoParametersSpecified(self) -> None: |
| """Tests that no parameters are added if none are specified.""" |
| cmd = queries.GenerateBigQueryCommand('project', {}) |
| for element in cmd: |
| self.assertFalse(element.startswith('--parameter')) |
| |
| def testParameterAddition(self) -> None: |
| """Tests that specified parameters are added appropriately.""" |
| cmd = queries.GenerateBigQueryCommand('project', { |
| '': { |
| 'string': 'string_value' |
| }, |
| 'INT64': { |
| 'int': 1 |
| } |
| }) |
| self.assertIn('--parameter=string::string_value', cmd) |
| self.assertIn('--parameter=int:INT64:1', cmd) |
| |
| def testBatchMode(self) -> None: |
| """Tests that batch mode adds the necessary arg.""" |
| cmd = queries.GenerateBigQueryCommand('project', {}, batch=True) |
| self.assertIn('--batch', cmd) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(verbosity=2) |