| # Copyright 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import abc |
| import cPickle as pickle |
| import errno |
| import json |
| import logging |
| import os |
| import Queue |
| import socket |
| import SocketServer |
| import sys |
| import tempfile |
| import threading |
| import time |
| import traceback |
| |
| import factory_common # pylint: disable=unused-import |
| from cros.factory.utils import file_utils |
| from cros.factory.utils import type_utils |
| |
| |
| # Environment variable storing the path to the endpoint. |
| CROS_FACTORY_EVENT = 'CROS_FACTORY_EVENT' |
| |
| # Maximum allowed size for messages. If messages are bigger than this, they |
| # will be truncated by the seqpacket sockets. |
| _MAX_MESSAGE_SIZE = 65535 |
| |
| # Maximum size of logged event data in debug log. Sometimes a test may pass |
| # large data to JavaScript functions. If all of it is logged, it can easily take |
| # up all disk space. |
| _MAX_EVENT_SIZE_FOR_DEBUG_LOG = 512 |
| |
| # Hello message send by the server and expected as the first datagram by |
| # the client. |
| _HELLO_MESSAGE = '\1' |
| |
| |
| def json_default_repr(obj): |
| """Converts an object into a suitable representation for |
| JSON-ification. |
| |
| If obj is an object, this returns a dict with all properties |
| not beginning in '_'. Otherwise, the original object is |
| returned. |
| """ |
| if isinstance(obj, object): |
| return dict([(k, v) for k, v in obj.__dict__.iteritems() |
| if k[0] != '_']) |
| else: |
| return obj |
| |
| |
| class Event(object): |
| """An event object that may be written to the event server. |
| |
| E.g.: |
| |
| event = Event(Event.Type.STATE_CHANGE, |
| test='foo.bar', |
| state=TestState(...)) |
| """ |
| Type = type('Event.Type', (), { |
| # The state of a test has changed. |
| 'STATE_CHANGE': 'goofy:state_change', |
| # The UI has come up. |
| 'UI_READY': 'goofy:ui_ready', |
| # Tells goofy to clear all state and restart testing. |
| 'RESTART_TESTS': 'goofy:restart_tests', |
| # Tells goofy to run all tests that haven't been run yet. |
| 'AUTO_RUN': 'goofy:auto_run', |
| # Tells goofy to set all failed tests' state to untested and re-run. |
| 'RUN_TESTS_WITH_STATUS': 'goofy:run_tests_with_status', |
| # Clears state of all tests underneath the given path. |
| 'CLEAR_STATE': 'goofy:clear_state', |
| # Tells the UI about a single new line in the log. |
| 'LOG': 'goofy:log', |
| # A hello message to a new WebSocket. Contains a 'uuid' parameter |
| # identification the particular invocation of the server. |
| 'HELLO': 'goofy:hello', |
| # A keepalive message from the UI. Contains a 'uuid' parameter |
| # containing the same 'uuid' value received when the client received |
| # its HELLO. |
| 'KEEPALIVE': 'goofy:keepalive', |
| # Initializes the test UI. |
| 'INIT_TEST_UI': 'goofy:init_test_ui', |
| # Sets layout for the test UI. |
| 'SET_TEST_UI_LAYOUT': 'goofy:set_test_ui_layout', |
| # Sets the UI in the test pane. |
| 'SET_HTML': 'goofy:set_html', |
| # Runs JavaScript in the test pane. |
| 'RUN_JS': 'goofy:run_js', |
| # Performs a remote procedure call to the Chrome extension inside UI. |
| 'EXTENSION_RPC': 'goofy:extension_rpc', |
| # Event from a test UI. |
| 'TEST_UI_EVENT': 'goofy:test_ui_event', |
| # Message from the test UI that it has finished. |
| 'END_TEST': 'goofy:end_test', |
| # Message to tell the test UI to destroy itself. |
| 'DESTROY_TEST': 'goofy:destroy_test', |
| # Message telling Goofy should re-read system info. |
| 'UPDATE_SYSTEM_INFO': 'goofy:update_system_info', |
| # Tells Goofy to stop all tests. |
| 'STOP': 'goofy:stop', |
| # Indicates a pending shutdown. |
| 'PENDING_SHUTDOWN': 'goofy:pending_shutdown', |
| # Cancels a pending shutdown. |
| 'CANCEL_SHUTDOWN': 'goofy:cancel_shutdown', |
| # Tells UI to update notes. |
| 'UPDATE_NOTES': 'goofy:update_notes', |
| # Diagnosis Tool's events |
| 'DIAGNOSIS_TOOL_EVENT': 'goofy:diagnosis_tool:event', |
| # Enable/disable key filtering |
| 'KEY_FILTER_MODE': 'goofy:key_filter_mode', |
| # Notifies that factory server config (URL, timeout) is changed. |
| 'FACTORY_SERVER_CONFIG_CHANGED': 'factory_server:config_changed', |
| }) |
| |
| def __init__(self, type, **kw): # pylint: disable=redefined-builtin |
| self.type = type |
| self.timestamp = time.time() |
| for k, v in kw.iteritems(): |
| setattr(self, k, v) |
| |
| def __repr__(self): |
| return type_utils.StdRepr( |
| self, |
| extra=[ |
| 'type=%s' % self.type, |
| 'timestamp=%s' % time.ctime(self.timestamp)], |
| excluded_keys=['type', 'timestamp']) |
| |
| def to_json(self): |
| return json.dumps(self, default=json_default_repr) |
| |
| @staticmethod |
| def from_json(encoded_event): |
| kw = type_utils.UnicodeToString(json.loads(encoded_event)) |
| type = kw.pop('type') # pylint: disable=redefined-builtin |
| return Event(type=type, **kw) |
| |
| _unique_id_lock = threading.Lock() |
| _unique_id = 1 |
| |
| |
| def get_unique_id(): |
| global _unique_id # pylint: disable=global-statement |
| with _unique_id_lock: |
| ret = _unique_id |
| _unique_id += 1 |
| return ret |
| |
| |
| class EventServerRequestHandler(SocketServer.BaseRequestHandler): |
| """Request handler for the event server. |
| |
| This class is agnostic to message format (except for logging). |
| """ |
| |
| def setup(self): |
| SocketServer.BaseRequestHandler.setup(self) |
| threading.current_thread().name = ( |
| 'EventServerRequestHandler-%d' % get_unique_id()) |
| # A thread to be used to send messages that are posted to the queue. |
| self.send_thread = None |
| # A queue containing messages. |
| self.queue = Queue.Queue() |
| |
| def handle(self): |
| # The handle() methods is run in a separate thread per client |
| # (since EventServer has ThreadingMixIn). |
| logging.debug('Event server: handling new client') |
| try: |
| self.server._subscribe(self.queue) # pylint: disable=protected-access |
| |
| # Send hello, now that we've subscribed. Client will wait for |
| # it before returning from the constructor. |
| self.request.send(_HELLO_MESSAGE) |
| |
| self.send_thread = threading.Thread( |
| target=self._run_send_thread, |
| name='EventServerSendThread-%d' % get_unique_id()) |
| self.send_thread.daemon = True |
| self.send_thread.start() |
| |
| # Process events: continuously read message and broadcast to all |
| # clients' queues. |
| while True: |
| msg = self.request.recv(_MAX_MESSAGE_SIZE + 1) |
| if len(msg) > _MAX_MESSAGE_SIZE: |
| logging.error('Event server: message too large') |
| if len(msg) == 0: |
| break # EOF |
| self.server._post_message(msg) # pylint: disable=protected-access |
| except socket.error, e: |
| if e.errno in [errno.ECONNRESET, errno.ESHUTDOWN, errno.EPIPE]: |
| pass # Client just quit |
| else: |
| raise e |
| finally: |
| logging.debug('Event server: client disconnected') |
| self.queue.put(None) # End of stream; make writer quit |
| self.server._unsubscribe(self.queue) # pylint: disable=protected-access |
| |
| def _run_send_thread(self): |
| while True: |
| message = self.queue.get() |
| if message is None: |
| return |
| try: |
| self.request.send(message) |
| except Exception: |
| return |
| |
| |
| class EventServer(SocketServer.ThreadingUnixStreamServer): |
| """An event server that broadcasts messages to all clients. |
| |
| This class is agnostic to message format (except for logging). |
| """ |
| allow_reuse_address = True |
| socket_type = socket.SOCK_SEQPACKET |
| daemon_threads = True |
| |
| def __init__(self, path=None): |
| """Constructor. |
| |
| Args: |
| path: Path at which to create a UNIX stream socket. |
| If None, uses a temporary path and sets the CROS_FACTORY_EVENT |
| environment variable for future clients to use. |
| """ |
| # pylint: disable=super-init-not-called |
| # A set of queues listening to messages. |
| self._queues = set() |
| # A lock guarding the _queues variable. |
| self._lock = threading.Lock() |
| self._temp_path = None |
| if not path: |
| path = tempfile.mktemp(prefix='cros_factory_event.') |
| os.environ[CROS_FACTORY_EVENT] = path |
| logging.info('Setting %s=%s', CROS_FACTORY_EVENT, path) |
| self._temp_path = path |
| # pylint: disable=non-parent-init-called |
| SocketServer.UnixStreamServer.__init__( |
| self, path, EventServerRequestHandler) |
| |
| def server_close(self): |
| """Cleanup temporary file""" |
| SocketServer.ThreadingUnixStreamServer.server_close(self) |
| if self._temp_path is not None: |
| file_utils.TryUnlink(self._temp_path) |
| |
| def _subscribe(self, queue): |
| """Subscribes a queue to receive events. |
| |
| Invoked only from the request handler. |
| """ |
| with self._lock: |
| self._queues.add(queue) |
| |
| def _unsubscribe(self, queue): |
| """Unsubscribes a queue to receive events. |
| |
| Invoked only from the request handler. |
| """ |
| with self._lock: |
| self._queues.discard(queue) |
| |
| def _post_message(self, message): |
| """Posts a message to all clients. |
| |
| Invoked only from the request handler. |
| """ |
| try: |
| if logging.getLogger().isEnabledFor(logging.DEBUG): |
| logging.debug('Event server: dispatching object %s', |
| pickle.loads(message)) |
| except Exception: |
| # Message isn't parseable as a pickled object; weird! |
| logging.info( |
| 'Event server: dispatching message %r', message) |
| |
| with self._lock: |
| for q in self._queues: |
| # Note that this is nonblocking (even if one of the |
| # clients is dead). |
| q.put(message) |
| |
| |
| class EventClientBase(object): |
| __metaclass__ = abc.ABCMeta |
| |
| """A client used to post and receive messages from an event server. |
| |
| All events sent through this class must be subclasses of Event. It |
| marshals Event classes through the server by pickling them. |
| |
| The _process_event() need to be called periodically. |
| |
| Inherit graph: |
| EventClientBase: |
| |-- ThreadingEventClient: A daemon thread to process events. |
| |-- BlockingEventClient: A while-loop on calling thread to process events. |
| """ |
| def __init__(self, path=None, callback=None): |
| """Constructor. |
| |
| Args: |
| path: The UNIX seqpacket socket endpoint path. If None, uses |
| the CROS_FACTORY_EVENT environment variable. |
| callback: A callback to call when events occur. The callback |
| takes one argument: the received event. |
| """ |
| self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) |
| self.callbacks = set() |
| logging.debug('Initializing event client') |
| |
| if callback: |
| self.callbacks.add(callback) |
| |
| path = path or os.environ[CROS_FACTORY_EVENT] |
| self.socket.connect(path) |
| |
| hello = self.socket.recv(len(_HELLO_MESSAGE)) |
| if hello != _HELLO_MESSAGE: |
| raise socket.error('Event client expected hello (%r) but got %r' % |
| _HELLO_MESSAGE, hello) |
| |
| self._lock = threading.Lock() |
| |
| def close(self): |
| """Closes the client, waiting for any threads to terminate.""" |
| if not self.socket: |
| return |
| |
| self.socket.shutdown(socket.SHUT_RDWR) |
| self.socket.close() |
| self.socket = None |
| |
| def __del__(self): |
| self.close() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, traceback): |
| # pylint: disable=redefined-outer-name |
| del exc_type, exc_value, traceback # Unused. |
| try: |
| self.close() |
| except Exception: |
| pass |
| return False |
| |
| def _truncate_event_for_debug_log(self, event): |
| """Truncates event to a size of _MAX_EVENT_SIZE_FOR_DEBUG_LOG. |
| |
| Args: |
| event: The event to be printed. |
| |
| Returns: |
| Truncated event string representation. |
| """ |
| event_repr = repr(event) |
| if len(event_repr) > _MAX_EVENT_SIZE_FOR_DEBUG_LOG: |
| return event_repr[:_MAX_EVENT_SIZE_FOR_DEBUG_LOG] + '...' |
| else: |
| return event_repr |
| |
| def post_event(self, event): |
| """Posts an event to the server.""" |
| if logging.getLogger().isEnabledFor(logging.DEBUG): |
| logging.debug('Event client: sending event %s', |
| self._truncate_event_for_debug_log(event)) |
| message = pickle.dumps(event, protocol=2) |
| if len(message) > _MAX_MESSAGE_SIZE: |
| # Log it first so we know what event caused the problem. |
| logging.error('Message too large (%d bytes): event is %s', |
| len(message), event) |
| raise IOError('Message too large (%d bytes)' % len(message)) |
| self.socket.sendall(message) |
| |
| def _process_event(self): |
| """Handles one incoming message from the socket. |
| |
| Returns: |
| (keep_going, event), where: |
| keep_going: True if event processing should continue (i.e., not EOF). |
| event: The message if any. |
| """ |
| msg_bytes = self.socket.recv(_MAX_MESSAGE_SIZE + 1) |
| if len(msg_bytes) > _MAX_MESSAGE_SIZE: |
| # The message may have been truncated - ignore it |
| logging.error('Event client: message too large') |
| return True, None |
| |
| if len(msg_bytes) == 0: |
| return False, None |
| |
| try: |
| event = pickle.loads(msg_bytes) |
| if logging.getLogger().isEnabledFor(logging.DEBUG): |
| logging.debug('Event client: dispatching event %s', |
| self._truncate_event_for_debug_log(event)) |
| except Exception: |
| logging.warn('Event client: bad message %r', msg_bytes) |
| traceback.print_exc(sys.stderr) |
| return True, None |
| |
| with self._lock: |
| callbacks = list(self.callbacks) |
| for callback in callbacks: |
| try: |
| callback(event) |
| except Exception: |
| logging.warn('Event client: error in callback') |
| traceback.print_exc(sys.stderr) |
| # Keep going |
| |
| return True, event |
| |
| @abc.abstractmethod |
| def request_response(self, request_event, check_response, timeout=None): |
| """Starts a request-response communication: sends a request event and waits |
| for an valid response event until timeout. |
| |
| Args: |
| request_event: An event to start protocol. None to send no events. |
| check_response: A function to evaluate if given event is an expected |
| response. The function takes one argument (an event to evaluate) and |
| returns whether it is valid. Note it may also get events "before" |
| request_event is sent, including the request_event itself. |
| timeout: A timeout in seconds, or None to wait forever. |
| |
| Returns: |
| The valid response event, or None if the connection was closed or timeout. |
| """ |
| raise NotImplementedError |
| |
| def wait(self, condition, timeout=None): |
| """Waits for an event matching a condition. |
| |
| Args: |
| condition: A function to evaluate. The function takes one |
| argument (an event to evaluate) and returns whether the condition |
| applies. |
| timeout: A timeout in seconds, or None to wait forever. |
| |
| Returns: |
| The event that matched the condition, or None if the connection |
| was closed or timeout. |
| """ |
| return self.request_response(None, condition, timeout) |
| |
| |
| class BlockingEventClient(EventClientBase): |
| """A blocking event client. |
| |
| A while-loop is used to serve as the event loop. This will block the |
| calling thread, until the specified condition is met. |
| |
| Note that, the event loop only runs in request_response() and wait() calls, |
| so the callbacks will be called only when these calls are invoked. |
| """ |
| def request_response(self, request_event, check_response, timeout=None): |
| """See EventClientBase.request_response.""" |
| assert not timeout, 'Timeout is not currently supported in Blocking mode.' |
| |
| if request_event: |
| self.post_event(request_event) |
| |
| while True: |
| keep_going, event = self._process_event() |
| if not keep_going: # Closed |
| return None |
| if event and check_response(event): |
| return event |
| |
| |
| class ThreadingEventClient(EventClientBase): |
| """A threaded event client. |
| |
| A daemon thread is created in constructor to process events. After instance is |
| constructed, callbacks will be called from that thread with incoming events. |
| """ |
| def __init__(self, path=None, callback=None, name=None): |
| """Constructor. |
| |
| Args: |
| path: See EventClientBase.__init__. |
| callback: See EventClientBase.__init__. |
| name: An optional name for the receving thread. |
| """ |
| super(ThreadingEventClient, self).__init__(path, callback) |
| |
| self.recv_thread = threading.Thread( |
| target=self._run_recv_thread, |
| name='EventServerRecvThread-%s' % (name or get_unique_id())) |
| self.recv_thread.daemon = True |
| self.recv_thread.start() |
| |
| def close(self): |
| super(ThreadingEventClient, self).close() |
| if self.recv_thread: |
| self.recv_thread.join() |
| self.recv_thread = None |
| |
| def _run_recv_thread(self): |
| """Thread to receive messages and broadcast them to callbacks.""" |
| while self._process_event()[0]: |
| pass |
| |
| def request_response(self, request_event, check_response, timeout=None): |
| """See EventClientBase.request_response.""" |
| queue = Queue.Queue() |
| |
| def check_response_callback(event): |
| if check_response(event): |
| queue.put(event) |
| |
| try: |
| with self._lock: |
| self.callbacks.add(check_response_callback) |
| if request_event: |
| self.post_event(request_event) |
| return queue.get(timeout=timeout) |
| except Queue.Empty: |
| return None |
| finally: |
| with self._lock: |
| self.callbacks.remove(check_response_callback) |
| |
| |
| def PostEvent(event): |
| """Post the specified event to the server.""" |
| # Use a BlockingEventClient is sufficient, since we don't need to call the |
| # callbacks from another thread. |
| with BlockingEventClient() as event_client: |
| # This will not blocked, since it's just a 'post' operation. |
| event_client.post_event(event) |
| |
| |
| def PostNewEvent(event_type, *args, **kwargs): |
| """Constructs an event from given type and parameters, and post it.""" |
| return PostEvent(Event(event_type, *args, **kwargs)) |
| |
| |
| def SendEvent(request_event, check_response, timeout=None): |
| """Send request_event to the server, and wait for a response until timeout. |
| |
| Args: |
| request_event: An event to start protocol. None to send no events. |
| check_response: A function to evaluate if given event is an expected |
| response. The function takes one argument (an event to evaluate) and |
| returns whether it is valid. Note it may also get events "before" |
| request_event is sent, including the request_event itself. |
| timeout: A timeout in seconds, or None to wait forever. |
| |
| Returns: |
| The valid response event, or None if the connection was closed or timeout. |
| """ |
| # To support timeout, we need to use ThreadingEventClient. |
| with ThreadingEventClient() as event_client: |
| return event_client.request_response(request_event, check_response, timeout) |