blob: e259751354b6d04cb09861aea942bc09fde9af8f [file] [log] [blame] [edit]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Driver for common sequences for image management on switchable usb port."""
import errno
import glob
import os
import subprocess
import tempfile
import time
import usb
from servo.drv import hw_driver
import servo.utils.usb_hierarchy as usb_hierarchy
# If a hub is attached to the usual image usb storage slot, use this port on the
# hub to search for the usb storage.
STORAGE_ON_HUB_PORT = 1
class UsbImageManagerError(hw_driver.HwDriverError):
"""Error class for UsbImageManager errors."""
pass
# pylint: disable=invalid-name
# Servod driver discovery logic requires this naming convention
class usbImageManager(hw_driver.HwDriver):
"""Driver to handle common tasks on the switchable usb port."""
# Polling rate to poll for image usb dev to appear if setting mux to
# servo_sees_usbkey
_POLLING_DELAY_S = 0.1
# Timeout to wait before giving up on hoping the image usb dev will enumerate
_WAIT_TIMEOUT_S = 10
# Timeout to settle all pending tasks on the device before writing to it.
_SETTLE_TIMEOUT_S = 60
# Control aliases to the image mux and power intended for image management
_IMAGE_USB_MUX = "image_usbkey_mux"
_USB_MUX_BOTTOM = "bottom_usbkey_mux" # only for servo v4.1
_IMAGE_USB_PWR = "image_usbkey_pwr"
_USB_PWR_BOTTOM = "bottom_usbkey_pwr"
_IMAGE_DEV = "image_usbkey_dev"
_IMAGE_MUX_TO_SERVO = "servo_sees_usbkey"
_HTTP_PREFIX = "http://"
_DEFAULT_ERROR_MSG = "No USB storage device found for image transfer."
def _drv_init(self):
"""Driver specific initializer."""
super(usbImageManager, self)._drv_init()
# This delay is required to safely switch the usb image mux direction
self._poweroff_delay = self._params.get("usb_power_off_delay", 0)
if self._poweroff_delay:
self._poweroff_delay = float(self._poweroff_delay)
# This is required to determine if the usbkey is connected to the host.
# The |hub_ports| is a comma-separated string of usb hub port numbers
# that the image usbkey enumerates under for a given servo device.
# NOTE: initialization here is shared among multiple controls, some of which
# do not use the hub_ports logic. Do not raise error here if the param
# is not available, but rather inside the controls that leverage it.
self._image_usbkey_hub_ports = None
if "hub_ports" in self._params:
self._image_usbkey_hub_ports = self._params.get("hub_ports").split(",")
# Flag to indicate whether the usb port supports having a hub attached to
# it. In that case, the image will be searched on the |STORAGE_ON_HUB_PORT|
# of the hub.
self._supports_hub_on_port = self._params.get("hub_on_port", False)
self._error_msg = self._DEFAULT_ERROR_MSG
if "error_amendment" in self._params:
self._error_msg += " " + self._params["error_amendment"]
# Retrieve map_params(if exists) and create a reversed dict to allow reverse lookup
self._MAP_DICT = self._params.get("map_params")
if isinstance(self._MAP_DICT, dict):
self._MAP_DICT_REVERSED = {val: key for key, val in self._MAP_DICT.items()}
else:
self._MAP_DICT_REVERSED = None
def _Get_image_usbkey_direction(self):
"""Return direction of image usbkey mux (as a usbkey string)."""
return self._servod_get(self._IMAGE_USB_MUX)
def _Get_second_usbkey_direction(self):
"""Return direction of bottom usbkey mux.
Please note that this is only compatible with servo_v4p1
"""
return self._servod_get(self._USB_MUX_BOTTOM)
def _Set_image_usbkey_direction(self, mux_direction):
"""Connect USB flash stick to either servo or DUT.
This function switches 'usb_mux_sel1' to provide electrical
connection between the USB port J3 and either servo or DUT side.
Args:
mux_direction: string/int values of "servo_sees_usbkey" or "dut_sees_usbkey".
Raises:
UsbImageManagerError: if an invalid usbkey int is passed in
"""
# If mux_direction is an int, change it to its corresponding string
if isinstance(mux_direction, int) and self._MAP_DICT_REVERSED is not None:
try:
mux_direction = self._MAP_DICT_REVERSED[str(mux_direction)]
except KeyError as e:
raise UsbImageManagerError(
"Invalid usbkey value. Try one of these values: "
+ str(self._MAP_DICT)
) from e
self._SafelySwitchMux(mux_direction)
if self._servod_get(self._IMAGE_USB_MUX) == self._IMAGE_MUX_TO_SERVO:
# This will ensure that we make a best-effort attempt to only
# return when the block device of the attached usb stick fully
# enumerates.
self._servod_get(self._IMAGE_DEV)
def _Set_second_usbkey_direction(self, mux_direction):
"""Connect USB port to either servo or DUT.
This function switches 'usb_mux_sel2' to provide electrical
connection between the USB port and either servo or DUT side.
Args:
mux_direction: string/int values of "servo_sees_usbkey" or "dut_sees_usbkey".
Raises:
UsbImageManagerError: if an invalid usbkey int is passed in
"""
# If mux_direction is an int, change it to its corresponding string
if isinstance(mux_direction, int) and self._MAP_DICT_REVERSED is not None:
try:
mux_direction = self._MAP_DICT_REVERSED[str(mux_direction)]
except KeyError as e:
raise UsbImageManagerError(
"Invalid usbkey value. Try one of these values: "
+ str(self._MAP_DICT)
) from e
self._SafelySwitchMux(mux_direction, self._USB_MUX_BOTTOM)
# Note: We don't check that image usb is connected as this port isn't currently used for images
def _SafelySwitchMux(self, mux_direction, usb_mux=None):
"""Helper to switch the usb mux.
Switching the usb mux is accompanied by powercycling
of the USB stick, because it sometimes gets wedged if the mux
is switched while the stick power is on.
Args:
mux_direction: string values of "servo_sees_usbkey" or "dut_sees_usbkey".
"""
# Older servos have just one usb "image port", so the default mux is image.
if usb_mux is None:
usb_mux = self._IMAGE_USB_MUX
# Select corresponding usb_pwr
usb_pwr = (
self._IMAGE_USB_PWR
if usb_mux == self._IMAGE_USB_MUX
else self._USB_PWR_BOTTOM
)
if self._servod_get(usb_mux) != mux_direction:
self._servod_set(usb_pwr, "off")
time.sleep(self._poweroff_delay)
self._servod_set(usb_mux, mux_direction)
time.sleep(self._poweroff_delay)
if self._servod_get(usb_pwr) != "on":
# Enforce that power is supplied.
self._servod_set(usb_pwr, "on")
def _PathIsHub(self, usb_sysfs_path):
"""Return whether |usb_sysfs_path| is a usb hub."""
if not os.path.exists(usb_sysfs_path):
return False
with open(os.path.join(usb_sysfs_path, "bDeviceClass"), "r") as classf:
return int(classf.read().strip(), 16) == usb.CLASS_HUB
def _Get_image_usbkey_dev(self):
"""Probe the USB disk device plugged in the servo from the host side.
Returns:
USB disk path if one and only one USB disk path is found, otherwise an
empty string.
Raises:
UsbImageManagerError: if 'hub_ports' was not defined in params
"""
if self._image_usbkey_hub_ports is None:
raise UsbImageManagerError("hub_ports need to be defined in params.")
servod = self._servod
# When the user is requesting the usb_dev they most likely intend for the
# usb to the facing the servo, and be powered. Enforce that.
self._SafelySwitchMux(self._IMAGE_MUX_TO_SERVO, self._IMAGE_USB_MUX)
# Look for own servod usb device
# pylint: disable=protected-access
# Need servod information to find own servod instance.
# hub device can be the cluster root device, or the main device if there
# is only 1 device on this servod instance
hub_device = servod.get_root_device()
if not hub_device.template.HUB_SERVO:
raise UsbImageManagerError("There is no USB hub device connected.")
hub_on_servo = hub_device.dev_entry.hub_stub
# Image usb is one of the hub ports |self._image_usbkey_hub_ports|
image_location_candidates = [
"%s.%s" % (hub_on_servo, p) for p in self._image_usbkey_hub_ports
]
# all |image_location_candidates| here assume that the device is on the same
# bus as the servo device. If the servo is attached to a usb controller
# that has both usb2 and usb3 busses, we need to search both.
busnum = usb_hierarchy.Hierarchy.bus_num_from_sysfs(hub_on_servo)
usb3_busnum = usb_hierarchy.Hierarchy.complement_bus_num(busnum)
if usb3_busnum is not None:
# It can be none if the bus only appears in one speed.
usb3_location_candidates = [
c.replace("/%d-" % busnum, "/%d-" % usb3_busnum)
for c in image_location_candidates
]
image_location_candidates.extend(usb3_location_candidates)
hub_location_candidates = []
if self._supports_hub_on_port:
# Here the config says that |image_usbkey_sysfs| might actually have a hub
# and not storage attached to it. In that case, the |STORAGE_ON_HUB_PORT|
# on that hub will house the storage.
hub_location_candidates = [
"%s.%d" % (path, STORAGE_ON_HUB_PORT)
for path in image_location_candidates
]
image_location_candidates.extend(hub_location_candidates)
self._logger.debug(
"usb image dev file candidates: %s", ", ".join(image_location_candidates)
)
# Let the device settle first before pushing out any data onto it.
subprocess.call(["/bin/udevadm", "settle", "-t", str(self._SETTLE_TIMEOUT_S)])
self._logger.debug("All udev events have settled.")
end = time.time() + self._WAIT_TIMEOUT_S
while image_location_candidates:
active_storage_candidate = image_location_candidates.pop(0)
if os.path.exists(active_storage_candidate):
if self._PathIsHub(active_storage_candidate):
# Do not check the hub, only devices.
continue
# Use /sys/block/ entries to see which block device is the |hub_device|.
# Use sd* to avoid querying any non-external block devices.
for candidate in glob.glob("/sys/block/sd*"):
# |candidate| is a link to a sys hw device file
devicepath = os.path.realpath(candidate)
# |active_storage_candidate| is also a link to a sys hw device file
if devicepath.startswith(
os.path.realpath(active_storage_candidate)
):
devpath = "/dev/%s" % os.path.basename(candidate)
try:
f = open(devpath, "rb")
except FileNotFoundError:
continue
# ensure that the block device is readable
# by reading the first sector.
read_bytes = 512
read_len = 0
with f:
try:
read_len = len(f.read(read_bytes))
except OSError as error:
self._logger.warning(
"open() or read() of {!r} failed with errno {:d} {} ({}), skipping it as a USB mux drive candidate.".format(
devpath,
error.errno,
errno.errorcode.get(error.errno, "UNKNOWN"),
os.strerror(error.errno),
)
)
continue
if read_len < read_bytes:
self._logger.warning(
"read() returned only {:d} bytes out of {:d} requested from {!r}, the drive is likely not functional, skipping it as a USB mux drive candidate.".format(
read_len, read_bytes, devpath
)
)
continue
return devpath
# Enqueue the candidate again in hopes that it will eventually enumerate.
image_location_candidates.append(active_storage_candidate)
if time.time() >= end:
break
time.sleep(self._POLLING_DELAY_S)
# Split and join to help with error message formatting from XML that might
# introduce multiple white-spaces.
self._logger.warning(" ".join(self._error_msg.split()))
self._logger.warning(
"Stick should be at one of the usb image dev file candidates: %s",
", ".join(image_location_candidates),
)
if self._supports_hub_on_port:
self._logger.warning(
"If using a hub on the image key port, please make "
"sure to use port %d on the hub. This should be at "
"one of: %s.",
STORAGE_ON_HUB_PORT,
", ".join(hub_location_candidates),
)
return ""
def _Set_make_image_noninteractive(self, usb_dev_partition):
"""Makes the recovery image noninteractive.
A noninteractive image will reboot automatically after installation
instead of waiting for the USB device to be removed to initiate a system
reboot.
Mounts |usb_dev_partition| and creates a file called "non_interactive" so
that the image will become noninteractive.
Args:
usb_dev_partition: string of dev partition file e.g. sdb1 or sdc3
Raises:
UsbImageManagerError: if error occurred during the process
"""
# pylint: disable=broad-except
# Ensure that any issue gets caught & reported as UsbImageError
if not usb_dev_partition or not os.path.exists(usb_dev_partition):
msg = "Usb partition device file provided %r invalid." % usb_dev_partition
self._logger.error(msg)
raise UsbImageManagerError(msg)
# Create TempDirectory
tmpdir = tempfile.mkdtemp()
result = True
if not tmpdir:
self._logger.error("Failed to create temp directory.")
result = False
else:
# Mount drive to tmpdir.
rc = subprocess.call(["mount", usb_dev_partition, tmpdir])
if rc == 0:
# Create file 'non_interactive'
non_interactive_file = os.path.join(tmpdir, "non_interactive")
try:
open(non_interactive_file, "w").close()
except IOError as e:
self._logger.error(
"Failed to create file %s : %s ( %d )",
non_interactive_file,
e.strerror,
e.errno,
)
result = False
except BaseException as e:
self._logger.error(
"Unexpected Exception creating file %s : %s",
non_interactive_file,
str(e),
)
result = False
# Unmount drive regardless if file creation worked or not.
rc = subprocess.call(["umount", usb_dev_partition])
if rc != 0:
self._logger.error("Failed to unmount USB Device")
result = False
else:
self._logger.error("Failed to mount USB Device")
result = False
# Delete tmpdir. May throw exception if 'umount' failed.
try:
os.rmdir(tmpdir)
except OSError as e:
self._logger.error(
"Failed to remove temp directory %s : %s", tmpdir, str(e)
)
result = False
except BaseException as e:
self._logger.error(
"Unexpected Exception removing tempdir %s : %s", tmpdir, str(e)
)
result = False
if not result:
raise UsbImageManagerError("Failed to make image noninteractive.")