blob: ef74823ee727b75da98c98e087c5f46028cd17b7 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Handles Whale's button click event."""
import logging
import optparse
import sys
import time
import factory_common # pylint: disable=W0611
from cros.factory.test.fixture.whale import keyboard_emulator
from cros.factory.test.fixture.whale import serial_client
from cros.factory.test.fixture.whale import servo_client
from cros.factory.test.fixture.whale.host import poll_client
from cros.factory.test.utils import Enum
from cros.factory.utils.process_utils import Spawn
ActionType = Enum(['CLOSE_COVER', 'HOOK_COVER', 'PUSH_NEEDLE',
'PLUG_LATERAL', 'FIXTURE_STARTED'])
def TimeClassMethodDebug(func):
"""A decorator to log method running time on debug level."""
def Wrapped(*args, **kwargs):
logging.debug('Invoking %s()', func.__name__)
start_time = time.time()
result = func(*args, **kwargs)
logging.debug('%s() finished in %.4f secs', func.__name__,
time.time() - start_time)
return result
return Wrapped
class InterruptHandler(object):
"""Waits for Whale's I/O expanders' interrupt and dispatches it.
It connects to BeagleBone's servod and polld, where servod is used to get
I/O expanders' input status and reset SR latches; polld is used to wait
GPIO 7, the interrupt pin from Whale's I/O expanders.
"""
# Shortcuts to Whale's button and control dict.
# pylint: disable=E1101
_BUTTON = servo_client.WHALE_BUTTON
_CONTROL = servo_client.WHALE_CONTROL
_FIXTURE_FEEDBACK = servo_client.FIXTURE_FEEDBACK
_PLANKTON_FEEDBACK = servo_client.PLANKTON_FEEDBACK
_WHALE_DEBUG_MODE_EN = servo_client.WHALE_DEBUG_MODE_EN
# List of buttons and feedbacks to scan.
# Difference between button and feedback is: button is latched;
# no latch for feedback.
_BUTTON_LIST = servo_client.WHALE_BUTTONS
_FEEDBACK_LIST = servo_client.WHALE_FEEDBACKS
# Buttons that operator can use (non debug mode).
_OPERATOR_BUTTON_LIST = (_BUTTON.FIXTURE_START, _BUTTON.FIXTURE_STOP)
_INPUT_LIST = _BUTTON_LIST + _FEEDBACK_LIST
_INPUT_INTERRUPT_GPIO = 7
# Used to avoid toggle battery too fast.
_BATTERY_CEASE_TOGGLE_SECS = 1.0
_FixtureState = Enum(['WAIT', 'CLOSED', 'CLOSING', 'OPENING'])
# Fixture state to LED light and LCD message (green, red, message).
_FixtureStateParams = {
_FixtureState.WAIT: ('on', 'on', 'ready'),
_FixtureState.CLOSED: ('off', 'off', 'closed'),
_FixtureState.CLOSING: ('off', 'on', 'closing'),
_FixtureState.OPENING: ('off', 'on', 'opening')}
def __init__(self, host, polld_port, servod_port, dolphin_port, rpc_debug,
polling_wait_secs):
"""Constructor.
Args:
host: BeagleBone's hostname or IP address.
polld_port: port that polld listens. Set to None if not using polld.
servod_port: port that servod listens.
dolphin_port: port that dolphin server listens. Set to None if not using
dolphin server.
rpc_debug: True to enable XMLRPC debug message.
polling_wait_secs: # seconds for polling button clicking event.
"""
self._poll = poll_client.PollClient(
use_polld=polld_port is not None, host=host, tcp_port=polld_port,
verbose=rpc_debug)
self._dolphin = None
if dolphin_port:
self._dolphin = serial_client.SerialClient(
host=host, tcp_port=dolphin_port, verbose=rpc_debug)
self._servo = servo_client.ServoClient(host=host, port=servod_port,
verbose=rpc_debug)
self._polling_wait_secs = polling_wait_secs
# Store last feedback value. The value is initialzed in the very first
# ScanFeedback call.
self._last_feedback = {}
self._starting_fixture_action = None
# Used to avoid toggle battery too fast.
self._last_battery_toggle_time = time.time()
@TimeClassMethodDebug
def Init(self):
"""Resets button latch and records feedback value."""
self._last_feedback = self._servo.MultipleIsOn(self._FEEDBACK_LIST)
self._servo.MultipleSet([(self._CONTROL.LCM_CMD, 'clear'),
(self._CONTROL.LCM_TEXT, 'Initializing...')])
self.ResetLatch()
self.ResetInterrupt()
self.ResetKeyboard()
# Initial fixture state: cover open.
self._HandleStopFixture(show_state=False)
self._SetState(self._FixtureState.WAIT)
def ResetKeyboard(self):
keyboard = keyboard_emulator.KeyboardEmulator(self._servo)
keyboard.SimulateKeystrokes()
def _SetState(self, state):
green, red, message = self._FixtureStateParams[state]
self._servo.MultipleSet([(self._CONTROL.PASS_LED, green),
(self._CONTROL.FAIL_LED, red),
(self._CONTROL.LCM_CMD, 'clear'),
(self._CONTROL.LCM_TEXT, message)])
@TimeClassMethodDebug
def _HandleStopFixture(self, show_state=True):
"""Stop Fixture Step"""
logging.info('Stopping fixture...')
if show_state:
self._SetState(self._FixtureState.OPENING)
# Disable battery first for safety.
self._servo.Disable(self._CONTROL.BATTERY)
while True:
feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)
if (not feedback_status[self._FIXTURE_FEEDBACK.FB7] or
not feedback_status[self._FIXTURE_FEEDBACK.FB9]):
logging.info('[HandleStopFixture] unplug lateral')
self._servo.Disable(self._CONTROL.FIXTURE_PLUG_LATERAL)
continue
if (not feedback_status[self._FIXTURE_FEEDBACK.FB1] or
not feedback_status[self._FIXTURE_FEEDBACK.FB3]):
logging.info('[HandleStopFixture] pull needle')
self._servo.Disable(self._CONTROL.FIXTURE_PUSH_NEEDLE)
continue
# Retry disable fixture hook till both left and right hook
# are released.
if (feedback_status[self._FIXTURE_FEEDBACK.FB5] or
feedback_status[self._FIXTURE_FEEDBACK.FB6]):
logging.info('[HandleStopFixture] release cover hook')
# Before release cover hook, be sure to close cover otherwise latch
# cannot be released.
self._servo.Enable(self._CONTROL.FIXTURE_CLOSE_COVER)
self._servo.Disable(self._CONTROL.FIXTURE_HOOK_COVER)
continue
if (feedback_status[self._FIXTURE_FEEDBACK.FB12] or
not feedback_status[self._FIXTURE_FEEDBACK.FB11]):
logging.info('[HandleStopFixture] open cover')
self._servo.Disable(self._CONTROL.FIXTURE_CLOSE_COVER)
continue
if feedback_status[self._FIXTURE_FEEDBACK.FB11]:
self._starting_fixture_action = None
logging.info('[Fixture stopped]')
break
self._SetState(self._FixtureState.WAIT)
@TimeClassMethodDebug
def _HandleStartFixtureFeedbackChange(self, feedback_status):
"""Processing Start Fixture feedback information"""
if self._servo.IsOn(self._BUTTON.FIXTURE_START):
if (self._starting_fixture_action == ActionType.CLOSE_COVER and
feedback_status[self._FIXTURE_FEEDBACK.FB12]):
self._starting_fixture_action = ActionType.HOOK_COVER
elif (self._starting_fixture_action == ActionType.HOOK_COVER and
feedback_status[self._FIXTURE_FEEDBACK.FB5] and
feedback_status[self._FIXTURE_FEEDBACK.FB6]):
self._starting_fixture_action = ActionType.PUSH_NEEDLE
elif (self._starting_fixture_action == ActionType.PUSH_NEEDLE and
feedback_status[self._FIXTURE_FEEDBACK.FB2] and
feedback_status[self._FIXTURE_FEEDBACK.FB4]):
self._starting_fixture_action = ActionType.PLUG_LATERAL
elif (self._starting_fixture_action == ActionType.PLUG_LATERAL and
feedback_status[self._FIXTURE_FEEDBACK.FB8] and
feedback_status[self._FIXTURE_FEEDBACK.FB10]):
logging.info('[HandleStartFixture] fixture closed')
self._starting_fixture_action = ActionType.FIXTURE_STARTED
self._SetState(self._FixtureState.CLOSED)
@TimeClassMethodDebug
def _HandleStartFixture(self):
"""Start Fixture Step"""
logging.info('[Fixture Start ...]')
if self._starting_fixture_action == ActionType.FIXTURE_STARTED:
logging.info('[HandleStartFixture] ACTION = FIXTURE_STARTED')
return
if self._starting_fixture_action is None:
if self._servo.IsOn(self._FIXTURE_FEEDBACK.DUT_SENSOR):
logging.info(
'[HandleStartFixture] OOPS! Cannot close cover without DUT')
return
self._ResetWhaleDeviceBeforeClosing()
self._ResetDolphinDeviceBeforeClosing()
self._starting_fixture_action = ActionType.CLOSE_COVER
self._SetState(self._FixtureState.CLOSING)
if self._starting_fixture_action == ActionType.CLOSE_COVER:
logging.info('[HandleStartFixture] closing cover')
self._servo.Enable(self._CONTROL.FIXTURE_CLOSE_COVER)
elif self._starting_fixture_action == ActionType.HOOK_COVER:
logging.info('[HandleStartFixture] hooking cover')
self._servo.Enable(self._CONTROL.FIXTURE_HOOK_COVER)
elif self._starting_fixture_action == ActionType.PUSH_NEEDLE:
logging.info('[HandleStartFixture] pushing needle')
self._servo.Enable(self._CONTROL.FIXTURE_PUSH_NEEDLE)
elif self._starting_fixture_action == ActionType.PLUG_LATERAL:
logging.info('[HandleStartFixture] plugging lateral')
self._servo.Enable(self._CONTROL.FIXTURE_PLUG_LATERAL)
@TimeClassMethodDebug
def _ResetWhaleDeviceBeforeClosing(self):
"""Resets devices on Whale if necessary before closing fixture."""
# Release DUT CC2 pull-high
self._servo.Disable(self._CONTROL.DC)
self._servo.Disable(self._CONTROL.OUTPUT_RESERVE_1)
@TimeClassMethodDebug
def _ResetDolphinDeviceBeforeClosing(self):
"""Resets Dolphin if necessary before closing fixture."""
if self._dolphin is None:
return
# Set dolphin to discharging mode, if dolphin is charging, DUT will fail to
# boot up after battery connection.
self._dolphin.Send(0, 'usbc_action dev')
self._dolphin.Send(1, 'usbc_action dev')
@TimeClassMethodDebug
def _ToggleBattery(self):
"""Toggles battery status.
If battery is on, switches it to off and vise versa.
"""
if (time.time() - self._last_battery_toggle_time <
self._BATTERY_CEASE_TOGGLE_SECS):
logging.debug('Toggle too fast, cease toggle for %f second.',
self._BATTERY_CEASE_TOGGLE_SECS)
return
new_battery_status = ('off' if self._servo.IsOn(self._CONTROL.BATTERY)
else 'on')
logging.info('[Toggle battery to %s]', new_battery_status)
self._servo.Set(self._CONTROL.BATTERY, new_battery_status)
self._last_battery_toggle_time = time.time()
@TimeClassMethodDebug
def ScanButton(self):
"""Scans all buttons and invokes button click handler for clicked buttons.
Returns:
True if a button is clicked.
"""
logging.debug('[Scanning button....]')
status = self._servo.MultipleIsOn(self._BUTTON_LIST)
if status[self._BUTTON.FIXTURE_STOP]:
logging.info('Calling _HandleStopFixture because FIXTURE_STOP is True.')
self._HandleStopFixture()
# Disable stop button, and use 'i2cset' to set it back to input mode.
self._servo.Disable(self._BUTTON.FIXTURE_STOP)
Spawn(['i2cset', '-y', '1', '0x77', '0x07', '0xff'])
return True
if (self._starting_fixture_action != ActionType.FIXTURE_STARTED and
not status[self._BUTTON.FIXTURE_START]):
logging.info('Calling _HandleStopFixture because FIXTURE_START is False.')
self._HandleStopFixture()
return False
button_clicked = any(status.values())
if not button_clicked:
return False
operator_mode = not self._servo.IsOn(self._WHALE_DEBUG_MODE_EN)
for button, clicked in status.iteritems():
if not clicked:
continue
if operator_mode and button not in self._OPERATOR_BUTTON_LIST:
logging.debug('Supress button %s click because debug mode is off.',
button)
continue
if button == self._BUTTON.FIXTURE_START:
if self._starting_fixture_action == ActionType.FIXTURE_STARTED:
logging.info('[START] ACTION = FIXTURE_STARTED')
else:
self._HandleStartFixture()
elif button == self._BUTTON.RESERVE_1:
self._ToggleBattery()
logging.info('Button %s clicked', button)
return button_clicked
@TimeClassMethodDebug
def ScanFeedback(self):
"""Scans all feedback and invokes handler for those changed feedback.
Returns:
True if any feedback value is clicked.
"""
logging.debug('[Scanning feedback....]')
feedback_status = self._servo.MultipleIsOn(self._FEEDBACK_LIST)
feedback_changed = False
for name, value in feedback_status.iteritems():
if self._last_feedback[name] == value:
continue
self._HandleStartFixtureFeedbackChange(feedback_status)
logging.info('Feedback %s value changed to %r', name, value)
self._last_feedback[name] = value
feedback_changed = True
return feedback_changed
@TimeClassMethodDebug
def ResetLatch(self):
"""Resets SR latch for buttons."""
self._servo.Click(self._CONTROL.INPUT_RESET)
@TimeClassMethodDebug
def WaitForInterrupt(self):
logging.debug('Polling interrupt (GPIO %d %s) for %r seconds',
self._INPUT_INTERRUPT_GPIO, self._poll.GPIO_EDGE_FALLING,
self._polling_wait_secs)
if self._poll.PollGPIO(self._INPUT_INTERRUPT_GPIO,
self._poll.GPIO_EDGE_FALLING,
self._polling_wait_secs):
logging.debug('Interrupt polled')
else:
logging.debug('Polling interrupt timeout')
@TimeClassMethodDebug
def ResetInterrupt(self):
"""Resets I/O expanders' interrupt.
We have four I/O expanders (TCA9539), three of them have inputs. As
BeagleBone can only accept one interrupt, we cascade two expanders'
(0x75, 0x77) INT to 0x76 input pins. So any input changes from 0x75, 0x76,
0x77 will generate interrupt to BeagleBone.
According to TCA9539 manual:
"resetting the interrupt circuit is achieved when data on the port is
changed to the original setting or data is read from the port that
generated the interrupt. ... Because each 8-bit port is read
independently, the interrupt caused by port 0 is not cleared by a read
of port 1, or vice versa",
to reset interrupt, we need to read each changing bit. However, as servod
reads a byte each time we read an input pin, so we only need to read 0x77
byte-0, byte-1, 0x75 byte-1, and 0x76 byte-0, byte-1, in sequence to reset
INT. The reason to read in sequence is that we need to read 0x76 at last
as 0x77 and 0x75 INT reset could change P02 and P03 pin in 0x76.
"""
# Touch I/O expander 0x77 byte 0 & 1, 0x75 byte 1, 0x76 byte 0 & 1.
# Note that we skip I/O expander 0x75 byte-0 as it contains no input
# pin, won't trigger interrupt.
self._servo.MultipleGet([
self._FIXTURE_FEEDBACK.FB1, self._BUTTON.FIXTURE_START,
self._PLANKTON_FEEDBACK.FB1, self._WHALE_DEBUG_MODE_EN,
self._BUTTON.RESERVE_1])
def Run(self):
"""Waits for Whale's button click interrupt and dispatches it."""
while True:
button_clicked = self.ScanButton()
feedback_changed = self.ScanFeedback()
# The reason why we don't poll interrupt right after reset latch is that
# it might be possible that a button is clicked after latch is cleared
# but before I/O expander is touched. In this case, the button is latched
# but the interrupt is consumed (after touching I/O expander) so that the
# following click of that button won't trigger interrupt again, and
# polling is blocked.
#
# The solution is to read button again without waiting for interrupt.
if button_clicked or feedback_changed:
if button_clicked:
self.ResetLatch()
self.ResetInterrupt()
continue
self.WaitForInterrupt()
def ParseArgs():
"""Parses command line arguments.
Returns:
tuple (options, args) from optparse.parse_args().
"""
parser = optparse.OptionParser(usage='usage: %prog [options]')
parser.description = '%prog handles Whale button click event.'
parser.add_option('-d', '--debug', action='store_true', default=False,
help='enable debug messages')
parser.add_option('', '--rpc_debug', action='store_true', default=False,
help='enable debug messages for XMLRPC call')
parser.add_option('', '--nouse_dolphin', action='store_false', default=True,
dest='use_dolphin', help='whether to skip dolphin control '
'(remote server). default: %default')
parser.add_option('', '--use_polld', action='store_true', default=False,
help='whether to use polld (for polling GPIO port on '
'remote server) or poll local GPIO port, default: %default')
parser.add_option('', '--host', default='127.0.0.1', type=str,
help='hostname of server, default: %default')
parser.add_option('', '--dolphin_port', default=9997, type=int,
help='port that dolphin_server listens, default: %default')
parser.add_option('', '--polld_port', default=9998, type=int,
help='port that polld listens, default: %default')
parser.add_option('', '--servod_port', default=9999, type=int,
help='port that servod listens, default: %default')
parser.add_option('', '--polling_wait_secs', default=5, type=int,
help=('# seconds for polling button clicking event, '
'default: %default'))
return parser.parse_args()
def main():
options = ParseArgs()[0]
logging.basicConfig(
level=logging.DEBUG if options.debug else logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
polld_port = options.polld_port if options.use_polld else None
dolphin_port = options.dolphin_port if options.use_dolphin else None
handler = InterruptHandler(options.host, polld_port, options.servod_port,
dolphin_port, options.rpc_debug,
options.polling_wait_secs)
handler.Init()
handler.Run()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(0)
except poll_client.PollClientError as e:
sys.stderr.write(e.message + '\n')
sys.exit(1)