| # Copyright 2014 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import fnmatch |
| import hashlib |
| import logging |
| import os |
| import signal |
| try: |
| import _thread as thread |
| except ImportError: |
| import thread |
| import threading |
| |
| from devil import base_error |
| from devil.android import crash_handler |
| from devil.android import device_errors |
| from devil.android.sdk import version_codes |
| from devil.android.tools import device_recovery |
| from devil.utils import signal_handler |
| from pylib.base import base_test_result |
| from pylib.base import test_collection |
| from pylib.base import test_exception |
| from pylib.base import test_run |
| from pylib.utils import device_dependencies |
| from pylib.local.device import local_device_environment |
| |
| from lib.proto import exception_recorder |
| |
| |
| _SIGTERM_TEST_LOG = ( |
| ' Suite execution terminated, probably due to swarming timeout.\n' |
| ' Your test may not have run.') |
| |
| # If the percentage of failed test exceeds this max value, the script |
| # will try to recover devices before next try. |
| FAILED_TEST_PCT_MAX = 90 |
| |
| class TestsTerminated(Exception): |
| pass |
| |
| |
| class LocalDeviceTestRun(test_run.TestRun): |
| |
| def __init__(self, env, test_instance): |
| super().__init__(env, test_instance) |
| # This is intended to be filled by a child class. |
| self._installed_packages = [] |
| env.SetPreferredAbis(test_instance.GetPreferredAbis()) |
| |
| @local_device_environment.handle_shard_failures |
| def _RunTestsOnDevice(self, dev, tests, results, exit_now): |
| # This is performed here instead of during setup because restarting the |
| # device clears app compatibility flags, which will happen if a device |
| # needs to be recovered. |
| SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev) |
| consecutive_device_errors = 0 |
| for test in tests: |
| if not test: |
| logging.warning('No tests in shard. Continuing.') |
| tests.test_completed() |
| continue |
| if exit_now.isSet(): |
| thread.exit() |
| |
| result = None |
| rerun = None |
| try: |
| result, rerun = crash_handler.RetryOnSystemCrash( |
| lambda d, t=test: self._RunTest(d, t), |
| device=dev) |
| consecutive_device_errors = 0 |
| if isinstance(result, base_test_result.BaseTestResult): |
| results.AddResult(result) |
| elif isinstance(result, list): |
| results.AddResults(result) |
| else: |
| raise Exception( |
| 'Unexpected result type: %s' % type(result).__name__) |
| except device_errors.CommandTimeoutError as e: |
| exception_recorder.register(e) |
| # Test timeouts don't count as device errors for the purpose |
| # of bad device detection. |
| consecutive_device_errors = 0 |
| |
| if isinstance(test, list): |
| result_log = '' |
| if len(test) > 1: |
| result_log = ('The test command timed out when running multiple ' |
| 'tests including this test. It does not ' |
| 'necessarily mean this specific test timed out.') |
| # Ensure instrumentation tests not batched at env level retries. |
| for t in test: |
| # |dict| type infers it's an instrumentation test. |
| if isinstance(t, dict) and t['annotations']: |
| t['annotations'].pop('Batch', None) |
| |
| results.AddResults( |
| base_test_result.BaseTestResult( |
| self._GetUniqueTestName(t), |
| base_test_result.ResultType.TIMEOUT, |
| log=result_log) for t in test) |
| else: |
| results.AddResult( |
| base_test_result.BaseTestResult( |
| self._GetUniqueTestName(test), |
| base_test_result.ResultType.TIMEOUT)) |
| except device_errors.DeviceUnreachableError as e: |
| exception_recorder.register(e) |
| # If the device is no longer reachable then terminate this |
| # _RunTestsOnDevice call. |
| raise |
| except base_error.BaseError as e: |
| exception_recorder.register(e) |
| # If we get a device error but believe the device is still |
| # reachable, attempt to continue using it. |
| if isinstance(tests, test_collection.TestCollection): |
| rerun = test |
| |
| consecutive_device_errors += 1 |
| if consecutive_device_errors >= 3: |
| # We believe the device is still reachable and may still be usable, |
| # but if it fails repeatedly, we shouldn't attempt to keep using |
| # it. |
| logging.error('Repeated failures on device %s. Abandoning.', |
| str(dev)) |
| raise |
| |
| logging.exception( |
| 'Attempting to continue using device %s despite failure (%d/3).', |
| str(dev), consecutive_device_errors) |
| |
| finally: |
| if isinstance(tests, test_collection.TestCollection): |
| if rerun: |
| tests.add(rerun) |
| tests.test_completed() |
| |
| logging.info('Finished running tests on this device.') |
| |
| #override |
| def RunTests(self, results, raw_logs_fh=None): |
| tests = self._GetTests() |
| total_test_count = len(FlattenTestList(tests)) |
| |
| exit_now = threading.Event() |
| |
| def stop_tests(_signum, _frame): |
| logging.critical('Received SIGTERM. Stopping test execution.') |
| exit_now.set() |
| raise TestsTerminated() |
| |
| try: |
| with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests): |
| self._env.ResetCurrentTry() |
| while self._env.current_try < self._env.max_tries and tests: |
| tries = self._env.current_try |
| flatten_tests = FlattenTestList(tests) |
| logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries) |
| if tries > 0 and self._env.recover_devices: |
| # The variable "tests" is reused to store the failed tests. |
| failed_test_pct = 100 * len(flatten_tests) // total_test_count |
| if failed_test_pct > FAILED_TEST_PCT_MAX: |
| logging.info( |
| 'Attempting to recover devices as the percentage of failed ' |
| 'tests (%d%%) exceeds the threshold %d%%.', failed_test_pct, |
| FAILED_TEST_PCT_MAX) |
| self._RecoverDevices() |
| elif tries + 1 == self._env.max_tries: |
| logging.info( |
| 'Attempting to recover devices prior to last test attempt.') |
| self._RecoverDevices() |
| logging.info( |
| 'Will run %d tests, grouped into %d groups, on %d devices: %s', |
| len(flatten_tests), len(tests), len(self._env.devices), |
| ', '.join(str(d) for d in self._env.devices)) |
| for t in tests: |
| logging.debug(' %s', t) |
| |
| try_results = base_test_result.TestRunResults() |
| test_names = (self._GetUniqueTestName(t) for t in flatten_tests) |
| try_results.AddResults( |
| base_test_result.BaseTestResult( |
| t, base_test_result.ResultType.NOTRUN) |
| for t in test_names if not t.endswith('*')) |
| |
| # As soon as we know the names of the tests, we populate |results|. |
| # The tests in try_results will have their results updated by |
| # try_results.AddResult() as they are run. |
| results.append(try_results) |
| |
| try: |
| if self._ShouldShardTestsForDevices(): |
| tc = test_collection.TestCollection( |
| self._CreateShardsForDevices(tests)) |
| self._env.parallel_devices.pMap( |
| self._RunTestsOnDevice, |
| tc, try_results, exit_now).pGet(None) |
| else: |
| self._env.parallel_devices.pMap(self._RunTestsOnDevice, tests, |
| try_results, exit_now).pGet(None) |
| except TestsTerminated: |
| for unknown_result in try_results.GetUnknown(): |
| try_results.AddResult( |
| base_test_result.BaseTestResult( |
| unknown_result.GetName(), |
| base_test_result.ResultType.TIMEOUT, |
| log=_SIGTERM_TEST_LOG)) |
| raise |
| |
| self._env.IncrementCurrentTry() |
| tests = self._GetTestsToRetry(tests, try_results) |
| |
| logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries) |
| if tests: |
| logging.info('%d failed tests remain.', len(tests)) |
| else: |
| logging.info('All tests completed.') |
| except TestsTerminated: |
| pass |
| |
| def _RecoverDevices(self): |
| self._env.parallel_devices.pMap(device_recovery.RecoverDevice, None) |
| |
| def _GetTestsToRetry(self, tests, try_results): |
| """Get the tests to retry. |
| |
| Note "tests" can have groups of test. For gtest, we would like to add |
| the entire group to retry as they are PRE test group. For instrumentation, |
| we only keep the failed tests in that group. |
| """ |
| |
| def is_failure_result(test_result): |
| if isinstance(test_result, list): |
| return any(is_failure_result(r) for r in test_result) |
| return ( |
| test_result is None |
| or test_result.GetType() not in ( |
| base_test_result.ResultType.PASS, |
| base_test_result.ResultType.SKIP)) |
| |
| def get_tests_to_retry(test_list, all_test_results): |
| failed_tests = [] |
| for test in test_list: |
| if isinstance(test, list): |
| failed_test = get_tests_to_retry(test, all_test_results) |
| if failed_test: |
| failed_tests.append( |
| test if self._ShouldRetryFullGroup(test) else failed_test) |
| else: |
| name = self._GetUniqueTestName(test) |
| # When specifying a test filter, names can contain trailing wildcards. |
| # See local_device_gtest_run._ExtractTestsFromFilters() |
| if name.endswith('*'): |
| result = [ |
| r for n, r in all_test_results.items() |
| if fnmatch.fnmatch(n, name) |
| ] |
| else: |
| result = all_test_results.get(name) |
| if is_failure_result(result) and self._ShouldRetry(test, result): |
| failed_tests.append(test) |
| return failed_tests |
| |
| all_test_results = {r.GetName(): r for r in try_results.GetAll()} |
| return get_tests_to_retry(tests, all_test_results) |
| |
| |
| def _ApplyExternalSharding(self, tests, shard_index, total_shards): |
| logging.info('Using external sharding settings. This is shard %d/%d', |
| shard_index, total_shards) |
| |
| if total_shards < 0 or shard_index < 0 or total_shards <= shard_index: |
| raise test_exception.InvalidShardingSettings(shard_index, total_shards) |
| |
| sharded_tests = [] |
| |
| grouped_tests = self._GroupTests(tests) |
| for test in grouped_tests: |
| test_name = self._GetUniqueTestName(test) |
| if self._DeterministicHash(test_name) % total_shards == shard_index: |
| sharded_tests.append(test) |
| |
| return sharded_tests |
| |
| def _DeterministicHash(self, test_name): |
| """Return the deterministic hash for a test name, as an integer.""" |
| # pylint: disable=no-self-use |
| assert isinstance(test_name, str), 'Expecting a string.' |
| hash_bytes = hashlib.sha256(test_name.encode('utf-8')).digest() |
| # To speed thing up, only take the last 3 bytes |
| return int.from_bytes(hash_bytes[-3:], byteorder='big') |
| |
| # Sort by hash so we don't put all tests in a slow suite in the same shard. |
| def _SortTests(self, tests): |
| return sorted( |
| tests, |
| key=lambda t: self._DeterministicHash(self._GetUniqueTestName(t))) |
| |
| def _CreateShardsForDevices(self, tests): |
| raise NotImplementedError |
| |
| def _GetUniqueTestName(self, test): |
| # pylint: disable=no-self-use |
| return test |
| |
| def _ShouldRetry(self, test, result): |
| # pylint: disable=no-self-use,unused-argument |
| return True |
| |
| def _ShouldRetryFullGroup(self, test_group): |
| """Whether should retry the full group if the group has test failure.""" |
| # pylint: disable=no-self-use,unused-argument |
| return False |
| |
| #override |
| def GetTestsForListing(self): |
| ret = self._GetTests() |
| ret = FlattenTestList(ret) |
| ret.sort() |
| return ret |
| |
| def GetDataDepsForListing(self): |
| device_root = '$CHROMIUM_TESTS_ROOT' |
| host_device_tuples = self._test_instance.GetDataDependencies() |
| host_device_tuples = device_dependencies.SubstituteDeviceRoot( |
| host_device_tuples, device_root) |
| host_device_tuples = device_dependencies.ExpandDataDependencies( |
| host_device_tuples) |
| |
| return sorted(f'{d} <- {os.path.relpath(h)}' for h, d in host_device_tuples) |
| |
| def _GetTests(self): |
| """Get the tests to run on the assigned shard index. |
| |
| Shall be implemented by the subclasses. |
| """ |
| raise NotImplementedError |
| |
| def _GroupTests(self, tests): |
| """Group tests by tests that should run in the same test invocation. |
| |
| Can be override by subclasses if needed. Examples are: |
| - gtest: PRE_ tests |
| - instrumentation test: unit tests and batched tests. |
| |
| Args: |
| tests: a flatten list of tests. The element can be a string for gtest, |
| or a dict for instrumentation tests. |
| |
| Return a list whose element can be a test, or a list of tests. |
| """ |
| # pylint: disable=no-self-use |
| return tests |
| |
| def _RunTest(self, device, test): |
| raise NotImplementedError |
| |
| def _ShouldShardTestsForDevices(self): |
| raise NotImplementedError |
| |
| |
| def FlattenTestList(values): |
| """Returns a list with all nested lists (shard groupings) expanded.""" |
| ret = [] |
| for v in values: |
| if isinstance(v, list): |
| ret += v |
| else: |
| ret.append(v) |
| return ret |
| |
| |
| def SetAppCompatibilityFlagsIfNecessary(packages, device): |
| """Sets app compatibility flags on the given packages and device. |
| |
| Args: |
| packages: A list of strings containing package names to apply flags to. |
| device: A DeviceUtils instance to apply the flags on. |
| """ |
| |
| def set_flag_for_packages(flag, enable): |
| enable_str = 'enable' if enable else 'disable' |
| for p in packages: |
| cmd = ['am', 'compat', enable_str, flag, p] |
| device.RunShellCommand(cmd) |
| |
| sdk_version = device.build_version_sdk |
| if sdk_version >= version_codes.R: |
| # These flags are necessary to use the legacy storage permissions on R+. |
| # See crbug.com/1173699 for more information. |
| set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False) |
| set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False) |
| |
| |
| class NoTestsError(Exception): |
| """Error for when no tests are found.""" |