blob: 73eb2adf6c39104912621b187e698690490ac47e [file] [log] [blame]
# Copyright 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import abc
import cPickle as pickle
import errno
import json
import logging
import os
import Queue
import socket
import SocketServer
import sys
import tempfile
import threading
import time
import traceback
import factory_common # pylint: disable=unused-import
from cros.factory.utils import file_utils
from cros.factory.utils import type_utils
# Environment variable storing the path to the endpoint.
CROS_FACTORY_EVENT = 'CROS_FACTORY_EVENT'
# Maximum allowed size for messages. If messages are bigger than this, they
# will be truncated by the seqpacket sockets.
_MAX_MESSAGE_SIZE = 65535
# Maximum size of logged event data in debug log. Sometimes a test may pass
# large data to JavaScript functions. If all of it is logged, it can easily take
# up all disk space.
_MAX_EVENT_SIZE_FOR_DEBUG_LOG = 512
# Hello message send by the server and expected as the first datagram by
# the client.
_HELLO_MESSAGE = '\1'
def json_default_repr(obj):
"""Converts an object into a suitable representation for
JSON-ification.
If obj is an object, this returns a dict with all properties
not beginning in '_'. Otherwise, the original object is
returned.
"""
if isinstance(obj, object):
return dict([(k, v) for k, v in obj.__dict__.iteritems()
if k[0] != '_'])
else:
return obj
class Event(object):
"""An event object that may be written to the event server.
E.g.:
event = Event(Event.Type.STATE_CHANGE,
test='foo.bar',
state=TestState(...))
"""
Type = type('Event.Type', (), {
# The state of a test has changed.
'STATE_CHANGE': 'goofy:state_change',
# The UI has come up.
'UI_READY': 'goofy:ui_ready',
# Tells goofy to clear all state and restart testing.
'RESTART_TESTS': 'goofy:restart_tests',
# Tells goofy to run all tests that haven't been run yet.
'AUTO_RUN': 'goofy:auto_run',
# Tells goofy to set all failed tests' state to untested and re-run.
'RUN_TESTS_WITH_STATUS': 'goofy:run_tests_with_status',
# Clears state of all tests underneath the given path.
'CLEAR_STATE': 'goofy:clear_state',
# Tells the UI about a single new line in the log.
'LOG': 'goofy:log',
# A hello message to a new WebSocket. Contains a 'uuid' parameter
# identification the particular invocation of the server.
'HELLO': 'goofy:hello',
# A keepalive message from the UI. Contains a 'uuid' parameter
# containing the same 'uuid' value received when the client received
# its HELLO.
'KEEPALIVE': 'goofy:keepalive',
# Initializes the test UI.
'INIT_TEST_UI': 'goofy:init_test_ui',
# Sets layout for the test UI.
'SET_TEST_UI_LAYOUT': 'goofy:set_test_ui_layout',
# Sets the UI in the test pane.
'SET_HTML': 'goofy:set_html',
# Runs JavaScript in the test pane.
'RUN_JS': 'goofy:run_js',
# Performs a remote procedure call to the Chrome extension inside UI.
'EXTENSION_RPC': 'goofy:extension_rpc',
# Event from a test UI.
'TEST_UI_EVENT': 'goofy:test_ui_event',
# Message from the test UI that it has finished.
'END_TEST': 'goofy:end_test',
# Message to tell the test UI to destroy itself.
'DESTROY_TEST': 'goofy:destroy_test',
# Message telling Goofy should re-read system info.
'UPDATE_SYSTEM_INFO': 'goofy:update_system_info',
# Tells Goofy to stop all tests.
'STOP': 'goofy:stop',
# Indicates a pending shutdown.
'PENDING_SHUTDOWN': 'goofy:pending_shutdown',
# Cancels a pending shutdown.
'CANCEL_SHUTDOWN': 'goofy:cancel_shutdown',
# Tells UI to update notes.
'UPDATE_NOTES': 'goofy:update_notes',
# Diagnosis Tool's events
'DIAGNOSIS_TOOL_EVENT': 'goofy:diagnosis_tool:event',
# Enable/disable key filtering
'KEY_FILTER_MODE': 'goofy:key_filter_mode',
# Notifies that factory server config (URL, timeout) is changed.
'FACTORY_SERVER_CONFIG_CHANGED': 'factory_server:config_changed',
})
def __init__(self, type, **kw): # pylint: disable=redefined-builtin
self.type = type
self.timestamp = time.time()
for k, v in kw.iteritems():
setattr(self, k, v)
def __repr__(self):
return type_utils.StdRepr(
self,
extra=[
'type=%s' % self.type,
'timestamp=%s' % time.ctime(self.timestamp)],
excluded_keys=['type', 'timestamp'])
def to_json(self):
return json.dumps(self, default=json_default_repr)
@staticmethod
def from_json(encoded_event):
kw = type_utils.UnicodeToString(json.loads(encoded_event))
type = kw.pop('type') # pylint: disable=redefined-builtin
return Event(type=type, **kw)
_unique_id_lock = threading.Lock()
_unique_id = 1
def get_unique_id():
global _unique_id # pylint: disable=global-statement
with _unique_id_lock:
ret = _unique_id
_unique_id += 1
return ret
class EventServerRequestHandler(SocketServer.BaseRequestHandler):
"""Request handler for the event server.
This class is agnostic to message format (except for logging).
"""
def setup(self):
SocketServer.BaseRequestHandler.setup(self)
threading.current_thread().name = (
'EventServerRequestHandler-%d' % get_unique_id())
# A thread to be used to send messages that are posted to the queue.
self.send_thread = None
# A queue containing messages.
self.queue = Queue.Queue()
def handle(self):
# The handle() methods is run in a separate thread per client
# (since EventServer has ThreadingMixIn).
logging.debug('Event server: handling new client')
try:
self.server._subscribe(self.queue) # pylint: disable=protected-access
# Send hello, now that we've subscribed. Client will wait for
# it before returning from the constructor.
self.request.send(_HELLO_MESSAGE)
self.send_thread = threading.Thread(
target=self._run_send_thread,
name='EventServerSendThread-%d' % get_unique_id())
self.send_thread.daemon = True
self.send_thread.start()
# Process events: continuously read message and broadcast to all
# clients' queues.
while True:
msg = self.request.recv(_MAX_MESSAGE_SIZE + 1)
if len(msg) > _MAX_MESSAGE_SIZE:
logging.error('Event server: message too large')
if len(msg) == 0:
break # EOF
self.server._post_message(msg) # pylint: disable=protected-access
except socket.error, e:
if e.errno in [errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE]:
pass # Client just quit
else:
raise e
finally:
logging.debug('Event server: client disconnected')
self.queue.put(None) # End of stream; make writer quit
self.server._unsubscribe(self.queue) # pylint: disable=protected-access
def _run_send_thread(self):
while True:
message = self.queue.get()
if message is None:
return
try:
self.request.send(message)
except Exception:
return
class EventServer(SocketServer.ThreadingUnixStreamServer):
"""An event server that broadcasts messages to all clients.
This class is agnostic to message format (except for logging).
"""
allow_reuse_address = True
socket_type = socket.SOCK_SEQPACKET
daemon_threads = True
def __init__(self, path=None):
"""Constructor.
Args:
path: Path at which to create a UNIX stream socket.
If None, uses a temporary path and sets the CROS_FACTORY_EVENT
environment variable for future clients to use.
"""
# pylint: disable=super-init-not-called
# A set of queues listening to messages.
self._queues = set()
# A lock guarding the _queues variable.
self._lock = threading.Lock()
self._temp_path = None
if not path:
path = tempfile.mktemp(prefix='cros_factory_event.')
os.environ[CROS_FACTORY_EVENT] = path
logging.info('Setting %s=%s', CROS_FACTORY_EVENT, path)
self._temp_path = path
# pylint: disable=non-parent-init-called
SocketServer.UnixStreamServer.__init__(
self, path, EventServerRequestHandler)
def server_close(self):
"""Cleanup temporary file"""
SocketServer.ThreadingUnixStreamServer.server_close(self)
if self._temp_path is not None:
file_utils.TryUnlink(self._temp_path)
def _subscribe(self, queue):
"""Subscribes a queue to receive events.
Invoked only from the request handler.
"""
with self._lock:
self._queues.add(queue)
def _unsubscribe(self, queue):
"""Unsubscribes a queue to receive events.
Invoked only from the request handler.
"""
with self._lock:
self._queues.discard(queue)
def _post_message(self, message):
"""Posts a message to all clients.
Invoked only from the request handler.
"""
try:
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug('Event server: dispatching object %s',
pickle.loads(message))
except Exception:
# Message isn't parseable as a pickled object; weird!
logging.info(
'Event server: dispatching message %r', message)
with self._lock:
for q in self._queues:
# Note that this is nonblocking (even if one of the
# clients is dead).
q.put(message)
class EventClientBase(object):
__metaclass__ = abc.ABCMeta
"""A client used to post and receive messages from an event server.
All events sent through this class must be subclasses of Event. It
marshals Event classes through the server by pickling them.
The _process_event() need to be called periodically.
Inherit graph:
EventClientBase:
|-- ThreadingEventClient: A daemon thread to process events.
|-- BlockingEventClient: A while-loop on calling thread to process events.
"""
def __init__(self, path=None, callback=None):
"""Constructor.
Args:
path: The UNIX seqpacket socket endpoint path. If None, uses
the CROS_FACTORY_EVENT environment variable.
callback: A callback to call when events occur. The callback
takes one argument: the received event.
"""
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
self.callbacks = set()
logging.debug('Initializing event client')
if callback:
self.callbacks.add(callback)
path = path or os.environ[CROS_FACTORY_EVENT]
self.socket.connect(path)
hello = self.socket.recv(len(_HELLO_MESSAGE))
if hello != _HELLO_MESSAGE:
raise socket.error('Event client expected hello (%r) but got %r' %
_HELLO_MESSAGE, hello)
self._lock = threading.Lock()
def close(self):
"""Closes the client, waiting for any threads to terminate."""
if not self.socket:
return
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
self.socket = None
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# pylint: disable=redefined-outer-name
del exc_type, exc_value, traceback # Unused.
try:
self.close()
except Exception:
pass
return False
def _truncate_event_for_debug_log(self, event):
"""Truncates event to a size of _MAX_EVENT_SIZE_FOR_DEBUG_LOG.
Args:
event: The event to be printed.
Returns:
Truncated event string representation.
"""
event_repr = repr(event)
if len(event_repr) > _MAX_EVENT_SIZE_FOR_DEBUG_LOG:
return event_repr[:_MAX_EVENT_SIZE_FOR_DEBUG_LOG] + '...'
else:
return event_repr
def post_event(self, event):
"""Posts an event to the server."""
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug('Event client: sending event %s',
self._truncate_event_for_debug_log(event))
message = pickle.dumps(event, protocol=2)
if len(message) > _MAX_MESSAGE_SIZE:
# Log it first so we know what event caused the problem.
logging.error('Message too large (%d bytes): event is %s',
len(message), event)
raise IOError('Message too large (%d bytes)' % len(message))
self.socket.sendall(message)
def _process_event(self):
"""Handles one incoming message from the socket.
Returns:
(keep_going, event), where:
keep_going: True if event processing should continue (i.e., not EOF).
event: The message if any.
"""
msg_bytes = self.socket.recv(_MAX_MESSAGE_SIZE + 1)
if len(msg_bytes) > _MAX_MESSAGE_SIZE:
# The message may have been truncated - ignore it
logging.error('Event client: message too large')
return True, None
if len(msg_bytes) == 0:
return False, None
try:
event = pickle.loads(msg_bytes)
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug('Event client: dispatching event %s',
self._truncate_event_for_debug_log(event))
except Exception:
logging.warn('Event client: bad message %r', msg_bytes)
traceback.print_exc(sys.stderr)
return True, None
with self._lock:
callbacks = list(self.callbacks)
for callback in callbacks:
try:
callback(event)
except Exception:
logging.warn('Event client: error in callback')
traceback.print_exc(sys.stderr)
# Keep going
return True, event
@abc.abstractmethod
def request_response(self, request_event, check_response, timeout=None):
"""Starts a request-response communication: sends a request event and waits
for an valid response event until timeout.
Args:
request_event: An event to start protocol. None to send no events.
check_response: A function to evaluate if given event is an expected
response. The function takes one argument (an event to evaluate) and
returns whether it is valid. Note it may also get events "before"
request_event is sent, including the request_event itself.
timeout: A timeout in seconds, or None to wait forever.
Returns:
The valid response event, or None if the connection was closed or timeout.
"""
raise NotImplementedError
def wait(self, condition, timeout=None):
"""Waits for an event matching a condition.
Args:
condition: A function to evaluate. The function takes one
argument (an event to evaluate) and returns whether the condition
applies.
timeout: A timeout in seconds, or None to wait forever.
Returns:
The event that matched the condition, or None if the connection
was closed or timeout.
"""
return self.request_response(None, condition, timeout)
class BlockingEventClient(EventClientBase):
"""A blocking event client.
A while-loop is used to serve as the event loop. This will block the
calling thread, until the specified condition is met.
Note that, the event loop only runs in request_response() and wait() calls,
so the callbacks will be called only when these calls are invoked.
"""
def request_response(self, request_event, check_response, timeout=None):
"""See EventClientBase.request_response."""
assert not timeout, 'Timeout is not currently supported in Blocking mode.'
if request_event:
self.post_event(request_event)
while True:
keep_going, event = self._process_event()
if not keep_going: # Closed
return None
if event and check_response(event):
return event
class ThreadingEventClient(EventClientBase):
"""A threaded event client.
A daemon thread is created in constructor to process events. After instance is
constructed, callbacks will be called from that thread with incoming events.
"""
def __init__(self, path=None, callback=None, name=None):
"""Constructor.
Args:
path: See EventClientBase.__init__.
callback: See EventClientBase.__init__.
name: An optional name for the receving thread.
"""
super(ThreadingEventClient, self).__init__(path, callback)
self.recv_thread = threading.Thread(
target=self._run_recv_thread,
name='EventServerRecvThread-%s' % (name or get_unique_id()))
self.recv_thread.daemon = True
self.recv_thread.start()
def close(self):
super(ThreadingEventClient, self).close()
if self.recv_thread:
self.recv_thread.join()
self.recv_thread = None
def _run_recv_thread(self):
"""Thread to receive messages and broadcast them to callbacks."""
while self._process_event()[0]:
pass
def request_response(self, request_event, check_response, timeout=None):
"""See EventClientBase.request_response."""
queue = Queue.Queue()
def check_response_callback(event):
if check_response(event):
queue.put(event)
try:
with self._lock:
self.callbacks.add(check_response_callback)
if request_event:
self.post_event(request_event)
return queue.get(timeout=timeout)
except Queue.Empty:
return None
finally:
with self._lock:
self.callbacks.remove(check_response_callback)
def PostEvent(event):
"""Post the specified event to the server."""
# Use a BlockingEventClient is sufficient, since we don't need to call the
# callbacks from another thread.
with BlockingEventClient() as event_client:
# This will not blocked, since it's just a 'post' operation.
event_client.post_event(event)
def PostNewEvent(event_type, *args, **kwargs):
"""Constructs an event from given type and parameters, and post it."""
return PostEvent(Event(event_type, *args, **kwargs))
def SendEvent(request_event, check_response, timeout=None):
"""Send request_event to the server, and wait for a response until timeout.
Args:
request_event: An event to start protocol. None to send no events.
check_response: A function to evaluate if given event is an expected
response. The function takes one argument (an event to evaluate) and
returns whether it is valid. Note it may also get events "before"
request_event is sent, including the request_event itself.
timeout: A timeout in seconds, or None to wait forever.
Returns:
The valid response event, or None if the connection was closed or timeout.
"""
# To support timeout, we need to use ThreadingEventClient.
with ThreadingEventClient() as event_client:
return event_client.request_response(request_event, check_response, timeout)