blob: b8f069b6517bf8bb501cff00e3dc354e86598556 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Servo interface for the EC-3PO console interpreter."""
from __future__ import print_function
# pylint: disable=cros-logging-import
import ctypes
import logging
import multiprocessing
import os
import pty
import stat
import termios
import tty
import ec3po
import uart
class EC3PO(uart.Uart):
"""Class for an EC-3PO console interpreter instance.
This includes both the interpreter and the console objects for one UART.
"""
def __init__(self, raw_ec_uart, source_name):
"""Provides the interface to the EC-3PO console interpreter.
Args:
raw_ec_uart: A string representing the actual PTY of the EC UART.
source_name: A user friendly name documenting the source of this PTY.
"""
# Run Fuart init.
super(EC3PO, self).__init__()
self._logger = logging.getLogger('EC3PO Interface')
# Create the console and interpreter passing in the raw EC UART PTY.
self._raw_ec_uart = raw_ec_uart
self._source = source_name
# Create some pipes to communicate between the interpreter and the console.
# The command pipe is bidirectional.
cmd_pipe_interactive, cmd_pipe_interp = multiprocessing.Pipe()
# The debug pipe is unidirectional from interpreter to console only.
dbg_pipe_interactive, dbg_pipe_interp = multiprocessing.Pipe(duplex=False)
# Create an interpreter instance.
itpr = ec3po.interpreter.Interpreter(raw_ec_uart, cmd_pipe_interp,
dbg_pipe_interp, logging.INFO)
self._itpr = itpr
itpr._logger = logging.getLogger('Interpreter')
# Spawn an interpreter process.
itpr_process = multiprocessing.Process(target=ec3po.interpreter.StartLoop,
args=(itpr,))
# Make sure to kill the interpreter when we terminate.
itpr_process.daemon = True
# Start the interpreter.
itpr_process.start()
self.itpr_process = itpr_process
# The interpreter starts up in the connected state.
self._interp_connected = 'on'
# Open a new pseudo-terminal pair.
(master_pty, user_pty) = pty.openpty()
(interface_pty, control_pty) = pty.openpty()
tty.setraw(master_pty, termios.TCSADRAIN)
tty.setraw(interface_pty, termios.TCSADRAIN)
# Set the permissions to 660.
os.chmod(
os.ttyname(user_pty),
(stat.S_IRGRP | stat.S_IWGRP | stat.S_IRUSR | stat.S_IWUSR))
os.chmod(
os.ttyname(control_pty),
(stat.S_IRGRP | stat.S_IWGRP | stat.S_IRUSR | stat.S_IWUSR))
# Change the owner and group of the PTY to the user who started servod.
try:
uid = int(os.environ.get('SUDO_UID', -1))
except TypeError:
uid = -1
try:
gid = int(os.environ.get('SUDO_GID', -1))
except TypeError:
gid = -1
os.fchown(user_pty, uid, gid)
os.fchown(control_pty, uid, gid)
# Close pts to indicate HUP to ec3po.
user_pty_name = os.ttyname(user_pty)
os.close(user_pty)
# Create a console.
c = ec3po.console.Console(master_pty, user_pty_name, interface_pty,
cmd_pipe_interactive, dbg_pipe_interactive)
self._console = c
c._logger = logging.getLogger('Console')
# Spawn a console process.
v = multiprocessing.Value(ctypes.c_bool, False)
self._command_active = v
console_process = multiprocessing.Process(target=ec3po.console.StartLoop,
args=(c, v))
# Make sure to kill the console when we terminate.
console_process.daemon = True
# Start the console.
console_process.start()
self._logger.debug('Console: %s', self._console)
self._logger.debug('User console: %s', user_pty_name)
self._logger.debug('Control console: %s', os.ttyname(control_pty))
self._pty = user_pty_name
self._control_pty = os.ttyname(control_pty)
self._cmd_pipe_int = cmd_pipe_interactive
self._logger.info('-------------------- %s console on: %s', self._source,
user_pty_name)
def get_pty(self):
"""Gets the path of the served PTY."""
self._logger.debug('get_pty: %s', self._pty)
return self._pty
def get_control_pty(self):
"""Gets the path of the served control PTY."""
self._logger.debug('get_pty: %s', self._control_pty)
return self._control_pty
def get_command_lock(self):
self._command_active.value = True
self._logger.debug('acquire lock for %s: %s', self._control_pty,
self._command_active.value)
def release_command_lock(self):
self._command_active.value = False
self._logger.debug('release lock for %s: %s', self._control_pty,
self._command_active.value)
def set_interp_connect(self, state):
"""Set the interpreter's connection state to the UART.
Args:
state: An integer (0 or 1) indicating whether to connect to the UART or
not.
"""
self._logger.debug('EC3PO Interpreter connection request: \'%r\'', state)
if state == 1:
self._cmd_pipe_int.send('reconnect')
self._interp_connected = 'on'
else:
self._cmd_pipe_int.send('disconnect')
self._interp_connected = 'off'
return
def get_interp_connect(self):
"""Get the state of the interpreter connection to the UART."""
return self._interp_connected