| # Copyright 2020 The Chromium Authors. All rights reserved. | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 | """Helper methods for unittests.""" | 
 |  | 
 | from __future__ import print_function | 
 |  | 
 | from typing import Any, Callable, Iterable, List, Optional, Tuple, Type | 
 | import unittest.mock as mock | 
 |  | 
 | from unexpected_passes_common import builders | 
 | from unexpected_passes_common import expectations | 
 | from unexpected_passes_common import data_types | 
 | from unexpected_passes_common import queries as queries_module | 
 |  | 
 |  | 
 | def CreateStatsWithPassFails(passes: int, fails: int) -> data_types.BuildStats: | 
 |   stats = data_types.BuildStats() | 
 |   for _ in range(passes): | 
 |     stats.AddPassedBuild() | 
 |   for i in range(fails): | 
 |     stats.AddFailedBuild('build_id%d' % i) | 
 |   return stats | 
 |  | 
 |  | 
 | def _CreateSimpleQueries(clauses: Iterable[str]) -> List[str]: | 
 |   queries = [] | 
 |   # Not actually a valid query since we don't specify the table, but it works. | 
 |   for c in clauses: | 
 |     queries.append("""\ | 
 | SELECT * | 
 | WHERE %s | 
 | """ % c) | 
 |   return queries | 
 |  | 
 |  | 
 | class SimpleFixedQueryGenerator(queries_module.FixedQueryGenerator): | 
 |   def GetQueries(self) -> List[str]: | 
 |     return _CreateSimpleQueries(self.GetClauses()) | 
 |  | 
 |  | 
 | class SimpleSplitQueryGenerator(queries_module.SplitQueryGenerator): | 
 |   def GetQueries(self) -> List[str]: | 
 |     return _CreateSimpleQueries(self.GetClauses()) | 
 |  | 
 |  | 
 | class SimpleBigQueryQuerier(queries_module.BigQueryQuerier): | 
 |   def _GetQueryGeneratorForBuilder(self, builder: data_types.BuilderEntry | 
 |                                    ) -> queries_module.BaseQueryGenerator: | 
 |     if not self._large_query_mode: | 
 |       return SimpleFixedQueryGenerator(builder, 'AND True') | 
 |     return SimpleSplitQueryGenerator(builder, ['test_id'], 200) | 
 |  | 
 |   def _GetRelevantExpectationFilesForQueryResult(self, _) -> None: | 
 |     return None | 
 |  | 
 |   def _StripPrefixFromTestId(self, test_id: str) -> str: | 
 |     return test_id.split('.')[-1] | 
 |  | 
 |   def _GetActiveBuilderQuery(self, _, __) -> str: | 
 |     return '' | 
 |  | 
 |  | 
 | def CreateGenericQuerier( | 
 |     suite: Optional[str] = None, | 
 |     project: Optional[str] = None, | 
 |     num_samples: Optional[int] = None, | 
 |     large_query_mode: Optional[bool] = None, | 
 |     cls: Optional[Type[queries_module.BigQueryQuerier]] = None | 
 | ) -> queries_module.BigQueryQuerier: | 
 |   suite = suite or 'pixel' | 
 |   project = project or 'project' | 
 |   num_samples = num_samples or 5 | 
 |   large_query_mode = large_query_mode or False | 
 |   cls = cls or SimpleBigQueryQuerier | 
 |   return cls(suite, project, num_samples, large_query_mode) | 
 |  | 
 |  | 
 | def GetArgsForMockCall(call_args_list: List[tuple], | 
 |                        call_number: int) -> Tuple[tuple, dict]: | 
 |   """Helper to more sanely get call args from a mocked method. | 
 |  | 
 |   Args: | 
 |     call_args_list: The call_args_list member from the mock in question. | 
 |     call_number: The call number to pull args from, starting at 0 for the first | 
 |         call to the method. | 
 |  | 
 |   Returns: | 
 |     A tuple (args, kwargs). |args| is a list of arguments passed to the method. | 
 |     |kwargs| is a dict containing the keyword arguments padded to the method. | 
 |   """ | 
 |   args = call_args_list[call_number][0] | 
 |   kwargs = call_args_list[call_number][1] | 
 |   return args, kwargs | 
 |  | 
 |  | 
 | class FakePool(): | 
 |   """A fake pathos.pools.ProcessPool instance. | 
 |  | 
 |   Real pools don't like being given MagicMocks, so this allows testing of | 
 |   code that uses pathos.pools.ProcessPool by returning this from | 
 |   multiprocessing_utils.GetProcessPool(). | 
 |   """ | 
 |  | 
 |   def map(self, f: Callable[[Any], Any], inputs: Iterable[Any]) -> List[Any]: | 
 |     retval = [] | 
 |     for i in inputs: | 
 |       retval.append(f(i)) | 
 |     return retval | 
 |  | 
 |   def apipe(self, f: Callable[[Any], Any], | 
 |             inputs: Iterable[Any]) -> 'FakeAsyncResult': | 
 |     return FakeAsyncResult(f(inputs)) | 
 |  | 
 |  | 
 | class FakeAsyncResult(): | 
 |   """A fake AsyncResult like the one from multiprocessing or pathos.""" | 
 |  | 
 |   def __init__(self, result: Any): | 
 |     self._result = result | 
 |  | 
 |   def ready(self) -> bool: | 
 |     return True | 
 |  | 
 |   def get(self) -> Any: | 
 |     return self._result | 
 |  | 
 |  | 
 | class FakeProcess(): | 
 |   """A fake subprocess Process object.""" | 
 |  | 
 |   def __init__(self, | 
 |                returncode: Optional[int] = None, | 
 |                stdout: Optional[str] = None, | 
 |                stderr: Optional[str] = None, | 
 |                finish: bool = True): | 
 |     if finish: | 
 |       self.returncode = returncode or 0 | 
 |     else: | 
 |       self.returncode = None | 
 |     self.stdout = stdout or '' | 
 |     self.stderr = stderr or '' | 
 |     self.finish = finish | 
 |  | 
 |   def communicate(self, _) -> Tuple[str, str]: | 
 |     return self.stdout, self.stderr | 
 |  | 
 |   def terminate(self) -> None: | 
 |     if self.finish: | 
 |       raise OSError('Tried to terminate a finished process') | 
 |  | 
 |  | 
 | class GenericBuilders(builders.Builders): | 
 |   #pylint: disable=useless-super-delegation | 
 |   def __init__(self, | 
 |                suite: Optional[str] = None, | 
 |                include_internal_builders: bool = False): | 
 |     super().__init__(suite, include_internal_builders) | 
 |   #pylint: enable=useless-super-delegation | 
 |  | 
 |   def _BuilderRunsTestOfInterest(self, _test_map) -> bool: | 
 |     return True | 
 |  | 
 |   def GetIsolateNames(self) -> dict: | 
 |     return {} | 
 |  | 
 |   def GetFakeCiBuilders(self) -> dict: | 
 |     return {} | 
 |  | 
 |   def GetNonChromiumBuilders(self) -> dict: | 
 |     return {} | 
 |  | 
 |  | 
 | def RegisterGenericBuildersImplementation() -> None: | 
 |   builders.RegisterInstance(GenericBuilders()) | 
 |  | 
 |  | 
 | class GenericExpectations(expectations.Expectations): | 
 |   def GetExpectationFilepaths(self) -> list: | 
 |     return [] | 
 |  | 
 |   def _GetExpectationFileTagHeader(self, _) -> str: | 
 |     return """\ | 
 | # tags: [ linux mac win ] | 
 | # results: [ Failure RetryOnFailure Skip Pass ] | 
 | """ | 
 |  | 
 |  | 
 | def CreateGenericExpectations() -> GenericExpectations: | 
 |   return GenericExpectations() |