blob: 9744f40a7f025038654eb448e7feb61bb1201a39 [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Fixture connection interface and implementation.
FixtureConnection defines the interface for communicating with a fixture.
Currently the following kind of FixtureConnection is supported:
MockFixtureConnection: a fake FixtureConnection for testing purpose.
SerialFixtureConnection: a serial port based FixtureConnection.
"""
import abc
import time
import serial
from cros.factory.test.utils import serial_utils
class FixtureConnectionError(Exception):
pass
class FixtureConnection(metaclass=abc.ABCMeta):
"""Abstract fixture connection."""
@abc.abstractmethod
def Connect(self):
"""Establishes the fixture connection.
Raise:
FixtureConnectionError
"""
raise NotImplementedError
@abc.abstractmethod
def Disconnect(self):
"""Destroys the fixture connection.
Raise:
FixtureConnectionError
"""
raise NotImplementedError
@abc.abstractmethod
def Send(self, msg, read_response=False):
"""Sends message to fixture.
Args:
msg: string or bytes to be sent to the fixture.
read_response: whether to read response after sending the message.
Raise:
FixtureConnectionError
"""
raise NotImplementedError
@abc.abstractmethod
def Recv(self, length=0):
"""Receives message from fixture.
Args:
length: length of characters to receive from fixture. If length == 0
then Recv all available characters in the buffer.
Raise:
FixtureConnectionError
"""
raise NotImplementedError
class MockFixtureConnection(FixtureConnection):
"""A fake FixtureConnection for simulation."""
def __init__(self, script):
"""Constructor.
Args:
script: A dictionary containing the mapping of command and response.
"""
super(MockFixtureConnection, self).__init__()
self._script = script
self._curr_cmd = None
def Connect(self):
pass
def Disconnect(self):
pass
def Send(self, msg, read_response=False):
self._curr_cmd = msg.strip()
if read_response:
return self.Recv()
return None
def Recv(self, length=0):
if self._curr_cmd in self._script:
return self._script[self._curr_cmd]
raise FixtureConnectionError("Unexpected fixture command '%s'" %
self._curr_cmd)
class SerialFixtureConnection(FixtureConnection):
def __init__(self, driver, serial_delay, serial_params, response_delay,
retries=5):
"""Constructor.
Args:
driver: name of the driver, e.g. pl2303
serial_delay: delay time in seconds between writing each character to
serial port
serial_params: a dictionary containing the following keys:
{
'baudrate': 9600,
'bytesize': 8,
'parity': 'N',
'stopbits': 1,
'xonxoff': False,
'rtscts': False,
'timeout': None
}
response_delay: delay time in seconds before reading the response
retries: number of retires when write failed
"""
super(SerialFixtureConnection, self).__init__()
self._tty = None
self._driver = driver
self._serial_delay = serial_delay
self._serial_params = serial_params
self._response_delay = response_delay
self._retries = retries
def Connect(self):
port = serial_utils.FindTtyByDriver(self._driver)
if not port:
raise FixtureConnectionError('Cannot find TTY with driver %s' %
self._driver)
self._tty = serial_utils.OpenSerial(port=port, **self._serial_params)
self._tty.flush()
def Disconnect(self):
self._tty.close()
def Send(self, msg, read_response=False):
for c in msg:
retries = self._retries
while True:
try:
self._tty.write(str(c))
self._tty.flush()
time.sleep(self._serial_delay)
except serial.SerialTimeoutException as e:
if retries <= 0:
raise FixtureConnectionError(str(e))
retries -= 1
else:
break
time.sleep(self._response_delay)
if read_response:
return self.Recv()
def Recv(self, length=0):
if length:
return self._tty.read(length)
return self._tty.read(self._tty.inWaiting())