blob: 06b3209827d9e148ff2319735fe83464df4f48cc [file] [log] [blame]
# Copyright 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
import asyncore
import factory_common # pylint: disable=unused-import
from cros.factory.utils import process_utils
from cros.factory.external import evdev
def GetDevices():
"""Gets all the input devices.
Returns:
A list of evdev.InputDevice() instances of the input devices.
"""
return [evdev.InputDevice(d) for d in evdev.list_devices()]
def FilterEvdevEcodes(dev, cnf):
"""Check if the capabilities of the device satisfy that of the CNF
Args:
dev: evdev.InputDevice
cnf: list of lists of evdev.ecodes
Returns:
True if dev satisfies cnf
"""
caps = set(dev.capabilities().get(evdev.ecodes.EV_KEY, []))
for clause in cnf:
if set(clause) & caps == set():
return False
return True
def IsLidEventDevice(dev):
"""Check if a device is with EV_SW and SW_LID capabilities.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a lid event device.
"""
return evdev.ecodes.SW_LID in dev.capabilities().get(evdev.ecodes.EV_SW, [])
def IsTabletEventDevice(dev):
"""Check if a device is with EV_SW and SW_TABLET_MODE capabilities.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a tablet event device.
"""
return evdev.ecodes.SW_TABLET_MODE in dev.capabilities().get(
evdev.ecodes.EV_SW, [])
def IsKeyboardDevice(dev):
"""Check if a device is with EV_KEY and KEY_ENTER capabilities.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a keyboard device.
"""
keys = {
evdev.ecodes.KEY_ENTER,
evdev.ecodes.KEY_LEFTCTRL,
evdev.ecodes.KEY_LEFTALT
}
caps = set(dev.capabilities().get(evdev.ecodes.EV_KEY, []))
return keys.issubset(caps)
def SendKeys(key_sequence):
"""Sends the given key sequence through uinput.
Args:
key_sequence: A list of keys to send. For the list of valid key events, see
evdev.ecodes module.
"""
uinput = evdev.UInput()
for k in key_sequence:
uinput.write(evdev.ecodes.EV_KEY, k, 1)
for k in key_sequence:
uinput.write(evdev.ecodes.EV_KEY, k, 0)
uinput.syn()
uinput.close()
def IsTouchDevice(dev):
"""Check if a device is a touch device.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a touch device.
"""
keycaps = dev.capabilities().get(evdev.ecodes.EV_KEY, [])
return evdev.ecodes.BTN_TOUCH in keycaps
def IsStylusDevice(dev):
"""Check if a device is a stylus device.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a stylus device.
"""
return FilterEvdevEcodes(dev, [[
evdev.ecodes.BTN_STYLUS,
evdev.ecodes.BTN_STYLUS2,
evdev.ecodes.BTN_TOOL_PEN]])
def IsTouchpadDevice(dev):
"""Check if a device is a touchpad device.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a touchpad device.
"""
keycaps = dev.capabilities().get(evdev.ecodes.EV_KEY, [])
return (evdev.ecodes.BTN_TOUCH in keycaps and
evdev.ecodes.BTN_MOUSE in keycaps)
def IsTouchscreenDevice(dev):
"""Check if a device is a touchscreen device.
Args:
dev: evdev.InputDevice
Returns:
True if dev is a touchscreen device.
"""
return (not IsTouchpadDevice(dev) and
evdev.ecodes.ABS_MT_SLOT in dict(
dev.capabilities().get(evdev.ecodes.EV_ABS, [])))
class DeviceNotFoundError(RuntimeError):
pass
class MultipleDevicesFoundError(RuntimeError):
pass
def FindDevice(*args):
"""Find a device that match all arguments.
Args:
Each argument should be None (skipped), int (event id), str (pattern to
search in evdev name), or a filter function with domain evdev.InputDevice.
Returns:
An evdev.InputDevice
"""
candidates = GetDevices()
for item in args:
# pylint: disable=cell-var-from-loop
if item is None:
continue
if isinstance(item, int):
dev_filter = lambda dev: dev.fn == '/dev/input/event%d' % item
elif isinstance(item, str):
if item in evdev.ecodes.__dict__:
dev_filter = lambda dev: FilterEvdevEcodes(
dev, [[evdev.ecodes.__dict__[item]]])
else:
dev_filter = lambda dev: item in dev.name
elif callable(item):
dev_filter = item
else:
raise ValueError('Invalid argument %r' % item)
candidates = list(filter(dev_filter, candidates))
if len(candidates) == 1:
return candidates[0]
elif not candidates:
raise DeviceNotFoundError("Can't find device.")
else:
raise MultipleDevicesFoundError('Not having exactly one candidate!')
def DeviceReopen(dev):
"""Reopen a device so that the event buffer is cleared.
Args:
dev: evdev.InputDevice
Returns:
A different evdev.InputDevice of the same device but with empty event
buffer.
"""
return evdev.InputDevice(dev.fn)
class InputDeviceDispatcher(asyncore.file_dispatcher):
"""Extends asyncore.file_dispatcher to read input device."""
def __init__(self, device, event_handler):
self.device = device
self.event_handler = event_handler
asyncore.file_dispatcher.__init__(self, device)
def handle_read(self):
# Spec - https://docs.python.org/2/library/asyncore.html mentions about
# that recv() may raise socket.error with EAGAIN or EWOULDBLOCK, even
# though select.select() or select.poll() has reported the socket ready
# for reading.
#
# We have the similar issue here; the buffer might be still empty when
# reading from an input device even though asyncore calls handle_read().
# As a result, we call read_one() here because it will return None when
# buffer is empty. On the other hand, if we call read() and iterate the
# returned generator object then an IOError - EAGAIN might be thrown but
# this behavior is not documented so can't leverage on it.
while True:
event = self.device.read_one()
if event is None:
break
self.event_handler(event)
def writable(self):
return False
def StartDaemon(self):
"""Start a daemon thread forwarding events to event_handler."""
process_utils.StartDaemonThread(target=asyncore.loop)