blob: 69d68f4c55e41cc3c163271feed873d78b0d4b86 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Python implementation of testlog JSON API.
Testlog (deprecates event_log) is designed to
- Define a stricter API that unifined logging action across different tests.
- More friendly for third-party to use.
File hierarchy and relation between event_log.py:
TODO(itspeter): Remove event_log related path once phased out.
TODO(itspeter): Move to Instalog folder and remove the dependency of
factory framework
[DATA_DIR] ─── .device_id
[LOG_ROOT]─┬─ testlog.json
├─ init_count
├─ installation_id
├─ events/ ─┬─ .boot_sequence
│ (Legacy │ (legacy file, replaced by [LOG_ROOT]/init_count)
│ ├─ .reimage_id
│ │ (legacy file, replaced by [LOG_ROOT]/installation_id)
│ └─ events
│ (legacy file, replaced by [LOG_ROOT]/testlog.json)
├─ running/ ─┬─ [UUID]-session.json
│ ├─ [UUID]-session.json
│ └─ [UUID]-session.json
└─ attachments/ ─┬─ [binary_file 1]
└─ [binary_file N]
[TEMPORARY_FOLDER]─┬─ event_log_seq
│ (legacy file, replaced by testlog_seq)
└─ testlog_seq
"""
from __future__ import print_function
import json
import logging
import os
import re
import tempfile
import threading
import time
from six import iterkeys
from . import hooks
from . import testlog_seq
from . import testlog_utils
from . import testlog_validator
from .utils import file_utils
from .utils import schema
from .utils import sys_utils
from .utils import time_utils
from .utils import type_utils
TESTLOG_API_VERSION = '0.21'
TESTLOG_ENV_VARIABLE_NAME = 'TESTLOG'
# Possible values for the `Stationstatus.parameters.type` field.
PARAM_TYPE = type_utils.Enum(['argument', 'measurement'])
# TODO(itspeter): Use config_utils for those default constants.
# The primary JSON file. It will be ingested by Instalog.
_DEFAULT_PRIMARY_JSON_FILE = 'testlog.json'
# The default template path for session JSON from log_root.
# Expected life cycle of a single test run. Parent process must call
# InitForChildProcess before invoking the new process.
_DEFAULT_SESSION_FOLDER = 'running'
_DEFAULT_SESSION_JSON_FILE_TEMPLATE = '%s-session.json'
# The default path for binary attachments from log_root.
_DEFAULT_ATTACHMENTS_FOLDER = 'attachments'
# This directory need to be cleared on each boot.
# The /run directory (or something writable by us if in the chroot).
# TODO(itspeter): Survey if we can find an equivalent folder in Windows.
# Otherwise, the SEQ_INCREMENT_ON_BOOT magic might not work.
_TMP_DIR = (os.environ.get('CROS_FACTORY_RUN_PATH') or
(tempfile.gettempdir() if sys_utils.InChroot() else '/run'))
# File containing the next sequence number to write. This is in
# /run so it is cleared on each boot.
_SEQUENCE_PATH = os.path.join(_TMP_DIR, 'testlog_seq')
# Get the progress pattern of FlushOutput return value.
# Example of FlushOutput return value: 'Flush for `output_uplink\' failed
# within the specified timeout (100 / 1000 events)'
_PROGRESS_RE = re.compile(r'\(.*\)')
# Use the lock to avoid two threads creating multiple writers.
_log_related_lock = threading.RLock()
# A global testlog writer. Since we expect each test is invoked
# separately as a process, each test will have their own 'global'
# testlog writer with a unique ID.
_global_testlog = None
_pylogger = None
_pylogger_handler = None
_update_session_json = threading.Event()
class FlushException(Exception):
"""Represents an exception when flushing to Instalog."""
pass
class Testlog(object):
"""Primarily a wrapper for variables that should exist in a singleton.
This class should be initialized only once per process.
Properties:
last_test_run: In memory object that keep the same content of session_json.
They should be the same so when updating the test_run we don't need to
read and parse from session_json repeatedly.
log_root: The root folder for logging.
primary_json: A JSON file that will ingested by Instalog.
session_json: A temporary JSON file to keep the test_run information.
attachments_folder: The folder for copying / moving binary files.
uuid: A unique ID for related to the process that using the testlog.
seq_generator: A sequence file that expected to increase monotonically.
during test.
"""
FIELDS = type_utils.Enum([
'LAST_TEST_RUN', 'LOG_ROOT', 'PRIMARY_JSON', 'SESSION_JSON',
'ATTACHMENTS_FOLDER', 'UUID', '_METADATA'])
def __init__(self, log_root=None, uuid=None,
stationDeviceId=None, stationInstallationId=None):
"""Initializes the Testlog singleton.
Args:
log_root: The path to root folder of testlog.
uuid: A unique ID for this process.
stationDeviceId: To use in saving Python logging calls.
stationInstallationId: To use in saving Python logging calls.
"""
global _global_testlog # pylint: disable=global-statement
assert _global_testlog is None, (
'_global_testlog should be initialized only once before Close().')
with _log_related_lock:
_global_testlog = self
self.instalog_plugin = None
self.in_subsession = False
if log_root and uuid:
# Indicate it initialized from a harness that no one will collect its
# session JSON file (so set to None)
session_data = {
Testlog.FIELDS.LOG_ROOT: log_root,
Testlog.FIELDS.PRIMARY_JSON:
os.path.join(log_root, _DEFAULT_PRIMARY_JSON_FILE),
Testlog.FIELDS.ATTACHMENTS_FOLDER:
os.path.join(log_root, _DEFAULT_ATTACHMENTS_FOLDER),
Testlog.FIELDS.UUID: uuid}
elif not log_root and not uuid:
# Get the related information from the OS environment variable.
self.in_subsession = True
session_data = Testlog._ReadSessionInfo()
else:
assert False, (
'Wrong initialization of _global_testlog with log_root:'
' %r, uuid: %r' % (log_root, uuid))
self.last_test_run = session_data.pop(self.FIELDS.LAST_TEST_RUN, None)
self.log_root = session_data.pop(self.FIELDS.LOG_ROOT)
self.primary_json = session_data.pop(self.FIELDS.PRIMARY_JSON)
self.session_json = session_data.pop(self.FIELDS.SESSION_JSON, None)
self.attachments_folder = session_data.pop(self.FIELDS.ATTACHMENTS_FOLDER)
self.uuid = session_data.pop(self.FIELDS.UUID)
metadata = session_data.pop(
self.FIELDS._METADATA, None) # pylint: disable=protected-access
if metadata:
# pylint: disable=protected-access
self.last_test_run[self.FIELDS._METADATA] = metadata
assert not session_data, 'Not all variable initialized.'
# Initialize the sequence generator
self.seq_generator = testlog_seq.SeqGenerator(
_SEQUENCE_PATH, self.primary_json)
self._CreateFolders()
self.hooks = None
# Reload the JSON paths into JSONLogFile for future writing.
if self.session_json:
self.session_json = JSONLogFile(
uuid=self.uuid, seq_generator=self.seq_generator,
path=self.session_json, mode='w')
if self.primary_json:
self.primary_json = JSONLogFile(
uuid=self.uuid, seq_generator=self.seq_generator,
path=self.primary_json, mode='a', check_event=True)
# Initialize testlog._pylogger
self.CaptureLogging(stationDeviceId, stationInstallationId)
if self.in_subsession:
self.stop_session_json_thread = threading.Event()
self.session_json_thread = threading.Thread(
target=self.UpdateSessionJSON)
self.session_json_thread.start()
def UpdateSessionJSON(self):
while (not self.stop_session_json_thread.wait(0.2) or
_update_session_json.is_set()):
if _update_session_json.is_set():
_update_session_json.clear()
Log(self.last_test_run)
def init_hooks(self, hooks_class):
# Initialize the Testlog hooks class
module, class_name = hooks_class.rsplit('.', 1)
self.hooks = getattr(__import__(module, fromlist=[class_name]),
class_name)()
assert isinstance(self.hooks, hooks.Hooks), (
'Testlog hooks should be of type Hooks but is %r' % type(self.hooks))
def CaptureLogging(self, stationDeviceId=None, stationInstallationId=None):
"""Captures calls to logging.* into primary_json."""
level = logging.getLogger().getEffectiveLevel()
def AnnotateAndLog(station_message):
if stationDeviceId:
station_message['stationDeviceId'] = stationDeviceId
if stationInstallationId:
station_message['stationInstallationId'] = stationInstallationId
# If we are in a subsession, use the UUID as testRunId.
if self.in_subsession:
station_message['testRunId'] = self.uuid
return self.primary_json.Log(station_message)
CapturePythonLogging(
callback=AnnotateAndLog, level=level)
logging.info('Testlog(%s) is capturing logging at level %s',
self.uuid, logging.getLevelName(level))
def Close(self):
# pylint: disable=global-statement
global _global_testlog, _pylogger, _pylogger_handler
if self.in_subsession:
self.stop_session_json_thread.set()
self.session_json_thread.join()
if self.primary_json:
self.primary_json.Close()
if self.session_json:
self.session_json.Close()
if _global_testlog:
_global_testlog = None
if _pylogger and _pylogger_handler:
_pylogger.removeHandler(_pylogger_handler)
_pylogger = None
_pylogger_handler = None
@staticmethod
def _ReadSessionInfo():
session_json_path = os.environ.get(TESTLOG_ENV_VARIABLE_NAME, None)
assert session_json_path, (
'Not able to find environment variable %r' % TESTLOG_ENV_VARIABLE_NAME)
# Read to load metadata.
metadata = None
with file_utils.FileLockContextManager(session_json_path, 'r') as fd:
last_test_run = json.loads(fd.read())
metadata = last_test_run.pop(
Testlog.FIELDS._METADATA) # pylint: disable=protected-access
return {
Testlog.FIELDS.LAST_TEST_RUN: Event.FromDict(last_test_run, False),
Testlog.FIELDS.LOG_ROOT: metadata[Testlog.FIELDS.LOG_ROOT],
Testlog.FIELDS.PRIMARY_JSON: metadata[Testlog.FIELDS.PRIMARY_JSON],
Testlog.FIELDS.SESSION_JSON: session_json_path,
Testlog.FIELDS.ATTACHMENTS_FOLDER:
metadata[Testlog.FIELDS.ATTACHMENTS_FOLDER],
Testlog.FIELDS.UUID: metadata[Testlog.FIELDS.UUID],
Testlog.FIELDS._METADATA: metadata} # pylint: disable=protected-access
def _CreateFolders(self):
for x in [self.log_root, self.attachments_folder]:
file_utils.TryMakeDirs(x)
def SetInstalogPlugin(self, instalog_plugin):
"""Sets a reference to the Goofy Instalog plugin."""
self.instalog_plugin = instalog_plugin
def Flush(self, uplink=True, local=True, timeout=None):
"""Flushes testlog logs through Instalog.
Args:
uplink: Flush the uplink (output_http) plugin.
local: Flush the local (output_file) plugin.
timeout: Time to wait before returning with failure.
Returns:
If successful, returns True and a string describing the flushing result.
Otherwise, returns False and a string describing the progress of flushing.
Raises:
FlushException if no instalog plugin.
"""
if self.instalog_plugin is None:
raise FlushException('Flush: No Instalog plugin available')
last_seq_output = self.seq_generator.Current()
input_success, input_msg = self.instalog_plugin.FlushInput(
last_seq_output, timeout)
if not input_success:
return False, 'Flushing input plugin: %s' % input_msg
output_success, output_msg = self.instalog_plugin.FlushOutput(
uplink, local, timeout)
output_msg = _PROGRESS_RE.search(output_msg).group()
if not output_success:
return False, 'Flushing output plugin: %s' % output_msg
return True, 'Success; %s; %s' % (input_msg, output_msg)
def InitSubSession(log_root, uuid, station_test_run=None):
"""Initializes session JSON file for future test in a separate process.
This is used for harness to generate the session JSON file for the
upcoming test. The upcoming test is expected to run in a separate process
and the path of session JSON file will be passed through environment
variable TESTLOG_ENV_VARIABLE_NAME.
Args:
log_root: Root folder that contains testlog.json, session JSONs
attachments.
uuid: Unique ID for the upcoming test run.
station_test_run: Any existed fields that need to propagate into the new
test session's station.test_run. For example, can be serial numbers or
harness-specific information (e.x.: stationDeviceId).
Returns:
Path to the session JSON file.
"""
# TODO(itspeter): Enable more fine setting on the testlog.json location, etc.
session_log_path = os.path.join(log_root, _DEFAULT_SESSION_FOLDER,
_DEFAULT_SESSION_JSON_FILE_TEMPLATE % uuid)
file_utils.TryMakeDirs(os.path.dirname(session_log_path))
if not station_test_run:
station_test_run = StationTestRun()
station_test_run.FromDict({
'status': StationTestRun.STATUS.STARTING,
'testRunId': uuid,
'startTime': time.time()
})
# pylint: disable=protected-access
station_test_run[Testlog.FIELDS._METADATA] = {
Testlog.FIELDS.LOG_ROOT: log_root,
Testlog.FIELDS.PRIMARY_JSON:
os.path.join(log_root, _DEFAULT_PRIMARY_JSON_FILE),
Testlog.FIELDS.SESSION_JSON: session_log_path,
Testlog.FIELDS.ATTACHMENTS_FOLDER:
os.path.join(log_root, _DEFAULT_ATTACHMENTS_FOLDER),
Testlog.FIELDS.UUID: uuid
}
with file_utils.FileLockContextManager(session_log_path, 'w') as fd:
fd.write(station_test_run.ToJSON())
fd.flush()
os.fsync(fd.fileno())
return session_log_path
def CollectExpiredSessions(log_root, station_test_run=None):
session_dir = os.path.join(log_root, _DEFAULT_SESSION_FOLDER)
for session_log_path in os.listdir(session_dir):
session_log_path = os.path.join(session_dir, session_log_path)
if os.path.isfile(session_log_path):
LogFinalTestRun(session_log_path, station_test_run)
def LogTestRun(session_json_path, station_test_run=None):
"""Merges the session JSON into the primary JSON and logs it.
Args:
session_json_path: Path to the session JSON.
station_test_run: Additional information might be appended.
"""
# TODO(itspeter): Check the file is already closed properly. (i.e.
# no lock exists or other process using it)
with file_utils.FileLockContextManager(session_json_path, 'r+') as fd:
content = fd.read()
try:
session_json = json.loads(content)
test_run = StationTestRun()
test_run.Populate(session_json)
# Merge the station_test_run information.
if station_test_run:
test_run.Populate(station_test_run.ToDict())
if 'startTime' in test_run and 'endTime' in test_run:
test_run['duration'] = test_run['endTime'] - test_run['startTime']
Log(test_run)
# pylint: disable=protected-access
test_run[Testlog.FIELDS._METADATA] = (
session_json[Testlog.FIELDS._METADATA])
fd.seek(0)
fd.truncate()
fd.write(test_run.ToJSON())
fd.flush()
os.fsync(fd.fileno())
except Exception:
# Not much we can do here.
logging.exception('Not able to collect %s. Last read: %s',
session_json_path, content)
# We should stop the pytest if it failed to log Testlog event.
raise
def LogFinalTestRun(session_json_path, station_test_run=None):
LogTestRun(session_json_path, station_test_run)
os.unlink(session_json_path)
def GetGlobalTestlog():
"""Gets the singleton instance of the global testlog writer."""
if _global_testlog is None:
with _log_related_lock:
if _global_testlog:
return _global_testlog
# Oops, the Testlog is not there yet.
Testlog()
return _global_testlog
def GetGlobalTestlogLock():
"""Gets locks that used in testlog.
Functions that want to keep action synchronized with testlog should use
this lock. For example:
with GetGlobalTestlogLock():
# Do log stuff that related to testlog.
Returns:
threading.RLock() used in testlog module.
"""
return _log_related_lock
def Log(event):
"""Logs the event using the global testlog writer.
This function is essentially a wrapper around JSONLogFile.Log(). It
creates or reuses the global log writer and calls the JSONLogFile.Log()
function. Note that this should only be called by other exposed API.
Args:
event: An instance of testlog.EventBase.
"""
# TODO(itspeter): expose flag to override the default flush behavior.
testlog_singleton = GetGlobalTestlog()
assert event, 'No event to write'
if testlog_singleton.hooks:
if isinstance(event, StationInit):
testlog_singleton.hooks.OnStationInit(event)
elif isinstance(event, StationMessage):
testlog_singleton.hooks.OnStationMessage(event)
elif isinstance(event, StationTestRun):
testlog_singleton.hooks.OnStationTestRun(event)
if not testlog_singleton.in_subsession:
testlog_singleton.primary_json.Log(event)
else:
testlog_singleton.session_json.Log(event, override=True)
def FlushEvent():
"""Flush the last_test_run event to primary JSON file.
Be careful that this function is slow and the time consumption depends on the
size of the event. Calling it too often will cause performance issue.
"""
testlog_singleton = GetGlobalTestlog()
if testlog_singleton.in_subsession:
testlog_singleton.primary_json.Log(testlog_singleton.last_test_run)
else:
raise testlog_utils.TestlogError(
'FlushEvent should be called in subsession.')
def _StationTestRunWrapperInSession(*args, **kwargs):
"""Wrapper for StationTestRun method function.
We provide this wrapper as a handy interface that caller can use
testlog.foo() instead of GetGlobalTestlog().last_test_run.foo().
This function is expected to call only in test session but not test harness,
because only test session expected to have GetGlobalTestlog().last_test_run.
Please see the StationTestRun for more details.
"""
method_name = kwargs.pop('_method_name')
if GetGlobalTestlog().last_test_run:
ret = getattr(
GetGlobalTestlog().last_test_run, method_name)(*args, **kwargs)
_update_session_json.set()
return ret
else:
raise testlog_utils.TestlogError(
'In memory station.test_run does not set. '
'Test harness need to set it manually.')
def AddSerialNumber(*args, **kwargs):
kwargs['_method_name'] = 'AddSerialNumber'
return _StationTestRunWrapperInSession(*args, **kwargs)
def LogParam(*args, **kwargs):
kwargs['_method_name'] = 'LogParam'
return _StationTestRunWrapperInSession(*args, **kwargs)
def CheckNumericParam(*args, **kwargs):
kwargs['_method_name'] = 'CheckNumericParam'
return _StationTestRunWrapperInSession(*args, **kwargs)
def CheckTextParam(*args, **kwargs):
kwargs['_method_name'] = 'CheckTextParam'
return _StationTestRunWrapperInSession(*args, **kwargs)
def GroupParam(*args, **kwargs):
kwargs['_method_name'] = 'GroupParam'
return _StationTestRunWrapperInSession(*args, **kwargs)
def AttachFile(*args, **kwargs):
kwargs['_method_name'] = 'AttachFile'
return _StationTestRunWrapperInSession(*args, **kwargs)
def AttachContent(*args, **kwargs):
kwargs['_method_name'] = 'AttachContent'
return _StationTestRunWrapperInSession(*args, **kwargs)
def UpdateParam(*args, **kwargs):
kwargs['_method_name'] = 'UpdateParam'
return _StationTestRunWrapperInSession(*args, **kwargs)
def AddArgument(*args, **kwargs):
kwargs['_method_name'] = 'AddArgument'
return _StationTestRunWrapperInSession(*args, **kwargs)
def AddFailure(*args, **kwargs):
kwargs['_method_name'] = 'AddFailure'
return _StationTestRunWrapperInSession(*args, **kwargs)
class JSONLogFile(file_utils.FileLockContextManager):
"""Represents a JSON log file on disk."""
def __init__(self, uuid, seq_generator, path, mode='a', check_event=False):
super(JSONLogFile, self).__init__(path=path, mode=mode)
self._thread_data = threading.local()
self.test_run_id = uuid
self.seq_generator = seq_generator
self.check_event = check_event
def Log(self, event, override=False):
"""Converts event into JSON string and writes into disk.
Warning: If this function or any code executed down the call stack makes
use of Python logging functions, they will be dropped from this function.
Otherwise, a deadlock will occur.
Args:
event: The event to output.
override: Ture to make sure the JSON log file contains only one event.
"""
if getattr(self._thread_data, 'in_log', False):
# We are already in a log call. Throw out any other subsequent logs
# until this call finishes.
return
self._thread_data.in_log = True
# Data that should be refreshed on every write operation.
event['uuid'] = time_utils.TimedUUID()
event['seq'] = self.seq_generator.Next()
if 'apiVersion' not in event:
event['apiVersion'] = TESTLOG_API_VERSION
if 'time' not in event:
event['time'] = time.time()
if self.check_event:
# Check the event, or it may be rejected by Instalog input plugin.
try:
event.CheckIsValid()
except Exception:
logging.exception('Not able to log the event: %s', event.ToJSON())
raise
line = event.ToJSON() + '\n'
with self:
if override:
self.file.seek(0)
self.file.write(line)
if override:
self.file.truncate()
self.file.flush()
os.fsync(self.file.fileno())
self._thread_data.in_log = False
return line
def CapturePythonLogging(callback, level=logging.DEBUG):
"""Starts capturing Python logging.
The output events will be sent to the specified callback function.
This function can only be used once to set up logging -- any subsequent
calls will return the existing Logger object.
Args:
callback: Function to be called when the Python logging library is called.
It accepts one argument, which will be the StationMessage object as
constructed by TestlogLogHandler.
level: Sets minimum verbosity of log messages that will be sent to the
callback. Default: logging.DEBUG.
"""
global _pylogger, _pylogger_handler # pylint: disable=global-statement
if _pylogger:
# We are already capturing Python logging.
return _pylogger
_pylogger_handler = TestlogLogHandler(callback)
_pylogger_handler.setFormatter(LogFormatter())
_pylogger = logging.getLogger()
_pylogger.addHandler(_pylogger_handler)
_pylogger.setLevel(level)
return _pylogger
class TestlogLogHandler(logging.Handler):
"""Formats records into events and send them to callback function.
Properties:
_callback: Function to be called when we have processed the logging message
and created a StationMessage object. It accepts one argument, which
will be the constructed StationMessage object.
_thread_data: Storage for the local thread. Used to track whether or not
the thread is currently in an emit call.
"""
def __init__(self, callback):
self._callback = callback
self._thread_data = threading.local()
super(TestlogLogHandler, self).__init__()
def emit(self, record):
"""Formats and emits event record.
Warning: If this function or any code executed down the call stack makes
use of Python logging functions, they will be dropped from this log handler.
Otherwise, an infinite loop will result.
"""
if getattr(self._thread_data, 'in_emit', False):
# We are already in an emit call. Throw out any other subsequent emits
# until this call finishes.
return
if record.name == 'console':
# logs sent to console (py/test/session.py@console) may be noisy and
# should be displayed only and not logged.
return
self._thread_data.in_emit = True
event = self.format(record)
result = self._callback(event)
self._thread_data.in_emit = False
return result
class LogFormatter(logging.Formatter):
"""Formats records into events."""
def format(self, record):
message = record.getMessage()
if record.exc_info:
message += '\n%s' % self.formatException(record.exc_info)
data = {
'filePath': getattr(record, 'pathname', None),
'lineNumber': getattr(record, 'lineno', None),
'functionName': getattr(record, 'funcName', None),
'logLevel': getattr(record, 'levelname', None),
'time': getattr(record, 'created', None),
'message': message}
return StationMessage(data)
class EventBase(object):
"""Base plumbing for Event class.
Includes functionality to map incoming data (JSON or Python dict) to its
corresponding Event class, through the GetEventType() method.
Properties:
_data: Stores the internal Python dict of this event. This should be
equivalent to the dict that gets returned from the ToDict class.
_type_class_map_cache: (class variable) This is a cached dict that maps
from event type to Python class. Used by FromJSON and FromDict to
know which class to initialize.
Properties in FIELDS:
- type (string, required): Type of the event. Its value determines
which fields are applicable to this event.
- _METADATA (object, optional): Special field reserved for testlog
to exchange information between processes.
"""
# TODO(itspeter): Consider to create a class that wraps properties in
# dictionary FIELDS.
# The FIELDS list data expected in for this class in a form where name is
# the key and value is a tuple of (required, validation function).
# Details of each fields can be found on either the docstring of this class
# or the Testlog API Playbook.
FIELDS = {
'type': (True, testlog_validator.Validator.String),
# pylint: disable=protected-access
Testlog.FIELDS._METADATA: (False, testlog_validator.Validator.Object)
}
def __init__(self, data=None):
"""Only allow initialization for classes that declared GetEventType()."""
try:
event_type = self.GetEventType()
if event_type:
default_data = self._data = {'type': event_type}
if isinstance(data, dict):
# Override the type in the data.
data.update(default_data)
self.Populate(data)
return
except NotImplementedError:
pass
raise testlog_utils.TestlogError(
'Must initialize directly from desired event class.')
def __eq__(self, other):
"""Equals operator."""
if isinstance(other, self.__class__):
return self._data == other._data # pylint: disable=protected-access
else:
return False
def __ne__(self, other):
"""Not equals operator."""
return not self.__eq__(other)
def __getitem__(self, name):
"""Dictionary operator."""
return self._data[name]
def __setitem__(self, key, value):
"""Simulates a dictionary operator.
Provides a dictionary-like operation to update the event. It will try to
get the corressponding validate function from FIELDS in current and all
superclass.
Raises:
ValueError if the value can not be converted.
TestlogError if the key is not whitelisted in the FIELD.
"""
# TODO(itspeter): Consider remove this handy function and make it
# explicitly to call other function to assign values.
mro = self.__class__.__mro__
# Find the corresponding validate function.
for cls in mro:
if cls is object:
break # Reach the root.
if not hasattr(cls, 'FIELDS'):
continue # Continue search in the parents'
if key in cls.FIELDS:
cls.FIELDS[key][1](self, key, value)
return
raise testlog_utils.TestlogError('Cannot find key %r for event %s' % (
key, self.__class__.__name__))
def __contains__(self, item):
"""Supports `in` operator."""
return item in self._data
def CheckIsValid(self):
"""Raises an exception if the event is invalid."""
mro = self.__class__.__mro__
missing_fields = []
# Find the corresponding requirement.
for cls in mro:
if cls is object:
break
for field_name, metadata in cls.FIELDS.iteritems():
if metadata[0] and field_name not in self._data:
missing_fields.append(field_name)
# Test run event with attachments should have at least one serial number.
if (self.GetEventType() == 'station.test_run' and
'attachments' in self._data and 'serialNumbers' not in self._data):
missing_fields.append('serialNumbers')
if missing_fields != []:
raise testlog_utils.TestlogError('Missing fields: %s' % missing_fields)
if self._data['apiVersion'] != TESTLOG_API_VERSION:
raise testlog_utils.TestlogError('Invalid Testlog API version: %s' %
self._data['apiVersion'])
# Check the length of the grouped parameters.
if 'parameters' in self._data:
group_length = {}
for param in self._data['parameters'].itervalues():
if 'group' in param:
group = param['group']
if group not in group_length:
group_length[group] = len(param['data'])
elif group_length[group] != len(param['data']):
raise testlog_utils.TestlogError(
'The parameters length in the group(%s) are not the same' %
group)
for key in iterkeys(self._data):
# Ignore keys that start with an underscore.
if key.startswith('_'):
continue
data_type = type(self._data[key])
if data_type == list:
if not self._data[key]:
raise testlog_utils.TestlogError('Empty list is invalid: %r' % key)
elif data_type == dict:
if not self._data[key]:
raise testlog_utils.TestlogError('Empty dict is invalid: %r' % key)
@classmethod
def GetEventType(cls):
"""Returns the event type of this particular object."""
raise NotImplementedError
def Populate(self, data):
"""Populates values one by one in dictionary data.
We iterate the data instead of directly asssigning it to self._data in
order to make sure that validate function is called.
Returns:
The event being modified (self).
"""
for key in iterkeys(data):
# Ignore keys that start with an underscore.
if key.startswith('_'):
continue
data_type = type(data[key])
if data_type == list:
if not data[key]:
raise testlog_utils.TestlogError('Empty list is invalid: %r' % key)
for value in data[key]:
self[key] = value
elif data_type == dict:
if not data[key]:
raise testlog_utils.TestlogError('Empty dict is invalid: %r' % key)
for sub_key, value in data[key].iteritems():
self[key] = {'key': sub_key, 'value': value}
else:
self[key] = data[key]
return self
def CastFields(self):
"""Casts fields to certain python types."""
pass
@classmethod
def _AllSubclasses(cls):
"""Returns all subclasses of this class recursively."""
subclasses = cls.__subclasses__()
# pylint: disable=protected-access
return subclasses + [subsub for sub in subclasses
for subsub in sub._AllSubclasses()]
@classmethod
def _TypeClassMap(cls):
"""Returns a map of EVENT_TYPE to EVENT_CLASS."""
if not hasattr(cls, '_type_class_map_cache'):
cls._type_class_map_cache = {event_cls.GetEventType(): event_cls
for event_cls in cls._AllSubclasses()
if event_cls.GetEventType()}
return cls._type_class_map_cache
@classmethod
def DetermineClass(cls, data):
"""Determines the appropriate Event subclass for a particular dataset."""
try:
return cls._TypeClassMap()[data['type']]
except (testlog_utils.TestlogError, KeyError):
raise testlog_utils.TestlogError(
'Input event does not have a valid `type`.')
@classmethod
def FromJSON(cls, json_string, check_valid=True):
"""Converts JSON data into an Event instance."""
return cls.FromDict(json.loads(json_string), check_valid)
@classmethod
def FromDict(cls, data, check_valid=True):
"""Converts Python dict data into an Event instance."""
event = cls.DetermineClass(data)()
event.Populate(data)
event.CastFields()
if check_valid:
event.CheckIsValid()
return event
def ToJSON(self):
"""Returns a JSON string representing this event."""
return json.dumps(self.ToDict(), default=testlog_utils.JSONHandler)
def ToDict(self):
"""Returns a Python dict representing this event."""
return self._data
def __repr__(self):
"""Repr operator for string printing."""
return '<{} data={}>'.format(self.__class__.__name__, repr(self._data))
class Event(EventBase):
"""Main event class.
Defines common fields of all events.
Properties in FIELDS:
- uuid (string, required): Unique UUID of the event.
- apiVersion (string, required): Version of the testlog API being
used.
- time (number, required): Time in seconds since the epoch of
the event.
- seq (integer, optional): Sequence number of the event, to help in
cases where the station date is unreliable. Should be monotonically
increasing.
"""
FIELDS = {
'uuid': (True, testlog_validator.Validator.String),
'apiVersion': (True, testlog_validator.Validator.String),
'time': (True, testlog_validator.Validator.Number),
'seq': (False, testlog_validator.Validator.Long),
}
@classmethod
def GetEventType(cls):
return None
class _StationBase(Event):
"""Base class for all "station" subtypes.
Cannot be initialized.
Properties in FIELDS:
- dutDeviceId (string, optional): ID of the device under test. This
should be a value tied to the device that will not change in the case
that the device is reimaged.
- stationDeviceId (string, optional): ID of the device being used as
the station. This should be a value tied to the device that will not
change in the case that the device is reimaged.
- stationInstallationId (string, optional): ID of the installation of the
station. Every time the station is reimaged, a new installation ID
should be generated (unique UUID).
"""
FIELDS = {
'dutDeviceId': (False, testlog_validator.Validator.String),
'stationDeviceId': (False, testlog_validator.Validator.String),
'stationInstallationId': (False, testlog_validator.Validator.String)
}
@classmethod
def GetEventType(cls):
return None
class _GroupChecker(object):
"""Context manager for checking grouped parameters."""
def __init__(self, event, name, param_list):
self.event = event
self.name = name
self.param_list = param_list
def __enter__(self):
if self.event.in_group:
raise ValueError('Can\'t enter the same GroupChecker twice')
self.event.in_group = self.name
def __exit__(self, exc_type, exc_value, traceback):
# If an exception occurs, we don't want to suppress it.
if traceback:
return False
if self.event.in_group != self.name:
raise ValueError('This should not happen! Exit the wrong group!')
self.event.in_group = None
length = len(self.event['parameters'][self.param_list[0]]['data'])
for param_name in self.param_list:
if length != len(self.event['parameters'][param_name]['data']):
raise ValueError('The parameters length in the group(%s) are not '
'the same' % self.name)
class StationStatus(_StationBase):
"""Represents the Station's status when Station is running.
Properties in FIELDS:
- filePath (string, optional): Name or path of the program that generated
this message.
- serialNumbers (dictionary, optional): A dictionary of serial numbers
associated with this device. May not be exhaustive (since some
components may not have been attached yet).
- parameters (dictionary, optional): The value can be any type. If
numeric, minimum and maximum limits (inclusive) may also be specified.
If text, a regex may be specified. If other types, the value will be
serialized. If limits/regex are specified, the status field should be
defined to show success (value match the expectation) or failure.
"""
@classmethod
def _NumericSchema(cls, label):
return schema.AnyOf([
schema.Scalar(label, int),
schema.Scalar(label, long),
schema.Scalar(label, float)])
def _ValidatorSerialNumberWrapper(*args, **kwargs):
# pylint: disable=no-method-argument
SCHEMA = schema.Optional(schema.Scalar('serialNumbers.value', basestring))
kwargs['schema'] = SCHEMA
return testlog_validator.Validator.Dict(*args, **kwargs)
def _ValidatorParameterWrapper(*args, **kwargs):
# pylint: disable=no-method-argument
DATA_SCHEMA = schema.List('data', schema.FixedDict(
'data',
items={},
optional_items={
'status': schema.Scalar('status', basestring, ['PASS', 'FAIL']),
'numericValue': StationStatus._NumericSchema('numericValue'),
'expectedMinimum': StationStatus._NumericSchema('expectedMinimum'),
'expectedMaximum': StationStatus._NumericSchema('expectedMaximum'),
'textValue': schema.Scalar('textValue', basestring),
'expectedRegex': schema.Scalar('expectedRegex', basestring),
'serializedValue': schema.Scalar('serializedValue', basestring)
}))
SCHEMA = schema.FixedDict(
'parameters.value',
items={},
optional_items={
'description': schema.Scalar('description', basestring),
'group': schema.Scalar('group', basestring),
'valueUnit': schema.Scalar('valueUnit', basestring),
'type': schema.Scalar('type', basestring, list(PARAM_TYPE)),
'data': DATA_SCHEMA})
kwargs['schema'] = SCHEMA
return testlog_validator.Validator.Dict(*args, **kwargs)
FIELDS = {
'filePath': (False, testlog_validator.Validator.String),
'serialNumbers': (False, _ValidatorSerialNumberWrapper),
'parameters': (False, _ValidatorParameterWrapper),
}
@classmethod
def GetEventType(cls):
return 'station.status'
@staticmethod
def _CreateParamValueDict(value, min_val=None, max_val=None, regex=None):
"""Checks types and returns a dict that aligns with Testlog Playbook."""
value_dict = {}
if isinstance(value, basestring):
value_dict['textValue'] = value
if min_val is not None or max_val is not None:
raise ValueError('This should not happen!')
if regex:
value_dict['expectedRegex'] = regex
elif isinstance(value, (int, long, float)):
value_dict['numericValue'] = value
if regex:
raise ValueError('This should not happen!')
if min_val is not None:
value_dict['expectedMinimum'] = min_val
if max_val is not None:
value_dict['expectedMaximum'] = max_val
else:
value_dict['serializedValue'] = json.dumps(value)
return value_dict
def _CheckAndCreateParam(self, name):
if 'parameters' not in self._data or name not in self['parameters']:
self['parameters'] = {
'key': name,
'value': {'type': PARAM_TYPE.measurement, 'data': []}
}
def _LogParamValue(self, name, value_dict):
self._CheckAndCreateParam(name)
group = self['parameters'][name].get('group', None)
if group and group != self.in_group:
raise ValueError('The grouped parameter should be used in the '
'GroupChecker')
self['parameters'][name]['data'].append(value_dict)
def AddSerialNumber(self, name, value):
"""Adds serial numbers."""
self['serialNumbers'] = {'key': name, 'value': value}
def LogParam(self, name, value):
"""Logs parameter as specified in Testlog API."""
value_dict = StationStatus._CreateParamValueDict(value)
self._LogParamValue(name, value_dict)
return self
# pylint: disable=redefined-builtin
def CheckNumericParam(self, name, value, min=None, max=None):
"""Checks and logs numeric parameter as specified in Testlog API.
We use testlog_utils.IsInRange to perform the check.
"""
if not isinstance(value, (int, long, float)):
raise ValueError('%r is not a numeric' % value)
value_dict = StationStatus._CreateParamValueDict(value, min, max)
# Check the result
result = testlog_utils.IsInRange(value, min, max)
value_dict['status'] = 'PASS' if result else 'FAIL'
self._LogParamValue(name, value_dict)
return result
def CheckTextParam(self, name, value, regex=None):
"""Checks and logs text parameter as specified in Testlog API.
We use re.search to perform the check.
"""
if not isinstance(value, basestring):
raise ValueError('%r is not a text' % value)
value_dict = StationStatus._CreateParamValueDict(value, regex=regex)
# Check the result
result = True
if not re.search(regex, value):
result = False
value_dict['status'] = 'PASS' if result else 'FAIL'
self._LogParamValue(name, value_dict)
return result
def UpdateParam(self, name, description=None, value_unit=None,
param_type=None):
"""Updates parameter's metedata."""
self._CheckAndCreateParam(name)
if description:
self['parameters'][name]['description'] = description
if value_unit:
self['parameters'][name]['valueUnit'] = value_unit
if param_type:
self['parameters'][name]['type'] = param_type
in_group = None
def GroupParam(self, name, param_list):
"""Groups a list of parameters."""
if not isinstance(name, basestring) or not name:
raise ValueError('name(%r) should be a string and not empty' % name)
if not isinstance(param_list, list) or not param_list:
raise ValueError('param_list(%r) should be a list and not empty' %
param_list)
for param in param_list:
self._CheckAndCreateParam(param)
if self['parameters'][param]['data']:
raise ValueError(
'parameter(%s) should not have data before grouping' % param)
if self['parameters'][param].get('group', None):
raise ValueError(
'parameter(%s) should not be grouped twice' % param)
self['parameters'][param]['group'] = name
return _GroupChecker(self, name, param_list)
class StationInit(_StationBase):
"""Represents the Station being brought up or initialized.
Properties in FIELDS:
- count (integer, required): Number of times that this station has
been initialized so far.
- success (boolean, required): Whether or not this station was
successfully initialized.
- failureMessage (string, optional): A failure string explaining why
the station could not initialize.
"""
FIELDS = {
'count': (True, testlog_validator.Validator.Long),
'success': (True, testlog_validator.Validator.Boolean),
'failureMessage': (False, testlog_validator.Validator.String)
}
@classmethod
def GetEventType(cls):
return 'station.init'
class StationMessage(_StationBase):
"""Represents a Python message on the Station.
Properties in FIELDS:
- message (string, required): Message text. Can include stacktrace or
other debugging information if applicable.
- filePath (string, optional): Name or path of the program that
generated this message.
- lineNumber (integer, optional): Line number within the program that
generated this message.
- functionName (string, optional): Function name within the program
that generated this message.
- logLevel (string, optional): Log level of this message. Possible
values: DEBUG, INFO, WARNING, ERROR, CRITICAL
- testRunId (string, optional): If this message was associated with a
particular test run, its ID should be specified here.
"""
FIELDS = {
'message': (True, testlog_validator.Validator.String),
'filePath': (False, testlog_validator.Validator.String),
'lineNumber': (False, testlog_validator.Validator.Long),
'functionName': (False, testlog_validator.Validator.String),
'logLevel': (False, testlog_validator.Validator.String),
'testRunId': (False, testlog_validator.Validator.String)
}
@classmethod
def GetEventType(cls):
return 'station.message'
class StationTestRun(StationStatus):
"""Represents a test run on the Station.
Properties in FIELDS:
- testRunId (string, required): Unique UUID of the test run. Since one
test run may output multiple test_run events (showing the progress of
the test run), we use testRunId to identify them as the same test.
- testName (string, required): A name identifying this test with its
particular configuration. Sometimes, a test might run multiple times
with different configurations in the same project. This field is used
to separate these configurations.
- testType (string, required): A name identifying this type of test.
If it runs multiple times with different configurations, use testName
to differentiate.
- arguments (dictionary, optional): A dictionary representing the arguments
of the test configuration.
- status (string, required): The current status of the test run.
Possible values: STARTING, RUNNING, FAIL, PASS, UNKNOWN
- startTime (number, required): Time in seconds since the epoch when the
test started.
- endTime (number, optional): Time in seconds since the epoch when the test
ended.
- duration (number, optional): How long the test took to complete.
Should be the same as endTime - startTime. Included for convenience.
Measured in seconds.
- operatorId (string, optional): A unique identifier for the operator
running this test.
- attachments (dictionary, optional): List of attachment files associated
with this test run. If the JSON's location does not imply the path to
attachment file, the full path can be specified.
May also be a gs:// path.
- failures (array, optional): List of failures associated with this test
run. It is recommended for each parameter or series failure to have an
entry in this list, but functional or environmental failures may also be
included (e.g. device not connected). The same failure code may be listed
multiple times with different details strings.
"""
def _ValidatorArgumentWrapper(*args, **kwargs):
# pylint: disable=no-method-argument
SCHEMA = schema.FixedDict(
'arguments.value',
items={'value': schema.Scalar('value', basestring)},
optional_items={
'description': schema.Scalar('description', basestring)})
kwargs['schema'] = SCHEMA
return testlog_validator.Validator.Dict(*args, **kwargs)
def _ValidatorFailureWrapper(*args, **kwargs):
# pylint: disable=no-method-argument
SCHEMA = schema.FixedDict(
'failures.value',
items={'code': schema.Scalar('code', basestring),
'details': schema.Scalar('details', basestring)})
kwargs['schema'] = SCHEMA
return testlog_validator.Validator.List(*args, **kwargs)
def _ValidatorAttachmentWrapper(*args, **kwargs):
# pylint: disable=no-method-argument
SCHEMA = schema.FixedDict(
'attachments.value',
items={'path': schema.Scalar('path', basestring),
'mimeType': schema.Scalar('mimeType', basestring)},
optional_items={
'description': schema.Scalar('description', basestring)})
kwargs['schema'] = SCHEMA
return testlog_validator.Validator.Dict(*args, **kwargs)
FIELDS = {
'testRunId': (True, testlog_validator.Validator.String),
'testName': (True, testlog_validator.Validator.String),
'testType': (True, testlog_validator.Validator.String),
'arguments': (False, _ValidatorArgumentWrapper),
'status': (True, testlog_validator.Validator.Status),
'startTime': (True, testlog_validator.Validator.Number),
'endTime': (False, testlog_validator.Validator.Number),
'duration': (False, testlog_validator.Validator.Number),
'operatorId': (False, testlog_validator.Validator.String),
'attachments': (False, _ValidatorAttachmentWrapper),
'failures': (False, _ValidatorFailureWrapper),
}
# Possible values for the `status` field.
# TODO(itspeter): Check on log when will UNKNOWN emitted.
STATUS = type_utils.Enum([
'STARTING', 'RUNNING', 'FAIL', 'PASS',
'UNKNOWN']) # States that doesn't apply for StationTestRun
@classmethod
def GetEventType(cls):
return 'station.test_run'
def AttachFile(self, path, mime_type, name, delete=True, description=None):
"""Attaches a file as specified in Testlog API."""
value = {'mimeType': mime_type,
'path': path}
if description:
value.update({'description': description})
testlog_validator.Validator.Attachment(
self, name, value, delete, GetGlobalTestlog)
self['attachments'] = {'key': name, 'value': value}
return self
def AttachContent(self, content, name, description=None):
"""Attaches a file with content."""
with file_utils.UnopenedTemporaryFile() as path:
with open(path, 'w') as f:
f.write(content)
return self.AttachFile(
path, 'text/plain', name, delete=False, description=description)
def AddArgument(self, key, value, description=None):
"""Adds arguments."""
value_dict = {'value': json.dumps(value)}
if description:
value_dict['description'] = description
self['arguments'] = {'key': key, 'value': value_dict}
return self
def AddFailure(self, code, details):
"""Adds failures."""
# TODO(itspeter): Unittest.
# Get the numeric code unified into hex format.
if isinstance(code, (int, long)):
code = '0x%x' % code
if not isinstance(code, basestring):
raise ValueError('code(%r) should be a string or an integer' % code)
if not isinstance(details, basestring):
raise ValueError('details(%r) should be a string' % details)
self['failures'] = {'code': code, 'details': details}
return self