blob: 3c3f09bdbe1ec1138823174afaabad099beba574 [file] [log] [blame]
# Copyright 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import shutil
import subprocess
import tempfile
from cros.factory.external import pyudev
# udev constants
_UDEV_ACTION_INSERT = 'add'
_UDEV_ACTION_REMOVE = 'remove'
class MediaMonitor:
"""A wrapper to monitor media events.
This class offers an easy way to monitor the insertion and removal
activities of media devices.
Attributes:
on_insert: Function to invoke when receiving USB insert event.
None means ignoring the event.
on_remove: Function to invoke when receiving USB remove event.
None means ignoring the event.
is_monitoring: Bool indicating whether it's monitoring or not.
Usage example:
monitor = MediaMonitor('block', 'disk')
monitor.Start(on_insert=on_insert, on_remove=on_remove)
monitor.Stop()
"""
def __init__(self, subsystem, device_type):
self.on_insert = None
self.on_remove = None
self.is_monitoring = False
self._observer = None
self._subsystem = subsystem
self._device_type = device_type
def _UdevEventCallback(self, action, device):
if self.is_monitoring is False:
return
if action == _UDEV_ACTION_INSERT:
logging.info('Device inserted: %s', device.device_node)
if self.on_insert:
self.on_insert(device)
elif action == _UDEV_ACTION_REMOVE:
logging.info('Device removed: %s', device.device_node)
if self.on_remove:
self.on_remove(device)
def Start(self, on_insert, on_remove):
if self.is_monitoring:
raise Exception('Multiple start() call is not allowed')
self.on_insert = on_insert
self.on_remove = on_remove
# Setup the media monitor,
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem=self._subsystem, device_type=self._device_type)
self._observer = pyudev.MonitorObserver(monitor, self._UdevEventCallback)
self._observer.start()
self.is_monitoring = True
logging.info('Start monitoring media actitivities.')
def Stop(self):
if self.is_monitoring:
self._observer.stop()
self.is_monitoring = False
logging.info('Stop monitoring media actitivities.')
class RemovableDiskMonitor(MediaMonitor):
"""MediaMonitor specifically used for monitoring removable storage devices."""
def __init__(self):
super(RemovableDiskMonitor, self).__init__('block', 'disk')
class MountedMedia:
"""A context manager to automatically mount and unmount specified device.
Usage example:
To mount the third partition of /dev/sda.
with MountedMedia('/dev/sda', 3) as media_path:
print("Mounted at %s." % media_path)
"""
def __init__(self, dev_path, partition=None):
"""Constructs a context manager to automatically mount/umount.
Args:
dev_path: The absolute path to the device.
partition: A optional number indicated which partition of the device
should be mounted. If None is given, the dev_path will be
the mounted partition.
Returns:
A MountedMedia instance with initialized proper path.
Example:
with MountedMedia('/dev/sdb', 1) as path:
with open(os.path.join(path, 'test'), 'w') as f:
f.write('test')
"""
self._mount_dir = None
self._mounted = False
if partition is None:
self._dev_path = dev_path
return
if dev_path[-1].isdigit():
# Devices enumerated in numbers (ex, mmcblk0).
self._dev_path = '%sp%d' % (dev_path, partition)
else:
# Devices enumerated in alphabets (ex, sda)
self._dev_path = '%s%d' % (dev_path, partition)
# For devices not using partition table (floppy mode),
# allow using whole device as first partition.
if (not os.path.exists(self._dev_path)) and (partition == 1):
logging.info('Using device without partition table - %s', dev_path)
self._dev_path = dev_path
def __enter__(self):
self._MountMedia()
return self._mount_dir
def __exit__(self, exc_type, exc_value, traceback):
if self._mounted:
self._UmountMedia()
def _MountMedia(self):
"""Mount a partition of media at temporary directory.
Exceptions are throwed if anything goes wrong.
"""
# Create an temporary mount directory to mount.
self._mount_dir = tempfile.mkdtemp(prefix='MountedMedia')
logging.info('Media mount directory created: %s', self._mount_dir)
exit_code, output = subprocess.getstatusoutput(
'mount %s %s' % (self._dev_path, self._mount_dir))
if exit_code != 0:
shutil.rmtree(self._mount_dir)
raise Exception('Failed to mount. Message-%s' % output)
self._mounted = True
def _UmountMedia(self):
"""Umounts the partition of the media."""
# Umount media and delete the temporary directory.
exit_code, output = subprocess.getstatusoutput(
'umount %s' % self._mount_dir)
if exit_code != 0:
raise Exception('Failed to umount. Message-%s' % output)
shutil.rmtree(self._mount_dir)
self._mounted = False