blob: e9897645110f03ab4c23e39b55bc98b0895147be [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import collections
import threading
import time
from six.moves import xrange
import factory_common # pylint: disable=unused-import
from cros.factory.test import event
from cros.factory.test.i18n import _
from cros.factory.test import session
from cros.factory.test.utils import serial_utils
# Define the driver name and the interface protocols to find the arduino ports.
# NATIVE_USB_PORT: used to monitor the internal state of test fixture.
# PROGRAMMING_PORT: used to upload the firmware from host to the arduino and
# issue calibration commands to control the test fixture.
NATIVE_USB_PORT = 0
PROGRAMMING_PORT = 1
ARDUINO_DRIVER = 'cdc_acm'
interface_protocol_dict = {NATIVE_USB_PORT: '00', PROGRAMMING_PORT: '01'}
ArduinoCommand = collections.namedtuple(
'ArduinoCommand', ['DOWN', 'UP', 'STATE', 'RESET'])
COMMAND = ArduinoCommand('d', 'u', 's', 'r')
ArduinoState = collections.namedtuple(
'ArduinoState', ['INIT', 'STOP_DOWN', 'STOP_UP', 'GOING_DOWN', 'GOING_UP',
'EMERGENCY_STOP'])
STATE = ArduinoState('i', 'D', 'U', 'd', 'u', 'e')
class FixtureException(Exception):
"""A dummy exception class for FixtureSerialDevice."""
pass
class FixutreNativeUSB(serial_utils.SerialDevice):
"""A native usb port used to monitor the internal state of the fixture."""
def __init__(self, driver=ARDUINO_DRIVER,
interface_protocol=interface_protocol_dict[NATIVE_USB_PORT],
timeout=86400):
super(FixutreNativeUSB, self).__init__()
self.driver = driver
self.interface_protocol = interface_protocol
self.timeout = timeout
self.port = self._GetPort()
self._Connect(self.port)
self.state_string = None
self.last_state_string = None
# The ordering of the state names should match that in
# touchscreen_calibration.ino
self.state_name_dict = [
'state',
'jumper',
'button debug',
'sensor extreme up',
'sensor up',
'sensor down',
'sensor safety',
'motor direction',
'motor enabled',
'motor locked',
'motor duty cycle',
'pwm frequency',
'count',
]
def _GetPort(self):
return serial_utils.FindTtyByDriver(self.driver, self.interface_protocol)
def _Connect(self, port):
try:
self.Connect(port=port, timeout=self.timeout)
msg = 'Connect to native USB port "%s" for monitoring internal state.'
session.console.info(msg, port)
except Exception:
msg = 'FixtureNativeUSB: failed to connect to native usb port: %s'
session.console.warn(msg, port)
def _CheckReconnection(self):
"""Reconnect the native usb port if it has been refreshed."""
curr_port = self._GetPort()
if curr_port != self.port:
self.Disconnect()
self._Connect(curr_port)
self.port = curr_port
session.console.info('Reconnect to new port: %s', curr_port)
def GetState(self):
"""Get the fixture state from the native usb port.
The complete state_string looks like: <i1001000000.6000.0>
Its format is defined in self.state_name_dict in __init__() above.
The first character describes the main state.
This call is blocked until a complete fixture state has been received.
Call this method with a new thread if needed.
"""
self._CheckReconnection()
reply = []
while True:
ch = self.Receive()
reply.append(ch)
if ch == '>':
self.last_state_string = self.state_string
self.state_string = ''.join(reply)
return self.state_string
def QueryFixtureState(self):
"""Query fixture internal state."""
self._CheckReconnection()
self.Send('s')
def _ExtractStateList(self, state_string):
if state_string:
state, pwm_freq, count = state_string.strip().strip('<>').split('.')
state_list = [s for s in state]
state_list.extend([pwm_freq, count])
else:
state_list = []
return state_list
def DiffState(self):
"""Get the difference of between this state and the last state."""
old_state_list = self._ExtractStateList(self.last_state_string)
new_state_list = self._ExtractStateList(self.state_string)
return [(self.state_name_dict[i], new_state_list[i])
for i in xrange(len(new_state_list))
if old_state_list == [] or new_state_list[i] != old_state_list[i]]
def CompleteState(self):
"""Get the complete state snap shot."""
state_list = self._ExtractStateList(self.state_string)
return [(self.state_name_dict[i], state_list[i])
for i in xrange(len(state_list))]
class BaseFixture(serial_utils.SerialDevice):
"""A base fixture class."""
def __init__(self, state=None):
super(BaseFixture, self).__init__()
self.state = state
self.native_usb = None
class FakeFixture(BaseFixture):
"""A fake fixture class used for development purpose only."""
TIMEOUT = 10
def __init__(self, ui, state=None):
super(FakeFixture, self).__init__(state)
self.ui = ui
self.final_calibration_lock = threading.Event()
def QueryState(self):
"""Queries the state of the arduino board."""
return self.state
def IsStateUp(self):
"""Checks if the fixture is in the INIT or STOP_UP state."""
return (self.state in [STATE.INIT, STATE.STOP_UP])
def IsEmergencyStop(self):
"""Checks if the fixture is in the EMERGENCY_STOP state."""
return self.state == STATE.EMERGENCY_STOP
def DriveProbeDown(self):
"""Drives the probe to the 'down' position."""
session.console.info('Drive Probe Down....')
self.ui.Alert(_('Pull the lever down.'))
def DriveProbeUp(self):
"""Drives the probe to the 'up' position."""
session.console.info('Drive Probe Up....')
self.ui.Alert(_('Pull the lever up.'))
self.final_calibration_lock.wait(self.TIMEOUT)
self.ui.PostEvent(event.Event(event.Event.Type.TEST_UI_EVENT,
subtype='FinishTest'))
def DriveProbeUpDone(self):
"""Notify that the DriveProbeUp has been done."""
self.final_calibration_lock.set()
class FixtureSerialDevice(BaseFixture):
"""A serial device to control touchscreen fixture."""
def __init__(self, driver=ARDUINO_DRIVER,
interface_protocol=interface_protocol_dict[PROGRAMMING_PORT],
timeout=20):
super(FixtureSerialDevice, self).__init__()
try:
port = serial_utils.FindTtyByDriver(driver, interface_protocol)
self.Connect(port=port, timeout=timeout)
msg = 'Connect to programming port "%s" for issuing commands.'
session.console.info(msg, port)
session.console.info('Wait up to %d seconds for arduino initialization.',
timeout)
except Exception:
raise FixtureException('Failed to connect the test fixture.')
self.AssertStateWithTimeout([STATE.INIT, STATE.STOP_UP,
STATE.EMERGENCY_STOP], timeout)
# The 2nd-generation tst fixture has a native usb port.
self.native_usb = FixutreNativeUSB()
if not self.native_usb:
raise FixtureException('Fail to connect the native usb port.')
def QueryState(self):
"""Queries the state of the arduino board."""
try:
self.state = self.SendReceive(COMMAND.STATE)
except Exception:
raise FixtureException('QueryState failed.')
return self.state
def IsStateUp(self):
"""Checks if the fixture is in the INIT or STOP_UP state."""
return (self.QueryState() in [STATE.INIT, STATE.STOP_UP])
def IsEmergencyStop(self):
"""Checks if the fixture is in the EMERGENCY_STOP state."""
return self.QueryState() == STATE.EMERGENCY_STOP
def AssertStateWithTimeout(self, expected_states, timeout):
"""Assert the state with timeout."""
while True:
result, state = self._AssertState(expected_states)
if result is True:
session.console.info('state: %s (expected)', state)
return
session.console.info('state: %s (transient, probe still moving)', state)
time.sleep(1)
timeout -= 1
if timeout == 0:
break
msg = 'AssertState failed: actual state: "%s", expected_states: "%s".'
raise FixtureException(msg % (state, str(expected_states)))
def _AssertState(self, expected_states):
"""Confirms that the arduino is in the specified state.
It returns True if the actual state is in the expected states;
otherwise, it returns the actual state.
"""
if not isinstance(expected_states, list):
expected_states = [expected_states]
actual_state = self.QueryState()
return (actual_state in expected_states, actual_state)
def AssertState(self, expected_states):
result, _ = self._AssertState(expected_states)
if result is not True:
msg = 'AssertState failed: actual state: "%s", expected_states: "%s".'
raise FixtureException(msg % (result, str(expected_states)))
def DriveProbeDown(self):
"""Drives the probe to the 'down' position."""
try:
response = self.SendReceive(COMMAND.DOWN)
session.console.info('Send COMMAND.DOWN(%s). Receive state(%s).',
COMMAND.DOWN, response)
except Exception:
raise FixtureException('DriveProbeDown failed.')
self.AssertState(STATE.STOP_DOWN)
def DriveProbeUp(self):
"""Drives the probe to the 'up' position."""
try:
response = self.SendReceive(COMMAND.UP)
session.console.info('Send COMMAND.UP(%s). Receive state(%s).',
COMMAND.UP, response)
except Exception:
raise FixtureException('DriveProbeUp failed.')
self.AssertState(STATE.STOP_UP)