| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ Remote class for touch devices using Aardvark adapter. |
| |
| At this moment, the only subclass implemented is ElanAardvarkTouchPad, |
| however, it is expected to add more vendors soon. |
| The class RemoteAardvarkTouchDevice should contain all the codes that will stay |
| the same for all touch devices. |
| |
| New device classes should be added as subclasses here if they are not already |
| connected to the computer using an Ardvark i2c to USB adaptor. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import inspect |
| import os |
| try: |
| import queue |
| except ImportError: |
| import Queue as queue |
| import re |
| import select |
| import sys |
| from subprocess import PIPE, Popen |
| import time |
| from importlib import import_module |
| |
| from . import mt |
| from .mt.input import linux_input |
| from .remote import RemoteTouchDevice |
| |
| class RemoteAardvarkTouchDevice(RemoteTouchDevice): |
| """ This class impliments much of the shared functionality among remote |
| touch devices that are connected to the system with an Aardvark adapter. |
| As such, specific implimentations should derive from this class. |
| """ |
| |
| # This flag indicates that 10 bit address is used |
| I2C_NO_FLAGS = 0x00 |
| |
| # Constants |
| I2C_BITRATE = 400 |
| AARDVARK_FLUSH_TIMEOUT = 0.5 |
| |
| def __init__(self, addr, |
| pressure_src = mt.MtStateMachine.PRESSURE_FROM_MT_PRESSURE): |
| try: |
| self.adapter = import_module('aardvark_py') |
| except ImportError: |
| print('The files "aardvark_py.py" and "aardvark.so" are needed.') |
| print('You can download these files from the following link.') |
| print('https://www.totalphase.com/downloadable/download/sample/sample_id/16/') |
| sys.exit() |
| |
| RemoteTouchDevice.__init__(self) |
| |
| self._addr = addr |
| self.state_machine = mt.MtaStateMachine(pressure_src) |
| self.flush_timeout = self.AARDVARK_FLUSH_TIMEOUT |
| |
| self._Initialize_Aardvark() |
| |
| self.adapter.aa_sleep_ms(200) |
| self._Initialize_Device() |
| |
| # Determine the ranges/resolutions of the various attributes of fingers |
| x, y, p = self._GetDimensions() |
| self._x_min, self._x_max, self._x_res = x['min'], x['max'], x['resolution'] |
| self._y_min, self._y_max, self._y_res = y['min'], y['max'], y['resolution'] |
| self._p_min, self._p_max = p['min'], p['max'] |
| # Currently no Aardvark-compatible touch devices report tilt events, so |
| # these are just set to 0. |
| self._tx_min, self._tx_max = 0, 0 |
| self._ty_min, self._ty_max = 0, 0 |
| |
| def __del__(self): |
| # Turning off the device |
| print('Turning off') |
| self.adapter.aa_target_power(self._handle, |
| self.adapter.AA_TARGET_POWER_NONE) |
| |
| def _Initialize_Aardvark(self): |
| self._port = self._detect_port() |
| handle = self.adapter.aa_open(self._port) |
| self._handle = handle |
| if handle <= 0: |
| print('Unable to open Aardvark device on port') |
| print('Error code = %d' % handle) |
| sys.exit() |
| # Ensure that the I2C subsystem is enabled |
| self.adapter.aa_configure(handle, self.adapter.AA_CONFIG_SPI_I2C) |
| # Enable the I2C bus pullup resistors (2.2k resistors). |
| self.adapter.aa_i2c_pullup(handle, self.adapter.AA_I2C_PULLUP_BOTH) |
| # Enable the Aardvark adapter's power supply. |
| self.adapter.aa_target_power(handle, self.adapter.AA_TARGET_POWER_BOTH) |
| # Set the bitrate |
| bitrate = self.adapter.aa_i2c_bitrate(handle, self.I2C_BITRATE) |
| |
| def _detect_port(self): |
| (num, ports, unique_ids) = self.adapter.aa_find_devices_ext(16, 16) |
| for port in ports: |
| # Determine if the device is in-use |
| if (port & self.adapter.AA_PORT_NOT_FREE): |
| port &= ~self.adapter.AA_PORT_NOT_FREE |
| return port |
| print('No Aardvark devices found.') |
| sys.exit() |
| |
| def _Initialize_Device(self): |
| """ This function should initialize the TP or TS device. |
| Depending on the device type this should be implemented different ways. |
| It mainly depend on the protocols and the device firmware. |
| """ |
| raise NotImplementedError(RemoteTouchDevice.not_implemented_msg) |
| |
| def _i2c_write(self, data): |
| data_len = 4 if (data & 0xffff0000) else 2 |
| data_out = self.adapter.array_u08(data_len) |
| for byte_num in range(data_len): |
| data_out[byte_num] = (data & (0xff << 8 * byte_num)) >> (8 * byte_num) |
| retVal = self.adapter.aa_i2c_write(self._handle, |
| self._addr, self.I2C_NO_FLAGS, data_out) |
| if retVal != data_len: |
| print('Error: Number of bytes written: %d' % retVal) |
| print(' Make sure that the peripheral address is correct') |
| print(' Peripheral address: %d' % self._addr) |
| sys.exit() |
| else: |
| return retVal |
| |
| def _cut_data(self, tmp_data, num_bytes): |
| start_index = -1 |
| end_index = -1 |
| for i in xrange(num_bytes): |
| if tmp_data[i] != 255: |
| start_index = i |
| break |
| if start_index != -1: |
| for i in xrange(num_bytes): |
| if tmp_data[num_bytes - i - 1] != 255: |
| end_index = num_bytes - i - 1 |
| break |
| if end_index == -1: |
| return 0, num_bytes - 1, tmp_data |
| return start_index, end_index , tmp_data[start_index:end_index + 1] |
| |
| def _i2c_read(self, num_bytes): |
| temp_len, tmp_data = self.adapter.aa_i2c_read(self._handle, self._addr, |
| self.I2C_NO_FLAGS, |
| self.adapter.array_u08(num_bytes)) |
| s, e, data = self._cut_data(tmp_data, num_bytes) |
| if e == num_bytes - 1 and s != 0: |
| temp_len, tmp_data = self.adapter.aa_i2c_read(self._handle, self._addr, |
| self.I2C_NO_FLAGS, |
| self.adapter.array_u08(num_bytes)) |
| s, e, data_extension = self._cut_data(tmp_data, num_bytes) |
| return len(data) + len(data_extension), data + data_extension |
| return len(data), data |
| |
| class ElanTouchDevice(RemoteAardvarkTouchDevice): |
| I2C_ETP_ADDRESS = 21 |
| |
| RESET_CMD = 0x01000005 |
| DEVICE_DESC_CMD = 0x0001 |
| REPORT_DESC_CMD = 0x0002 |
| ABS_MODE_CMD = 0x00010300 |
| WAKE_UP_CMD = 0x80000005 |
| PRODUCT_ID_CMD = 0x0101 |
| FW_VERSION_CMD = 0x0102 |
| SM_VERSION_CMD = 0x0103 |
| IAP_VERSION_CMD = 0x0110 |
| X_MAX_CMD = 0x0106 |
| Y_MAX_CMD = 0x0107 |
| RESOLUTION_CMD = 0x0108 |
| I2C_XY_TRACENUM_CMD = 0x0105 |
| EVENT_LEN = 34 |
| |
| REPORT_DESC_LEN = 158 |
| DEVICE_DESC_LEN = 30 |
| MAX_FINGERS = 5 |
| MAX_PRESSURE = 255 |
| FWIDTH_REDUCE = 90 |
| |
| REPORT_ID = 0x5D |
| BTN_CLICK_MASK = 0x01 |
| |
| REPORT_ID_INDEX = 2 |
| TAP_INFO_INDEX = 3 |
| I2C_FINGER_DATA_OFFSET = 4 |
| FINGER_DATA_LEN = 5 |
| |
| def __init__(self, addr): |
| RemoteAardvarkTouchDevice.__init__(self, ElanTouchDevice.I2C_ETP_ADDRESS) |
| self._eventQ = queue.Queue() |
| |
| self._last_event_time = time.time() |
| self._event_started = False |
| def _CheckReturnVal(self, val, expected, description): |
| if val != expected: |
| print('Error: The device does not reply to get', description, 'command.') |
| sys.exit() |
| |
| def _Initialize_Device(self): |
| # Elan RESET |
| self._i2c_write(ElanTouchDevice.RESET_CMD) |
| # Waiting for the device to return resets and sends the confirmation |
| self.adapter.aa_sleep_ms(100) |
| # Reset flag receive |
| self._i2c_read(2) |
| # Get device description |
| self._i2c_write(ElanTouchDevice.DEVICE_DESC_CMD) |
| desc_len, desc_data = self._i2c_read(ElanTouchDevice.DEVICE_DESC_LEN) |
| self._CheckReturnVal(desc_len, |
| ElanTouchDevice.DEVICE_DESC_LEN, 'description') |
| |
| # Get report description |
| self._i2c_write(ElanTouchDevice.REPORT_DESC_CMD) |
| report_len, report_data = self._i2c_read(ElanTouchDevice.REPORT_DESC_LEN) |
| # Absolute mode activation |
| self._i2c_write(ElanTouchDevice.ABS_MODE_CMD) |
| # Send wake up command |
| self._i2c_write(ElanTouchDevice.WAKE_UP_CMD) |
| # Get Product ID |
| self._i2c_write(ElanTouchDevice.PRODUCT_ID_CMD) |
| product_id_len, product_id = self._i2c_read(2) |
| self._CheckReturnVal(product_id_len, 2, 'product ID') |
| self._product_id = product_id[0] |
| # Get FW version |
| self._i2c_write(ElanTouchDevice.FW_VERSION_CMD) |
| fw_len, fw_version = self._i2c_read(2) |
| self._CheckReturnVal(fw_len, 2, 'FW version') |
| self._fw_version = fw_version[0] |
| # Get SM version |
| self._i2c_write(ElanTouchDevice.SM_VERSION_CMD) |
| sm_len, sm_version = self._i2c_read(2) |
| self._CheckReturnVal(sm_len, 2, 'SM version') |
| self._sm_version = sm_version[0] |
| # Get IAP version |
| self._i2c_write(ElanTouchDevice.IAP_VERSION_CMD) |
| iap_len, iap_version = self._i2c_read(2) |
| self._CheckReturnVal(iap_len, 2, 'IAP version') |
| self._iap_version = iap_version[0] |
| |
| def _GetDimensions(self): |
| self._i2c_write(ElanTouchDevice.I2C_XY_TRACENUM_CMD) |
| data_len, data_val = self._i2c_read(2) |
| self.trace_x = data_val[0] - 1 |
| self.trace_y = data_val[1] - 1 |
| |
| # Get X min and max |
| self._i2c_write(ElanTouchDevice.X_MAX_CMD) |
| data_len, max_x = self._i2c_read(2) |
| self._CheckReturnVal(data_len, 2, 'Max X') |
| |
| self._max_x = (0x0f & max_x[1]) << 8 | max_x[0] |
| x = {'min': 0, 'max': self._max_x} |
| |
| # Get Y Min and Max |
| self._i2c_write(ElanTouchDevice.Y_MAX_CMD) |
| data_len, max_y = self._i2c_read(2) |
| self._CheckReturnVal(data_len, 2, 'Max Y') |
| |
| self._max_y = (0x0f & max_y[1]) << 8 | max_y[0] |
| y = {'min': 0, 'max': self._max_y} |
| |
| self._i2c_write(ElanTouchDevice.RESOLUTION_CMD) |
| data_len, resolution = self._i2c_read(2) |
| self._CheckReturnVal(data_len, 2, 'resolution') |
| |
| self._x_resolution = self._ConvertResolution(resolution[0]) |
| self._y_resolution = self._ConvertResolution(resolution[1]) |
| x['resolution'] = self._x_resolution |
| y['resolution'] = self._y_resolution |
| |
| p = {'min': 0, 'max': ElanTouchDevice.MAX_PRESSURE} |
| return x, y, p |
| |
| def _NextEvent(self, timeout=None): |
| if not self._eventQ.empty(): |
| return self._eventQ.get() |
| timestamp = time.time() |
| self._event_started = False |
| while True: |
| input_len, data = self._i2c_read(ElanTouchDevice.EVENT_LEN) |
| if input_len < ElanTouchDevice.EVENT_LEN: |
| continue |
| if data[ElanTouchDevice.REPORT_ID_INDEX] != ElanTouchDevice.REPORT_ID: |
| print(' Incompatible format') |
| print(' Maybe your device is not isolated and has noise') |
| else: |
| if time.time() - timestamp > timeout: |
| timestamp = time.time() |
| self._event_started = False |
| return None |
| tp_info = data[ElanTouchDevice.TAP_INFO_INDEX] |
| # 5 most significant bits of tp_info are mapped to fingers |
| if tp_info & 0xf8: |
| break |
| |
| self._event_started = True |
| tp_info = data[ElanTouchDevice.TAP_INFO_INDEX] |
| btn_click = (tp_info & ElanTouchDevice.BTN_CLICK_MASK) |
| finger_offset = ElanTouchDevice.I2C_FINGER_DATA_OFFSET |
| |
| for i in xrange(0, ElanTouchDevice.MAX_FINGERS): |
| finger_on = (tp_info >> (3 + i)) & 0x01 |
| if finger_on: |
| pos_x = (((data[0 + finger_offset] & 0xf0) << 4) | |
| data[1 + finger_offset]) |
| pos_y = (((data[0 + finger_offset] & 0x0f) << 8) | |
| data[2 + finger_offset]) |
| pos_y = self._max_y - pos_y |
| mk_x = (data[3 + finger_offset] & 0x0f) |
| mk_y = (data[3 + finger_offset] >> 4) |
| pressure = data[4 + finger_offset] |
| area_x = mk_x * ((self._max_x/self.trace_x) |
| - ElanTouchDevice.FWIDTH_REDUCE) |
| area_y = mk_y * ((self._max_y/self.trace_y) |
| - ElanTouchDevice.FWIDTH_REDUCE) |
| major = max(area_x, area_y) |
| event_type = 3 |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TRACKING_ID, i)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_X, pos_x)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_Y, pos_y)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_PRESSURE, pressure)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TOUCH_MAJOR, major)) |
| finger_offset += ElanTouchDevice.FINGER_DATA_LEN |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_MT_REPORT,0)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_REPORT,0)) |
| return self._eventQ.get() |
| |
| def _ConvertResolution(self, val): |
| # TODO(asimjour): This function looks a bit magical and it has to be fixed. |
| # Unfortunately, exactly the same function appears in the driver. |
| # So I don't have any explanation for these numbers at this time. |
| if val & 0x80: |
| val = ~val + 1 |
| res = (790 - val * 10) * 10 / 254 |
| else: |
| res = (val * 10 + 790) * 10 / 254 |
| return res |
| |
| class ElanTouchScreenDevice(RemoteAardvarkTouchDevice): |
| I2C_ETS_ADDRESS = 0x10 |
| HEADER_SIZE = 4 |
| FW_HDR_TYPE = 0 |
| FW_HDR_LENGTH = 2 |
| FW_HDR_COUNT = 1 |
| |
| MAX_RETRIES = 3 |
| PACKET_SIZE = 55 |
| MAX_PACKET_SIZE = 169 |
| FW_POS_WIDTH = 35 |
| FW_POS_PRESSURE = 45 |
| MAX_CONTACT_NUM = 10 |
| FW_POS_STATE = 1 |
| FW_POS_XY = 3 |
| |
| QUEUE_HEADER_SINGLE = 0x62 |
| QUEUE_HEADER_NORMAL = 0X63 |
| QUEUE_HEADER_WAIT = 0x64 |
| |
| RESET_CMD = 0x77777777 |
| FAST_BOOT_CMD = 0x6E69614D |
| QUERY_FW_ID_CMD = 0x0100F053 |
| QUERY_FW_VERSION_CMD = 0x01000053 |
| QUERY_TEST_CMD = 0x0100E053 |
| QUERY_BC_VERSION_CMD = 0x01001053 |
| |
| GET_RES_CMD = 0x00000000005B |
| GET_OSR_CMD = 0x0100D653 |
| GET_PHYSICAL_SCAN_CMD = 0x0100D753 |
| GET_PHYSICAL_DRIVE_CMD = 0x0100D853 |
| |
| CMD_HEADER_HELLO = 0x55 |
| CMD_HEADER_REK = 0x66 |
| CMD_HEADER_RESP = 0x52 |
| |
| def __init__(self, addr): |
| RemoteAardvarkTouchDevice.__init__(self, |
| ElanTouchScreenDevice.I2C_ETS_ADDRESS) |
| self.flush_timeout = self.AARDVARK_FLUSH_TIMEOUT |
| self._eventQ = queue.Queue() |
| self._last_event_time = time.time() |
| self._event_started = False |
| |
| def _Initialize_Device(self): |
| # Reset command |
| self._i2c_write(self.RESET_CMD) |
| # Fast boot |
| self._i2c_write(self.FAST_BOOT_CMD) |
| |
| self.adapter.aa_sleep_ms(50) |
| |
| # Read the hello packet |
| # Hello packet should be (0x55 0x55 0x55 0x55) |
| hello_len, hello_packet = self._i2c_read(self.HEADER_SIZE) |
| for i in xrange(4): |
| if hello_packet[i] != 0x55: |
| print('Failed to initialize the device.') |
| print('Hello packaet does not have the correct format.') |
| |
| # Read FW Id |
| self._i2c_repeat_cmd(self.QUERY_FW_ID_CMD) |
| |
| # Read FW version |
| self._i2c_repeat_cmd(self.QUERY_FW_VERSION_CMD) |
| |
| # Query test |
| self._i2c_repeat_cmd(self.QUERY_TEST_CMD) |
| |
| # Query bc version |
| self._i2c_repeat_cmd(self.QUERY_BC_VERSION_CMD) |
| |
| def _GetDimensions(self): |
| #TODO (asimjour): all templen values should be used for error checking |
| templen, self._resolution = self._i2c_execute_cmd_size(self.GET_RES_CMD, |
| 6, 17) |
| |
| row = self._resolution[2] + self._resolution[6] + self._resolution[10] |
| col = self._resolution[3] + self._resolution[7] + self._resolution[11] |
| |
| # Interpolating trace |
| templen, osr_buf = self._i2c_execute_cmd(self.GET_OSR_CMD) |
| osr = osr_buf[3] |
| self._x_max = (row - 1) * osr |
| self._y_max = (col - 1) * osr |
| |
| phy_x_len, phy_x_buf = self._i2c_execute_cmd(self.GET_PHYSICAL_SCAN_CMD) |
| phy_y_len, phy_y_buf = self._i2c_execute_cmd(self.GET_PHYSICAL_DRIVE_CMD) |
| phy_x = (phy_x_buf[2] << 8) | phy_x_buf[3] |
| phy_y = (phy_y_buf[2] << 8) | phy_y_buf[3] |
| x_res = self._x_max // phy_x |
| y_res = self._y_max // phy_y |
| x = {'min': 0, 'max': self._x_max, 'resolution': x_res} |
| y = {'min': 0, 'max': self._y_max, 'resolution': y_res} |
| p = {'min': 0, 'max': 255} |
| return x, y, p |
| |
| def _i2c_execute_cmd_size(self, cmd, data_len, out_len): |
| data_out = self.adapter.array_u08(data_len) |
| for byte_num in range(data_len): |
| data_out[byte_num] = (cmd & (0xff << 8 * byte_num)) >> (8 * byte_num) |
| retVal = self.adapter.aa_i2c_write(self._handle, self._addr, |
| self.I2C_NO_FLAGS, data_out) |
| resp = self._i2c_read(out_len) |
| return resp |
| |
| def _i2c_execute_cmd(self, cmd): |
| data_len = 6 if cmd >> 32 else 4 |
| data_out = self.adapter.array_u08(data_len) |
| for byte_num in range(data_len): |
| data_out[byte_num] = (cmd & (0xff << 8 * byte_num)) >> (8 * byte_num) |
| retVal = self.adapter.aa_i2c_write(self._handle, self._addr, |
| self.I2C_NO_FLAGS, data_out) |
| resp = self._i2c_read(data_len) |
| return resp |
| |
| def _i2c_repeat_cmd(self, cmd): |
| for i in xrange(1, self.MAX_RETRIES): |
| resp = self._i2c_execute_cmd(cmd) |
| if resp: |
| return resp |
| print('ERROR: exiting after %d retries to issue command %s' % |
| (self.MAX_RETRIES, cmd)) |
| sys.exit() |
| |
| def _NextEvent(self, timeout=None): |
| if not self._eventQ.empty(): |
| return self._eventQ.get() |
| |
| while True: |
| timestamp = time.time() |
| if (self._event_started and |
| timestamp - self._last_event_time > self.AARDVARK_FLUSH_TIMEOUT): |
| self._event_started = False |
| return None |
| input_len, data = self._i2c_read(self.MAX_PACKET_SIZE) |
| if (data[self.FW_HDR_TYPE] == self.CMD_HEADER_HELLO or |
| data[self.FW_HDR_TYPE] == self.CMD_HEADER_RESP or |
| data[self.FW_HDR_TYPE] == self.CMD_HEADER_REK): |
| continue |
| if (data[self.FW_HDR_TYPE] == self.QUEUE_HEADER_WAIT): |
| print('Waiting...') |
| self.adapter.aa_sleep_ms(30) |
| continue |
| |
| if (data[self.FW_HDR_TYPE] == self.QUEUE_HEADER_SINGLE): |
| self._finger_report_parser(data, self.HEADER_SIZE, timestamp) |
| if not self._eventQ.empty(): |
| return self._eventQ.get() |
| continue |
| |
| if (data[self.FW_HDR_TYPE] == self.QUEUE_HEADER_NORMAL): |
| report_count = data[self.FW_HDR_COUNT] |
| if report_count > 3: |
| print('The report count is greater than the max limit') |
| continue |
| report_len = data[self.FW_HDR_LENGTH] |
| if not report_len == self.PACKET_SIZE * report_count: |
| print('Mismatching report length') |
| continue |
| for i in xrange(report_count): |
| self._finger_report_parser(data, self.HEADER_SIZE + i * self.PACKET_SIZE, |
| timestamp) |
| if not self._eventQ.empty(): |
| return self._eventQ.get() |
| continue |
| |
| def _finger_report_parser(self, data, start_point, timestamp): |
| """ This function reads the finger report from data that starts from the |
| start_point index. Packets with header QUEUE_HEADER_SINGLE have one finger |
| report and packets with QUEUE_HEADER_NORMAL might have up to 3 finger |
| reports. |
| This function reads the report, builds MtEvents based on the report, and |
| puts the MtEvents into the eventQ. |
| eventQ is used to send the _NextEvent and building the next snapshot |
| """ |
| n_fingers = data[start_point + self.FW_POS_STATE + 1] & 0x0f |
| n_fingers = min(n_fingers, self.MAX_CONTACT_NUM) |
| self._last_event_time = timestamp |
| if not self._event_started: |
| self._eventQ.put(None) |
| self._event_started = True |
| finger_state = (((data[start_point + self.FW_POS_STATE + 1] & 0x30) << 4) | |
| data[start_point + self.FW_POS_STATE]) |
| for i in xrange(n_fingers): |
| if finger_state & 1: |
| finger_start_ptr = start_point + self.FW_POS_XY + i * 3 |
| finger_data = data[finger_start_ptr: finger_start_ptr + 3] |
| x = ((finger_data[0] & 0xf0) << 4) | finger_data[1] |
| y = ((finger_data[0] & 0x0f) << 8) | finger_data[2] |
| p = data[start_point + self.FW_POS_PRESSURE + i] |
| w = data[start_point + self.FW_POS_WIDTH + i] |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TRACKING_ID, i + 1)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_X, x)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_Y, y)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_PRESSURE, p)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TOUCH_MAJOR, w)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_MT_REPORT, 0)) |
| finger_state = finger_state >> 1 |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_REPORT, 0)) |
| |
| class SynapticsTouchDevice(RemoteAardvarkTouchDevice): |
| I2C_SNPTCS_ADDRESS = 0x20 |
| |
| MAX_F11_TOUCH_WIDTH = 15 |
| MAX_RETRIES = 10 |
| PAGE_SELECT_LEN = 2 |
| PAGES_TO_SERVICE = 10 |
| |
| MASK_8BIT = 0xFF |
| MASK_4BIT = 0x0F |
| MASK_3BIT = 0x07 |
| MASK_2BIT = 0x03 |
| |
| PDT_END = 0x00D0 |
| PDT_ENTRY_SIZE = 0x0006 |
| PDT_START = 0x00E9 |
| |
| def __init__(self, addr): |
| RemoteAardvarkTouchDevice.__init__(self, |
| SynapticsTouchDevice.I2C_SNPTCS_ADDRESS, |
| mt.MtStateMachine.PRESSURE_FROM_MT_TOUCH_MAJOR) |
| self.flush_timeout = self.AARDVARK_FLUSH_TIMEOUT |
| self._eventQ = queue.Queue() |
| self._last_event_time = time.time() |
| self._event_started = False |
| |
| def _Initialize_Device(self): |
| print('initializing the device') |
| self.synaptics_rmi4_query_device() |
| return None |
| |
| """ f-functions have different rmi function implementations. |
| SynapticsTouchDevice is designed to support the touchscreens. Therefore, |
| only the functions f01 and f11 should be used. Other function are exist |
| """ |
| def synaptics_rmi4_query_device(self): |
| SYNAPTICS_RMI4_F = { |
| 0x01 : self.f01, |
| 0x11 : self.f11, |
| 0x12 : self.f12, |
| 0x1A : self.f1A, |
| 0x35 : self.f35, |
| } |
| intr_count = 0 |
| for page_number in xrange(self.PAGES_TO_SERVICE): |
| pdt_entry_addr = self.PDT_START |
| for pdt_entry_addr in xrange(self.PDT_START, self.PDT_END, |
| -self.PDT_ENTRY_SIZE): |
| read_addr = pdt_entry_addr | (page_number << 8) |
| read_result = self._rmi_i2c_read(read_addr, 6) |
| fn_number = read_result[5] |
| if fn_number in SYNAPTICS_RMI4_F.keys(): |
| SYNAPTICS_RMI4_F[fn_number](read_result, page_number) |
| intr_count += (fn_number & self.MASK_3BIT) |
| |
| """ Before any read or write operation the page address |
| is set using this function. |
| |
| Every address consist of a page and an offset. |
| In order to perform a read/write function on an address, first the page |
| should be set, and then the read/write command only contains the offset. |
| """ |
| def _rmi_i2c_set_page(self, rmi_addr): |
| data_out = self.adapter.array_u08(self.PAGE_SELECT_LEN) |
| data_out[0] = self.MASK_8BIT |
| data_out[1] = ((rmi_addr >> 8) & self.MASK_8BIT) |
| |
| for i in xrange(self.MAX_RETRIES): |
| retVal = self.adapter.aa_i2c_write(self._handle, |
| self._addr, self.I2C_NO_FLAGS, |
| data_out) |
| if retVal == self.PAGE_SELECT_LEN: |
| return retVal |
| |
| print('Page Select Error. Was not able to write all the bytes.') |
| sys.exit() |
| |
| def _rmi_i2c_read(self, rmi_addr, length): |
| self._rmi_i2c_set_page(rmi_addr) |
| |
| msg = self.adapter.array_u08(1) |
| msg[0] = rmi_addr & self.MASK_8BIT |
| |
| for i in xrange(self.MAX_RETRIES): |
| self.adapter.aa_i2c_write(self._handle, self._addr, |
| self.I2C_NO_FLAGS, msg) |
| temp_len, tmp_data = self.adapter.aa_i2c_read(self._handle, self._addr, |
| self.I2C_NO_FLAGS, |
| self.adapter.array_u08(length)) |
| if temp_len == length: |
| return tmp_data |
| |
| print('I2C Read Error. Was not able to read all the bytes.') |
| sys.exit() |
| |
| def _rmi_i2c_write(self, rmi_addr, data_out, length): |
| self._rmi_i2c_set_page(rmi_addr) |
| |
| msg = self.adapter.array_u08(length + 1) |
| msg[0] = rmi_addr & self.MASK_8BIT |
| for i in xrange(length): |
| msg[i + 1] = data_out[i] |
| for i in xrange(self.MAX_RETRIES): |
| retVal = self.adapter.aa_i2c_write(self._handle, |
| self._addr, self.I2C_NO_FLAGS, msg) |
| if retVal > 0: |
| return retVal |
| |
| print('I2C Write Error. Was not able to write all the bytes.') |
| sys.exit() |
| |
| |
| def _GetDimensions(self): |
| # TODO (asimjour): resolution values are temporary |
| x = {'min': 0, 'max': self._max_x, 'resolution': self._max_x} |
| y = {'min': 0, 'max': self._max_y, 'resolution': self._max_y} |
| p = {'min': 0, 'max': self.MAX_F11_TOUCH_WIDTH} |
| return x, y, p |
| |
| def f01(self, data, page_number): |
| self.data_base_addr = (data[3] | (page_number << 8)) + 2 |
| self.f01_found = True |
| |
| def f11(self, data, page_number): |
| query_0_5 = self._rmi_i2c_read(data[0] | (page_number << 8), 6) |
| self._max_fingers = query_0_5[1] & self.MASK_3BIT |
| control_6_9 = self._rmi_i2c_read((data[2] + 6) | (page_number << 8), 4) |
| self._max_y = control_6_9[0] | ((control_6_9[1] & self.MASK_4BIT) << 8) |
| self._max_x = control_6_9[2] | ((control_6_9[3] & self.MASK_4BIT) << 8) |
| self.f11_found = True |
| |
| # These functions weren't supposed to be used with touch panels. |
| # But we should add the support of these functions in the future. |
| def f12(self, data, page_number): |
| print('f12 found. This functionality is not supported.') |
| sys.exit() |
| def f1A(self, data, page_number): |
| print('f1A found. This functionality is not supported.') |
| sys.exit() |
| def f35(self, data, page_number): |
| print('f35 found. This functionality is not supported.') |
| sys.exit() |
| |
| def _NextEvent(self, timeout=None): |
| if not self._eventQ.empty(): |
| return self._eventQ.get() |
| |
| while True: |
| timestamp = time.time() |
| if (self._event_started and |
| timestamp - self._last_event_time > self.AARDVARK_FLUSH_TIMEOUT): |
| self._event_started = False |
| return None |
| fingers_supported = self._max_fingers |
| num_of_finger_status_regs = (fingers_supported + 3) / 4 |
| sensor_data = self._rmi_i2c_read(self.data_base_addr, |
| num_of_finger_status_regs) |
| if sensor_data[0] or (sensor_data[1] & self.MASK_2BIT): |
| break |
| self._last_event_time = time.time() |
| if not self._event_started: |
| self._event_started = True |
| return None |
| for finger in xrange(self._max_fingers): |
| finger_index = finger // 4 |
| finger_shift = (finger % 4) * 2 |
| finger_status = (sensor_data[finger_index] >> finger_shift) & self.MASK_2BIT |
| # Each 2-bit finger status field represents the following: |
| # 00 = finger not present |
| # 01 = finger present and data accurate |
| # 10 = finger present but data may be inaccurate |
| # 11 = reserved |
| if finger_status: |
| data_offset = (self.data_base_addr + 1 + |
| num_of_finger_status_regs + (finger * 5)) |
| sensor_finger_data = self._rmi_i2c_read(data_offset, 5) |
| y = (sensor_finger_data[0] << 4) | (sensor_finger_data[2] & self.MASK_4BIT) |
| x = (sensor_finger_data[1] << 4) | (sensor_finger_data[2] >> 4) |
| wx = sensor_finger_data[3] & self.MASK_4BIT |
| wy = sensor_finger_data[3] >> 4 |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TRACKING_ID, finger + 1)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_X, x)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_POSITION_Y, y)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_ABS, |
| linux_input.ABS_MT_TOUCH_MAJOR, max(wx, wy))) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_MT_REPORT, 0)) |
| self._eventQ.put(mt.MtEvent(timestamp, linux_input.EV_SYN, |
| linux_input.SYN_REPORT, 0)) |
| return self._eventQ.get() |