blob: d3ef35b9f05bf6027e34b79f945c8c008fac52d2 [file] [log] [blame]
# Copyright 2014 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import fnmatch
import hashlib
import logging
import os
import signal
try:
import _thread as thread
except ImportError:
import thread
import threading
from devil import base_error
from devil.android import crash_handler
from devil.android import device_errors
from devil.android.sdk import version_codes
from devil.android.tools import device_recovery
from devil.utils import signal_handler
from pylib.base import base_test_result
from pylib.base import test_collection
from pylib.base import test_exception
from pylib.base import test_run
from pylib.utils import device_dependencies
from pylib.local.device import local_device_environment
from lib.proto import exception_recorder
_SIGTERM_TEST_LOG = (
' Suite execution terminated, probably due to swarming timeout.\n'
' Your test may not have run.')
# If the percentage of failed test exceeds this max value, the script
# will try to recover devices before next try.
FAILED_TEST_PCT_MAX = 90
class TestsTerminated(Exception):
pass
class LocalDeviceTestRun(test_run.TestRun):
def __init__(self, env, test_instance):
super().__init__(env, test_instance)
# This is intended to be filled by a child class.
self._installed_packages = []
env.SetPreferredAbis(test_instance.GetPreferredAbis())
@local_device_environment.handle_shard_failures
def _RunTestsOnDevice(self, dev, tests, results, exit_now):
# This is performed here instead of during setup because restarting the
# device clears app compatibility flags, which will happen if a device
# needs to be recovered.
SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev)
consecutive_device_errors = 0
for test in tests:
if not test:
logging.warning('No tests in shard. Continuing.')
tests.test_completed()
continue
if exit_now.isSet():
thread.exit()
result = None
rerun = None
try:
result, rerun = crash_handler.RetryOnSystemCrash(
lambda d, t=test: self._RunTest(d, t),
device=dev)
consecutive_device_errors = 0
if isinstance(result, base_test_result.BaseTestResult):
results.AddResult(result)
elif isinstance(result, list):
results.AddResults(result)
else:
raise Exception(
'Unexpected result type: %s' % type(result).__name__)
except device_errors.CommandTimeoutError as e:
exception_recorder.register(e)
# Test timeouts don't count as device errors for the purpose
# of bad device detection.
consecutive_device_errors = 0
if isinstance(test, list):
result_log = ''
if len(test) > 1:
result_log = ('The test command timed out when running multiple '
'tests including this test. It does not '
'necessarily mean this specific test timed out.')
# Ensure instrumentation tests not batched at env level retries.
for t in test:
# |dict| type infers it's an instrumentation test.
if isinstance(t, dict) and t['annotations']:
t['annotations'].pop('Batch', None)
results.AddResults(
base_test_result.BaseTestResult(
self._GetUniqueTestName(t),
base_test_result.ResultType.TIMEOUT,
log=result_log) for t in test)
else:
results.AddResult(
base_test_result.BaseTestResult(
self._GetUniqueTestName(test),
base_test_result.ResultType.TIMEOUT))
except device_errors.DeviceUnreachableError as e:
exception_recorder.register(e)
# If the device is no longer reachable then terminate this
# _RunTestsOnDevice call.
raise
except base_error.BaseError as e:
exception_recorder.register(e)
# If we get a device error but believe the device is still
# reachable, attempt to continue using it.
if isinstance(tests, test_collection.TestCollection):
rerun = test
consecutive_device_errors += 1
if consecutive_device_errors >= 3:
# We believe the device is still reachable and may still be usable,
# but if it fails repeatedly, we shouldn't attempt to keep using
# it.
logging.error('Repeated failures on device %s. Abandoning.',
str(dev))
raise
logging.exception(
'Attempting to continue using device %s despite failure (%d/3).',
str(dev), consecutive_device_errors)
finally:
if isinstance(tests, test_collection.TestCollection):
if rerun:
tests.add(rerun)
tests.test_completed()
logging.info('Finished running tests on this device.')
#override
def RunTests(self, results, raw_logs_fh=None):
tests = self._GetTests()
total_test_count = len(FlattenTestList(tests))
exit_now = threading.Event()
def stop_tests(_signum, _frame):
logging.critical('Received SIGTERM. Stopping test execution.')
exit_now.set()
raise TestsTerminated()
try:
with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests):
self._env.ResetCurrentTry()
while self._env.current_try < self._env.max_tries and tests:
tries = self._env.current_try
flatten_tests = FlattenTestList(tests)
logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries)
if tries > 0 and self._env.recover_devices:
# The variable "tests" is reused to store the failed tests.
failed_test_pct = 100 * len(flatten_tests) // total_test_count
if failed_test_pct > FAILED_TEST_PCT_MAX:
logging.info(
'Attempting to recover devices as the percentage of failed '
'tests (%d%%) exceeds the threshold %d%%.', failed_test_pct,
FAILED_TEST_PCT_MAX)
self._RecoverDevices()
elif tries + 1 == self._env.max_tries:
logging.info(
'Attempting to recover devices prior to last test attempt.')
self._RecoverDevices()
logging.info(
'Will run %d tests, grouped into %d groups, on %d devices: %s',
len(flatten_tests), len(tests), len(self._env.devices),
', '.join(str(d) for d in self._env.devices))
for t in tests:
logging.debug(' %s', t)
try_results = base_test_result.TestRunResults()
test_names = (self._GetUniqueTestName(t) for t in flatten_tests)
try_results.AddResults(
base_test_result.BaseTestResult(
t, base_test_result.ResultType.NOTRUN)
for t in test_names if not t.endswith('*'))
# As soon as we know the names of the tests, we populate |results|.
# The tests in try_results will have their results updated by
# try_results.AddResult() as they are run.
results.append(try_results)
try:
if self._ShouldShardTestsForDevices():
tc = test_collection.TestCollection(
self._CreateShardsForDevices(tests))
self._env.parallel_devices.pMap(
self._RunTestsOnDevice,
tc, try_results, exit_now).pGet(None)
else:
self._env.parallel_devices.pMap(self._RunTestsOnDevice, tests,
try_results, exit_now).pGet(None)
except TestsTerminated:
for unknown_result in try_results.GetUnknown():
try_results.AddResult(
base_test_result.BaseTestResult(
unknown_result.GetName(),
base_test_result.ResultType.TIMEOUT,
log=_SIGTERM_TEST_LOG))
raise
self._env.IncrementCurrentTry()
tests = self._GetTestsToRetry(tests, try_results)
logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries)
if tests:
logging.info('%d failed tests remain.', len(tests))
else:
logging.info('All tests completed.')
except TestsTerminated:
pass
def _RecoverDevices(self):
self._env.parallel_devices.pMap(device_recovery.RecoverDevice, None)
def _GetTestsToRetry(self, tests, try_results):
"""Get the tests to retry.
Note "tests" can have groups of test. For gtest, we would like to add
the entire group to retry as they are PRE test group. For instrumentation,
we only keep the failed tests in that group.
"""
def is_failure_result(test_result):
if isinstance(test_result, list):
return any(is_failure_result(r) for r in test_result)
return (
test_result is None
or test_result.GetType() not in (
base_test_result.ResultType.PASS,
base_test_result.ResultType.SKIP))
def get_tests_to_retry(test_list, all_test_results):
failed_tests = []
for test in test_list:
if isinstance(test, list):
failed_test = get_tests_to_retry(test, all_test_results)
if failed_test:
failed_tests.append(
test if self._ShouldRetryFullGroup(test) else failed_test)
else:
name = self._GetUniqueTestName(test)
# When specifying a test filter, names can contain trailing wildcards.
# See local_device_gtest_run._ExtractTestsFromFilters()
if name.endswith('*'):
result = [
r for n, r in all_test_results.items()
if fnmatch.fnmatch(n, name)
]
else:
result = all_test_results.get(name)
if is_failure_result(result) and self._ShouldRetry(test, result):
failed_tests.append(test)
return failed_tests
all_test_results = {r.GetName(): r for r in try_results.GetAll()}
return get_tests_to_retry(tests, all_test_results)
def _ApplyExternalSharding(self, tests, shard_index, total_shards):
logging.info('Using external sharding settings. This is shard %d/%d',
shard_index, total_shards)
if total_shards < 0 or shard_index < 0 or total_shards <= shard_index:
raise test_exception.InvalidShardingSettings(shard_index, total_shards)
sharded_tests = []
grouped_tests = self._GroupTests(tests)
for test in grouped_tests:
test_name = self._GetUniqueTestName(test)
if self._DeterministicHash(test_name) % total_shards == shard_index:
sharded_tests.append(test)
return sharded_tests
def _DeterministicHash(self, test_name):
"""Return the deterministic hash for a test name, as an integer."""
# pylint: disable=no-self-use
assert isinstance(test_name, str), 'Expecting a string.'
hash_bytes = hashlib.sha256(test_name.encode('utf-8')).digest()
# To speed thing up, only take the last 3 bytes
return int.from_bytes(hash_bytes[-3:], byteorder='big')
# Sort by hash so we don't put all tests in a slow suite in the same shard.
def _SortTests(self, tests):
return sorted(
tests,
key=lambda t: self._DeterministicHash(self._GetUniqueTestName(t)))
def _CreateShardsForDevices(self, tests):
raise NotImplementedError
def _GetUniqueTestName(self, test):
# pylint: disable=no-self-use
return test
def _ShouldRetry(self, test, result):
# pylint: disable=no-self-use,unused-argument
return True
def _ShouldRetryFullGroup(self, test_group):
"""Whether should retry the full group if the group has test failure."""
# pylint: disable=no-self-use,unused-argument
return False
#override
def GetTestsForListing(self):
ret = self._GetTests()
ret = FlattenTestList(ret)
ret.sort()
return ret
def GetDataDepsForListing(self):
device_root = '$CHROMIUM_TESTS_ROOT'
host_device_tuples = self._test_instance.GetDataDependencies()
host_device_tuples = device_dependencies.SubstituteDeviceRoot(
host_device_tuples, device_root)
host_device_tuples = device_dependencies.ExpandDataDependencies(
host_device_tuples)
return sorted(f'{d} <- {os.path.relpath(h)}' for h, d in host_device_tuples)
def _GetTests(self):
"""Get the tests to run on the assigned shard index.
Shall be implemented by the subclasses.
"""
raise NotImplementedError
def _GroupTests(self, tests):
"""Group tests by tests that should run in the same test invocation.
Can be override by subclasses if needed. Examples are:
- gtest: PRE_ tests
- instrumentation test: unit tests and batched tests.
Args:
tests: a flatten list of tests. The element can be a string for gtest,
or a dict for instrumentation tests.
Return a list whose element can be a test, or a list of tests.
"""
# pylint: disable=no-self-use
return tests
def _RunTest(self, device, test):
raise NotImplementedError
def _ShouldShardTestsForDevices(self):
raise NotImplementedError
def FlattenTestList(values):
"""Returns a list with all nested lists (shard groupings) expanded."""
ret = []
for v in values:
if isinstance(v, list):
ret += v
else:
ret.append(v)
return ret
def SetAppCompatibilityFlagsIfNecessary(packages, device):
"""Sets app compatibility flags on the given packages and device.
Args:
packages: A list of strings containing package names to apply flags to.
device: A DeviceUtils instance to apply the flags on.
"""
def set_flag_for_packages(flag, enable):
enable_str = 'enable' if enable else 'disable'
for p in packages:
cmd = ['am', 'compat', enable_str, flag, p]
device.RunShellCommand(cmd)
sdk_version = device.build_version_sdk
if sdk_version >= version_codes.R:
# These flags are necessary to use the legacy storage permissions on R+.
# See crbug.com/1173699 for more information.
set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False)
set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False)
class NoTestsError(Exception):
"""Error for when no tests are found."""