"""Common utility functions that are not touch specific.
The SimpleSystem() and SimpleSystemOutput() functions are grabbed from
hardware_Trackpad and were written by truty@.
Note that in order to be able to use this module on a system without the
factory stuffs, e.g., on a Beagle Bone, this module does not depend on
any factory modules on purpose.
from __future__ import print_function
import glob
import logging
import os
import re
import subprocess
from subprocess import PIPE
from subprocess import STDOUT
_SYSFS_I2C_PATH = '/sys/bus/i2c/devices'
_DEBUG_PATH = '/sys/kernel/debug'
ATMEL = 'atmel'
_TOUCH_DRIVER = {ATMEL: 'atmel_mxt_ts'}
_TOUCH_CONFIG = {ATMEL: 'maxtouch-ts.cfg'}
class Error(Exception):
def IsSuccessful(value):
"""Is the return code indicating success?"""
return value == 0
def SimpleSystem(cmd):
"""Execute a system command."""
ret =, shell=True)
if ret:
logging.warning('Command (%s) failed (ret=%s).', cmd, ret)
return ret
def SimpleSystemOutput(cmd):
"""Execute a system command and get its output."""
proc = subprocess.Popen(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
stdout, unused_stderr = proc.communicate()
except Exception as e:
logging.warning('Command (%s) failed (%s).', cmd, e)
return None if proc.returncode else stdout.strip()
def IsDestinationPortEnabled(port):
"""Check if the destination port is enabled.
If port 8000 is enabled, it looks like
ACCEPT tcp -- ctstate NEW tcp dpt:8000
pattern = re.compile(r'ACCEPT\s+tcp.+\s+ctstate\s+NEW\s+tcp\s+dpt:%d' % port)
rules = SimpleSystemOutput('iptables -L INPUT -n --line-number')
for rule in rules.splitlines():
return True
return False
def EnableDestinationPort(port):
"""Enable the destination port in iptables."""
if not IsDestinationPortEnabled(port):
cmd = ('iptables -A INPUT -p tcp -m conntrack --ctstate NEW --dport %d '
'-j ACCEPT' % port)
if SimpleSystem(cmd) != 0:
raise Error('Failed to enable destination port in iptables: %d.' % port)
def GetSysfsEntry(vendor=ATMEL):
"""Get the sys fs object file which is used to tune the device registers.
device_names: a list of possible device names
A qualifying device should satisfy the following 2 conditions.
Cond 1: Get those device paths with the matching driver name for the vendor.
The device path looks like: '/sys/bus/i2c/devices/...'.
An example about Samus
For touchpad device:
i2c-ATML0000:01/driver -> ../../../../../bus/i2c/drivers/atmel_mxt_ts
For touchscreen device:
i2c-ATML0001:01/driver -> ../../../../../bus/i2c/drivers/atmel_mxt_ts
Note that both touch devices use the same driver.
Cond 2: If there are multiple device paths with the same driver, use the
config name to distinguish them. Here we assume that different
touch devices would use different configs. As an example, on Samus
the config name of its touchpad is maxtouch-tp.cfg, while
the config name of its touchscreen is maxtouch-ts.cfg.
Returns: the sys fs path of the target touch device
e.g., '/sys/bus/i2c/devices/i2c-ATML0001:01'
def MatchingDriver(path, vendor):
"""Cond 1: Matching the path with the vendor associated driver."""
expected_driver = _TOUCH_DRIVER.get(vendor)
actual_driver = os.path.basename(
os.path.realpath(os.path.join(path, 'driver')))
return actual_driver == expected_driver
def MatchingConfig(path, vendor):
"""Cond 2: Matching the path with the vendor associated config."""
config_file = os.path.join(path, 'config_file')
if not os.path.isfile(config_file):
return False
with open(config_file) as f:
return == _TOUCH_CONFIG.get(vendor)
device_paths = []
for path in glob.glob(os.path.join(_SYSFS_I2C_PATH, '*')):
if MatchingDriver(path, vendor) and MatchingConfig(path, vendor):
# If there are multiple qualifying sysfs paths which is very unlikely,
# just use the first one.
return os.path.join(device_paths[0], 'object') if device_paths else None
def GetDebugfs(vendor=ATMEL):
"""Get the kernel debug path for the specified device.
vendor: a vendor name, e.g., 'atmel'
Returns: the kernel debug fs path of the touch device
e.g., driver is 'atmel_mxt_ts' and i2c_device is 'i2c-ATML0001:01',
return '/sys/kernel/debug/atmel_mxt_ts/i2c-ATML0001:01'
driver = _TOUCH_DRIVER.get(vendor)
sysfs_entry = GetSysfsEntry()
if driver is None or sysfs_entry is None:
return None
i2c_device = os.path.basename(os.path.dirname(sysfs_entry))
device_debug_path = os.path.join(_DEBUG_PATH, driver, i2c_device)
object_file = os.path.join(device_debug_path, 'object')
return device_debug_path if os.path.isfile(object_file) else None
class NetworkStatus(object):
"""Show the network and service status."""
def __init__(self, BB_ip, shopfloor_ip):
self._BB_ip = BB_ip
self._shopfloor_ip = shopfloor_ip
def GetHostIPs():
"""Get this host's IPs.
Its output is a dictionary looking like
interfaces_strings = SimpleSystemOutput('ifconfig | egrep "^eth[0-9]"')
interface_dict = {}
for line in interfaces_strings.splitlines():
# Each line look like
# eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
interface = line.split(':')[0]
cmd = "ifconfig %s | egrep 'inet .+netmask.+broadcast' | awk '{print $2}'"
ip = SimpleSystemOutput(cmd % interface)
interface_dict[interface] = ip or None
return interface_dict
def PingBB(self):
"""Ping the Beagle Bone."""
return IsSuccessful(SimpleSystem('ping -c 1 %s' % self._BB_ip))
def PingShopfloor(self):
"""Ping the Shopfloor machine."""
return IsSuccessful(SimpleSystem('ping -c 1 %s' % self._shopfloor_ip))
class KernelModule(object):
"""A simple class to manage a kernel module."""
def __init__(self, name): = name
self.sysfs_entry = GetSysfsEntry()
def IsLoaded(self):
"""Is the module loaded?"""
lsmod_result = SimpleSystemOutput('lsmod | grep %s' %
return bool(lsmod_result) and in lsmod_result
def Remove(self):
"""Remove the module."""
if self.IsLoaded():
return IsSuccessful(SimpleSystem('rmmod %s' %
return True
def Insert(self):
"""Insert the module."""
if not self.IsLoaded():
return IsSuccessful(SimpleSystem('modprobe %s' %
return True
def IsDeviceDetected(self):
"""Is the device detected properly?"""
return os.path.isfile(os.path.join(self.sysfs_entry, 'fw_version'))