blob: 14dc3540288b6dee3962878a7987161acfdb0bce [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import collections
import logging
import os
import subprocess
import threading
import time
from ws4py.websocket import WebSocket
import factory_common # pylint: disable=unused-import
from cros.factory.test.env import paths
from cros.factory.test.event import Event
from cros.factory.test.event import ThreadingEventClient
from cros.factory.test import session
from cros.factory.test.utils.web_socket_utils import WebSocketHandshake
from cros.factory.utils import file_utils
from cros.factory.utils import process_utils
from cros.factory.utils import string_utils
# Number of lines to buffer for new clients.
TAIL_BUFFER_SIZE = 10
class WebSocketManager(object):
"""Object to manage web sockets for Goofy.
Brokers between events in the event client infrastructure
and on web sockets. Also tails the console log and sends
events on web sockets when new bytes become available.
Each Goofy instance is associated with a UUID. When a new web
socket is created, we send a hello event on the socket with the
current UUID. If we receive a keepalive event with the wrong
UUID, we disconnect the client. This insures that we are always
talking to a client that has a complete picture of our state
(i.e., if the server restarts, the client must restart as well).
Properties:
tail_buffer: A rotating buffer of the last TAIL_BUFFER_SIZE lines,
to give to new web clients.
"""
def __init__(self, uuid):
self.uuid = uuid
self.lock = threading.Lock()
self.web_sockets = set()
self.event_client = None
self.tail_process = None
self.has_confirmed_socket = threading.Event()
self.event_client = ThreadingEventClient(callback=self._handle_event,
name='WebSocketManager')
if not os.path.exists(paths.CONSOLE_LOG_PATH):
file_utils.TryMakeDirs(os.path.dirname(paths.CONSOLE_LOG_PATH))
# There's a small chance of race condition. Some data might already
# flushed to console log before the 'TouchFile' got executed.
# But it's fine though, since TouchFile() uses 'a' append mode.
file_utils.TouchFile(paths.CONSOLE_LOG_PATH)
self.tail_process = process_utils.Spawn(
['tail', '-F', paths.CONSOLE_LOG_PATH],
ignore_stdin=True,
stdout=subprocess.PIPE)
self.tail_thread = threading.Thread(target=self._tail_console)
self.tail_thread.start()
self.closed = False
self.tail_buffer = collections.deque()
def close(self):
with self.lock:
if self.closed:
return
self.closed = True
if self.event_client:
self.event_client.close()
self.event_client = None
with self.lock:
web_sockets = list(self.web_sockets)
for web_socket in web_sockets:
web_socket.close_connection()
if self.tail_process:
self.tail_process.kill()
self.tail_process.wait()
if self.tail_thread:
self.tail_thread.join()
def has_sockets(self):
"""Returns true if any web sockets are currently connected."""
with self.lock:
return len(self.web_sockets) > 0
def handle_web_socket(self, request):
"""Runs a web socket in the current thread.
request: A RequestHandler object containing the request.
"""
if not WebSocketHandshake(request):
return
class MyWebSocket(WebSocket):
def __init__(self, **kwargs):
# Add a per-socket lock to use for sending, since ws4py is not
# thread-safe.
self.send_lock = threading.Lock()
super(MyWebSocket, self).__init__(**kwargs)
def received_message(socket_self, message):
# pylint: disable=no-self-argument
event = Event.from_json(str(message))
if event.type == Event.Type.KEEPALIVE:
if event.uuid == self.uuid:
if not self.has_confirmed_socket.is_set():
logging.info('Chrome UI has come up')
self.has_confirmed_socket.set()
else:
logging.warning('Disconnecting web socket with '
'incorrect UUID')
socket_self.close_connection()
else:
self.event_client.post_event(event)
web_socket = MyWebSocket(sock=request.connection)
with self.lock:
lines = list(self.tail_buffer)
with web_socket.send_lock:
web_socket.send(Event(Event.Type.HELLO,
uuid=self.uuid).to_json())
for line in lines:
# Send the last n lines.
web_socket.send(
Event(Event.Type.LOG,
message=string_utils.DecodeUTF8(line)).to_json())
try:
with self.lock:
self.web_sockets.add(web_socket)
logging.info('Running web socket')
web_socket.run()
logging.info('Web socket closed gracefully')
except Exception:
logging.exception('Web socket closed with exception')
finally:
with self.lock:
self.web_sockets.discard(web_socket)
def wait(self):
"""Waits for one socket to connect successfully."""
while not self.has_confirmed_socket.is_set():
# Wait at most 100 ms at a time; without a timeout, this seems
# to eat SIGINT signals.
self.has_confirmed_socket.wait(0.1)
def _tail_console(self):
"""Tails the console log, generating an event whenever a new
line is available.
We send this event only to web sockets (not to event clients
in general) since only the UI is interested in these log
lines.
"""
# tail seems to have a bug where, when outputting to a pipe, it
# doesn't output the first batch of data until it receives some
# new output. Let tail start up, then output a single line to
# wake it up. This is a terrible hack, but it's better than
# missing a bunch of lines. A better fix might involve emulating
# tail directly in Python.
def target():
time.sleep(0.5)
session.console.info('Opened console.')
process_utils.StartDaemonThread(target=target)
while True:
line = self.tail_process.stdout.readline()
if line == '':
break
with self.lock:
self.tail_buffer.append(line)
while len(self.tail_buffer) > TAIL_BUFFER_SIZE:
self.tail_buffer.popleft()
self._handle_event(
Event(Event.Type.LOG,
message=string_utils.DecodeUTF8(line).rstrip('\n')))
def _handle_event(self, event):
"""Sends an event to each open WebSocket client."""
with self.lock:
web_sockets = list(self.web_sockets)
if not web_sockets:
return
event_json = event.to_json()
for web_socket in web_sockets:
try:
with web_socket.send_lock:
web_socket.send(event_json)
except Exception:
logging.exception('Unable to send event on web socket')