blob: dd08527d750a163c76f0e848a0a452f6b68ccad7 [file] [log] [blame]
#!/usr/bin/env vpython3
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import json
import subprocess
import sys
from typing import List, Tuple
import unittest
import unittest.mock as mock
from unexpected_passes_common import builders
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations
from unexpected_passes_common import multiprocessing_utils
from unexpected_passes_common import queries
from unexpected_passes_common import unittest_utils
queries.QUERY_DELAY = 0
class HelperMethodUnittest(unittest.TestCase):
def testStripPrefixFromBuildIdValidId(self) -> None:
self.assertEqual(queries._StripPrefixFromBuildId('build-1'), '1')
def testStripPrefixFromBuildIdInvalidId(self) -> None:
with self.assertRaises(AssertionError):
queries._StripPrefixFromBuildId('build1')
with self.assertRaises(AssertionError):
queries._StripPrefixFromBuildId('build-1-2')
def testConvertActualResultToExpectationFileFormatAbort(self) -> None:
self.assertEqual(
queries._ConvertActualResultToExpectationFileFormat('ABORT'), 'Timeout')
class QueryGeneratorUnittest(unittest.TestCase):
def setUp(self):
self._builder = data_types.BuilderEntry('ci', constants.BuilderTypes.CI,
False)
def testSplitQueryGeneratorInitialSplit(self) -> None:
"""Tests that initial query splitting works as expected."""
test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 2)
self.assertEqual(test_filter._test_id_lists, [['1', '2'], ['3']])
self.assertEqual(len(test_filter.GetClauses()), 2)
test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2', '3'], 3)
self.assertEqual(test_filter._test_id_lists, [['1', '2', '3']])
self.assertEqual(len(test_filter.GetClauses()), 1)
def testSplitQueryGeneratorSplitQuery(self) -> None:
"""Tests that SplitQueryGenerator's query splitting works."""
test_filter = queries.SplitQueryGenerator(self._builder, ['1', '2'], 10)
self.assertEqual(len(test_filter.GetClauses()), 1)
test_filter.SplitQuery()
self.assertEqual(len(test_filter.GetClauses()), 2)
def testSplitQueryGeneratorSplitQueryCannotSplitFurther(self) -> None:
"""Tests that SplitQueryGenerator's failure mode."""
test_filter = queries.SplitQueryGenerator(self._builder, ['1'], 1)
with self.assertRaises(queries.QuerySplitError):
test_filter.SplitQuery()
class QueryBuilderUnittest(unittest.TestCase):
def setUp(self) -> None:
self._patcher = mock.patch.object(subprocess, 'Popen')
self._popen_mock = self._patcher.start()
self.addCleanup(self._patcher.stop)
builders.ClearInstance()
expectations.ClearInstance()
unittest_utils.RegisterGenericBuildersImplementation()
unittest_utils.RegisterGenericExpectationsImplementation()
self._querier = unittest_utils.CreateGenericQuerier()
self._relevant_file_patcher = mock.patch.object(
self._querier,
'_GetRelevantExpectationFilesForQueryResult',
return_value=None)
self._relevant_file_mock = self._relevant_file_patcher.start()
self.addCleanup(self._relevant_file_patcher.stop)
self._builder = data_types.BuilderEntry('builder',
constants.BuilderTypes.CI, False)
def testQueryFailureRaised(self) -> None:
"""Tests that a query failure is properly surfaced."""
self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1)
with self.assertRaises(RuntimeError):
self._querier.QueryBuilder(
data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
def testInvalidNumSamples(self) -> None:
"""Tests that the number of samples is validated."""
with self.assertRaises(AssertionError):
unittest_utils.CreateGenericQuerier(num_samples=-1)
def testInvalidNumJobs(self) -> None:
"""Tests that the number of jobs is validated."""
with self.assertRaises(AssertionError):
unittest_utils.CreateGenericQuerier(num_jobs=0)
def testNoResults(self) -> None:
"""Tests functionality if the query returns no results."""
self._popen_mock.return_value = unittest_utils.FakeProcess(stdout='[]')
results, expectation_files = self._querier.QueryBuilder(
data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
self.assertEqual(results, [])
self.assertIsNone(expectation_files, None)
def testValidResults(self) -> None:
"""Tests functionality when valid results are returned."""
self._relevant_file_mock.return_value = ['foo_expectations']
query_results = [
{
'id':
'build-1234',
'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
'gpu_tests.pixel_integration_test.'
'PixelIntegrationTest.test_name'),
'status':
'FAIL',
'typ_expectations': [
'RetryOnFailure',
],
'typ_tags': [
'win',
'intel',
'unknown_tag', # This is expected to be removed.
],
'step_name':
'step_name',
},
]
self._popen_mock.return_value = unittest_utils.FakeProcess(
stdout=json.dumps(query_results))
results, expectation_files = self._querier.QueryBuilder(
data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
self.assertEqual(len(results), 1)
self.assertEqual(
results[0],
data_types.Result('test_name', ['win', 'intel'], 'Failure', 'step_name',
'1234'))
self.assertEqual(expectation_files, ['foo_expectations'])
def testValidResultsNoneExpectations(self) -> None:
"""Tests when an implementation uses None for expectation files."""
query_results = [
{
'id':
'build-1234',
'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
'gpu_tests.pixel_integration_test.'
'PixelIntegrationTest.test_name'),
'status':
'FAIL',
'typ_expectations': [
'RetryOnFailure',
],
'typ_tags': [
'win',
'intel',
],
'step_name':
'step_name',
},
{
'id':
'build-1234',
'test_id': ('ninja://chrome/test:telemetry_gpu_integration_test/'
'gpu_tests.pixel_integration_test.'
'PixelIntegrationTest.test_name'),
'status':
'FAIL',
'typ_expectations': [
'RetryOnFailure',
],
'typ_tags': [
'win',
'nvidia',
],
'step_name':
'step_name',
},
]
self._popen_mock.return_value = unittest_utils.FakeProcess(
stdout=json.dumps(query_results))
with mock.patch.object(
self._querier, '_GetRelevantExpectationFilesForQueryResult') as ef_mock:
ef_mock.return_value = None
results, expectation_files = self._querier.QueryBuilder(
data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
self.assertEqual(len(results), 2)
self.assertIn(
data_types.Result('test_name', ['win', 'intel'], 'Failure',
'step_name', '1234'), results)
self.assertIn(
data_types.Result('test_name', ['win', 'nvidia'], 'Failure',
'step_name', '1234'), results)
self.assertIsNone(expectation_files)
ef_mock.assert_called_once()
def testValidResultsMultipleSteps(self) -> None:
"""Tests functionality when results from multiple steps are present."""
def SideEffect(result: queries.QueryResult) -> List[str]:
if result['step_name'] == 'a step name':
return ['foo_expectations']
if result['step_name'] == 'another step name':
return ['bar_expectations']
raise RuntimeError('Unknown step %s' % result['step_name'])
self._relevant_file_mock.side_effect = SideEffect
query_results = [
{
'id': 'build-1234',
'test_id': 'ninja://:blink_web_tests/some/test/with.test_name',
'status': 'FAIL',
'typ_expectations': [
'Failure',
],
'typ_tags': [
'linux',
'intel',
],
'step_name': 'a step name',
},
{
'id': 'build-1234',
'test_id': 'ninja://:blink_web_tests/some/test/with.test_name',
'status': 'FAIL',
'typ_expectations': [
'Crash',
],
'typ_tags': [
'linux',
'amd',
],
'step_name': 'another step name',
},
]
self._popen_mock.return_value = unittest_utils.FakeProcess(
stdout=json.dumps(query_results))
results, expectation_files = self._querier.QueryBuilder(
data_types.BuilderEntry('builder', constants.BuilderTypes.CI, False))
self.assertEqual(len(results), 2)
self.assertIn(
data_types.Result('test_name', ['linux', 'intel'], 'Failure',
'a step name', '1234'), results)
self.assertIn(
data_types.Result('test_name', ['linux', 'amd'], 'Failure',
'another step name', '1234'), results)
self.assertEqual(len(expectation_files), 2)
self.assertEqual(set(expectation_files),
set(['foo_expectations', 'bar_expectations']))
def testFilterInsertion(self) -> None:
"""Tests that test filters are properly inserted into the query."""
with mock.patch.object(
self._querier,
'_GetQueryGeneratorForBuilder',
return_value=unittest_utils.SimpleFixedQueryGenerator(
self._builder, 'a real filter')), mock.patch.object(
self._querier,
'_RunBigQueryCommandsForJsonOutput') as query_mock:
self._querier.QueryBuilder(self._builder)
query_mock.assert_called_once()
query = query_mock.call_args[0][0][0]
self.assertIn('a real filter', query)
def testEarlyReturnOnNoFilter(self) -> None:
"""Tests that the absence of a test filter results in an early return."""
with mock.patch.object(
self._querier, '_GetQueryGeneratorForBuilder',
return_value=None), mock.patch.object(
self._querier, '_RunBigQueryCommandsForJsonOutput') as query_mock:
results, expectation_files = self._querier.QueryBuilder(self._builder)
query_mock.assert_not_called()
self.assertEqual(results, [])
self.assertEqual(expectation_files, None)
def testRetryOnMemoryLimit(self) -> None:
"""Tests that queries are split and retried if the memory limit is hit."""
def SideEffect(*_, **__) -> list:
SideEffect.call_count += 1
if SideEffect.call_count == 1:
raise queries.MemoryLimitError()
return []
SideEffect.call_count = 0
with mock.patch.object(
self._querier,
'_GetQueryGeneratorForBuilder',
return_value=unittest_utils.SimpleSplitQueryGenerator(
self._builder, ['filter_a', 'filter_b'], 10)), mock.patch.object(
self._querier,
'_RunBigQueryCommandsForJsonOutput') as query_mock:
query_mock.side_effect = SideEffect
self._querier.QueryBuilder(self._builder)
self.assertEqual(query_mock.call_count, 2)
args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 0)
first_query = args[0][0]
self.assertIn('filter_a', first_query)
self.assertIn('filter_b', first_query)
args, _ = unittest_utils.GetArgsForMockCall(query_mock.call_args_list, 1)
second_query_first_half = args[0][0]
self.assertIn('filter_a', second_query_first_half)
self.assertNotIn('filter_b', second_query_first_half)
second_query_second_half = args[0][1]
self.assertIn('filter_b', second_query_second_half)
self.assertNotIn('filter_a', second_query_second_half)
class FillExpectationMapForBuildersUnittest(unittest.TestCase):
def setUp(self) -> None:
self._querier = unittest_utils.CreateGenericQuerier()
self._query_patcher = mock.patch.object(self._querier, 'QueryBuilder')
self._query_mock = self._query_patcher.start()
self.addCleanup(self._query_patcher.stop)
self._pool_patcher = mock.patch.object(multiprocessing_utils,
'GetProcessPool')
self._pool_mock = self._pool_patcher.start()
self._pool_mock.return_value = unittest_utils.FakePool()
self.addCleanup(self._pool_patcher.stop)
self._filter_patcher = mock.patch.object(self._querier,
'_FilterOutInactiveBuilders')
self._filter_mock = self._filter_patcher.start()
self._filter_mock.side_effect = lambda b, _: b
self.addCleanup(self._filter_patcher.stop)
def testErrorOnMixedBuilders(self) -> None:
"""Tests that providing builders of mixed type is an error."""
builders_to_fill = [
data_types.BuilderEntry('ci_builder', constants.BuilderTypes.CI, False),
data_types.BuilderEntry('try_builder', constants.BuilderTypes.TRY,
False)
]
with self.assertRaises(AssertionError):
self._querier.FillExpectationMapForBuilders(
data_types.TestExpectationMap({}), builders_to_fill)
def testValidResults(self) -> None:
"""Tests functionality when valid results are returned by the query."""
def SideEffect(builder: data_types.BuilderEntry,
*args) -> Tuple[data_types.ResultListType, None]:
del args
if builder.name == 'matched_builder':
return ([
data_types.Result('foo', ['win'], 'Pass', 'step_name', 'build_id')
], None)
if builder.name == 'matched_internal':
return ([
data_types.Result('foo', ['win'], 'Pass', 'step_name_internal',
'build_id')
], None)
if builder.name == 'unmatched_internal':
return ([
data_types.Result('bar', [], 'Pass', 'step_name_internal',
'build_id')
], None)
return ([data_types.Result('bar', [], 'Pass', 'step_name',
'build_id')], None)
self._query_mock.side_effect = SideEffect
expectation = data_types.Expectation('foo', ['win'], 'RetryOnFailure')
expectation_map = data_types.TestExpectationMap({
'foo':
data_types.ExpectationBuilderMap({
expectation:
data_types.BuilderStepMap(),
}),
})
builders_to_fill = [
data_types.BuilderEntry('matched_builder', constants.BuilderTypes.CI,
False),
data_types.BuilderEntry('unmatched_builder', constants.BuilderTypes.CI,
False),
data_types.BuilderEntry('matched_internal', constants.BuilderTypes.CI,
True),
data_types.BuilderEntry('unmatched_internal', constants.BuilderTypes.CI,
True),
]
unmatched_results = self._querier.FillExpectationMapForBuilders(
expectation_map, builders_to_fill)
stats = data_types.BuildStats()
stats.AddPassedBuild(frozenset(['win']))
expected_expectation_map = {
'foo': {
expectation: {
'chromium/ci:matched_builder': {
'step_name': stats,
},
'chrome/ci:matched_internal': {
'step_name_internal': stats,
},
},
},
}
self.assertEqual(expectation_map, expected_expectation_map)
self.assertEqual(
unmatched_results, {
'chromium/ci:unmatched_builder': [
data_types.Result('bar', [], 'Pass', 'step_name', 'build_id'),
],
'chrome/ci:unmatched_internal': [
data_types.Result('bar', [], 'Pass', 'step_name_internal',
'build_id'),
],
})
def testQueryFailureIsSurfaced(self) -> None:
"""Tests that a query failure is properly surfaced despite being async."""
self._query_mock.side_effect = IndexError('failure')
with self.assertRaises(IndexError):
self._querier.FillExpectationMapForBuilders(
data_types.TestExpectationMap(), [
data_types.BuilderEntry('matched_builder',
constants.BuilderTypes.CI, False)
])
class FilterOutInactiveBuildersUnittest(unittest.TestCase):
def setUp(self) -> None:
self._subprocess_patcher = mock.patch(
'unexpected_passes_common.queries.subprocess.Popen')
self._subprocess_mock = self._subprocess_patcher.start()
self.addCleanup(self._subprocess_patcher.stop)
self._querier = unittest_utils.CreateGenericQuerier()
def testAllActiveBuilders(self) -> None:
"""Tests that no builders are removed if no inactive builders are found."""
results = [{
'builder_name': 'foo_builder',
}, {
'builder_name': 'bar_builder',
}]
fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
self._subprocess_mock.return_value = fake_process
initial_builders = [
data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
False),
data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
False),
]
expected_builders = copy.copy(initial_builders)
filtered_builders = self._querier._FilterOutInactiveBuilders(
initial_builders, constants.BuilderTypes.CI)
self.assertEqual(filtered_builders, expected_builders)
def testInactiveBuilders(self) -> None:
"""Tests that inactive builders are removed."""
results = [{
'builder_name': 'foo_builder',
}]
fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
self._subprocess_mock.return_value = fake_process
initial_builders = [
data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
False),
data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
False),
]
expected_builders = [
data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False)
]
filtered_builders = self._querier._FilterOutInactiveBuilders(
initial_builders, constants.BuilderTypes.CI)
self.assertEqual(filtered_builders, expected_builders)
def testByteConversion(self) -> None:
"""Tests that bytes are properly handled if returned."""
results = [{
'builder_name': 'foo_builder',
}]
fake_process = unittest_utils.FakeProcess(stdout=json.dumps(results))
self._subprocess_mock.return_value = fake_process
initial_builders = [
data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI,
False),
data_types.BuilderEntry('bar_builder', constants.BuilderTypes.CI,
False),
]
expected_builders = [
data_types.BuilderEntry('foo_builder', constants.BuilderTypes.CI, False)
]
filtered_builders = self._querier._FilterOutInactiveBuilders(
initial_builders, constants.BuilderTypes.CI)
self.assertEqual(filtered_builders, expected_builders)
class RunBigQueryCommandsForJsonOutputUnittest(unittest.TestCase):
def setUp(self) -> None:
self._popen_patcher = mock.patch.object(subprocess, 'Popen')
self._popen_mock = self._popen_patcher.start()
self.addCleanup(self._popen_patcher.stop)
self._querier = unittest_utils.CreateGenericQuerier()
def testJsonReturned(self) -> None:
"""Tests that valid JSON parsed from stdout is returned."""
query_output = [{'foo': 'bar'}]
self._popen_mock.return_value = unittest_utils.FakeProcess(
stdout=json.dumps(query_output))
result = self._querier._RunBigQueryCommandsForJsonOutput([''], {})
self.assertEqual(result, query_output)
self._popen_mock.assert_called_once()
def testJsonReturnedSplitQuery(self) -> None:
"""Tests that valid JSON is returned when a split query is used."""
def SideEffect(*_, **__) -> unittest_utils.FakeProcess:
SideEffect.call_count += 1
if SideEffect.call_count == 1:
return unittest_utils.FakeProcess(stdout=json.dumps([{'foo': 'bar'}]))
return unittest_utils.FakeProcess(stdout=json.dumps([{'bar': 'baz'}]))
SideEffect.call_count = 0
self._popen_mock.side_effect = SideEffect
result = self._querier._RunBigQueryCommandsForJsonOutput(['1', '2'], {})
self.assertEqual(len(result), 2)
self.assertIn({'foo': 'bar'}, result)
self.assertIn({'bar': 'baz'}, result)
def testExceptionRaisedOnFailure(self) -> None:
"""Tests that an exception is raised if the query fails."""
self._popen_mock.return_value = unittest_utils.FakeProcess(returncode=1)
with self.assertRaises(RuntimeError):
self._querier._RunBigQueryCommandsForJsonOutput([''], {})
def testRateLimitRetrySuccess(self) -> None:
"""Tests that rate limit errors result in retries."""
def SideEffect(*_, **__) -> unittest_utils.FakeProcess:
SideEffect.call_count += 1
if SideEffect.call_count == 1:
return unittest_utils.FakeProcess(
returncode=1, stdout='Exceeded rate limits for foo.')
return unittest_utils.FakeProcess(stdout='[]')
SideEffect.call_count = 0
self._popen_mock.side_effect = SideEffect
self._querier._RunBigQueryCommandsForJsonOutput([''], {})
self.assertEqual(self._popen_mock.call_count, 2)
def testRateLimitRetryFailure(self) -> None:
"""Tests that rate limit errors stop retrying after enough iterations."""
self._popen_mock.return_value = unittest_utils.FakeProcess(
returncode=1, stdout='Exceeded rate limits for foo.')
with self.assertRaises(RuntimeError):
self._querier._RunBigQueryCommandsForJsonOutput([''], {})
self.assertEqual(self._popen_mock.call_count, queries.MAX_QUERY_TRIES)
def testBatching(self) -> None:
"""Tests that batching preferences are properly forwarded."""
query_output = [{'foo': 'bar'}]
self._popen_mock.return_value = unittest_utils.FakeProcess(
stdout=json.dumps(query_output))
self._querier._RunBigQueryCommandsForJsonOutput([''], {})
self._popen_mock.assert_called_once()
args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list,
0)
cmd = args[0]
self.assertIn('--batch', cmd)
self._querier = unittest_utils.CreateGenericQuerier(use_batching=False)
self._popen_mock.reset_mock()
self._querier._RunBigQueryCommandsForJsonOutput([''], {})
self._popen_mock.assert_called_once()
args, _ = unittest_utils.GetArgsForMockCall(self._popen_mock.call_args_list,
0)
cmd = args[0]
self.assertNotIn('--batch', cmd)
class GenerateBigQueryCommandUnittest(unittest.TestCase):
def testNoParametersSpecified(self) -> None:
"""Tests that no parameters are added if none are specified."""
cmd = queries.GenerateBigQueryCommand('project', {})
for element in cmd:
self.assertFalse(element.startswith('--parameter'))
def testParameterAddition(self) -> None:
"""Tests that specified parameters are added appropriately."""
cmd = queries.GenerateBigQueryCommand('project', {
'': {
'string': 'string_value'
},
'INT64': {
'int': 1
}
})
self.assertIn('--parameter=string::string_value', cmd)
self.assertIn('--parameter=int:INT64:1', cmd)
def testBatchMode(self) -> None:
"""Tests that batch mode adds the necessary arg."""
cmd = queries.GenerateBigQueryCommand('project', {}, batch=True)
self.assertIn('--batch', cmd)
if __name__ == '__main__':
unittest.main(verbosity=2)