blob: 0b1dd240ca218bec24c06913e1b868e6f6b8605d [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common Umpire RPC Commands."""
import csv
import glob
import os
import shutil
import tarfile
import time
import xmlrpclib
from twisted.internet import threads
from twisted.web import xmlrpc
import factory_common # pylint: disable=unused-import
from cros.factory.umpire import common
from cros.factory.umpire.server import umpire_env
from cros.factory.umpire.server import umpire_rpc
from cros.factory.umpire.server import utils
from cros.factory.utils import file_utils
from cros.factory.utils import webservice_utils
def Fault(message, reason=xmlrpclib.INVALID_METHOD_PARAMS):
"""Instantiates an XMLRPC Fault() object.
xmlrpc.Fault() notifies the RPC client that remote function was terminated
incorrectly.
"""
return xmlrpc.Fault(reason, message)
def GetServerIpPortFromRequest(request, env):
server_host = request.requestHeaders.getRawHeaders('host')[0]
server_ip, unused_sep, server_port = server_host.partition(':')
# The Host HTTP header do contains port when using xmlrpclib.ServerProxy,
# but it doesn't contains port when using twisted.web.xmlrpc.Proxy.
# Since currently the only places that use twisted.web.xmlrpc.Proxy are
# all inside unittests, and the returned url is not used in unittests, we
# just add some default value to prevent unittest from failing.
# TODO(pihsun): Figure out a better way to handle the case when port is
# missing from Host header.
server_port = int(server_port or env.umpire_base_port)
return (server_ip, server_port)
class RootDUTCommands(umpire_rpc.UmpireRPC):
"""Root DUT (Device Under Test) remote procedures.
Root commands for v1 and v2 compatibilities.
RPC URL:
http://umpire_server_address:umpire_port/RPC2
"""
@umpire_rpc.RPCCall
def Ping(self):
return {
'version': common.UMPIRE_DUT_RPC_VERSION,
'project': self.env.project
}
class UmpireDUTCommands(umpire_rpc.UmpireRPC):
"""Umpire DUT remote procedures.
RPC URL:
http://umpire_server_address:umpire_port/umpire
"""
@umpire_rpc.RPCCall
def GetTime(self):
return time.time()
# TODO(hsinyi): Remove ListParameters and GetParameter and modify related
# pytest codes.
@umpire_rpc.RPCCall
@utils.Deprecate
def ListParameters(self, pattern):
"""Lists files that match the pattern in parameters directory.
Args:
pattern: A pattern string for glob to list matched files.
Returns:
A list of matched files.
Raises:
ValueError if caller is trying to query outside parameters directory.
"""
parameters_dir = os.path.join(self.env.base_dir, 'parameters')
glob_pathname = os.path.abspath(os.path.join(parameters_dir, pattern))
if not glob_pathname.startswith(parameters_dir):
raise ValueError('ListParameters is limited to parameter directory')
matched_file = glob.glob(glob_pathname)
# Only return files.
matched_file = list(filter(os.path.isfile, matched_file))
return [os.path.relpath(x, parameters_dir) for x in matched_file]
@umpire_rpc.RPCCall
@utils.Deprecate
def GetParameter(self, path):
"""Gets the assigned parameter file.
Args:
path: A relative path for locating the parameter.
Returns:
Content of the parameter. It is always wrapped in a shopfloor.Binary
object to provides best flexibility.
Raises:
ValueError if the parameter does not exist or is not under
parameters folder.
"""
parameters_dir = os.path.join(self.env.base_dir, 'parameters')
abspath = os.path.abspath(os.path.join(parameters_dir, path))
if not abspath.startswith(parameters_dir):
raise ValueError('GetParameter is limited to parameter directory')
if not os.path.isfile(abspath):
raise ValueError('File does not exist or it is not a file')
return xmlrpc.Binary(file_utils.ReadFile(abspath))
@umpire_rpc.RPCCall
def GetParameters(self, namespace=None, name=None):
"""Gets parameter components by querying namespace and component name.
Args:
namespace: relative directory path of queried component(s). None if
they are in root directory.
name: component name of queried component. None if targeting all
components under namespace.
Returns:
Content of the parameter. It is always wrapped in a shopfloor.Binary
object to provides best flexibility.
Raises:
ValueError if the parameter does not exist.
"""
abspaths = self.env.parameters.QueryParameters(namespace, name)
if not abspaths:
raise ValueError('File does not exist or it is not a file')
# Pack files to tar
with file_utils.UnopenedTemporaryFile() as tar_path:
tar = tarfile.open(tar_path, 'w')
for name, path in abspaths:
tar.add(path, arcname=name)
tar.close()
return xmlrpc.Binary(file_utils.ReadFile(tar_path))
@umpire_rpc.RPCCall
@xmlrpc.withRequest
def GetCROSPayloadURL(self, request, x_umpire_dut):
"""Gets cros_payload JSON file URL of the matched bundle.
Args:
x_umpire_dut: DUT information in GetXUmpireDUT (str) or _GetXUmpireDUTDict
(dict) format.
Returns:
URL of cros_payload JSON file, or empty string if no available bundle.
"""
del x_umpire_dut # Unused.
bundle = self.env.config.GetActiveBundle()
if bundle:
return 'http://%s:%d/res/%s' % (
GetServerIpPortFromRequest(request, self.env) + (bundle['payloads'],))
return ''
class ShopfloorServiceDUTCommands(umpire_rpc.UmpireRPC):
"""Shopfloor Service for DUT (Device Under Test) to invoke.
RPC URL:
http://umpire_server_address:umpire_port/umpire
"""
def __init__(self, daemon):
super(ShopfloorServiceDUTCommands, self).__init__(daemon)
# Reuse ServerProxy so that we don't need to create a new one for every
# request.
self._url = None
self._proxy = None
@property
def service(self):
url = self.env.shopfloor_service_url
if self._url != url:
self._proxy = webservice_utils.CreateWebServiceProxy(
url, use_twisted=True)
self._url = url
return self._proxy
@umpire_rpc.RPCCall
def GetVersion(self):
"""Returns the version of supported protocol."""
return self.service.callRemote('GetVersion')
@umpire_rpc.RPCCall
def NotifyStart(self, data, station):
"""Notifies shopfloor backend that DUT entered a manufacturing station."""
return self.service.callRemote('NotifyStart', data, station)
@umpire_rpc.RPCCall
def NotifyEnd(self, data, station):
"""Notifies shopfloor backend that DUT leaves a manufacturing station."""
return self.service.callRemote('NotifyEnd', data, station)
@umpire_rpc.RPCCall
def NotifyEvent(self, data, event):
"""Notifies shopfloor backend that the DUT has performed an event."""
return self.service.callRemote('NotifyEvent', data, event)
@umpire_rpc.RPCCall
def GetDeviceInfo(self, data):
"""Returns information about the device's expected configuration."""
return self.service.callRemote('GetDeviceInfo', data)
@umpire_rpc.RPCCall
def ActivateRegCode(self, ubind_attribute, gbind_attribute, hwid):
"""Notifies shopfloor backend that DUT has deployed a registration code."""
return self.service.callRemote('ActivateRegCode', ubind_attribute,
gbind_attribute, hwid)
@umpire_rpc.RPCCall
def UpdateTestResult(self, data, test_id, status, details=None):
"""Sends the specified test result to shopfloor backend."""
return self.service.callRemote('UpdateTestResult', data, test_id, status,
details)
@umpire_rpc.RPCCall
@utils.Deprecate
def Finalize(self, serial_number):
"""Legacy from inform_shopfloor, not in Shopfloor Service API 1.0."""
return self.NotifyEvent({'serial_number': serial_number}, 'Finalize')
@umpire_rpc.RPCCall
@utils.Deprecate
def FinalizeFQC(self, serial_number):
"""Legacy from inform_shopfloor, not in Shopfloor Service API 1.0."""
return self.NotifyEvent({'serial_number': serial_number}, 'Refinalize')
class NewlineTerminatedCSVDialect(csv.excel):
lineterminator = '\n'
class LogDUTCommands(umpire_rpc.UmpireRPC):
"""DUT log upload procedures.
RPC URL:
http://umpire_server_address:umpire_port/umpire
"""
def _ReturnTrue(self, result):
"""Returns true."""
del result # Unused.
return True
def _UnwrapBlob(self, blob):
"""Unwraps a blob object."""
return blob.data if isinstance(blob, xmlrpclib.Binary) else blob
def _Now(self):
"""Gets current time."""
return time.gmtime(time.time())
def _SaveUpload(self, upload_type, file_name, content, mode='wb'):
"""Saves log file.
This function saves DUT data. Since file saving is a blocking call,
_SaveUpload() should be called in a separate thread context.
Example:
@umpire_rpc.RPCCall
def DUTUpload(...)
d = threads.deferToThread(lambda: self.SaveUpload(type, name, data))
return d
Args:
upload_type: one of LogRPCCommand.LOG_TYPES.
file_name: full basename of log file.
content: binary data.
mode: open file mode. 'wb' to write binary file, 'a' to append file.
"""
with file_utils.UnopenedTemporaryFile() as temp_path:
# To support paths in file_name, the save_dir will be splitted after
# concatenate to full save_path.
save_path = os.path.join(self.env.umpire_data_dir, upload_type,
time.strftime('%Y%m%d', self._Now()), file_name)
save_dir = os.path.dirname(os.path.abspath(save_path))
file_utils.TryMakeDirs(save_dir)
if mode == 'a' and os.path.isfile(save_path):
shutil.copy2(save_path, temp_path)
with open(temp_path, mode) as f:
f.write(content)
# Do not use os.rename() to move file. os.rename() behavior is OS
# dependent.
shutil.move(temp_path, save_path)
# temp_path (created by UnopenedTemporaryFile) will always be mode 0600
# for security reason, so we do want to change its permission to
# u+rw,go+r.
os.chmod(save_path, 0o644)
def _AppendCSV(self, file_name, entry, mode='a'):
"""Saves an entry to CSV file."""
file_utils.TryMakeDirs(os.path.dirname(file_name))
with open(file_name, mode) as f:
csv.writer(f, dialect=NewlineTerminatedCSVDialect).writerow(entry)
os.fdatasync(f.fileno())
@umpire_rpc.RPCCall
def UploadReport(self, serial, report_blob, report_name=None, stage=None):
"""Uploads a report file.
Args:
serial: A string of device serial number.
report_blob: Blob of compressed report to be stored (must be prepared by
shopfloor.Binary)
report_name: (Optional) Suggested report file name. This is usually
assigned by factory test client programs (ex, gooftool); however
server implementations still may use other names to store the report.
stage: (Optional) Current testing stage: SMT, FAT, RUNIN, FFT, or GRT.
Returns:
Deferred object that waits for log saving thread to complete.
RPC returns:
True on success.
Raises:
ValueError if serial is invalid, or other exceptions defined by individual
modules. Note this will be converted to xmlrpclib.Fault when being used as
a XML-RPC server module.
"""
opt_name = ('-' + report_name) if report_name else ''
file_name = '{stage}{opt_name}-{serial}-{gmtime}.rpt.xz'.format(
stage=stage or 'Unknown', opt_name=opt_name, serial=serial,
gmtime=time.strftime('%Y%m%dT%H%M%SZ', self._Now()))
d = threads.deferToThread(lambda: self._SaveUpload(
'report', file_name, self._UnwrapBlob(report_blob)))
d.addCallback(self._ReturnTrue)
return d
@umpire_rpc.RPCCall
def UploadCSVEntry(self, csv_name, data):
"""Uploads a list-type data and save in CSV file.
This is usually used for storing device data that should not be associated
with device identifiers (like device serial number), for example
registration codes.
Devices that can be associated with serial numbers should use TestLog.
Args:
csv_name: The base file name of target CSV file, without suffix.
data: A list or entry to be appended into CSV file.
"""
file_name = os.path.join(self.env.umpire_data_dir, 'csv', csv_name + '.csv')
d = threads.deferToThread(lambda: self._AppendCSV(file_name, data))
d.addCallback(self._ReturnTrue)
return d
@umpire_rpc.RPCCall
@utils.Deprecate
def UploadEvent(self, log_name, chunk):
"""Uploads a chunk of events.
In addition to append events to a single file, we appends event to a
directory that split on an daily basis.
Args:
log_name: A string of the event log filename. Event logging module creates
event files with an unique identifier (uuid) as part of the filename.
chunk: A string containing one or more events. Events are in YAML format
and separated by a "---" as specified by YAML. A chunk contains one or
more events with separator.
Returns:
Deferred object that waits for log saving thread to complete.
RPC returns:
True on success.
Raises:
IOError if unable to save the chunk of events.
"""
d = threads.deferToThread(lambda: self._SaveUpload(
'eventlog', log_name, self._UnwrapBlob(chunk), mode='a'))
d.addCallback(self._ReturnTrue)
return d
@umpire_rpc.RPCCall
@utils.Deprecate
def SaveAuxLog(self, name, contents):
"""Saves an auxiliary log into the umpire_data/aux_logs directory.
In general, this should probably be compressed to save space.
Args:
name: Name of the report. Any existing log with the same name will be
overwritten. Subdirectories are allowed.
contents: Contents of the report. If this is binary, it should be
wrapped in a shopfloor.Binary object.
"""
# Disallow absolute paths and non-normalized paths.
if os.path.isabs(name):
raise ValueError('Disallow absolute paths')
if name != os.path.normpath(name):
raise ValueError('Disallow non-normalized paths')
d = threads.deferToThread(
lambda: self._SaveUpload('aux_log', name, self._UnwrapBlob(contents)))
d.addCallback(self._ReturnTrue)
return d
@umpire_rpc.RPCCall
@xmlrpc.withRequest
def GetFactoryLogPort(self, request):
"""Fetches system logs rsync port."""
unused_server_ip, server_port = GetServerIpPortFromRequest(
request, self.env)
return umpire_env.GetRsyncPortFromBasePort(server_port)