blob: 276e0c5cdb49bd67208c4b46ac88d86402fe0308 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Remote classes for touch devices on full DUTs being tested.
The subclasses in this file are groups as RemoteInSystemTouchDevices, which
use a lot of the same code. The way these devices are accesses is by remotely
connecting to the laptop/tablet/phone/etc that the touchs device is part of
and using command line tools to query the events. For instance, on ChromeOS
"ssh" is used to instigate the remote connection and "evtest" is run on the
remote device to query the touch events.
New device classes should be added as subclasses here if they are already
installed in a working computer system with an OS that offers a remote terminal.
"""
import inspect
import os
import re
import select
import stat
from subprocess import PIPE, Popen
import mt
from mt.input import linux_input
from remote import RemoteTouchDevice
class RemoteInSystemTouchDevice(RemoteTouchDevice):
""" This class impliments much of the shared functionality among in-system
remote touch devices. As such, specific implimentations should derive from
this class.
"""
FLUSH_TIMEOUT = 0.1
def __init__(self, addr, is_touchscreen=False, protocol='auto',
device_num=None):
RemoteTouchDevice.__init__(self)
self.addr = addr
self.is_touchscreen = is_touchscreen
self.event_stream_process = None
self.begin_event_stream_cmd = self.begin_event_stream_cmd or ''
self.most_recent_snapshot = None
self.flush_timeout = self.FLUSH_TIMEOUT
self.pressure_src = mt.MtStateMachine.PRESSURE_FROM_MT_PRESSURE
# Probe to find out which device we want to connect to on the DUT
self.device_num = device_num
if self.device_num is None:
self.device_num = self._GetDeviceNumber(self.is_touchscreen)
if self.device_num is None:
print 'ERROR: Unable to determind the device number!'
# Spawn the event gathering subprocess
self._InitializeEventGatheringSubprocess()
# Determine the ranges/resolutions of the various attributes of fingers
x, y, p = self._GetDimensions()
self._x_min, self._x_max, self._x_res = x['min'], x['max'], x['resolution']
self._y_min, self._y_max, self._y_res = y['min'], y['max'], y['resolution']
self._p_min, self._p_max = p.get('min', None), p.get('max', None)
# Determine which kind of state machine should be used with this device
self.protocol = self._GetMtProtocol() if protocol == 'auto' else protocol
if self.protocol == mt.MTA:
self.state_machine = mt.MtaStateMachine(self.pressure_src)
elif self.protocol == mt.MTB:
self.state_machine = mt.MtbStateMachine(self.pressure_src)
else:
self.state_machine = mt.StylusStateMachine(self.pressure_src)
def _CheckForAlternativePressureSources(self, p, touch_major):
""" Given dimensional dictionaries for the min/max values for p and touch
major, this function returns the best guess of what this touch device
should use for "pressure." If p is already known, simply return that,
otherwise try to use the touch_major values instead, and finally as a last
resort just assume a constant pressure of 1.
"""
if p:
return p
if touch_major:
self.Warn('This device does NOT report "pressure". Using the contact '
'size (TOUCH_MAJOR) instead, so we can continue!')
self.pressure_src = mt.MtStateMachine.PRESSURE_FROM_MT_TOUCH_MAJOR
return touch_major
self.Warn('This device does NOT report "pressure" and no substitue '
'was found. Using a constant value of 1 so we can continue!')
self.pressure_src = mt.MtStateMachine.PRESSURE_FROM_NOWHERE
return {'min': 0, 'max': 1}
def _InitializeEventGatheringSubprocess(self):
""" Initiate a stream of MTB events from the DUT
This function starts up an ongoing connection over which we can
receive MTB events on stdout. After calling this, repeated calls to
NextEvent() will return each event as they arrive.
Returns True on success, False it something went wrong
"""
# Initiate the streaming connection
self.event_stream_process = self._RunRemoteCmd(
self.begin_event_stream_cmd % self.device_num)
# Check to make sure it didn't terminate immediately
ret_code = self.event_stream_process.poll()
if ret_code is not None:
print 'ERROR: streaming terminated unexpectedly (%d)' % ret_code
return False
# Block until there's *something* to read, indicating everything is ready
readable, _, _, = select.select([self.event_stream_process.stdout], [], [])
return self.event_stream_process.stdout in readable
def _GetMtProtocol(self):
""" This function determines the Multitouch protocol used by this device.
For most devices mtb is used, but for a few devices mta has been
encountered.
This function is called for all devices in the constructor and needs to
be implimented by the subclass. If a particular class of devices will
always use one protocol this function can simply return that value without
contacting the device at all
"""
raise NotImplementedError(RemoteTouchDevice.not_implemented_msg)
def _ParseMtEvent(self, line):
""" Specifies how to parse a line of output into an MtEvent object
This function should take in a single line of output from stdout and
return None in the case the input does not contain an MT event or
the MtEvent object that it represents.
"""
raise NotImplementedError(RemoteTouchDevice.not_implemented_msg)
def _RunRemoteCmd(self, cmd):
""" This function should run cmd on the remote device.
Depending on the device type this may be implemented different ways.
For instance, on ChromeOS devices this would use SSH but on Android
it might used adb instead.
"""
raise NotImplementedError(RemoteTouchDevice.not_implemented_msg)
def _NextEvent(self, timeout=None):
""" Wait for and capture the next MT event
Once a connection has been initiated with BeginEventStream() this
function can be called to block until another MT event arrives
from the DUT. If the subprocess has stopped, None is returned,
otherwise a string with the MT event is returned.
"""
event = None
while not event:
if not self.event_stream_process:
return None
line = self._GetNextLine(timeout)
if not line:
return None
event = self._ParseMtEvent(line)
return event
def _GetNextLine(self, timeout=None):
if timeout:
inputs = [self.event_stream_process.stdout]
readable, _, _, = select.select(inputs, [], [], timeout)
if inputs[0] not in readable:
return None
line = self.event_stream_process.stdout.readline()
# If the event_stream_process had been terminated, just return None.
if self.event_stream_process is None:
return None
if line == '' and self.event_stream_process.poll() != None:
self.event_steam_process = None
return None
return line
def __del__(self):
""" Stops the stream of events
There are two steps.
Step 1: Kill the remote process; otherwise, it becomes a zombie process.
Step 2: Kill the local ssh/adb subprocess.
This terminates the subprocess that's maintaining the connection
with the DUT and returns its return code.
"""
# Step 1: kill the remote process; otherwise, it becomes a zombie process.
killing_process = self._RunRemoteCmd(self.kill_remote_process_cmd)
killing_process.wait()
# Step 2: Kill the local ssh/adb subprocess.
# If self.event_stream_process has been terminated, its value is None.
if self.event_stream_process is None:
return None
# Kill the subprocess if it is still alive with return_code as None.
return_code = self.event_stream_process.poll()
if return_code is None:
self.event_stream_process.terminate()
return_code = self.event_stream_process.wait()
if return_code is None:
print 'Error in killing the event_stream_process!'
self.event_stream_process = None
return return_code
def _GetDeviceNumber(self, is_touchscreen):
""" Collects the device ID for the touchpad/screen we're connecting to
This is so when we try to start streaming events we can make sure we're
connected to the right device on the DUT.
"""
def GetAllDeviceNumberMappings():
# This file contains the mappings so we can determine which event file
output = self._RunRemoteCmd('cat /proc/bus/input/devices').stdout.read()
# Build up a dictionary mapping device names -> device numbers
mappings = {}
current_name = None
for line in output.split('\n'):
line = str.replace(line, '\r', '')
matches = re.match('^N:\s+Name="(.+)"\s*$', line)
if matches:
current_name = matches.group(1)
matches = re.match('^H:\s+Handlers=.*event(\d+).*$', line, re.M)
if matches:
mappings[int(matches.group(1))] = current_name
return mappings
# First find all the device names/numbers on the DUT
device_names_by_number = GetAllDeviceNumberMappings()
# Ask the user to pick an input device from the list provided by evtest.
# It's difficult/impossible to autodetect this reliably
print 'Please select the correct device from the list below'
sorted_devices = sorted(device_names_by_number.items(), key=lambda x:x[0])
for device_number, device_name in sorted_devices:
print '%d: "%s"' % (device_number, device_name)
# Loop until the user makes a valid device selection for us.
selection = None
while selection not in device_names_by_number:
if selection:
print 'Error: That is not a legal device number'
selection = raw_input('Enter the device number you wish to test: ')
try:
selection = int(selection)
except:
pass
return selection
class AndroidTouchDevice(RemoteInSystemTouchDevice):
def __init__(self, addr, is_touchscreen=False, protocol='auto',
device_num=None):
self.begin_event_stream_cmd = 'getevent -tv /dev/input/event%d'
self.kill_remote_process_cmd = (
'for f in `ls /proc/`; do'
' grep getevent /proc/$f/cmdline &>/dev/null;'
' if [ "$?" == "0" -a "$f" != "self" ]; then'
' kill $f;'
' fi;'
'done')
RemoteInSystemTouchDevice.__init__(self, addr, is_touchscreen, protocol,
device_num)
def _GetMtProtocol(self):
""" Android devices usually use mtb, but for some devices they still use
mta. You can tell the difference by looking at the events that the touch
device emits. MTA devices use SYN_MT_REPORT events to delimit finger data
but that event isn't used at all in MTB. By reading events from the device
until a full frame has been read, we should be able to reliably determine
which protocol is being used by the presence of these events.
"""
print ('\033[1;32mPlease touch the screen so we may determine which '
'protocol this Android device uses.\033[0m')
# Discard events until the start of the next snapshot in case we started
# in the middle of one
while not self._NextEvent().is_SYN_REPORT:
pass
# Now check all the events of this next snapshot for the signs of MTA
event = self._NextEvent()
while not event.is_SYN_REPORT():
if event.is_SYN_MT_REPORT():
print 'MTA selected by the presence of SYN_MT_REPORT events'
return mt.MTA
event = self._NextEvent()
print 'MTB selected by the absence of SYN_MT_REPORT events'
return mt.MTB
def _GetResolution(self):
""" Android devices don't report their touchscreen resolution to the
kernel, but instead have their resolution tied to the LCD resolution.
To determine this we can pull down their LCD resolution and convert
from px/in to px/mm to remain consistent with other devices.
The output of Android's "getprop" command outputs a list of properties
that look like this:
...
[ro.setupwizard.mode]: [OPTIONAL]
[ro.sf.lcd_density]: [320]
[ro.somc.customerid]: [110]
...
We simply run that command on the DUT and then look for the line that
contains the "lcd_density" and parse out the value.
"""
MM_PER_INCH_CONVERSION = 25.4
cmd = 'getprop'
pattern = '^.*\[ro.sf.lcd_density\]:\s*\[(\d*)\].*$'
output = self._RunRemoteCmd(cmd).stdout.read()
for line in output.split('\n'):
matches = re.match(pattern, line, re.M)
if matches:
return float(matches.group(1)) / MM_PER_INCH_CONVERSION
return None
def _GetDimensions(self):
""" Get the dimensions of the touch device by using the getevent -p
tool. This program prints the range of each value that the touch
device will return, so this checks the range for X/Y positions and
pressure and returns them.
Running "getevent -p" on Android will output the details for the values
the device will output as shown below. The first number is a code that
specifies which parameter it is describing, then it has other information
about it such as min/max/etc.
...
0035 : value 0, min 0, max 1080, fuzz 0, flat 0, resolution 0
0036 : value 0, min 0, max 1920, fuzz 0, flat 0, resolution 0
003a : value 0, min 0, max 181, fuzz 0, flat 0, resolution 0
...
This function finds the line for X, Y, and pressure values and parses them
out to get their ranges.
Note: Android doesn't seem to report the X/Y resolution in the typical
fashion here, so it is computed in the function _GetResolution() as a
separate step before returning. To see how we compute resolution
information for Android, look in that function.
"""
x = y = p = touch_major = None
cmd = 'getevent -p /dev/input/event%d' % self.device_num
output = self._RunRemoteCmd(cmd).stdout.read()
for line in output.split('\n'):
pattern = '^.*([0-9a-f]{4})\s+:\s+.*min (\d*),\s+max (\d*),.*$'
matches = re.match(pattern, line, re.M)
if not matches:
continue
code = int(matches.group(1), 16)
min_value = int(matches.group(2))
max_value = int(matches.group(3))
if code in [linux_input.ABS_MT_POSITION_X, linux_input.ABS_X]:
x = {'min': min_value, 'max': max_value}
elif code in [linux_input.ABS_MT_POSITION_Y, linux_input.ABS_Y]:
y = {'min': min_value, 'max': max_value}
elif code in [linux_input.ABS_MT_PRESSURE, linux_input.ABS_PRESSURE]:
p = {'min': min_value, 'max': max_value}
elif code is linux_input.ABS_MT_TOUCH_MAJOR:
touch_major = {'min': min_value, 'max': max_value}
p = self._CheckForAlternativePressureSources(p, touch_major)
resolution = self._GetResolution()
x['resolution'] = resolution
y['resolution'] = resolution
return x, y, p
def _ToSignedInt(self, x, bits):
return x if x & (1 << (bits - 1)) == 0 else x - (1 << bits)
def _ParseMtEvent(self, line):
""" How to parse out the Android-specific event format
Android events look like this:
[ 583187.983948] 0003 003a 00000080
timestamp type code value
Note: Some versions of Android will include the device name as well:
[ 124555.734127] /dev/input/event2: 0003 0036 0000024d
"""
pattern = ('^\[\s*(\d+.\d+)\](.+):?' +
'([0-9a-f]+)\s+([0-9a-f]+)\s+([0-9a-f]+)\s*$')
matches = re.match(pattern, line, re.M | re.I)
if not matches:
return None
if ('event' in matches.group(2) and
'event%d' % self.device_num not in matches.group(2)):
return None
timestamp = float(matches.group(1))
event_type = int(matches.group(3), 16)
event_code = int(matches.group(4), 16)
value = self._ToSignedInt(int(matches.group(5), 16), bits=32)
return mt.MtEvent(timestamp, event_type, event_code, value)
def _RunRemoteCmd(self, cmd):
""" Run a command on the shell of a remote Android DUT """
args = ['adb', 'shell', cmd]
if self.addr:
args = ['adb', '-s', self.addr, 'shell', cmd]
return Popen(args, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
class ChromeOSTouchDevice(RemoteInSystemTouchDevice):
def __init__(self, addr, is_touchscreen=False, protocol='auto', grab=True,
device_num=None):
program = '/usr/local/bin/evtest %s ' % ('--grab' if grab else '')
self.begin_event_stream_cmd = program + '/dev/input/event%d'
self.kill_remote_process_cmd = 'killall evtest'
RemoteInSystemTouchDevice.__init__(self, addr, is_touchscreen, protocol,
device_num)
# Stop ChromeOS's power daemon to prevent the laptop going to sleep
self._RunRemoteCmd('stop powerd')
def _GetMtProtocol(self):
""" ChromeOS devices generally use mtb, so we can simply return that. In
the rare case where this is invalid, the operator will know and can
specify the protocol manually. """
print ('\033[1;32mAssuming MTB is used. If this is a different kind, '
'please rerun the script with --protocol to specify.\033[0m')
return mt.MTB
def _GetDimensions(self):
""" Evtest simply outputs the legal ranges of values for X/Y/etc at
the beginning of its output. So instead of issuing a new remote
command to figure out the dimensions, this function simply consumes
the first few lines of output (before actual events are reported)
and parses out the ranges.
The first thing that evtest outputs is a bunch of information about
the values the device will produce. Among that information are blocks
that looks like this describing the range of x/y/etc values.
...
Event code 53 (ABS_MT_POSITION_X)
Value 0
Min 0
Max 3029
Resolution 31
...
This function finds them and parses out the max values and resolutions for
both X and Y values, in addition to the maximum value for pressure.
"""
def _GetDimensions_Aux(required_attributes):
""" This helper function parses out the ranges for a single dimension """
dim = {}
while not all([(attrib in dim) for attrib in required_attributes]):
line = self._GetNextLine()
matches = re.match('^\s*(\S+)\s*(\d+).*$', line)
if matches:
attribute = matches.group(1).lower()
value = int(matches.group(2))
if attribute in required_attributes:
dim[attribute] = value
return dim
x = y = p = touch_major = None
while not all([x, y, p]):
line = self._GetNextLine(timeout=2)
if line is None:
break
if '(ABS_MT_POSITION_X)' in line or '(ABS_X)' in line:
x = _GetDimensions_Aux(['min', 'max', 'resolution'])
elif '(ABS_MT_POSITION_Y)' in line or '(ABS_Y)' in line:
y = _GetDimensions_Aux(['min', 'max', 'resolution'])
elif '(ABS_MT_PRESSURE)' in line or '(ABS_PRESSURE)' in line:
p = _GetDimensions_Aux(['min', 'max'])
elif 'Event code 48 (ABS_MT_TOUCH_MAJOR)' in line:
touch_major = _GetDimensions_Aux(['min', 'max'])
p = self._CheckForAlternativePressureSources(p, touch_major)
return x, y, p
def _ParseMtEvent(self, line):
""" How to parse out the ChromeOS-specific event format
ChromeOS events look like this:
Event: time 1416420362.444933, type 3 (EV_ABS), code 1 (ABS_Y), value 599
Event: time 1416420362.425098, -------------- SYN_REPORT ------------
"""
matches = re.match('^Event: time (\d+.\d+), (.*)$', line)
if matches:
timestamp = float(matches.group(1))
contents = matches.group(2)
else:
return None
if 'SYN_REPORT' in contents:
return mt.MtEvent(timestamp, linux_input.EV_SYN, linux_input.SYN_REPORT)
pattern = '^.*type (\d+) .*, code (\d+) .*, value (-?[0-9a-fA-F]+).*$'
matches = re.match(pattern, contents)
if matches is None:
print 'ERROR: Unable to parse event!'
print '\tline: (%s)' % line
print '\ttimestamp: (%f)' % timestamp
print '\tcontents: (%s)' % contents
event_type = int(matches.group(1))
event_code = int(matches.group(2))
# evtest reports all values in decimal except 2 events, which are in hex
# Depending on the event type and code, parse using a different base
value_base = 10
if (event_type == linux_input.EV_MSC and
event_code in [linux_input.MSC_RAW, linux_input.MSC_SCAN]):
value_base = 16
value = int(matches.group(3), value_base)
return mt.MtEvent(timestamp, event_type, event_code, value)
def _RunRemoteCmd(self, cmd):
""" Run a command on the shell of a remote ChromeOS DUT """
RSA_KEY_PATH = os.path.dirname(
os.path.realpath(inspect.getfile(
inspect.currentframe()))) + '/data/testing_rsa'
if stat.S_IMODE(os.stat(RSA_KEY_PATH).st_mode) != 0600:
os.chmod(RSA_KEY_PATH, 0600)
args = ['ssh', 'root@%s' % self.addr,
'-i', RSA_KEY_PATH,
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no',
cmd]
return Popen(args, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)