blob: b17c7e1ec05fae9e2d33ceb82928f97a8cce9cf2 [file] [log] [blame]
# Copyright 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Classes and Methods related to invoking a test."""
from __future__ import print_function
import cPickle as pickle
import logging
import os
import signal
import sys
import tempfile
import threading
import time
import yaml
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test import device_data
from cros.factory.test.env import paths
from cros.factory.test.event import Event
from cros.factory.test import session
from cros.factory.test import state
from cros.factory.test.state import TestState
from cros.factory.test.test_lists import manager
from cros.factory.test.test_lists import test_object
from cros.factory.test.utils import pytest_utils
from cros.factory.testlog import testlog
from cros.factory.testlog import testlog_utils
from cros.factory.utils import file_utils
from cros.factory.utils import process_utils
from cros.factory.utils.service_utils import ServiceManager
from cros.factory.utils.string_utils import DecodeUTF8
from cros.factory.utils import time_utils
from cros.factory.external import syslog
# Number of bytes to include from the log of a failed test.
ERROR_LOG_TAIL_LENGTH = 8 * 1024
class InvocationError(Exception):
"""Invocation error."""
pass
def ResolveTestArgs(goofy, test, test_list_id, dut_options):
"""Resolves an argument dictionary.
The dargs will be passed to test_list.ResolveTestArgs(), which will
evaluate values start with 'eval! ' and 'i18n! '.
Args:
goofy: a goofy instance.
test: a FactoryTest object whose dargs will be resolved.
test_list_id: ID of the test list the `test` object belongs to.
dut_options: DUT options for current test.
Returns:
Resolved dargs dictionary object.
"""
dargs = test.dargs
locals_ = test.locals_
try:
test_list = goofy.GetTestList(test_list_id)
except Exception:
logging.exception('Goofy does not have test list `%s`', test_list_id)
raise
dut_options = dut_options or {}
dut = device_utils.CreateDUTInterface(**dut_options)
# TODO(stimim): might need to override station options?
station = device_utils.CreateStationInterface()
return test_list.ResolveTestArgs(
dargs, dut=dut, station=station, locals_=locals_)
class PytestInfo(object):
"""A class to hold all the data needed when invoking a test.
Properties:
test_list: The test list name or ID to get the factory test info from.
path: The path of the test in the test list.
pytest_name: The name of the factory test to run.
args: Arguments passing down to the factory test.
results_path: The path to the result file.
dut_options: The options to override default DUT target.
"""
def __init__(self, test_list, path, pytest_name, args, results_path,
dut_options=None):
self.test_list = test_list
self.path = path
self.pytest_name = pytest_name
self.args = args
self.results_path = results_path
self.dut_options = dut_options or {}
def ReadTestList(self):
"""Reads and returns the test list."""
return manager.Manager().GetTestListByID(self.test_list)
class Metadata(dict):
"""Placeholder for the metadata of a pytest execution.
It also provides methods to save/load the data to/from the storage.
Properties:
is_new: `True` if the instance is create from scratch; `False` if it
is loaded from a file.
"""
REQUIRED_FIELDS = ['path', 'dargs', 'invocation', 'label', 'init_time',
'start_time']
def __init__(self, is_new, file_path):
super(Metadata, self).__init__()
self.is_new = is_new
self._file_path = file_path
@classmethod
def LoadFile(cls, pytest_name, file_path):
"""Create an instance by loading the metadata from the file.
Args:
pytest_name: Name of the target pytest.
file_path: Path to the metadata file.
Returns:
An instance of `Metadata`.
Raises:
`Exception` if the file content is invalid.
"""
def _Validate(metadata):
for field in cls.REQUIRED_FIELDS:
if field not in metadata:
raise Exception('metadata missing field %s' % field)
if pytest_name and 'pytest_name' not in metadata:
raise Exception('metadata missing field pytest_name')
with open(file_path, 'r') as f:
metadata = yaml.load(f)
_Validate(metadata)
inst = cls(False, file_path)
inst.Update(**metadata)
return inst
@classmethod
def CreateNew(cls, file_path, fields):
"""Create an instance with some initial fields.
Args:
file_path: Path to the metadata file.
fields: A dict of initial values.
Returns:
An instance of `Metadata`.
"""
inst = cls(True, file_path)
inst.UpdateAndFlush(**fields)
return inst
def Update(self, **kwargs):
self.update(kwargs)
def Flush(self):
with file_utils.AtomicWrite(self._file_path) as f:
yaml.dump(dict(self), f, default_flow_style=False)
def UpdateAndFlush(self, **kwargs):
self.Update(**kwargs)
self.Flush()
class TestInvocation(object):
"""State for an active test.
Properties:
update_state_on_completion: State for Goofy to update on
completion; Goofy will call test.update_state(
**update_state_on_completion). So update_state_on_completion
will have at least status and error_msg properties to update
the test state.
aborted_reason: A reason that the test was aborted (e.g.,
'Stopped by operator' or 'Factory update')
"""
def __init__(self, goofy, test, on_completion=None, on_test_failure=None):
"""Constructor.
Args:
goofy: The controlling Goofy object.
test: The FactoryTest object to test.
on_completion: Callback to invoke in the goofy event queue
on completion.
on_test_failure: Callback to invoke in the goofy event queue
when the test fails.
"""
self.goofy = goofy
self.test = test
self.thread = threading.Thread(
target=self._Run, name='TestInvocation-%s' % self.test.path)
self.thread.daemon = True
self.count = None
self.update_state_on_completion = {}
self._on_completion = on_completion
self._on_test_failure = on_test_failure
post_shutdown_state = state.DataShelfGetValue(
state.KEY_POST_SHUTDOWN % self.test.path)
self._is_post_shutdown = bool(post_shutdown_state)
self.uuid = self._ResolveUUID(post_shutdown_state)
self._env_additions = {session.ENV_TEST_INVOCATION: self.uuid,
session.ENV_TEST_PATH: self.test.path}
self.output_dir = self._SetupOutputDir()
self._metadata = self._ResolveMetadata()
self._log_path = os.path.join(self.output_dir, 'log')
self._dut_options = self._ResolveDUTOptions()
self.dut = device_utils.CreateDUTInterface(**self._dut_options)
self._lock = threading.Lock()
# The following properties are guarded by the lock.
self._aborted = False
self._aborted_reason = None
self._completed = False
self._process = None
@property
def resolved_dargs(self):
return self._metadata.get('dargs')
def __repr__(self):
return 'TestInvocation(_aborted=%s, _completed=%s)' % (
self._aborted, self._completed)
def Start(self):
"""Starts the test threads."""
self.thread.start()
def AbortAndJoin(self, reason=None):
"""Aborts a test (must be called from the event controller thread)."""
with self._lock:
self._aborted = True
self._aborted_reason = reason
process = self._process
if process:
process_utils.KillProcessTree(process, 'pytest')
if self.thread:
self.thread.join()
with self._lock:
# Should be set by the thread itself, but just in case...
self._completed = True
def IsCompleted(self):
"""Returns true if the test has finished."""
return self._completed
def _AbortedMessage(self):
"""Returns an error message describing why the test was aborted."""
return 'Aborted' + (
(': ' + self._aborted_reason) if self._aborted_reason else '')
@classmethod
def _ResolveUUID(cls, post_shutdown_state):
if post_shutdown_state:
# If this is going to be a post-shutdown run of an active shutdown test,
# reuse the existing invocation as uuid so that we can accumulate all the
# logs in the same log file.
return post_shutdown_state['invocation']
else:
return time_utils.TimedUUID()
def _SetupOutputDir(self):
output_dir = os.path.join(paths.DATA_TESTS_DIR,
'%s-%s' % (self.test.path, self.uuid))
file_utils.TryMakeDirs(output_dir)
# Create a symlink for the latest test run, so if we're looking at the
# logs we don't need to enter the whole UUID.
latest_symlink = os.path.join(paths.DATA_TESTS_DIR, self.test.path)
file_utils.TryUnlink(latest_symlink)
try:
os.symlink(os.path.basename(output_dir), latest_symlink)
except OSError:
logging.exception('Unable to create symlink %s', latest_symlink)
return output_dir
def _ResolveMetadata(self):
if self._is_post_shutdown:
# Resuming from an active shutdown test, try to restore its metadata file.
try:
return Metadata.LoadFile(self.test.pytest_name,
os.path.join(self.output_dir, 'metadata'))
except Exception:
logging.exception('Failed to load metadata from active shutdown test; '
'will continue, but logs will be inaccurate')
return Metadata.CreateNew(os.path.join(self.output_dir, 'metadata'),
{'path': self.test.path,
'init_time': time.time(),
'invocation': self.uuid,
'label': self.test.label})
def _ResolveDUTOptions(self):
"""Resolve dut_options.
Climb the tree of test options and choose the first non-empty dut_options
encountered. Note we are not stacking the options because most DUT targets
don't share any options.
"""
dut_options = {}
test_node = self.test
while test_node and not dut_options:
dut_options = test_node.dut_options
test_node = test_node.parent
if not dut_options:
# Use the options in test list (via test.root).
dut_options = self.test.root.options.dut_options or {}
return dut_options
def _InvokePytest(self):
"""Invokes a pyunittest-based test."""
assert self.test.pytest_name
assert self.resolved_dargs is not None
files_to_delete = []
try:
def make_tmp(prefix):
ret = tempfile.mktemp(
prefix='%s-%s-' % (self.test.path, prefix))
files_to_delete.append(ret)
return ret
results_path = make_tmp('results')
log_dir = os.path.join(paths.DATA_TESTS_DIR)
file_utils.TryMakeDirs(log_dir)
pytest_name = self.test.pytest_name
# Invoke the unittest driver in a separate process.
with open(self._log_path, 'ab', 0) as log:
print('Running test: %s' % self.test.path, file=log)
self._env_additions['CROS_PROC_TITLE'] = (
'%s.py (factory pytest %s)' % (pytest_name, self.output_dir))
env = dict(os.environ)
env.update(self._env_additions)
with self._lock:
if self._aborted:
return (TestState.FAILED,
'Before starting: %s' % self._AbortedMessage())
self._process = self.goofy.pytest_prespawner.spawn(
PytestInfo(test_list=self.goofy.test_list.test_list_id,
path=self.test.path,
pytest_name=pytest_name,
args=self.resolved_dargs,
results_path=results_path,
dut_options=self._dut_options),
self._env_additions)
def _LineCallback(line):
log.write(line + '\n')
sys.stderr.write('%s> %s\n' % (self.test.path.encode('utf-8'), line))
# Tee process's stderr to both the log and our stderr.
process_utils.PipeStdoutLines(self._process, _LineCallback)
# Try to kill all subprocess created by the test.
try:
os.kill(-self._process.pid, signal.SIGKILL)
except OSError:
pass
with self._lock:
if self._aborted:
return TestState.FAILED, self._AbortedMessage()
if self._process.returncode:
return (TestState.FAILED,
'Test returned code %d' % self._process.returncode)
if not os.path.exists(results_path):
return TestState.FAILED, 'pytest did not complete'
with open(results_path) as f:
result = pickle.load(f)
assert isinstance(result, pytest_utils.PytestExecutionResult)
# TODO(yhong): Record the the detail failure reason for advanced
# analysis.
return result.status, '; '.join(f.exc_repr for f in result.failures)
except Exception as e:
return TestState.FAILED, 'Unable to retrieve pytest results: %r' % e
finally:
for f in files_to_delete:
try:
if os.path.exists(f):
os.unlink(f)
except Exception:
logging.exception('Unable to delete temporary file %s', f)
def _Run(self):
iteration_string = ''
retries_string = ''
if self.test.iterations > 1:
iteration_string = ' [%s/%s]' % (
self.test.iterations -
self.test.GetState().iterations_left + 1,
self.test.iterations)
if self.test.retries > 0:
retries_string = ' [retried %s/%s]' % (
self.test.retries -
self.test.GetState().retries_left,
self.test.retries)
logging.info('Running test %s%s%s', self.test.path,
iteration_string, retries_string)
service_manager = ServiceManager()
service_manager.SetupServices(enable_services=self.test.enable_services,
disable_services=self.test.disable_services)
testlog_helper = _TestInvocationTestLogHelper()
event_log_helper = _TestInvocationEventLogHelper(self.goofy.event_log)
status, error_msg = self._PrepareRunPytest(testlog_helper, event_log_helper)
try:
# Run the pytest if everything was fine.
if status is None:
if self.test.pytest_name:
status, error_msg = self._InvokePytest()
else:
status = TestState.FAILED
error_msg = 'No pytest_name'
finally:
status, error_msg = self._TearDownAfterPytest(
testlog_helper, event_log_helper, status, error_msg)
service_manager.RestoreServices()
log_func = (session.console.error if status == TestState.FAILED
else logging.info)
tag_decorator = (' (%s)' % self._metadata['tag']
if 'tag' in self._metadata else '')
log_func(u'Test %s%s%s %s: %s', self.test.path, iteration_string,
tag_decorator, status, error_msg)
self._InvokeOnCompleteCallBack(status, error_msg)
def _PrepareRunPytest(self, testlog_helper, event_log_helper):
if self._is_post_shutdown:
tag_name = 'post-shutdown'
event_name = 'resume_test'
progressing_verb = 'resuming'
else:
tag_name = 'pre-shutdown'
event_name = 'start_test'
progressing_verb = 'starting'
status, error_msg = None, None
# Resolve the start_time and the test arguments in metadata.
if self._metadata.is_new:
# Resolve the data and save to metadata.
self._metadata.Update(start_time=time.time(),
serial_numbers=device_data.GetAllSerialNumbers())
try:
logging.debug('Resolving self.test.dargs from test list [%s]...',
self.goofy.test_list.test_list_id)
self._metadata.Update(dargs=ResolveTestArgs(
self.goofy,
self.test,
test_list_id=self.goofy.test_list.test_list_id,
dut_options=self._dut_options))
except Exception as e:
logging.exception('Unable to resolve test arguments')
# Although the test is considered failed already,
# let's still follow the normal path, so everything is logged properly.
status = TestState.FAILED
error_msg = 'Unable to resolve test arguments: %s' % e
self._metadata.Update(dargs=None)
if self.test.pytest_name:
self._metadata.Update(pytest_name=self.test.pytest_name)
if isinstance(self.test, test_object.ShutdownStep):
self._metadata.Update(tag=tag_name)
self._metadata.Flush()
# Log the starting event, continue even if fails.
try:
event_log_helper.LogStartEvent(event_name, self._metadata)
except Exception:
logging.exception('Unable to log %s event by event_log', event_name)
try:
testlog_helper.InitSubSession(self.dut, self._metadata)
self._env_additions[
testlog.TESTLOG_ENV_VARIABLE_NAME] = testlog_helper.session_json_path
testlog_helper.LogStartEvent()
except Exception:
logging.exception('Unable to log %s event by testlog', event_name)
syslog.syslog('Test %s (%s) %s' %
(self.test.path, self.uuid, progressing_verb))
return status, error_msg
def _TearDownAfterPytest(self, testlog_helper, event_log_helper, status,
error_msg):
def _SafelyAddLogTailToMetadata():
if self._log_path and os.path.exists(self._log_path):
try:
log_size = os.path.getsize(self._log_path)
offset = max(0, log_size - ERROR_LOG_TAIL_LENGTH)
with open(self._log_path) as f:
f.seek(offset)
self._metadata.Update(log_tail=DecodeUTF8(f.read()))
except Exception:
logging.exception('Unable to read log tail')
def _HandleLogEndTestEventFail(orig_status, logger_name):
session.console.exception('Unable to log end_test event by %s. '
'Change status from %s to FAILED',
orig_status, logger_name)
status = TestState.FAILED
error_msg = 'Unable to log end_test event'
self._metadata.Update(status=status, error_msg=error_msg)
return status, error_msg
# Shutdown the test.
if error_msg:
error_msg = DecodeUTF8(error_msg)
try:
self.goofy.event_client.post_event(
Event(Event.Type.DESTROY_TEST,
test=self.test.path,
invocation=self.uuid))
except Exception:
logging.exception('Unable to post DESTROY_TEST event')
syslog.syslog('Test %s (%s) completed: %s%s' % (
self.test.path, self.uuid, status,
(' (%s)' % error_msg if error_msg else '')))
# Update the metadata but flush it to the storage after logging
# because the log might fail.
end_time = time.time()
self._metadata.Update(status=status, end_time=end_time,
duration=end_time - self._metadata['start_time'])
if error_msg:
self._metadata.Update(error_msg=error_msg)
if status != TestState.PASSED:
_SafelyAddLogTailToMetadata()
# Log the end test event.
try:
testlog_helper.LogFinishEvent(self._metadata)
del self._env_additions[testlog.TESTLOG_ENV_VARIABLE_NAME]
except Exception:
status, error_msg = _HandleLogEndTestEventFail(status, 'testlog')
finally:
try:
event_log_helper.LogEndEvent(self._metadata)
except Exception:
status, error_msg = _HandleLogEndTestEventFail(status, 'event_log')
finally:
self._metadata.Flush()
return status, error_msg
def _InvokeOnCompleteCallBack(self, status, error_msg):
decrement_iterations_left = 0
decrement_retries_left = 0
if status == TestState.FAILED:
if self.test.waived:
status = TestState.FAILED_AND_WAIVED
decrement_retries_left = 1
elif status == TestState.PASSED:
decrement_iterations_left = 1
with self._lock:
self.update_state_on_completion = dict(
status=status,
error_msg=error_msg,
decrement_iterations_left=decrement_iterations_left,
decrement_retries_left=decrement_retries_left)
self._completed = True
self.goofy.RunEnqueue(self.goofy.ReapCompletedTests)
if status == TestState.FAILED:
self.goofy.RunEnqueue(self._on_test_failure)
if self._on_completion:
self.goofy.RunEnqueue(self._on_completion)
class _TestInvocationTestLogHelper(object):
"""A helper class to log the testlog event.
Properties:
session_json_path: Testlog's sub-session path.
"""
_STATUS_CONVERSION = {
# TODO(itspeter): No mapping for STARTING ?
TestState.ACTIVE: testlog.StationTestRun.STATUS.RUNNING,
TestState.PASSED: testlog.StationTestRun.STATUS.PASS,
TestState.FAILED: testlog.StationTestRun.STATUS.FAIL,
TestState.UNTESTED: testlog.StationTestRun.STATUS.UNKNOWN,
# TODO(itspeter): Consider adding another status.
TestState.FAILED_AND_WAIVED: testlog.StationTestRun.STATUS.PASS,
TestState.SKIPPED: testlog.StationTestRun.STATUS.PASS}
def __init__(self):
self.session_json_path = None
def InitSubSession(self, dut, metadata):
def _GetDUTDeviceID(dut):
if not dut.link.IsReady():
return 'device-offline'
device_id = dut.info.device_id
# TODO(chuntsen): If the dutDeviceId won't be None anymore, remove this.
if not isinstance(device_id, basestring):
logging.error('DUT device ID is an unexpected type (%s)',
type(device_id))
device_id = str(device_id)
return device_id
testlog_event = testlog.StationTestRun()
kwargs = {
'dutDeviceId': _GetDUTDeviceID(dut),
'stationDeviceId': session.GetDeviceID(),
'stationInstallationId': session.GetInstallationID(),
'testRunId': metadata['invocation'],
'testName': metadata['path'],
'testType': metadata.get('pytest_name', None),
'status': testlog.StationTestRun.STATUS.RUNNING,
'startTime': metadata['start_time']
}
testlog_event.Populate(kwargs)
dargs = metadata['dargs']
if dargs:
# Only allow types that can be natively expressed in JSON.
flattened_dargs = testlog_utils.FlattenAttrs(
dargs, allow_types=(int, long, float, basestring, type(None)))
for k, v in flattened_dargs:
testlog_event.AddArgument(k, v)
for k, v in metadata['serial_numbers'].iteritems():
testlog_event.AddSerialNumber(k, v)
tag = metadata.get('tag')
if tag:
testlog_event.LogParam(name='tag', value=tag)
testlog_event.UpdateParam(name='tag',
description='Indicate type of shutdown')
self.session_json_path = testlog.InitSubSession(
log_root=paths.DATA_LOG_DIR, station_test_run=testlog_event,
uuid=metadata['invocation'])
def LogStartEvent(self):
testlog_event = testlog.StationTestRun()
testlog_event.Populate({'status': testlog.StationTestRun.STATUS.STARTING})
testlog.LogTestRun(self.session_json_path, station_test_run=testlog_event)
def LogFinishEvent(self, metadata):
status = self._STATUS_CONVERSION.get(metadata['status'], metadata['status'])
testlog_event = testlog.StationTestRun()
kwargs = {
'endTime': metadata['end_time'],
'duration': metadata['duration'],
'status': status,
}
testlog_event.Populate(kwargs)
if status == testlog.StationTestRun.STATUS.FAIL:
for err_field, failure_code in [('error_msg', 'GoofyErrorMsg'),
('log_tail', 'GoofyLogTail')]:
if err_field in metadata:
testlog_event.AddFailure(failure_code, metadata[err_field])
testlog.LogFinalTestRun(self.session_json_path,
station_test_run=testlog_event)
class _TestInvocationEventLogHelper(object):
"""Helper class to log the event_log event."""
def __init__(self, event_log):
self._event_log = event_log
self._event_log_args = {}
def LogStartEvent(self, event_name, metadata):
self._event_log_args = {
'path': metadata['path'],
'dargs': metadata['dargs'],
'serial_nubmers': metadata['serial_numbers'],
'invocation': metadata['invocation']}
self._UpdateArgsIfKeyExists(metadata, 'pytest_name')
self._UpdateArgsIfKeyExists(metadata, 'tag')
self._event_log.Log(event_name, **self._event_log_args)
self._event_log_args.pop('dargs', None)
self._event_log_args.pop('serial_numbers', None)
self._event_log_args.pop('tag', None)
def LogEndEvent(self, metadata):
self._event_log_args.update({'status': metadata['status'],
'duration': metadata['duration']})
self._UpdateArgsIfKeyExists(metadata, 'error_msg')
self._event_log.Log('end_test', **self._event_log_args)
def _UpdateArgsIfKeyExists(self, metadata, key):
if key in metadata:
self._event_log_args[key] = metadata[key]