blob: 4c5a3aa30208694924963f03933e928f3764f100 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Servo interface for the EC-3PO console interpreter."""
from __future__ import print_function
# pylint: disable=cros-logging-import
import logging
import multiprocessing
import os
import pty
import stat
import termios
import tty
import ec3po
import uart
class EC3PO(uart.Uart):
"""Class for an EC-3PO console interpreter instance.
This includes both the interpreter and the console objects for one UART.
"""
def __init__(self, raw_ec_uart):
"""Provides the interface to the EC-3PO console interpreter.
Args:
raw_ec_uart: A string representing the actual PTY of the EC UART.
"""
# Run Fuart init.
super(EC3PO, self).__init__()
self._logger = logging.getLogger('EC3PO Interface')
# Create the console and interpreter passing in the raw EC UART PTY.
self._raw_ec_uart = raw_ec_uart
# Create some pipes to communicate between the interpreter and the console.
# The command pipe is bidirectional.
cmd_pipe_interactive, cmd_pipe_interp = multiprocessing.Pipe()
# The debug pipe is unidirectional from interpreter to console only.
dbg_pipe_interactive, dbg_pipe_interp = multiprocessing.Pipe(duplex=False)
# Create an interpreter instance.
itpr = ec3po.interpreter.Interpreter(raw_ec_uart, cmd_pipe_interp,
dbg_pipe_interp, logging.INFO)
self._itpr = itpr
itpr._logger = logging.getLogger('Interpreter')
# Spawn an interpreter process.
itpr_process = multiprocessing.Process(target=ec3po.interpreter.StartLoop,
args=(itpr,))
# Make sure to kill the interpreter when we terminate.
itpr_process.daemon = True
# Start the interpreter.
itpr_process.start()
self.itpr_process = itpr_process
# The interpreter starts up in the connected state.
self._interp_connected = "on"
# Open a new pseudo-terminal pair.
(master_pty, user_pty) = pty.openpty()
tty.setraw(master_pty, termios.TCSADRAIN)
# Set the permissions to 660.
os.chmod(os.ttyname(user_pty), (stat.S_IRGRP | stat.S_IWGRP |
stat.S_IRUSR | stat.S_IWUSR))
# Change the owner and group of the PTY to the user who started servod.
try:
uid = int(os.environ.get('SUDO_UID', -1))
except TypeError:
uid = -1
try:
gid = int(os.environ.get('SUDO_GID', -1))
except TypeError:
gid = -1
os.fchown(user_pty, uid, gid)
# Create a console.
c = ec3po.console.Console(master_pty, os.ttyname(user_pty),
cmd_pipe_interactive,
dbg_pipe_interactive)
self._console = c
c._logger = logging.getLogger('Console')
# Spawn a console process.
console_process = multiprocessing.Process(target=ec3po.console.StartLoop,
args=(c,))
# Make sure to kill the console when we terminate.
console_process.daemon = True
# Start the console.
console_process.start()
self._logger.info('%s', os.ttyname(user_pty))
self._logger.debug('Console: %s', self._console)
self._pty = os.ttyname(user_pty)
self._cmd_pipe_int = cmd_pipe_interactive
def get_pty(self):
"""Gets the path of the served PTY."""
self._logger.debug('get_pty')
return self._pty
def set_interp_connect(self, state):
"""Set the interpreter's connection state to the UART.
Args:
state: An integer (0 or 1) indicating whether to connect to the UART or
not.
"""
self._logger.debug('EC3PO Interpreter connection request: \'%r\'', state)
if state == 1:
self._cmd_pipe_int.send("reconnect")
self._interp_connected = "on"
else:
self._cmd_pipe_int.send("disconnect")
self._interp_connected = "off"
return
def get_interp_connect(self):
"""Get the state of the interpreter connection to the UART."""
return self._interp_connected