blob: 970d155cc09236448f7ed62118d014b01f55a089 [file] [log] [blame]
# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helper methods for unittests."""
from __future__ import print_function
from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Type
import unittest.mock as mock
from unexpected_passes_common import builders
from unexpected_passes_common import expectations
from unexpected_passes_common import data_types
from unexpected_passes_common import queries as queries_module
def CreateStatsWithPassFails(passes: int, fails: int) -> data_types.BuildStats:
stats = data_types.BuildStats()
for _ in range(passes):
stats.AddPassedBuild(frozenset())
for i in range(fails):
stats.AddFailedBuild('build_id%d' % i, frozenset())
return stats
def _CreateSimpleQueries(clauses: Iterable[str]) -> List[str]:
queries = []
# Not actually a valid query since we don't specify the table, but it works.
for c in clauses:
queries.append("""\
SELECT *
WHERE %s
""" % c)
return queries
class SimpleFixedQueryGenerator(queries_module.FixedQueryGenerator):
def GetQueries(self) -> List[str]:
return _CreateSimpleQueries(self.GetClauses())
class SimpleSplitQueryGenerator(queries_module.SplitQueryGenerator):
def GetQueries(self) -> List[str]:
return _CreateSimpleQueries(self.GetClauses())
class SimpleBigQueryQuerier(queries_module.BigQueryQuerier):
def _GetQueryGeneratorForBuilder(self, builder: data_types.BuilderEntry
) -> queries_module.BaseQueryGenerator:
if not self._large_query_mode:
return SimpleFixedQueryGenerator(builder, 'AND True')
return SimpleSplitQueryGenerator(builder, ['test_id'], 200)
def _GetRelevantExpectationFilesForQueryResult(self, _) -> None:
return None
def _StripPrefixFromTestId(self, test_id: str) -> str:
return test_id.split('.')[-1]
def _GetActiveBuilderQuery(self, _, __) -> str:
return ''
def CreateGenericQuerier(
suite: Optional[str] = None,
project: Optional[str] = None,
num_samples: Optional[int] = None,
large_query_mode: Optional[bool] = None,
num_jobs: Optional[int] = None,
use_batching: bool = True,
cls: Optional[Type[queries_module.BigQueryQuerier]] = None
) -> queries_module.BigQueryQuerier:
suite = suite or 'pixel'
project = project or 'project'
num_samples = num_samples or 5
large_query_mode = large_query_mode or False
cls = cls or SimpleBigQueryQuerier
return cls(suite, project, num_samples, large_query_mode, num_jobs,
use_batching)
def GetArgsForMockCall(call_args_list: List[tuple],
call_number: int) -> Tuple[tuple, dict]:
"""Helper to more sanely get call args from a mocked method.
Args:
call_args_list: The call_args_list member from the mock in question.
call_number: The call number to pull args from, starting at 0 for the first
call to the method.
Returns:
A tuple (args, kwargs). |args| is a list of arguments passed to the method.
|kwargs| is a dict containing the keyword arguments padded to the method.
"""
args = call_args_list[call_number][0]
kwargs = call_args_list[call_number][1]
return args, kwargs
class FakePool():
"""A fake pathos.pools.ProcessPool instance.
Real pools don't like being given MagicMocks, so this allows testing of
code that uses pathos.pools.ProcessPool by returning this from
multiprocessing_utils.GetProcessPool().
"""
def map(self, f: Callable[[Any], Any], inputs: Iterable[Any]) -> List[Any]:
retval = []
for i in inputs:
retval.append(f(i))
return retval
def apipe(self, f: Callable[[Any], Any],
inputs: Iterable[Any]) -> 'FakeAsyncResult':
return FakeAsyncResult(f(inputs))
def close(self) -> None:
pass
def join(self) -> None:
pass
class FakeAsyncResult():
"""A fake AsyncResult like the one from multiprocessing or pathos."""
def __init__(self, result: Any):
self._result = result
def ready(self) -> bool:
return True
def get(self) -> Any:
return self._result
class FakeProcess():
"""A fake subprocess Process object."""
def __init__(self,
returncode: Optional[int] = None,
stdout: Optional[str] = None,
stderr: Optional[str] = None,
finish: bool = True):
if finish:
self.returncode = returncode or 0
else:
self.returncode = None
self.stdout = stdout or ''
self.stderr = stderr or ''
self.finish = finish
def communicate(self, _) -> Tuple[str, str]:
return self.stdout, self.stderr
def terminate(self) -> None:
if self.finish:
raise OSError('Tried to terminate a finished process')
class GenericBuilders(builders.Builders):
#pylint: disable=useless-super-delegation
def __init__(self,
suite: Optional[str] = None,
include_internal_builders: bool = False):
super().__init__(suite, include_internal_builders)
#pylint: enable=useless-super-delegation
def _BuilderRunsTestOfInterest(self, _test_map) -> bool:
return True
def GetIsolateNames(self) -> dict:
return {}
def GetFakeCiBuilders(self) -> dict:
return {}
def GetNonChromiumBuilders(self) -> dict:
return {}
def RegisterGenericBuildersImplementation() -> None:
builders.RegisterInstance(GenericBuilders())
class GenericExpectations(expectations.Expectations):
def GetExpectationFilepaths(self) -> list:
return []
def _GetExpectationFileTagHeader(self, _) -> str:
return """\
# tags: [ linux mac win ]
# tags: [ amd intel nvidia ]
# results: [ Failure RetryOnFailure Skip Pass ]
"""
def _GetKnownTags(self) -> Set[str]:
return set(['linux', 'mac', 'win', 'amd', 'intel', 'nvidia'])
def CreateGenericExpectations() -> GenericExpectations:
return GenericExpectations()
def RegisterGenericExpectationsImplementation() -> None:
expectations.RegisterInstance(CreateGenericExpectations())