blob: 1435275703652d52c6f16d2a984f165e9b372aa1 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""GPIO polling module. It also supports basic GPIO read/write."""
import atexit
import logging
import os
import select
import socket
import threading
import poll_common
# Ref: https://www.kernel.org/doc/Documentation/gpio/sysfs.txt
_GPIO_ROOT = '/sys/class/gpio'
_EXPORT_FILE = os.path.join(_GPIO_ROOT, 'export')
_UNEXPORT_FILE = os.path.join(_GPIO_ROOT, 'unexport')
_GPIO_PIN_PATTERN = os.path.join(_GPIO_ROOT, 'gpio%d')
class PollGpioError(Exception):
"""Exception class for PollGpio."""
pass
class PollGpio(object):
"""Monitors or controls the status of one GPIO port.
Do not create PollGpio object directly. Use factory method
PollGpio.get_instance() to get a PollGpio object to use. Otherwise, it'd be
problematic when two PollGpio objects controlling the same port.
"""
# Mapping from GPIO port to (object, edge).
_instances = dict()
# Lock around _instances.
_instance_lock = threading.Lock()
# Mapping values for /sys/class/gpio/gpioN/edge.
_EDGE_VALUES = {
poll_common.GPIO_EDGE_RISING: 'rising',
poll_common.GPIO_EDGE_FALLING: 'falling',
poll_common.GPIO_EDGE_BOTH: 'both'
}
@classmethod
def get_instance(cls, port, edge=None):
"""Constructs or returns an existing PollGpio object.
Args:
port: GPIO port.
edge: value in GPIO_EDGE_LIST[], or None for read/write action.
Returns:
PollGpio object for the port.
Raises:
PollGpioError: Invalid Operation.
"""
# It's possible to support different edge types for one GPIO by setting
# edge='both' in sysfs. But it's impractical in real-world use case because
# hardware design should already demand one specific edge type.
with cls._instance_lock:
if port not in cls._instances:
# Create an instance for specified port. If edge is None, this port
# could be assigned an edge afterwards.
cls._instances[port] = (PollGpio(port), edge)
elif cls._instances[port][1] and edge and cls._instances[port][1] != edge:
# It's not allowed to assign different polling edge to the same port, but
# it has no constraint to do read/write (edge is None).
raise PollGpioError('The gpio %d was assigned different edge' % port)
elif not cls._instances[port][1]:
# If this port instance has not initiated with an edge (only did
# read/write before), it can be assigned an edge in this moment.
cls._instances[port] = (cls._instances[port][0], edge)
return cls._instances[port][0]
def __init__(self, port):
"""Constructor.
Args:
port: GPIO port
Attributes:
_port: Same as argument 'port'.
_edge: Same as argument 'edge'.
_logger: Logger.
_thread: GPIO-polling thread.
_wait_cond: Conditional variable to notify all client threads waiting for
GPIO edge.
_stop_sockets: Socket pair to interrupt poll() call in polling thread.
Raises:
PollGpioError
"""
try:
self._port = port
self._edge = None # will be assigned at first-time polling
self._logger = logging.getLogger('PollGpio')
self._thread = None # will be started at first-time polling
self._wait_cond = threading.Condition()
self._stop_sockets = socket.socketpair()
self._export_sysfs()
atexit.register(self._cleanup) # must release system resource
except Exception as e:
raise PollGpioError('Fail to __init__ GPIO %d: %s' % (self._port, e))
def _cleanup(self):
"""Stops the monitor thread and unexport the sysfs interface."""
try:
self._logger.debug('')
self._stop_thread()
self._unexport_sysfs()
except Exception as e:
logging.error('Fail to clean up GPIO %d: %s', self._port, e)
def _start_thread(self):
"""Starts polling thread."""
self._thread = threading.Thread(target=self._polling_loop)
self._thread.daemon = True
self._thread.start()
def _stop_thread(self):
"""Stops polling thread."""
self._stop_sockets[0].send('.')
if not self._thread: # thread wasn't started
return
self._thread.join(timeout=1.0)
if self._thread.is_alive():
self._logger.warning('fail to stop thread of GPIO %d' % self._port)
def _get_sysfs_path(self):
"""Gets the path of GPIO sysfs interface."""
return _GPIO_PIN_PATTERN % self._port
def _export_sysfs(self):
"""Exports GPIO sysfs interface."""
self._logger.debug('export GPIO port %d', self._port)
if not os.path.exists(self._get_sysfs_path()):
with open(_EXPORT_FILE, 'w') as f:
f.write(str(self._port))
def _assign_edge(self, edge):
"""Writes edge value to GPIO sysfs interface.
Args:
edge: value in GPIO_EDGE_LIST[]
"""
self._logger.debug('assign GPIO port %d %s', self._port, edge)
self._edge = edge
with open(os.path.join(self._get_sysfs_path(), 'edge'), 'w') as f:
f.write(self._EDGE_VALUES[self._edge])
def _unexport_sysfs(self):
"""Unexports GPIO sysfs interface."""
self._logger.debug('unexport GPIO port %d', self._port)
with open(_UNEXPORT_FILE, 'w') as f:
f.write(str(self._port))
def _read_value(self):
"""Reads the GPIO value from sysfs.
Returns:
(int) 1 for GPIO high, 0 for low.
"""
with open(os.path.join(self._get_sysfs_path(), 'value'), 'r') as f:
return int(f.read().strip())
def _write_value(self, value):
"""Writes the GPIO value to sysfs.
Args:
value: (int) 1 for GPIO high, 0 for low.
"""
# Set GPIO direction to output mode.
with open(os.path.join(self._get_sysfs_path(), 'direction'), 'w') as f:
f.write('out')
with open(os.path.join(self._get_sysfs_path(), 'value'), 'w') as f:
f.write(str(value))
def _polling_loop(self):
"""Main loop of polling thread."""
with open(os.path.join(self._get_sysfs_path(), 'value'), 'r') as f:
poll = select.poll()
poll.register(f, select.POLLPRI | select.POLLERR)
poll.register(self._stop_sockets[1], select.POLLIN | select.POLLERR)
stop_flag = False
while not stop_flag:
# After edge is triggered, re-read from head of 'gpio[N]/value'.
# Or poll() will return immediately next time.
f.seek(0)
f.read()
self._logger.debug('poll()-ing on gpio %d', self._port)
ret = poll.poll()
self._logger.debug('poll() on gpio %d returns %s', self._port, ret)
for fd, _ in ret:
if fd == f.fileno():
self._wait_cond.acquire()
self._wait_cond.notifyAll()
self._wait_cond.release()
if fd == self._stop_sockets[1].fileno():
if len(self._stop_sockets[1].recv(1)) > 0:
self._logger.debug('stopping thread')
stop_flag = True
def poll(self, edge):
"""Waits for a GPIO port being edge triggered.
Args:
edge: value in GPIO_EDGE_LIST[]
Raises:
PollGpioError
"""
if not self._edge:
# Only for the first time polling, assigns edge value to sysfs interface
# and starts a new thread.
try:
self._assign_edge(edge)
self._start_thread()
except Exception as e:
raise PollGpioError(
'Fail to start up thread GPIO %d: %s' % (self._port, e))
try:
self._logger.debug('client starts waiting')
self._wait_cond.acquire()
self._wait_cond.wait()
self._wait_cond.release()
self._logger.debug('client finishes waiting')
except Exception as e:
raise PollGpioError('Fail to poll GPIO %d: %s' % (self._port, e))
def read(self):
"""Reads GPIO port value.
Returns:
(int) 1 for GPIO high, 0 for low.
Raises:
PollGpioError
"""
try:
self._logger.debug('client reads value')
return self._read_value()
except Exception as e:
raise PollGpioError('Fail to read GPIO %d: %s' % (self._port, e))
def write(self, value):
"""Writes GPIO port value.
Be aware that this action will set GPIO direction to output mode.
Args:
value: (int) 1 for GPIO high, 0 for low.
Raises:
PollGpioError
"""
try:
self._logger.debug('client writes value')
self._write_value(value)
except Exception as e:
raise PollGpioError('Fail to write GPIO %d: %s' % (self._port, e))