blob: 948805d7a0d725faf34e539fb580b1272a67743c [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Command line util library"""
import subprocess
import os
import shutil
import logging
import errno
import collections
import re
LOGGER = logging.getLogger(__name__)
class RunCommandError(subprocess.CalledProcessError):
def __init__(self, returncode, cmd):
super(RunCommandError, self).__init__(returncode, cmd)
def run_command(cmd, error_code_ok=False, log=False, shell=False):
"""Run a command.
Args:
cmd: string array of the command, and the command line arguments to
pass into the command, ex:
['echo', 'hello']
error_code_ok: do not throw RunCommandError on non-zero exit code
log: log the command and the output
Return:
string containing the stdout of the output, None if no output
"""
try:
output = subprocess.check_output(cmd, shell=shell)
if log:
LOGGER.info(' '.join(cmd))
LOGGER.info(output)
return output
except subprocess.CalledProcessError as e:
if log:
LOGGER.info(' '.join(cmd))
LOGGER.info(str(e))
if not error_code_ok:
raise RunCommandError(e.returncode, e.cmd)
def sudo_run_command(cmd, user='root', error_code_ok=False, shell=False):
"""Run a command via sudo. All calls are logged.
Args:
cmd: string array of the command and args, see run_command for details
user: what user to run the command as, default root
error_code_ok: do not throw RunCommandError on non-zero exit code
Return:
string containing stdout of the output, None if no output
"""
if user == 'root':
sudo_cmd = ['sudo'] + cmd
else:
sudo_cmd = ['sudo', '-u', user] + cmd
return run_command(sudo_cmd, log=True, error_code_ok=error_code_ok,
shell=shell)
def create_tarball(tarball_name, directory):
"""Creates a gzipped tar archive
Args:
tarball_name: string name for the tarball
directory: directory to archive the contents of
"""
run_command(['tar', '-czf', tarball_name, directory])
def safe_mkdir(path):
"""Create a directory and any parent directories if they don't exist.
Args:
path: string path of the new directory
"""
run_command(['mkdir', '-p', path])
def rm_dir(path, ignore_missing=False, sudo=False):
"""Recursively remove a directory.
Args:
path: Path of directory to remove.
ignore_missing: Do not error when path does not exist.
sudo: Remove directories as root.
"""
if sudo:
try:
sudo_run_command(
['rm', '-r%s' % ('f' if ignore_missing else '',), '--', path])
except RunCommandError as e:
if not ignore_missing or os.path.exists(path):
# If we're not ignoring the rm ENOENT equivalent, throw it;
# if the pathway still exists, something failed, thus throw it.
raise
else:
try:
shutil.rmtree(path)
except EnvironmentError as e:
if not ignore_missing or e.errno != errno.ENOENT:
raise
def resolve_symlink(file_name, root='/'):
"""Resolve a symlink |file_name| relative to |root|.
For example:
ROOT-A/absolute_symlink --> /an/abs/path
ROOT-A/relative_symlink --> a/relative/path
absolute_symlink will be resolved to ROOT-A/an/abs/path
relative_symlink will be resolved to ROOT-A/a/relative/path
Args:
file_name: A path to the file.
root: A path to the root directory.
Returns:
file_name if file_name is not a symlink. Otherwise, the ultimate path
that file_name points to, with links resolved relative to |root|.
"""
count = 0
while os.path.islink(file_name):
count += 1
if count > 128:
raise ValueError('Too many link levels for %s.' % file_name)
link = os.readlink(file_name)
if link.startswith('/'):
file_name = os.path.join(root, link[1:])
else:
file_name = os.path.join(os.path.dirname(file_name), link)
return file_name
def list_block_devices():
"""Lists all block devices.
Returns:
A list of BlockDevice items with attributes 'NAME', 'RM', 'TYPE',
'SIZE' (RM stands for removable).
"""
keys = ['NAME', 'RM', 'TYPE', 'SIZE']
BlockDevice = collections.namedtuple('BlockDevice', keys)
cmd = ['lsblk', '--pairs', '--bytes']
cmd += ['--output', ','.join(keys)]
output = run_command(cmd).strip()
devices = []
for line in output.splitlines():
d = {}
for k, v in re.findall(r'(\S+?)=\"(\w*?)\"', line):
d[k] = v
devices.append(BlockDevice(**d))
return devices
def list_connected_dut_ips():
"""List duts by ip currently connected to test subnet
Returns:
A list of connected ips on the test subnet
"""
# regex to search an ip n reachable or cached line such as
# 192.168.231.100 dev lxcbr0 lladdr 60:38:e0:e3:10:f5 STALE
# grabs ip, state
# note this will not match an 'INCOMPLETE' line that indicates
# no network connection
regex = re.compile('^(192\.168\.231\.1\d\d)\sdev\s\S+\slladdr\s\S+\s(\S+)')
duts = []
ip_n_output = run_command(['ip', 'n']).strip()
for line in ip_n_output.splitlines():
match = regex.search(line)
if match is not None:
ip = match.group(1)
state = match.group(2)
if state == 'REACHABLE':
duts.append(ip)
else:
# a stale record, ping the ip to be sure it is reachable
cmd = ['ping', '-c', '1', '-W', '1', ip]
ping_output = run_command(cmd, error_code_ok=True)
if (ping_output is not None and
'64 bytes from %s' % ip in ping_output):
duts.append(ip)
return duts