blob: aabf63e3ade9f54258f29b4967992be8fe4af1b9 [file] [log] [blame]
# Copyright 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import collections
import os
import re
import StringIO
import threading
import time
import xmlrpclib
import factory_common # pylint: disable=unused-import
from cros.factory.device import device_utils
from cros.factory.test import device_data
from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log.
from cros.factory.test.fixture.touchscreen_calibration import fixture
from cros.factory.test.i18n import _
from cros.factory.test.pytests.touchscreen_calibration import sensors_server
from cros.factory.test.pytests.touchscreen_calibration import touchscreen_calibration_utils # pylint: disable=line-too-long
from cros.factory.test import session
from cros.factory.test import state
from cros.factory.test import test_case
from cros.factory.test.utils import media_utils
from cros.factory.testlog import testlog
from cros.factory.utils.arg_utils import Arg
from cros.factory.utils import process_utils
# __name__ looks like "cros.factory.test.pytests.touchscreen_calibration".
# test_name is "touchscreen_calibration"
test_name = __name__.split('.')[-1]
Event = collections.namedtuple('Event', ['data'])
class Error(Exception):
def __init__(self, msg):
session.console.error(msg)
super(Error, self).__init__()
def _CreateXMLRPCSensorsClient(addr=('localhost', 8000)):
"""A helper function to create the xmlrpc client for sensors data."""
url = 'http://%s:%s' % addr
proxy = xmlrpclib.ServerProxy(url)
return proxy
class TouchscreenCalibration(test_case.TestCase):
"""Handles the calibration and controls the test fixture."""
version = 1
DELTAS = 'deltas'
REFS = 'refs'
TRX_OPENS = 'trx_opens'
TRX_GND_SHORTS = 'trx_gnd_shorts'
TRX_SHORTS = 'trx-shorts'
PHASE_SETUP_ENVIRONMENT = 'PHASE_SETUP_ENVIRONMENT'
PHASE_REFS = 'PHASE_REFS'
PHASE_DELTAS_UNTOUCHED = 'PHASE_DELTAS_UNTOUCHED'
PHASE_DELTAS_TOUCHED = 'PHASE_DELTAS_TOUCHED'
PHASE_TRX_OPENS = 'PHASE_TRX_OPENS'
PHASE_TRX_GND_SHORTS = 'PHASE_TRX_GND_SHORTS'
PHASE_TRX_SHORTS = 'PHASE_TRX_SHORTS'
PHASE_FLASH_FIRMWARE = 'PHASE_FLASH_FIRMWARE'
PHASE_CHECK_FIRMWARE_VERSION = 'PHASE_CHECK_FIRMWARE_VERSION'
ARGS = [
Arg('shopfloor_ip', str, 'The IP address of the shopfloor', default=''),
Arg('phase', str, 'The test phase of touchscreen calibration',
default=''),
Arg('remote_bin_root', str, 'The remote binary root path', default=''),
Arg('remote_data_dir', str, 'The remote data directory', default=''),
Arg('fw_update_tool', str, 'The firmware update tool', default=None),
Arg('fw_file', str, 'The firmware file', default=None),
Arg('fw_version', str, 'The firmware version', default=None),
Arg('fw_config', str, 'The firmware config', default=None),
Arg('hid_tool', str, 'The hid tool to query version information',
default=None),
Arg('tool', str, 'The test tool', default=''),
Arg('keep_raw_logs', bool, 'Whether to attach the log by Testlog',
default=True),
]
def setUp(self):
"""Sets up the object."""
self.dut = device_utils.CreateDUTInterface()
self._calibration_thread = None
self.fixture = None
# Temp hack to determine it is sdb or sdc
self.dev_path = '/dev/sdb' if os.path.exists('/dev/sdb1') else '/dev/sdc'
self.dump_frames = 3
self._monitor_thread = None
self.query_fixture_state_flag = False
self._mounted_media_flag = True
self._local_log_dir = '/var/tmp/%s' % test_name
self._board = self._GetBoard()
session.console.info('Get Board: %s', self._board)
self.sensors = None
self.start_time = None
self.sensors_ip = None
self._ReadConfig()
self._AssignDirectIPsIfTwoInterfaces()
self.network_status = self.RefreshNetwork()
# There are multiple boards running this test now.
# The log path of a particular board is distinguished by the board name.
self.aux_log_path = os.path.join('touchscreen_calibration', self._board)
self._GetSensorService()
self._ConnectTouchDevice()
self.log = event_log.Log if self.use_shopfloor else self._DummyLog
session.console.info('Use shopfloor: %s', str(self.use_shopfloor))
self.summary_file = None
self.test_pass = None
self.min_max_msg = None
self.num_tx = 0
self.num_rx = 0
self.touchscreen_status = False
def _ReadConfig(self):
self.config = sensors_server.TSConfig(self._board)
self.use_sensors_server = (
self.config.Read('Sensors', 'USE_SENSORS_SERVER') == 'True')
self.sn_length = int(self.config.Read('Misc', 'SN_LENGTH'))
self.fake_fixture = self.config.Read('Misc', 'FAKE_FIXTURE') == 'True'
self.use_shopfloor = (self.config.Read('Misc', 'USE_SHOPFLOOR') == 'True')
self.sensors_ip = self.config.Read('Sensors', 'SENSORS_IP')
self.direct_host_ip = self.config.Read('Sensors', 'DIRECT_HOST_IP')
self.direct_sensors_ip = self.config.Read('Sensors', 'DIRECT_SENSORS_IP')
self.sensors_port = int(self.config.Read('Sensors', 'SENSORS_PORT'))
def _SetupEnvironment(self):
if not self.network_status:
self.FailTask('Check network status error.')
def _DummyLog(self, *args, **kwargs):
pass
def _GetBoard(self):
"""Get the target board."""
board_path = os.path.join(os.path.dirname(__file__), 'boards', 'board')
if os.path.isfile(board_path):
with open(board_path) as f:
return f.read().strip()
return None
def _GetSensorService(self):
"""Get the Sensors service.
1st priority: connect to the IP address specified in generic_tsab test list.
The sensor server is run on a BB or on a DUT in this case.
2nd priority: instantiate a local sensor object.
The sensor object is run on the same local host of the
factory test in this case.
"""
def _ShowError():
self.ui.Alert(
_('Fail to detect the touchscreen.\n'
'Insert the traveler board, and restart the test.'))
def _CheckStatus(msg):
"""Check the status of the sensor service."""
try:
if self.sensors.CheckStatus():
session.console.info('Sensors service: %s', msg)
return
session.console.info('No Sensors service: %s', msg)
except Exception as e:
session.console.info('No Sensors service (%s): %s', e, msg)
_ShowError()
if self.use_sensors_server:
if not self.sensors_ip:
self.ui.Alert(_('Fail to assign DIRECT_SENSORS_IP in ryu.conf'))
raise Error('Failed to assign sensors_ip.')
# Connect to the sensors_server at the IP address.
server_addr = (self.sensors_ip, self.sensors_port)
self.sensors = _CreateXMLRPCSensorsClient(addr=server_addr)
self.sensors.PreTest()
_CheckStatus(str(server_addr))
else:
# Instantiate a local sensor object.
board_sensors = sensors_server.GetSensorServiceClass(self._board)
self.sensors = board_sensors(
self.sensors_ip, self.dut,
remote_bin_root=self.args.remote_bin_root,
remote_data_dir=self.args.remote_data_dir,
tool=self.args.tool,
fw_update_tool=self.args.fw_update_tool,
hid_tool=self.args.hid_tool,
fw_file=self.args.fw_file,
install_flag=(self.args.phase == self.PHASE_SETUP_ENVIRONMENT),
log=session.console)
_CheckStatus('Use local sensors object.')
def _AlertFixtureDisconnected(self):
"""Alerts that the fixture is disconnected."""
self.ui.Alert(_('Disconnected from controller'))
self.ui.CallJSFunction('setControllerStatus', self.fixture is not None)
def _CheckFixtureConnection(self):
"""Check if the fixture is still connected."""
if not self.fixture:
self._AlertFixtureDisconnected()
raise fixture.FixtureException('Fixture disconnected.')
def _CheckFixtureStateUp(self):
"""Check if the fixture probe is in the UP state."""
self._CheckFixtureConnection()
if not self.fixture.IsStateUp():
self.ui.Alert(_('Probe not in initial position, aborted'))
raise fixture.FixtureException('Fixture not in UP position.')
def ReadTest(self):
"""Reads the raw sensor data.."""
if self.sensors:
data = self.sensors.Read(self.DELTAS)
session.console.info('Get data %s', data)
self.ui.CallJSFunction('displayDebugData', data)
else:
session.console.info('No sensors service found.')
def ProbeSelfTest(self):
"""Execute the probe self test to confirm the fixture works properly."""
self._CheckFixtureStateUp()
self.DriveProbeDown()
self.DriveProbeUp()
def RefreshFixture(self):
"""Refreshes the fixture."""
try:
if self.fake_fixture:
self.fixture = fixture.FakeFixture(self.ui, state='i')
else:
self.fixture = fixture.FixtureSerialDevice()
if not self.fixture:
raise fixture.FixtureException(
'Fail to create the fixture serial device.')
except Exception as e:
session.console.info('Refresh fixture serial device exception, %s', e)
self.ui.Alert(
_('Please check if the USB cable has been connected '
'between the test fixture and the control host.\n'
'Click "RefreshFixture" button on screen after connecting '
'the USB cable.'))
self.fixture = None
fixture_ready = bool(self.fixture) and not self.fixture.IsEmergencyStop()
self.ui.CallJSFunction('setControllerStatus', fixture_ready)
if self.fixture and self.fixture.IsEmergencyStop():
self.ui.Alert(
_('The test fixture is not ready.\n'
'(1) It is possible that the test fixure is not powered on yet.\n'
' Turn on the power and click "RefreshFixture" button '
'on screen.\n'
'(2) The test fixture is already powered on. '
'The fixture may be in the emergency stop state.\n'
' Press debug button on the test fixture and '
'click "RefreshFixture" button on screen.'))
self._CreateMonitorPort()
def RefreshTouchscreen(self):
"""Refreshes all possible saved state for the old touchscreen.
This functions is called whenever an old touchscreen panel
removed and a new one attached and awaiting for testing.
After old states of previous touchscreen panel are cleared and
new panel detected, show the sign on UI.
"""
if self.args.phase == self.PHASE_SETUP_ENVIRONMENT:
self.touchscreen_status = False
try:
if self.sensors.CheckStatus():
self.num_tx, self.num_rx = self.sensors.GetSensorDimensions()
self.touchscreen_status = True
session.console.info('touchscreen exists')
else:
session.console.info('touchscreen does not exist')
except Exception as e:
session.console.info('Exception at refreshing touch screen: %s', e)
finally:
state.DataShelfSetValue('touchscreen_status', self.touchscreen_status)
state.DataShelfSetValue('num_tx', self.num_tx)
state.DataShelfSetValue('num_rx', self.num_rx)
else:
self.touchscreen_status = state.DataShelfGetValue('touchscreen_status')
self.num_tx = state.DataShelfGetValue('num_tx')
self.num_rx = state.DataShelfGetValue('num_rx')
session.console.info('tx = %d, rx = %d', self.num_tx, self.num_rx)
self.ui.CallJSFunction('setTouchscreenStatus', self.touchscreen_status)
self.GetSerialNumber()
def _AssignDirectIPsIfTwoInterfaces(self):
"""Assign direct IPs to the test host and the BB if two network
interfaces are found.
There are two legitimate scenarios of configuring the host network.
Case 1: there is only 1 network interface on the host
Both the host and the BB connect to the same subnet of the
shopfloor server and got their IP addresses from a dhcp server.
Case 2: there are exactly 2 network interfaces on the host
The host connects to the same subnet of the shopfloor server and
got their IP addresses from a dhcp server.
Besides, the host and the BB connects directly to each other in which
situation the host is assigned DIRECT_HOST_IP and the BB is assigned
DIRECT_SENSORS_IP. Both DIRECT_HOST_IP and DIRECT_SENSORS_IP are
defined in the board config file.
"""
self.host_ip_dict = touchscreen_calibration_utils.NetworkStatus.GetHostIPs()
if len(self.host_ip_dict) == 2:
for interface, ip in self.host_ip_dict.items():
if ip is None:
cmd = 'ifconfig %s %s' % (interface, self.direct_host_ip)
if touchscreen_calibration_utils.IsSuccessful(
touchscreen_calibration_utils.SimpleSystem(cmd)):
session.console.info('Successfully assign direct host ip: %s',
self.direct_host_ip)
else:
raise Error('Failed to assign direct host ip.')
self.host_ip_dict[interface] = self.direct_host_ip
self.sensors_ip = self.direct_sensors_ip
elif ip == self.direct_host_ip:
self.sensors_ip = self.direct_sensors_ip
elif len(self.host_ip_dict) > 2:
msg = 'There should be no more than 2 network interfaces on the host.'
session.console.error(msg)
def RefreshNetwork(self):
"""Refreshes all possible saved state for the touchscreen."""
if self.args.phase == self.PHASE_SETUP_ENVIRONMENT:
network_status = touchscreen_calibration_utils.NetworkStatus(
self.sensors_ip, self.args.shopfloor_ip)
if self.use_sensors_server:
bb_status = self.sensors_ip if network_status.PingBB() else False
else:
bb_status = 'Not used'
if self.use_shopfloor:
shopfloor_status = (self.args.shopfloor_ip
if network_status.PingShopfloor() else False)
else:
shopfloor_status = 'Skipped for debugging'
state.DataShelfSetValue('bb_status', bb_status)
state.DataShelfSetValue('shopfloor_status', shopfloor_status)
else:
bb_status = state.DataShelfGetValue('bb_status')
shopfloor_status = state.DataShelfGetValue('shopfloor_status')
session.console.info('host_ips: %s', str(self.host_ip_dict))
session.console.info('bb_status: %s', bb_status)
session.console.info('shopfloor_status: %s', shopfloor_status)
self.ui.CallJSFunction('setHostNetworkStatus',
str(self.host_ip_dict.values()))
self.ui.CallJSFunction('setBBNetworkStatus', bb_status)
self.ui.CallJSFunction('setShopfloorNetworkStatus', shopfloor_status)
return (bool(self.host_ip_dict) and
(not self.use_shopfloor or shopfloor_status) and
(not self.sensors_ip or bb_status))
def DriveProbeDown(self):
"""A wrapper to drive the probe down."""
try:
self.fixture.DriveProbeDown()
except Exception:
self.ui.Alert(_('Probe not in the DOWN position, aborted'))
raise
def DriveProbeUp(self):
"""A wrapper to drive the probe up."""
try:
self.fixture.DriveProbeUp()
except Exception:
self.ui.Alert(_('Probe not in the UP position, aborted'))
raise
def _ExecuteCommand(self, command, fail_msg='Failed: '):
"""Execute a command."""
try:
os.system(command)
except Exception as e:
session.console.warn('%s: %s', fail_msg, e)
def _CommandOutputSearch(self, command_str, pattern_str, pattern_flags):
"""Execute the command and search the pattern from its output."""
re_pattern = re.compile(pattern_str, pattern_flags)
for line in process_utils.SpawnOutput(command_str.split(),
log=True).splitlines():
output = re_pattern.search(line)
if output:
return output.group(1)
return None
def Shutdown(self):
"""Shut down the host."""
self._ExecuteCommand('shutdown -H 0',
fail_msg='Failed to shutdown the host')
def _AttachLog(self, log_name, log_data):
"""Attachs the data by Testlog."""
if self.args.keep_raw_logs:
testlog.AttachContent(
content=log_data,
name=log_name,
description='plain text log of %s' % log_name)
def _WriteLog(self, filename, content):
"""Writes the content to the file and display the message in the log.
Args:
filename: the name of the file to write the content to
content: the content to be written to the file
"""
def _AppendLog(log_dir, filename, content):
"""Append the content to the filename in the log_dir."""
with open(os.path.join(log_dir, filename), 'a') as f:
f.write(content)
session.console.info('Log written to "%s/%s".', log_dir, filename)
if self._mounted_media_flag:
with media_utils.MountedMedia(self.dev_path, 1) as mount_dir:
_AppendLog(mount_dir, filename, content)
else:
_AppendLog(self._local_log_dir, filename, content)
def _WriteSensorDataToFile(self, logger, sn, phase, test_pass, data):
"""Writes the sensor data and the test result to a file."""
logger.write('%s: %s %s\n' % (phase, sn, 'Pass' if test_pass else 'Fail'))
for row in data:
if isinstance(row, collections.Iterable):
logger.write(' '.join([str(val) for val in row]))
logger.write('\n')
else:
logger.write('%s\n' % str(row))
self._WriteLog(sn, logger.getvalue())
def _GetTime(self):
"""Get the time format like 2014_1225.10:35:20"""
return time.strftime('%Y_%m%d.%H:%M:%S')
def _CheckSerialNumber(self, sn):
"""Check if the serial number is legitimate."""
# This is for development purpose.
if sn == '0000':
return True
if len(sn) != self.sn_length:
self.ui.Alert(_('Wrong serial number!'))
return False
return True
def _UpdateSummaryFile(self, sn, summary_line):
"""Write the summary line to the summary file of the serial number.
If a device is tested multiple times, the summary lines are accumulated.
The summary file on the local host is updated and it will be attached by
Testlog in FinishTest.
"""
self.summary_file = 'summary_%s.txt' % sn
if summary_line.strip():
summary_line += ' (time: %s)\n' % self._GetTime()
self._WriteLog(self.summary_file, summary_line)
def _ReadAndVerifyTRxData(self, sn, phase, category, verify_method):
# Get data based on the category, i.e., REFS or DELTAS.
data = self.sensors.ReadTRx(category)
self.ui.CallJSFunction('displayDebugData', data)
session.console.debug('%s: get %s data: %s', phase, category, data)
self.Sleep(1)
# Verifies whether the sensor data is good or not by the verify_method.
self.test_pass = verify_method(data, category)
session.console.info('Invoked verify_method: %s', verify_method.func_name)
# Write the sensor data and the test result to USB stick, the UI,
# and also to the shop floor.
log_to_file = StringIO.StringIO()
self._WriteSensorDataToFile(log_to_file, sn, phase, self.test_pass, data)
self.log('touchscreen_calibration', sn=sn, phase=phase,
test_pass=self.test_pass, sensor_data=str(data))
testlog.LogParam('phase', phase)
testlog.LogParam('test_pass', self.test_pass)
testlog.LogParam('sensor_data', data)
result = 'pass' if self.test_pass else 'fail'
self._AttachLog('touchscreen_calibration.log', str(data))
summary_line = '%s: %s (%s)' % (sn, result, phase)
self._UpdateSummaryFile(sn, summary_line)
if not self.test_pass:
self.FailTask('%s failed' % phase)
def _ReadAndVerifySensorData(self, sn, phase, category, verify_method):
# Get data based on the category, i.e., REFS or DELTAS.
data = self.sensors.Read(category)
self.ui.CallJSFunction('displayDebugData', data)
session.console.debug('%s: get %s data: %s', phase, category, data)
self.Sleep(1)
# Verifies whether the sensor data is good or not by the verify_method.
self.test_pass, failed_sensors, min_value, max_value = verify_method(data)
session.console.info('Invoked verify_method: %s', verify_method.func_name)
for sensor in failed_sensors:
session.console.debug('Failed sensor at (%d, %d) value %d', *sensor)
session.console.info('Number of failed sensors: %d', len(failed_sensors))
session.console.info('(min, max): (%d, %d)', min_value, max_value)
# Write the sensor data and the test result to USB stick, the UI,
# and also to the shop floor.
log_to_file = StringIO.StringIO()
self._WriteSensorDataToFile(log_to_file, sn, phase, self.test_pass, data)
self.log('touchscreen_calibration', sn=sn, phase=phase,
test_pass=self.test_pass, sensor_data=str(data))
testlog.LogParam('phase', phase)
testlog.LogParam('test_pass', self.test_pass)
testlog.LogParam('sensor_data', data)
result = 'pass' if self.test_pass else 'fail'
self._AttachLog('touchscreen_calibration.log', str(data))
summary_line = ('%s: %s (%s) [min: %d, max: %d]' %
(sn, result, phase, min_value, max_value))
self._UpdateSummaryFile(sn, summary_line)
if phase == 'PHASE_DELTAS_TOUCHED':
self._UpdateSummaryFile(sn, '\n')
if not self.test_pass:
msg = '[min, max] of phase %s: [%d, %d]' % (phase, min_value, max_value)
self.FailTask(msg)
def _FlashFirmware(self, sn, phase):
"""."""
fw_file = self.args.fw_file
test_pass = self.sensors.FlashFirmware(self.args.fw_version,
self.args.fw_config)
result = 'pass' if test_pass else 'fail'
summary_line = ('%s: %s (%s) flashed fw %s:%s' %
(sn, result, phase, self.args.fw_version,
self.args.fw_config))
self._UpdateSummaryFile(sn, summary_line)
if test_pass:
session.console.info('Have flashed %s to %s', fw_file, sn)
else:
self.FailTask('Fail to flash firmware: %s' % fw_file)
def _CheckFirmwareVersion(self, sn, phase):
"""Check whether the firmware version and the config are correct."""
fw_version, fw_config = self.sensors.ReadFirmwareVersion()
session.console.info('firmware version %s:%s', fw_version, fw_config)
test_pass = (fw_version == self.args.fw_version and
fw_config == self.args.fw_config)
result = 'pass' if test_pass else 'fail'
summary_line = ('%s: %s (%s) detected base fw %s:%s' %
(sn, result, phase, fw_version, fw_config))
self._UpdateSummaryFile(sn, summary_line)
if not test_pass:
self.FailTask(
'Firmware version failed. Expected %s:%s, but got %s:%s' %
(self.args.fw_version, self.args.fw_config, fw_version, fw_config))
def _DoTest(self, sn, phase):
"""The actual calibration method.
Args:
sn: the serial number of the touchscreen under test
phase: the test phase, including PHASE_REFS, PHASE_DELTAS_UNTOUCHED, and
PHASE_DELTAS_TOUCHED
"""
session.console.info('Start testing SN %s for phase %s', sn, phase)
if not self._CheckSerialNumber(sn):
return
if phase == self.PHASE_SETUP_ENVIRONMENT:
self._SetupEnvironment()
elif phase == self.PHASE_FLASH_FIRMWARE:
self._FlashFirmware(sn, phase)
elif phase == self.PHASE_CHECK_FIRMWARE_VERSION:
self._CheckFirmwareVersion(sn, phase)
elif phase == self.PHASE_REFS:
# Dump one frame of the baseline refs data before the probe touches the
# panel, and verify the uniformity.
self._ReadAndVerifySensorData(
sn, phase, self.REFS, self.sensors.VerifyRefs)
elif phase == self.PHASE_DELTAS_UNTOUCHED:
# Dump delta values a few times before the probe touches the panel.
for unused_time in range(self.dump_frames):
self._ReadAndVerifySensorData(
sn, phase, self.DELTAS, self.sensors.VerifyDeltasUntouched)
elif phase == self.PHASE_TRX_OPENS:
# Read the TRx test data and verify the test result.
self._ReadAndVerifyTRxData(sn, phase, self.TRX_OPENS,
self.sensors.VerifyTRx)
elif phase == self.PHASE_TRX_GND_SHORTS:
# Read the TRx test data and verify the test result.
self._ReadAndVerifyTRxData(sn, phase, self.TRX_GND_SHORTS,
self.sensors.VerifyTRx)
elif phase == self.PHASE_TRX_SHORTS:
# Read the TRx test data and verify the test result.
self._ReadAndVerifyTRxData(sn, phase, self.TRX_SHORTS,
self.sensors.VerifyTRx)
elif phase == self.PHASE_DELTAS_TOUCHED:
# Dump delta values after the probe has touched the panel.
# This test involves controlling the test fixture.
self._CheckFixtureStateUp()
if not self.sensors.PreRead():
session.console.error('Failed to execute PreRead().')
self.DriveProbeDown()
# Wait a while to let the probe touch the panel stably.
self.Sleep(10 if self.fake_fixture else 1)
self._ReadAndVerifySensorData(
sn, phase, self.DELTAS, self.sensors.VerifyDeltasTouched)
self.DriveProbeUp()
if not self.sensors.PostRead():
session.console.error('Failed to execute PostRead().')
def FinishTest(self):
"""Finish the test and do cleanup if needed.
This method is invoked only for Ryu to do final calibration.
Args:
event: the event that triggers this callback function
"""
with open(os.path.join(self._local_log_dir, self.summary_file)) as f:
self._AttachLog('summary.log', f.read())
self.sensors.PostTest()
self.fixture.DriveProbeUpDone()
def _ConnectTouchDevice(self):
"""Make sure that the touch device is connected to the machine and
the touch kernel module is inserted properly.
"""
if not self.sensors.CheckStatus():
# The kernel module is inserted, but the touch device is not connected.
self.sensors.PostTest()
self.ui.Alert(
_('Fail to detect the touchscreen.\n'
'Insert the traveler board, and restart the test.'))
return False
return True
def _InsertAndDetectTouchKernelModule(self):
"""Insert the touch kernel module and make sure it is detected."""
if (not self.sensors.kernel_module.Insert() or
not self.sensors.kernel_module.IsDeviceDetected()):
self.sensors.kernel_module.Remove()
session.console.error('Failed to insert the kernel module: %s.',
self.sensors.kernel_module.name)
self.ui.Alert(
_('Fail to detect the touchscreen.\n'
'Remove and re-insert the traveler board. And restart the test.'))
return False
return True
def GetSerialNumber(self):
"""Get the DUT's serial number from device data."""
sn = device_data.GetSerialNumber()
self.ui.CallJSFunction('fillInSerialNumber', sn)
self.StartCalibration(Event({'sn': sn}))
def StartCalibration(self, event):
"""Starts the calibration thread.
This method is invoked by snEntered() in touchscreen_calibration.js
after the serial number has been entered.
Args:
event: the event that triggers this callback function
"""
if self._calibration_thread and self._calibration_thread.isAlive():
self.ui.Alert(_('Current calibration has not completed yet'))
return
if not self._ConnectTouchDevice():
raise Error('Cannot detect the touch device.')
sn = event.data.get('sn', '')
if not sn:
self.ui.Alert(_('Please enter SN first'))
self.ui.CallJSFunction('displayDebugData', [])
return
self._calibration_thread = threading.Thread(target=self._DoTest,
args=[sn, self.args.phase])
self._calibration_thread.start()
def _RegisterEvent(self, event):
"""Adds event handlers for various events.
Args:
event: the event to be registered in the UI
"""
assert hasattr(self, event)
session.console.debug('Registered event %s', event)
self.event_loop.AddEventHandler(
event, lambda unused_event: getattr(self, event)())
def _MakeLocalLogDir(self):
if not os.path.isdir(self._local_log_dir):
try:
os.makedirs(self._local_log_dir)
except Exception as e:
msg = 'Failed to create the local log directory %s: %s'
session.console.warn(msg, self._local_log_dir, e)
def _CheckMountedMedia(self):
"""Checks the existence of the mounted media."""
try:
# Write the test launch time.
self._WriteLog('touchscreen_calibration_launch.txt',
'%s\n' % time.ctime())
except Exception:
self._mounted_media_flag = False
msg = 'Mounted media does not exist. Use %s instead.'
session.console.warn(msg, self._local_log_dir)
self._MakeLocalLogDir()
self._WriteLog('touchscreen_calibration_launch.txt',
'%s\n' % time.ctime())
def QueryFixtureState(self):
"""Query the fixture internal state including all sensor values."""
if self.fixture.native_usb:
try:
self.fixture.native_usb.QueryFixtureState()
self.query_fixture_state_flag = True
except Exception as e:
session.console.warn('Failed to query fixture state: %s', e)
def _MonitorNativeUsb(self, native_usb):
"""Get the complete state and show the values that are changed."""
self.ui.CallJSFunction('showProbeState', 'N/A')
self.QueryFixtureState()
self.Sleep(0.5)
while True:
native_usb.GetState()
if self.query_fixture_state_flag:
state_list = native_usb.CompleteState()
self.query_fixture_state_flag = False
else:
state_list = native_usb.DiffState()
if state_list:
session.console.info('Internal state:')
for name, value in state_list:
if name == 'state':
try:
self.ui.CallJSFunction('showProbeState', value)
except Exception:
msg = 'Not able to invoke CallJSFunction to show probe state.'
session.console.warn(msg)
session.console.info(' %s: %s', name, value)
def _CreateMonitorPort(self):
"""Create a thread to monitor the native USB port."""
if self.fixture and self.fixture.native_usb:
try:
self._monitor_thread = process_utils.StartDaemonThread(
target=self._MonitorNativeUsb, args=[self.fixture.native_usb])
except threading.ThreadError:
session.console.warn('Cannot start thread for _MonitorNativeUsb()')
def runTest(self):
os.environ['DISPLAY'] = ':0'
self.start_time = self._GetTime()
self._CheckMountedMedia()
events = [
# Events that are emitted from buttons on the factory UI.
'ReadTest', 'RefreshFixture', 'RefreshTouchscreen', 'ProbeSelfTest',
'DriveProbeDown', 'DriveProbeUp', 'Shutdown', 'QueryFixtureState',
'RefreshNetwork',
# Events that are emitted from other callback functions.
'FinishTest',
]
for event in events:
self._RegisterEvent(event)
self.event_loop.AddEventHandler('StartCalibration', self.StartCalibration)
self.ui.BindKeyJS('D', 'toggleDebugPanel();')
self.WaitTaskEnd()