blob: 86bfb1384b1b90288a6f7b1737e24fbb31ccc7d4 [file] [log] [blame]
# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common methods and variables used by Cr-Fuchsia testing infrastructure."""
import json
import logging
import os
import re
import signal
import shutil
import subprocess
import sys
import time
from argparse import ArgumentParser
from typing import Iterable, List, Optional, Tuple
from compatible_utils import get_ssh_prefix, get_host_arch
def _find_src_root() -> str:
"""Find the root of the src folder."""
if os.environ.get('SRC_ROOT'):
return os.environ['SRC_ROOT']
return os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
os.pardir)
# The absolute path of the root folder to work on. It may not always be the
# src folder since there may not be source code at all, but it's expected to
# have folders like third_party/fuchsia-sdk in it.
DIR_SRC_ROOT = os.path.abspath(_find_src_root())
def _find_fuchsia_images_root() -> str:
"""Define the root of the fuchsia images."""
if os.environ.get('FUCHSIA_IMAGES_ROOT'):
return os.environ['FUCHSIA_IMAGES_ROOT']
return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'images')
IMAGES_ROOT = os.path.abspath(_find_fuchsia_images_root())
def _find_fuchsia_internal_images_root() -> str:
"""Define the root of the fuchsia images."""
if os.environ.get('FUCHSIA_INTERNAL_IMAGES_ROOT'):
return os.environ['FUCHSIA_INTERNAL_IMAGES_ROOT']
return IMAGES_ROOT + '-internal'
INTERNAL_IMAGES_ROOT = os.path.abspath(_find_fuchsia_internal_images_root())
REPO_ALIAS = 'fuchsia.com'
def _find_fuchsia_sdk_root() -> str:
"""Define the root of the fuchsia sdk."""
if os.environ.get('FUCHSIA_SDK_ROOT'):
return os.environ['FUCHSIA_SDK_ROOT']
return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-sdk', 'sdk')
SDK_ROOT = os.path.abspath(_find_fuchsia_sdk_root())
def _find_fuchsia_gn_sdk_root() -> str:
"""Define the root of the fuchsia sdk."""
if os.environ.get('FUCHSIA_GN_SDK_ROOT'):
return os.environ['FUCHSIA_GN_SDK_ROOT']
return os.path.join(DIR_SRC_ROOT, 'third_party', 'fuchsia-gn-sdk', 'src')
GN_SDK_ROOT = os.path.abspath(_find_fuchsia_gn_sdk_root())
SDK_TOOLS_DIR = os.path.join(SDK_ROOT, 'tools', get_host_arch())
_FFX_TOOL = os.path.join(SDK_TOOLS_DIR, 'ffx')
def set_ffx_isolate_dir(isolate_dir: str) -> None:
"""Overwrites the global environment so the following ffx calls will have
the isolate dir being carried."""
os.environ['FFX_ISOLATE_DIR'] = isolate_dir
def get_hash_from_sdk():
"""Retrieve version info from the SDK."""
version_file = os.path.join(SDK_ROOT, 'meta', 'manifest.json')
assert os.path.exists(version_file), \
'Could not detect version file. Make sure the SDK is downloaded.'
with open(version_file, 'r') as f:
return json.load(f)['id']
def get_host_tool_path(tool):
"""Get a tool from the SDK."""
return os.path.join(SDK_TOOLS_DIR, tool)
def get_host_os():
"""Get host operating system."""
host_platform = sys.platform
if host_platform.startswith('linux'):
return 'linux'
if host_platform.startswith('darwin'):
return 'mac'
raise Exception('Unsupported host platform: %s' % host_platform)
def make_clean_directory(directory_name):
"""If the directory exists, delete it and remake with no contents."""
if os.path.exists(directory_name):
shutil.rmtree(directory_name)
os.makedirs(directory_name)
def _get_daemon_status():
"""Determines daemon status via `ffx daemon socket`.
Returns:
dict of status of the socket. Status will have a key Running or
NotRunning to indicate if the daemon is running.
"""
status = json.loads(
run_ffx_command(cmd=('daemon', 'socket'),
check=True,
capture_output=True,
json_out=True,
suppress_repair=True).stdout.strip())
return status.get('pid', {}).get('status', {'NotRunning': True})
def _is_daemon_running():
return 'Running' in _get_daemon_status()
def _wait_for_daemon(start=True, timeout_seconds=100):
"""Waits for daemon to reach desired state in a polling loop.
Sleeps for 5s between polls.
Args:
start: bool. Indicates to wait for daemon to start up. If False,
indicates waiting for daemon to die.
timeout_seconds: int. Number of seconds to wait for the daemon to reach
the desired status.
Raises:
TimeoutError: if the daemon does not reach the desired state in time.
"""
wanted_status = 'start' if start else 'stop'
sleep_period_seconds = 5
attempts = int(timeout_seconds / sleep_period_seconds)
for i in range(attempts):
if _is_daemon_running() == start:
return
if i != attempts:
logging.info('Waiting for daemon to %s...', wanted_status)
time.sleep(sleep_period_seconds)
raise TimeoutError(f'Daemon did not {wanted_status} in time.')
def _run_repair_command(output):
"""Scans |output| for a self-repair command to run and, if found, runs it.
Returns:
True if a repair command was found and ran successfully. False otherwise.
"""
# Check for a string along the lines of:
# "Run `ffx doctor --restart-daemon` for further diagnostics."
match = re.search('`ffx ([^`]+)`', output)
if not match or len(match.groups()) != 1:
return False # No repair command found.
args = match.groups()[0].split()
try:
run_ffx_command(cmd=args, suppress_repair=True)
# Need the daemon to be up at the end of this.
_wait_for_daemon(start=True)
except subprocess.CalledProcessError:
return False # Repair failed.
return True # Repair succeeded.
# The following two functions are the temporary work around before
# https://fxbug.dev/92296 and https://fxbug.dev/125873 are being fixed.
def start_ffx_daemon():
"""Starts the ffx daemon by using doctor --restart-daemon since daemon start
blocks the current shell.
Note, doctor --restart-daemon usually fails since the timeout in ffx is
short and won't be sufficient to wait for the daemon to really start.
Also, doctor --restart-daemon always restarts the daemon, so this function
should be used with caution unless it's really needed to "restart" the
daemon by explicitly calling stop daemon first.
"""
assert not _is_daemon_running(), "Call stop_ffx_daemon first."
run_ffx_command(cmd=('doctor', '--restart-daemon'), check=False)
_wait_for_daemon(start=True)
def stop_ffx_daemon():
"""Stops the ffx daemon"""
run_ffx_command(cmd=('daemon', 'stop', '-t', '10000'))
_wait_for_daemon(start=False)
def run_ffx_command(suppress_repair: bool = False,
check: bool = True,
capture_output: Optional[bool] = None,
timeout: Optional[int] = None,
**kwargs) -> subprocess.CompletedProcess:
"""Runs `ffx` with the given arguments, waiting for it to exit.
If `ffx` exits with a non-zero exit code, the output is scanned for a
recommended repair command (e.g., "Run `ffx doctor --restart-daemon` for
further diagnostics."). If such a command is found, it is run and then the
original command is retried. This behavior can be suppressed via the
`suppress_repair` argument.
**
Except for `suppress_repair`, the arguments below are named after
|subprocess.run| arguments. They are overloaded to avoid them from being
forwarded to |subprocess.Popen|.
**
See run_continuous_ffx_command for additional arguments.
Args:
suppress_repair: If True, do not attempt to find and run a repair
command.
check: If True, CalledProcessError is raised if ffx returns a non-zero
exit code.
capture_output: Whether to capture both stdout/stderr.
timeout: Optional timeout (in seconds). Throws TimeoutError if process
does not complete in timeout period.
Returns:
A CompletedProcess instance
Raises:
CalledProcessError if |check| is true.
"""
# Always capture output when:
# - Repair does not need to be suppressed
# - capture_output is Truthy
if capture_output or not suppress_repair:
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
proc = None
try:
proc = run_continuous_ffx_command(**kwargs)
stdout, stderr = proc.communicate(input=kwargs.get('stdin'),
timeout=timeout)
completed_proc = subprocess.CompletedProcess(
args=proc.args,
returncode=proc.returncode,
stdout=stdout,
stderr=stderr)
if check:
completed_proc.check_returncode()
return completed_proc
except subprocess.CalledProcessError as cpe:
if proc is None:
raise
logging.error('%s %s failed with returncode %s.',
os.path.relpath(_FFX_TOOL),
subprocess.list2cmdline(proc.args[1:]), cpe.returncode)
if cpe.output:
logging.error('stdout of the command: %s', cpe.output)
if suppress_repair or (cpe.output
and not _run_repair_command(cpe.output)):
raise
# If the original command failed but a repair command was found and
# succeeded, try one more time with the original command.
return run_ffx_command(suppress_repair=True,
check=check,
capture_output=capture_output,
timeout=timeout,
**kwargs)
def run_continuous_ffx_command(cmd: Iterable[str],
target_id: Optional[str] = None,
configs: Optional[List[str]] = None,
json_out: bool = False,
encoding: Optional[str] = 'utf-8',
**kwargs) -> subprocess.Popen:
"""Runs `ffx` with the given arguments, returning immediately.
Args:
cmd: A sequence of arguments to ffx.
target_id: Whether to execute the command for a specific target. The
target_id could be in the form of a nodename or an address.
configs: A list of configs to be applied to the current command.
json_out: Have command output returned as JSON. Must be parsed by
caller.
encoding: Optional, desired encoding for output/stderr pipes.
Returns:
A subprocess.Popen instance
"""
ffx_cmd = [_FFX_TOOL]
if json_out:
ffx_cmd.extend(('--machine', 'json'))
if target_id:
ffx_cmd.extend(('--target', target_id))
if configs:
for config in configs:
ffx_cmd.extend(('--config', config))
ffx_cmd.extend(cmd)
return subprocess.Popen(ffx_cmd, encoding=encoding, **kwargs)
def read_package_paths(out_dir: str, pkg_name: str) -> List[str]:
"""
Returns:
A list of the absolute path to all FAR files the package depends on.
"""
with open(
os.path.join(DIR_SRC_ROOT, out_dir, 'gen', 'package_metadata',
f'{pkg_name}.meta')) as meta_file:
data = json.load(meta_file)
packages = []
for package in data['packages']:
packages.append(os.path.join(DIR_SRC_ROOT, out_dir, package))
return packages
def register_common_args(parser: ArgumentParser) -> None:
"""Register commonly used arguments."""
common_args = parser.add_argument_group('common', 'common arguments')
common_args.add_argument(
'--out-dir',
'-C',
type=os.path.realpath,
help='Path to the directory in which build files are located. ')
def register_device_args(parser: ArgumentParser) -> None:
"""Register device arguments."""
device_args = parser.add_argument_group('device', 'device arguments')
device_args.add_argument('--target-id',
default=os.environ.get('FUCHSIA_NODENAME'),
help=('Specify the target device. This could be '
'a node-name (e.g. fuchsia-emulator) or an '
'an ip address along with an optional port '
'(e.g. [fe80::e1c4:fd22:5ee5:878e]:22222, '
'1.2.3.4, 1.2.3.4:33333). If unspecified, '
'the default target in ffx will be used.'))
def register_log_args(parser: ArgumentParser) -> None:
"""Register commonly used arguments."""
log_args = parser.add_argument_group('logging', 'logging arguments')
log_args.add_argument('--logs-dir',
type=os.path.realpath,
help=('Directory to write logs to.'))
def get_component_uri(package: str) -> str:
"""Retrieve the uri for a package."""
# If the input is a full package already, do nothing
if package.startswith('fuchsia-pkg://'):
return package
return f'fuchsia-pkg://{REPO_ALIAS}/{package}#meta/{package}.cm'
def resolve_packages(packages: List[str], target_id: Optional[str]) -> None:
"""Ensure that all |packages| are installed on a device."""
ssh_prefix = get_ssh_prefix(get_ssh_address(target_id))
subprocess.run(ssh_prefix + ['--', 'pkgctl', 'gc'], check=False)
def _retry_command(cmd: List[str],
retries: int = 2,
**kwargs) -> Optional[subprocess.CompletedProcess]:
"""Helper function for retrying a subprocess.run command."""
for i in range(retries):
if i == retries - 1:
proc = subprocess.run(cmd, **kwargs, check=True)
return proc
proc = subprocess.run(cmd, **kwargs, check=False)
if proc.returncode == 0:
return proc
time.sleep(3)
return None
for package in packages:
resolve_cmd = [
'--', 'pkgctl', 'resolve',
'fuchsia-pkg://%s/%s' % (REPO_ALIAS, package)
]
_retry_command(ssh_prefix + resolve_cmd)
def get_ssh_address(target_id: Optional[str]) -> str:
"""Determines SSH address for given target."""
return run_ffx_command(cmd=('target', 'get-ssh-address'),
target_id=target_id,
capture_output=True).stdout.strip()
def find_in_dir(target_name: str, parent_dir: str) -> Optional[str]:
"""Finds path in SDK.
Args:
target_name: Name of target to find, as a string.
parent_dir: Directory to start search in.
Returns:
Full path to the target, None if not found.
"""
# Doesn't make sense to look for a full path. Only extract the basename.
target_name = os.path.basename(target_name)
for root, dirs, _ in os.walk(parent_dir):
if target_name in dirs:
return os.path.abspath(os.path.join(root, target_name))
return None
def find_image_in_sdk(product_name: str) -> Optional[str]:
"""Finds image dir in SDK for product given.
Args:
product_name: Name of product's image directory to find.
Returns:
Full path to the target, None if not found.
"""
top_image_dir = os.path.join(SDK_ROOT, os.pardir, 'images')
path = find_in_dir(product_name, parent_dir=top_image_dir)
if path:
return find_in_dir('images', parent_dir=path)
return path
def catch_sigterm() -> None:
"""Catches the kill signal and allows the process to exit cleanly."""
def _sigterm_handler(*_):
sys.exit(0)
signal.signal(signal.SIGTERM, _sigterm_handler)
def wait_for_sigterm(extra_msg: str = '') -> None:
"""
Spin-wait for either ctrl+c or sigterm. Caller can use try-finally
statement to perform extra cleanup.
Args:
extra_msg: The extra message to be logged.
"""
try:
while True:
# We do expect receiving either ctrl+c or sigterm, so this line
# literally means sleep forever.
time.sleep(10000)
except KeyboardInterrupt:
logging.info('Ctrl-C received; %s', extra_msg)
except SystemExit:
logging.info('SIGTERM received; %s', extra_msg)
def get_system_info(target: Optional[str] = None) -> Tuple[str, str]:
"""Retrieves installed OS version frm device.
Returns:
Tuple of strings, containing {product, version number), or a pair of
empty strings to indicate an error.
"""
info_cmd = run_ffx_command(cmd=('target', 'show', '--json'),
target_id=target,
capture_output=True,
check=False)
if info_cmd.returncode == 0:
info_json = json.loads(info_cmd.stdout.strip())
for info in info_json:
if info['title'] == 'Build':
return (info['child'][1]['value'], info['child'][0]['value'])
# If the information was not retrieved, return empty strings to indicate
# unknown system info.
return ('', '')