| # Copyright 2022 The Chromium Authors | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 | """Provide helpers for running Fuchsia's `ffx`.""" | 
 |  | 
 | import logging | 
 | import os | 
 | import json | 
 | import subprocess | 
 | import sys | 
 | import tempfile | 
 |  | 
 | from contextlib import AbstractContextManager | 
 | from typing import IO, Iterable, List, Optional | 
 |  | 
 | from common import run_continuous_ffx_command, run_ffx_command, SDK_ROOT | 
 |  | 
 | RUN_SUMMARY_SCHEMA = \ | 
 |     'https://fuchsia.dev/schema/ffx_test/run_summary-8d1dd964.json' | 
 |  | 
 |  | 
 | def get_config(name: str) -> Optional[str]: | 
 |     """Run a ffx config get command to retrieve the config value.""" | 
 |  | 
 |     try: | 
 |         return run_ffx_command(cmd=['config', 'get', name], | 
 |                                capture_output=True).stdout.strip() | 
 |     except subprocess.CalledProcessError as cpe: | 
 |         # A return code of 2 indicates no previous value set. | 
 |         if cpe.returncode == 2: | 
 |             return None | 
 |         raise | 
 |  | 
 |  | 
 | class ScopedFfxConfig(AbstractContextManager): | 
 |     """Temporarily overrides `ffx` configuration. Restores the previous value | 
 |     upon exit.""" | 
 |  | 
 |     def __init__(self, name: str, value: str) -> None: | 
 |         """ | 
 |         Args: | 
 |             name: The name of the property to set. | 
 |             value: The value to associate with `name`. | 
 |         """ | 
 |         self._old_value = None | 
 |         self._new_value = value | 
 |         self._name = name | 
 |  | 
 |     def __enter__(self): | 
 |         """Override the configuration.""" | 
 |  | 
 |         # Cache the old value. | 
 |         self._old_value = get_config(self._name) | 
 |         if self._new_value != self._old_value: | 
 |             run_ffx_command(cmd=['config', 'set', self._name, self._new_value]) | 
 |         return self | 
 |  | 
 |     def __exit__(self, exc_type, exc_val, exc_tb) -> bool: | 
 |         if self._new_value == self._old_value: | 
 |             return False | 
 |  | 
 |         # Allow removal of config to fail. | 
 |         remove_cmd = run_ffx_command(cmd=['config', 'remove', self._name], | 
 |                                      check=False) | 
 |         if remove_cmd.returncode != 0: | 
 |             logging.warning('Error when removing ffx config %s', self._name) | 
 |  | 
 |         # Explicitly set the value back only if removing the new value doesn't | 
 |         # already restore the old value. | 
 |         if self._old_value is not None and \ | 
 |            self._old_value != get_config(self._name): | 
 |             run_ffx_command(cmd=['config', 'set', self._name, self._old_value]) | 
 |  | 
 |         # Do not suppress exceptions. | 
 |         return False | 
 |  | 
 |  | 
 | def test_connection(target_id: Optional[str]) -> None: | 
 |     """Run an echo test to verify that the device can be connected to.""" | 
 |  | 
 |     run_ffx_command(cmd=('target', 'echo'), target_id=target_id) | 
 |  | 
 |  | 
 | class FfxTestRunner(AbstractContextManager): | 
 |     """A context manager that manages a session for running a test via `ffx`. | 
 |  | 
 |     Upon entry, an instance of this class configures `ffx` to retrieve files | 
 |     generated by a test and prepares a directory to hold these files either in a | 
 |     specified directory or in tmp. On exit, any previous configuration of | 
 |     `ffx` is restored and the temporary directory, if used, is deleted. | 
 |  | 
 |     The prepared directory is used when invoking `ffx test run`. | 
 |     """ | 
 |  | 
 |     def __init__(self, results_dir: Optional[str] = None) -> None: | 
 |         """ | 
 |         Args: | 
 |             results_dir: Directory on the host where results should be stored. | 
 |         """ | 
 |         self._results_dir = results_dir | 
 |         self._custom_artifact_directory = None | 
 |         self._temp_results_dir = None | 
 |         self._debug_data_directory = None | 
 |  | 
 |     def __enter__(self): | 
 |         if self._results_dir: | 
 |             os.makedirs(self._results_dir, exist_ok=True) | 
 |         else: | 
 |             self._temp_results_dir = tempfile.TemporaryDirectory() | 
 |             self._results_dir = self._temp_results_dir.__enter__() | 
 |         return self | 
 |  | 
 |     def __exit__(self, exc_type, exc_val, exc_tb) -> bool: | 
 |         if self._temp_results_dir: | 
 |             self._temp_results_dir.__exit__(exc_type, exc_val, exc_tb) | 
 |             self._temp_results_dir = None | 
 |  | 
 |         # Do not suppress exceptions. | 
 |         return False | 
 |  | 
 |     def run_test(self, | 
 |                  component_uri: str, | 
 |                  test_args: Optional[Iterable[str]] = None, | 
 |                  node_name: Optional[str] = None) -> subprocess.Popen: | 
 |         """Starts a subprocess to run a test on a target. | 
 |         Args: | 
 |             component_uri: The test component URI. | 
 |             test_args: Arguments to the test package, if any. | 
 |             node_name: The target on which to run the test. | 
 |         Returns: | 
 |             A subprocess.Popen object. | 
 |         """ | 
 |         command = [ | 
 |             'test', 'run', '--output-directory', self._results_dir, | 
 |             component_uri | 
 |         ] | 
 |         if test_args: | 
 |             command.append('--') | 
 |             command.extend(test_args) | 
 |         return run_continuous_ffx_command(command, | 
 |                                           node_name, | 
 |                                           stdout=subprocess.PIPE, | 
 |                                           stderr=subprocess.STDOUT) | 
 |  | 
 |     def _parse_test_outputs(self): | 
 |         """Parses the output files generated by the test runner. | 
 |  | 
 |         The instance's `_custom_artifact_directory` member is set to the | 
 |         directory holding output files emitted by the test. | 
 |  | 
 |         This function is idempotent, and performs no work if it has already been | 
 |         called. | 
 |         """ | 
 |         if self._custom_artifact_directory: | 
 |             return | 
 |  | 
 |         run_summary_path = os.path.join(self._results_dir, 'run_summary.json') | 
 |         try: | 
 |             with open(run_summary_path) as run_summary_file: | 
 |                 run_summary = json.load(run_summary_file) | 
 |         except IOError: | 
 |             logging.exception('Error reading run summary file.') | 
 |             return | 
 |         except ValueError: | 
 |             logging.exception('Error parsing run summary file %s', | 
 |                               run_summary_path) | 
 |             return | 
 |  | 
 |         assert run_summary['schema_id'] == RUN_SUMMARY_SCHEMA, \ | 
 |             'Unsupported version found in %s' % run_summary_path | 
 |  | 
 |         run_artifact_dir = run_summary.get('data', {})['artifact_dir'] | 
 |         for artifact_path, artifact in run_summary.get( | 
 |                 'data', {})['artifacts'].items(): | 
 |             if artifact['artifact_type'] == 'DEBUG': | 
 |                 self._debug_data_directory = os.path.join( | 
 |                     self._results_dir, run_artifact_dir, artifact_path) | 
 |                 break | 
 |  | 
 |         if run_summary['data']['outcome'] == "NOT_STARTED": | 
 |             logging.critical('Test execution was interrupted. Either the ' | 
 |                              'emulator crashed while the tests were still ' | 
 |                              'running or connection to the device was lost.') | 
 |             sys.exit(1) | 
 |  | 
 |         # There should be precisely one suite for the test that ran. | 
 |         suites_list = run_summary.get('data', {}).get('suites') | 
 |         if not suites_list: | 
 |             logging.error('Missing or empty list of suites in %s', | 
 |                           run_summary_path) | 
 |             return | 
 |         suite_summary = suites_list[0] | 
 |  | 
 |         # Get the top-level directory holding all artifacts for this suite. | 
 |         artifact_dir = suite_summary.get('artifact_dir') | 
 |         if not artifact_dir: | 
 |             logging.error('Failed to find suite\'s artifact_dir in %s', | 
 |                           run_summary_path) | 
 |             return | 
 |  | 
 |         # Get the path corresponding to artifacts | 
 |         for artifact_path, artifact in suite_summary['artifacts'].items(): | 
 |             if artifact['artifact_type'] == 'CUSTOM': | 
 |                 self._custom_artifact_directory = os.path.join( | 
 |                     self._results_dir, artifact_dir, artifact_path) | 
 |                 break | 
 |  | 
 |     def get_custom_artifact_directory(self) -> str: | 
 |         """Returns the full path to the directory holding custom artifacts | 
 |         emitted by the test or None if the directory could not be discovered. | 
 |         """ | 
 |         self._parse_test_outputs() | 
 |         return self._custom_artifact_directory | 
 |  | 
 |     def get_debug_data_directory(self): | 
 |         """Returns the full path to the directory holding debug data | 
 |         emitted by the test, or None if the path cannot be determined. | 
 |         """ | 
 |         self._parse_test_outputs() | 
 |         return self._debug_data_directory | 
 |  | 
 |  | 
 | def run_symbolizer(symbol_paths: List[str], input_fd: IO, | 
 |                    output_fd: IO) -> subprocess.Popen: | 
 |     """Runs symbolizer that symbolizes |input| and outputs to |output|.""" | 
 |  | 
 |     symbolize_cmd = ([ | 
 |         'debug', 'symbolize', '--', '--omit-module-lines', '--build-id-dir', | 
 |         os.path.join(SDK_ROOT, '.build-id') | 
 |     ]) | 
 |     for path in symbol_paths: | 
 |         symbolize_cmd.extend(['--ids-txt', path]) | 
 |     return run_continuous_ffx_command(symbolize_cmd, | 
 |                                       stdin=input_fd, | 
 |                                       stdout=output_fd, | 
 |                                       stderr=subprocess.STDOUT) |