blob: e57b93ae01a382cc35451b19ad0b8ddcd0f8f202 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
'''
This module provides both client and server side of a XML RPC based server which
can be used to handle factory test states (status) and shared persistent data.
'''
import factory_common # pylint: disable=W0611
import glob
import logging
import mimetypes
import os
import Queue
import re
import shelve
import shutil
import SocketServer
import sys
import threading
import time
import yaml
from hashlib import sha1
from uuid import uuid4
from jsonrpclib import jsonclass
from jsonrpclib import jsonrpc
from jsonrpclib import SimpleJSONRPCServer
from cros.factory import system
from cros.factory.test import factory
from cros.factory.test.factory import TestState
from cros.factory.test import unicode_to_string
from cros.factory.utils.shelve_utils import OpenShelfOrBackup
from cros.factory.utils.string_utils import CleanUTF8
DEFAULT_FACTORY_STATE_PORT = 0x0FAC
DEFAULT_FACTORY_STATE_ADDRESS = 'localhost'
DEFAULT_FACTORY_STATE_BIND_ADDRESS = 'localhost'
DEFAULT_FACTORY_STATE_FILE_PATH = factory.get_state_root()
def _synchronized(f):
'''
Decorates a function to grab a lock.
'''
def wrapped(self, *args, **kw):
with self._lock: # pylint: disable=W0212
return f(self, *args, **kw)
return wrapped
def clear_state(state_file_path=None):
'''Clears test state (removes the state file path).
Args:
state_file_path: Path to state; uses the default path if None.
'''
state_file_path = state_file_path or DEFAULT_FACTORY_STATE_FILE_PATH
logging.warn('Clearing state file path %s' % state_file_path)
if os.path.exists(state_file_path):
shutil.rmtree(state_file_path)
class PathResolver(object):
'''Resolves paths in URLs.'''
def __init__(self):
self._paths = {}
def AddPath(self, url_path, local_path):
'''Adds a prefix mapping:
For example,
AddPath('/foo', '/usr/local/docs')
will cause paths to resolved as follows:
/foo -> /usr/local/docs
/foo/index.html -> /usr/local/docs/index.html
Args:
url_path: The path in the URL
'''
self._paths[url_path] = local_path
def Resolve(self, url_path):
'''Resolves a path mapping.
Returns None if no paths match.'
Args:
url_path: A path in a URL (starting with /).
'''
if not url_path.startswith('/'):
return None
prefix = url_path
while prefix != '':
local_prefix = self._paths.get(prefix)
if local_prefix:
return local_prefix + url_path[len(prefix):]
prefix, _, _ = prefix.rpartition('/')
root_prefix = self._paths.get('/')
if root_prefix:
return root_prefix + url_path
@unicode_to_string.UnicodeToStringClass
class FactoryState(object):
'''
The core implementation for factory state control.
The major provided features are:
SHARED DATA
You can get/set simple data into the states and share between all tests.
See get_shared_data(name) and set_shared_data(name, value) for more
information.
TEST STATUS
To track the execution status of factory auto tests, you can use
get_test_state, get_test_states methods, and update_test_state
methods.
All arguments may be provided either as strings, or as Unicode strings in
which case they are converted to strings using UTF-8. All returned values
are strings (not Unicode).
This object is thread-safe.
See help(FactoryState.[methodname]) for more information.
Properties:
_generated_files: Map from UUID to paths on disk. These are
not persisted on disk (though they could be if necessary).
_generated_data: Map from UUID to (mime_type, data) pairs for
transient objects to serve.
_generated_data_expiration: Priority queue of expiration times
for objects in _generated_data.
'''
def __init__(self, state_file_path=None):
'''
Initializes the state server.
Parameters:
state_file_path: External file to store the state information.
'''
state_file_path = state_file_path or DEFAULT_FACTORY_STATE_FILE_PATH
if not os.path.exists(state_file_path):
os.makedirs(state_file_path)
self._tests_shelf = OpenShelfOrBackup(state_file_path + '/tests')
self._data_shelf = OpenShelfOrBackup(state_file_path + '/data')
self._lock = threading.RLock()
self.test_list_struct = None
self._generated_files = {}
self._generated_data = {}
self._generated_data_expiration = Queue.PriorityQueue()
self._resolver = PathResolver()
if TestState not in jsonclass.supported_types:
jsonclass.supported_types.append(TestState)
@_synchronized
def close(self):
'''
Shuts down the state instance.
'''
for shelf in [self._tests_shelf,
self._data_shelf]:
try:
shelf.close()
except:
logging.exception('Unable to close shelf')
@_synchronized
def update_test_state(self, path, **kw):
'''
Updates the state of a test.
See TestState.update for the allowable keyword arguments.
@param path: The path to the test (see FactoryTest for a description
of test paths).
@param kw: See TestState.update for allowable arguments (e.g.,
status and increment_count).
@return: A tuple containing the new state, and a boolean indicating
whether the state was just changed.
'''
state = self._tests_shelf.get(path)
old_state_repr = repr(state)
changed = False
if not state:
changed = True
state = TestState()
changed = changed | state.update(**kw) # Don't short-circuit
if changed:
logging.debug('Updating test state for %s: %s -> %s',
path, old_state_repr, state)
self._tests_shelf[path] = state
self._tests_shelf.sync()
return state, changed
@_synchronized
def get_test_state(self, path):
'''
Returns the state of a test.
'''
return self._tests_shelf[path]
@_synchronized
def get_test_paths(self):
'''
Returns a list of all tests' paths.
'''
return self._tests_shelf.keys()
@_synchronized
def get_test_states(self):
'''
Returns a map of each test's path to its state.
'''
return dict(self._tests_shelf)
@_synchronized
def clear_test_state(self):
'''
Clears all test state.
'''
self._tests_shelf.clear()
def get_test_list(self):
'''
Returns the test list.
'''
return self.test_list.to_struct()
@_synchronized
def set_shared_data(self, *key_value_pairs):
'''
Sets shared data items.
Args:
key_value_pairs: A series of alternating keys and values
(k1, v1, k2, v2...). In the simple case this can just
be a single key and value.
'''
assert len(key_value_pairs) % 2 == 0, repr(key_value_pairs)
for i in range(0, len(key_value_pairs), 2):
self._data_shelf[key_value_pairs[i]] = key_value_pairs[i + 1]
self._data_shelf.sync()
@_synchronized
def get_shared_data(self, key, optional=False):
'''
Retrieves a shared data item.
Args:
key: The key whose value to retrieve.
optional: True to return None if not found; False to raise
a KeyError.
'''
if optional:
return self._data_shelf.get(key)
else:
return self._data_shelf[key]
@_synchronized
def has_shared_data(self, key):
'''
Returns if a shared data item exists.
'''
return key in self._data_shelf
@_synchronized
def del_shared_data(self, key, optional=False):
'''
Deletes a shared data item.
Args:
key: The key whose value to retrieve.
optional: False to raise a KeyError if not found.
'''
try:
del self._data_shelf[key]
except KeyError:
if not optional:
raise
@_synchronized
def update_shared_data_dict(self, key, new_data):
'''
Updates values a shared data item whose value is a dictionary.
This is roughly equivalent to
data = get_shared_data(key) or {}
data.update(new_data)
set_shared_data(key, data)
return data
except that it is atomic.
Args:
key: The key for the data item to update.
new_data: A dictionary of items to update.
Returns:
The updated value.
'''
data = self._data_shelf.get(key, {})
data.update(new_data)
self._data_shelf[key] = data
return data
@_synchronized
def delete_shared_data_dict_item(self, shared_data_key,
delete_keys, optional):
'''
Deletes items from a shared data item whose value is a dict.
This is roughly equivalent to
data = get_shared_data(shared_data_key) or {}
for key in delete_keys:
try:
del data[key]
except KeyError:
if not optional:
raise
set_shared_data(shared_data_key, data)
return data
except that it is atomic.
Args:
shared_data_key: The key for the data item to update.
delete_keys: A list of keys to delete from the dict.
optional: False to raise a KeyError if not found.
Returns:
The updated value.
'''
data = self._data_shelf.get(shared_data_key, {})
for key in delete_keys:
try:
del data[key]
except KeyError:
if not optional:
raise
self._data_shelf[shared_data_key] = data
return data
@_synchronized
def append_shared_data_list(self, key, new_item):
'''
Appends an item to a shared data item whose value is a list.
This is roughly equivalent to
data = get_shared_data(key) or []
data.append(new_item)
set_shared_data(key, data)
return data
except that it is atomic.
Args:
key: The key for the data item to append.
new_item: The item to be appended.
Returns:
The updated value.
'''
data = self._data_shelf.get(key, [])
data.append(new_item)
self._data_shelf[key] = data
return data
def get_test_history(self, *test_paths):
'''Returns metadata for all previous (and current) runs of a test.'''
ret = []
for path in test_paths:
for f in glob.glob(os.path.join(factory.get_test_data_root(),
path + '-*',
'metadata')):
try:
ret.append(yaml.load(open(f)))
except:
logging.exception('Unable to load test metadata %s', f)
ret.sort(key=lambda item: item.get('init_time', None))
return ret
def get_test_history_entry(self, path, invocation):
'''Returns metadata and log for one test invocation.'''
test_dir = os.path.join(factory.get_test_data_root(),
'%s-%s' % (path, invocation))
log_file = os.path.join(test_dir, 'log')
try:
log = CleanUTF8(open(log_file).read())
except:
# Oh well
logging.exception('Unable to read log file %s', log_file)
log = None
return {'metadata': yaml.load(open(os.path.join(test_dir, 'metadata'))),
'log': log}
@_synchronized
def url_for_file(self, path):
'''Returns a URL that can be used to serve a local file.
Args:
path: path to the local file
Returns:
url: A (possibly relative) URL that refers to the file
'''
uuid = str(uuid4())
uri_path = '/generated-files/%s/%s' % (uuid, os.path.basename(path))
self._generated_files[uuid] = path
return uri_path
@_synchronized
def url_for_data(self, mime_type, data, expiration_secs=None):
'''Returns a URL that can be used to serve a static collection
of bytes.
Args:
mime_type: MIME type for the data
data: Data to serve
expiration_secs: If not None, the number of seconds in which
the data will expire.
'''
uuid = str(uuid4())
self._generated_data[uuid] = mime_type, data
if expiration_secs:
now = time.time()
self._generated_data_expiration.put(
(now + expiration_secs, uuid))
# Reap old items.
while True:
try:
item = self._generated_data_expiration.get_nowait()
except Queue.Empty:
break
if item[0] < now:
del self._generated_data[item[1]]
else:
# Not expired yet; put it back and we're done
self._generated_data_expiration.put(item)
break
uri_path = '/generated-data/%s' % uuid
return uri_path
@_synchronized
def register_path(self, url_path, local_path):
self._resolver.AddPath(url_path, local_path)
def get_system_status(self):
'''Returns system status information.
This may include system load, battery status, etc. See
system.SystemStatus().
'''
return system.SystemStatus().__dict__
def get_instance(address=DEFAULT_FACTORY_STATE_ADDRESS,
port=DEFAULT_FACTORY_STATE_PORT):
'''
Gets an instance (for client side) to access the state server.
@param address: Address of the server to be connected.
@param port: Port of the server to be connected.
@return An object with all public functions from FactoryState.
See help(FactoryState) for more information.
'''
return jsonrpc.ServerProxy('http://%s:%d' % (address, port),
verbose=False)
class MyJSONRPCRequestHandler(SimpleJSONRPCServer.SimpleJSONRPCRequestHandler):
def do_GET(self):
logging.debug('HTTP request for path %s', self.path)
handler = self.server.handlers.get(self.path)
if handler:
return handler(self)
match = re.match('^/generated-data/([-0-9a-f]+)$', self.path)
if match:
generated_data = self.server._generated_data.get(match.group(1))
if not generated_data:
logging.warn('Unknown or expired generated data %s',
match.group(1))
self.send_response(404)
return
mime_type, data = generated_data
self.send_response(200)
self.send_header('Content-Type', mime_type)
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
if self.path.endswith('/'):
self.path += 'index.html'
if ".." in self.path.split("/"):
logging.warn("Invalid path")
self.send_response(404)
return
mime_type = mimetypes.guess_type(self.path)
if not mime_type:
logging.warn("Unable to guess MIME type")
self.send_response(404)
return
local_path = None
match = re.match('^/generated-files/([-0-9a-f]+)/', self.path)
if match:
local_path = self.server._generated_files.get(match.group(1))
if not local_path:
logging.warn('Unknown generated file %s in path %s',
match.group(1), self.path)
self.send_response(404)
return
local_path = self.server._resolver.Resolve(self.path)
if not local_path or not os.path.exists(local_path):
logging.warn("File not found: %s", (local_path or self.path))
self.send_response(404)
return
self.send_response(200)
self.send_header("Content-Type", mime_type[0])
self.send_header("Content-Length", os.path.getsize(local_path))
self.end_headers()
with open(local_path) as f:
shutil.copyfileobj(f, self.wfile)
class ThreadedJSONRPCServer(SocketServer.ThreadingMixIn,
SimpleJSONRPCServer.SimpleJSONRPCServer):
'''The JSON/RPC server.
Properties:
handlers: A map from URLs to callbacks handling them. (The callback
takes a single argument: the request to handle.)
'''
def __init__(self, *args, **kwargs):
SimpleJSONRPCServer.SimpleJSONRPCServer.__init__(self, *args, **kwargs)
self.handlers = {}
def add_handler(self, url, callback):
self.handlers[url] = callback
def create_server(state_file_path=None, bind_address=None, port=None):
'''
Creates a FactoryState object and an JSON/RPC server to serve it.
@param state_file_path: The path containing the saved state.
@param bind_address: Address to bind to, defaulting to
DEFAULT_FACTORY_STATE_BIND_ADDRESS.
@param port: Port to bind to, defaulting to DEFAULT_FACTORY_STATE_PORT.
@return A tuple of the FactoryState instance and the SimpleJSONRPCServer
instance.
'''
# We have some icons in SVG format, but this isn't recognized in
# the standard Python mimetypes set.
mimetypes.add_type('image/svg+xml', '.svg')
if not bind_address:
bind_address = DEFAULT_FACTORY_STATE_BIND_ADDRESS
if not port:
port = DEFAULT_FACTORY_STATE_PORT
instance = FactoryState(state_file_path)
instance._resolver.AddPath(
'/',
os.path.join(factory.FACTORY_PACKAGE_PATH, 'goofy/static'))
server = ThreadedJSONRPCServer(
(bind_address, port),
requestHandler=MyJSONRPCRequestHandler,
logRequests=False)
# Give the server the information it needs to resolve URLs.
server._generated_files = instance._generated_files
server._generated_data = instance._generated_data
server._resolver = instance._resolver
server.register_introspection_functions()
server.register_instance(instance)
server.web_socket_handler = None
return instance, server