blob: 0ffc9f0371eaba0d817553531b0f2281a207a7d4 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from collections import namedtuple
from event import MtEvent, EV_ABS, ABS_MT_TRACKING_ID
MtFinger = namedtuple('MtFinger', ['tid', 'syn_time', 'x', 'y', 'pressure',\
'tilt_x', 'tilt_y', 'touch_major',\
'touch_minor'])
MtSnapshot = namedtuple('MtSnapshot', ['syn_time', 'button_pressed',
'fingers', 'raw_events'])
class MtStateMachine(object):
""" This is an abstract base class that defines the interface of a multitouch
state machine. This class should never be instantiated directly, but rather
any actual state machines should derive from this class to guarantee that it
conforms to the same interfaces.
"""
PRESSURE_FROM_MT_PRESSURE = 'from_mt_pressure'
PRESSURE_FROM_MT_TOUCH_MAJOR = 'from_mt_touch_major'
PRESSURE_FROM_NOWHERE = 'from_nowhere'
PRESSURE_SOURCES = [PRESSURE_FROM_MT_PRESSURE, PRESSURE_FROM_MT_TOUCH_MAJOR,
PRESSURE_FROM_NOWHERE]
def __init__(self, pressure_src):
""" Setup the parts of the state machine that are uniform across all MT
protocol state machines.
Currently this is only one option: pressure source. This tells the state
machine where to get the "pressure" readings from. While, the default
way is to use MT_PRESSURE messages, some touchpads/screens use the
MT_TOUCH_MAJOR message instead or do not report it at all. This parameter
allows you to specify which you want the state machine to use.
"""
if pressure_src not in MtStateMachine.PRESSURE_SOURCES:
raise ValueError('The pressure source "%s" is not valid.')
self.pressure_src = pressure_src
def add_event(self, event):
raise NotImplementedError('Error: add_event() not implemented.')
def get_current_snapshot(self, request_data_ready=True):
raise NotImplementedError('Error: get_current_snapshot() not implemented.')
class MtaStateMachine(MtStateMachine):
""" The state machine for MTA events
This class accepts MtEvents via the add_event() member function and maintains
the state that an MTA system would. MTA is slightly different than MTB, but
uses similar events. Without delta compression, MTA can get rid of the
concept of slots, since all the finger data is reported before each SYN_REPORT
and the fingers themselves are separated byt SYN_MT_REPORTS instead of a
new SLOT.
Note that because our system expects new fingers to have new TIDs and in MTA
this is not the case (they reuse TIDs all the time) the MtaStateMachine
keeps track of which TIDs have already been used and remaps them when
it detects a repeated TID to an unused value to maintain transparency
between MTB and MTA.
"""
def __init__(self, pressure_src=MtStateMachine.PRESSURE_FROM_MT_PRESSURE):
# This dictionary maps the raw (non-unique) tids reported in the events to
# new (unique) tids so we can easily differentiate any two fingers even if
# there is a gap of time inbetween them.
self.tid_mappings = {}
self.next_unique_tid = 0
self.button_pressed = False
self.last_snapshot = None
self._clear()
MtStateMachine.__init__(self, pressure_src)
def _clear(self):
# Clear out the state for the current fingers
self.fingers_data = []
self.raw_events_since_last_syn = []
self.tid = self.x = self.y = self.pressure = self.touch_major \
= self.touch_minor = None
def add_event(self, event):
self.raw_events_since_last_syn.append(event)
# If this is the end of the information for all the fingers for this report
if event.is_SYN_REPORT():
# First build up the snapshot
fingers = []
for tid, x, y, p, touch_major, touch_minor in self.fingers_data:
mapped_tid = self.tid_mappings[tid]
fingers.append(MtFinger(mapped_tid, event.timestamp, x, y,
p, 0, 0, touch_major, touch_minor))
self.last_snapshot = MtSnapshot(event.timestamp, self.button_pressed,
fingers, self.raw_events_since_last_syn)
# Delete the tid mappings for any leaving fingers
# Find all the raw tids that have currently been reported
raw_tids_used = set([raw_tid for raw_tid, _, _, _ in self.fingers_data])
# Detect any mappings that are for fingers no longer on the pad
leaving_raw_tids = [raw_tid for raw_tid in self.tid_mappings
if raw_tid not in raw_tids_used]
# Remove those out-dated mappings from the mapping dictionary
for raw_tid in leaving_raw_tids:
del self.tid_mappings[raw_tid]
# Then clear out the fingers_data since this starts a new snapshot
self._clear()
# If this is the end of a finger's information stash all the data about it
# until the end of the data for all the fingers
elif event.is_SYN_MT_REPORT():
if self.pressure_src == MtStateMachine.PRESSURE_FROM_MT_TOUCH_MAJOR:
pressure = self.touch_major
elif self.pressure_src == MtStateMachine.PRESSURE_FROM_NOWHERE:
pressure = 1
else:
pressure = self.pressure
if all([v is not None for v in (self.tid, self.x, self.y, pressure)]):
touch_major = 0
touch_minor = 0
if self.touch_major is not None:
touch_major = self.touch_major
if self.touch_minor is not None:
touch_minor = self.touch_minor
self.fingers_data.append((self.tid, self.x, self.y, pressure,\
touch_major, touch_minor))
# Otherwise, check what information it's giving us and store it
elif event.is_ABS_MT_TRACKING_ID():
self.tid = event.value
if self.tid not in self.tid_mappings:
self.tid_mappings[self.tid] = self.next_unique_tid
self.next_unique_tid += 1
elif event.is_ABS_MT_POSITION_X():
self.x = event.value
elif event.is_ABS_MT_POSITION_Y():
self.y = event.value
elif event.is_ABS_MT_PRESSURE():
self.pressure = event.value
elif event.is_ABS_MT_TOUCH_MAJOR():
self.touch_major = event.value
elif event.is_ABS_MT_TOUCH_MINOR():
self.touch_minor = event.value
elif event.is_BTN_LEFT():
self.button_pressed = (event.value == 1)
def get_current_snapshot(self, request_data_ready=True):
return self.last_snapshot
class MtbStateMachine(MtStateMachine):
""" The state machine for MTB events.
It traces the slots, tracking IDs, x coordinates, y coordinates, etc. If
these values are not changed explicitly, the values are kept across events.
Note that the kernel driver only reports what is changed. Due to its
internal state machine, it is possible that either x or y in
self.point[tid] is None initially even though the instance has been created.
"""
DUMMY_TRACKING_ID = 999999
def __init__(self, pressure_src=MtStateMachine.PRESSURE_FROM_MT_PRESSURE):
# Set the default slot to 0 as it may not be displayed in the MT events
#
# Some abnormal event files may not display the tracking ID in the
# beginning. To handle this situation, we need to initialize
# the following variables: slot_to_tid, point
#
# As an example, refer to the following event file which is one of
# the golden samples with this problem.
# tests/data/stationary_finger_shift_with_2nd_finger_tap.dat
self.slot = 0
self.slots_in_use = set()
self.tid = {}
self.x = {}
self.y = {}
self.distance = {}
self.pressure = {}
self.touch_major = {}
self.touch_minor = {}
self.syn_time = None
self.leaving_slots = []
self.button_pressed = False
self.raw_events_since_last_syn = []
self.last_snapshot = None
self.is_first_event = True
MtStateMachine.__init__(self, pressure_src)
def add_event(self, event):
""" Update the internal states with the input MtEvent """
if self.is_first_event and not event.is_ABS_MT_TRACKING_ID():
self.add_event(MtEvent(event.timestamp,
EV_ABS, ABS_MT_TRACKING_ID,
MtbStateMachine.DUMMY_TRACKING_ID))
self.is_first_event = False
self.raw_events_since_last_syn.append(event)
# Note: The physical click button is not associated with any finger/slot
if event.is_BTN_LEFT():
self.button_pressed = (event.value == 1)
# Handle all the finger-related updates below here
# Switch the slot.
elif event.is_ABS_MT_SLOT():
self.slot = event.value
# Update tracking ID, noting if the slot is no longer used
elif event.is_ABS_MT_TRACKING_ID():
self.tid[self.slot] = event.value
if event.value == MtEvent.LEAVING_TRACKING_ID:
self.slots_in_use.discard(self.slot)
else:
self.slots_in_use.add(self.slot)
# Update the x, y, pressure, etc values for a given slot
elif event.is_ABS_MT_POSITION_X():
self.x[self.slot] = event.value
elif event.is_ABS_MT_POSITION_Y():
self.y[self.slot] = event.value
elif event.is_ABS_MT_PRESSURE():
self.pressure[self.slot] = event.value
elif event.is_ABS_MT_TOUCH_MAJOR():
self.touch_major[self.slot] = event.value
elif event.is_ABS_MT_TOUCH_MINOR():
self.touch_minor[self.slot] = event.value
elif event.is_ABS_MT_DISTANCE():
self.distance[self.slot] = event.value
# Use the SYN_REPORT time as the snapshot time
elif event.is_SYN_REPORT():
self.syn_time = event.timestamp
self.last_snapshot = self._build_current_snapshot()
self.raw_events_since_last_syn = []
def _build_current_snapshot(self):
"""Build current packet's data including x, y, pressure, and
the syn_time for all tids.
"""
current_fingers = []
for slot in self.slots_in_use:
tid = self.tid.get(slot, None)
x = self.x.get(slot, None)
y = self.y.get(slot, None)
distance = self.distance.get(slot, None)
major = self.touch_major.get(slot, 0)
minor = self.touch_minor.get(slot, 0)
# Skip any fingers that are hovering (they have a distance)
if distance:
continue
if self.pressure_src == MtStateMachine.PRESSURE_FROM_MT_TOUCH_MAJOR:
pressure = self.touch_major.get(slot, None)
elif self.pressure_src == MtStateMachine.PRESSURE_FROM_NOWHERE:
pressure = 1
else:
pressure = self.pressure.get(slot, None)
data_ready = all([v is not None for v in
(x, y, pressure, tid, self.syn_time)])
if data_ready:
finger = MtFinger(tid, self.syn_time, x, y, pressure, 0, 0,
major, minor)
current_fingers.append(finger)
current_snapshot = MtSnapshot(self.syn_time, self.button_pressed,
current_fingers,
self.raw_events_since_last_syn)
return current_snapshot
def get_current_snapshot(self):
return self.last_snapshot
class StylusStateMachine(MtStateMachine):
""" State machine for parsing simplified stylus protocol.
This only supports a single stylus, and ignores any hovering reports, just
reporting snapshots when the stylus is touching the surface.
"""
def __init__(self, pressure_src):
self.x = self.y = self.p = self.last_snapshot = None
self.tilt_x = self.tilt_y = None
self.btn_touch = self.btn_tool_pen = self.is_hovering = False
self.raw_events_since_last_syn = []
self.last_report_was_empty = True
MtStateMachine.__init__(self, pressure_src)
def __update_hover_state(self):
self.is_hovering = not self.btn_touch and self.btn_tool_pen
def add_event(self, event):
self.raw_events_since_last_syn.append(event)
if event.is_ABS_X():
self.x = event.value
elif event.is_ABS_Y():
self.y = event.value
elif event.is_ABS_TILT_X():
self.tilt_x = event.value
elif event.is_ABS_TILT_Y():
self.tilt_y = event.value
elif event.is_ABS_PRESSURE():
self.p = event.value
elif event.is_BTN_TOUCH():
self.btn_touch = (event.value == 1)
self.__update_hover_state()
elif event.is_BTN_TOOL_PEN():
self.btn_tool_pen = (event.value == 1)
self.__update_hover_state()
if event.value == 0:
self.x = self.y = self.p = None
elif event.is_SYN_REPORT():
current_fingers = None
if self.btn_touch:
current_fingers = [MtFinger(0, event.timestamp, self.x, self.y, self.p,
self.tilt_x, self.tilt_y, 0, 0)]
self.last_report_was_empty = False
elif not self.last_report_was_empty:
current_fingers = []
self.last_report_was_empty = True
if current_fingers is not None:
self.last_snapshot = MtSnapshot(event.timestamp, False, current_fingers,
self.raw_events_since_last_syn)
self.raw_events_since_last_syn = []
else:
self.last_snapshot = None
def get_current_snapshot(self, request_data_ready=True):
return self.last_snapshot