blob: 0d38b43fe2b09cb31ab112c48c02c0e3e7313b88 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (C) 2020 - 2023 Collabora Limited
# Authors:
# Gustavo Padovan <gustavo.padovan@collabora.com>
# Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT
"""Send a job to LAVA, track it and collect log back"""
import contextlib
import json
import pathlib
import sys
import time
from collections import defaultdict
from dataclasses import dataclass, fields
from datetime import datetime, timedelta
from os import environ, getenv, path
from typing import Any, Optional
import fire
from lavacli.utils import flow_yaml as lava_yaml
from lava.exceptions import (
MesaCIException,
MesaCIParseException,
MesaCIRetryError,
MesaCITimeoutError,
)
from lava.utils import (
CONSOLE_LOG,
GitlabSection,
LAVAJob,
LAVAJobDefinition,
LogFollower,
LogSectionType,
call_proxy,
fatal_err,
hide_sensitive_data,
print_log,
setup_lava_proxy,
)
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
# Initialize structural logging with a defaultdict, it can be changed for more
# sophisticated dict-like data abstractions.
STRUCTURAL_LOG = defaultdict(list)
try:
from ci.structured_logger import StructuredLogger
except ImportError as e:
print_log(
f"Could not import StructuredLogger library: {e}. "
"Falling back to defaultdict based structured logger."
)
# Timeout in seconds to decide if the device from the dispatched LAVA job has
# hung or not due to the lack of new log output.
DEVICE_HANGING_TIMEOUT_SEC = int(getenv("DEVICE_HANGING_TIMEOUT_SEC", 5*60))
# How many seconds the script should wait before try a new polling iteration to
# check if the dispatched LAVA job is running or waiting in the job queue.
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
)
# How many seconds the script will wait to let LAVA finalize the job and give
# the final details.
WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 6)
)
# How many seconds to wait between log output LAVA RPC calls.
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
# How many retries should be made when a timeout happen.
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
)
def raise_exception_from_metadata(metadata: dict, job_id: int) -> None:
"""
Investigate infrastructure errors from the job metadata.
If it finds an error, raise it as MesaCIException.
"""
if "result" not in metadata or metadata["result"] != "fail":
return
if "error_type" in metadata:
error_type = metadata["error_type"]
if error_type == "Infrastructure":
raise MesaCIException(
f"LAVA job {job_id} failed with Infrastructure Error. Retry."
)
if error_type == "Job":
# This happens when LAVA assumes that the job cannot terminate or
# with mal-formed job definitions. As we are always validating the
# jobs, only the former is probable to happen. E.g.: When some LAVA
# action timed out more times than expected in job definition.
raise MesaCIException(
f"LAVA job {job_id} failed with JobError "
"(possible LAVA timeout misconfiguration/bug). Retry."
)
if "case" in metadata and metadata["case"] == "validate":
raise MesaCIException(
f"LAVA job {job_id} failed validation (possible download error). Retry."
)
def raise_lava_error(job) -> None:
# Look for infrastructure errors, raise them, and retry if we see them.
results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results = lava_yaml.load(results_yaml)
for res in results:
metadata = res["metadata"]
raise_exception_from_metadata(metadata, job.job_id)
# If we reach this far, it means that the job ended without hwci script
# result and no LAVA infrastructure problem was found
job.status = "fail"
def show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}"):
with GitlabSection(
"job_data",
"LAVA job info",
type=LogSectionType.LAVA_POST_PROCESSING,
start_collapsed=True,
colour=colour,
):
wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
while not job.is_post_processed() and wait_post_processing_retries > 0:
# Wait a little until LAVA finishes processing metadata
time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
wait_post_processing_retries -= 1
if not job.is_post_processed():
waited_for_sec: int = (
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
* WAIT_FOR_LAVA_POST_PROCESSING_SEC
)
print_log(
f"Waited for {waited_for_sec} seconds "
"for LAVA to post-process the job, it haven't finished yet. "
"Dumping it's info anyway"
)
details: dict[str, str] = job.show()
for field, value in details.items():
print(f"{field:<15}: {value}")
job.refresh_log()
def fetch_logs(job, max_idle_time, log_follower) -> None:
is_job_hanging(job, max_idle_time)
time.sleep(LOG_POLLING_TIME_SEC)
new_log_lines = fetch_new_log_lines(job)
parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
for line in parsed_lines:
print_log(line)
def is_job_hanging(job, max_idle_time):
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now() - job.last_log_time > max_idle_time:
max_idle_time_min = max_idle_time.total_seconds() / 60
raise MesaCITimeoutError(
f"{CONSOLE_LOG['BOLD']}"
f"{CONSOLE_LOG['FG_YELLOW']}"
f"LAVA job {job.job_id} does not respond for {max_idle_time_min} "
"minutes. Retry."
f"{CONSOLE_LOG['RESET']}",
timeout_duration=max_idle_time,
)
def parse_log_lines(job, log_follower, new_log_lines):
if log_follower.feed(new_log_lines):
# If we had non-empty log data, we can assure that the device is alive.
job.heartbeat()
parsed_lines = log_follower.flush()
# Only parse job results when the script reaches the end of the logs.
# Depending on how much payload the RPC scheduler.jobs.logs get, it may
# reach the LAVA_POST_PROCESSING phase.
if log_follower.current_section.type in (
LogSectionType.TEST_CASE,
LogSectionType.LAVA_POST_PROCESSING,
):
parsed_lines = job.parse_job_result_from_log(parsed_lines)
return parsed_lines
def fetch_new_log_lines(job):
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
# Retry the log fetching several times before exposing the error.
for _ in range(5):
with contextlib.suppress(MesaCIParseException):
new_log_lines = job.get_logs()
break
else:
raise MesaCIParseException
return new_log_lines
def submit_job(job):
try:
job.submit()
except Exception as mesa_ci_err:
raise MesaCIException(
f"Could not submit LAVA job. Reason: {mesa_ci_err}"
) from mesa_ci_err
def wait_for_job_get_started(job):
print_log(f"Waiting for job {job.job_id} to start.")
while not job.is_started():
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
job.refresh_log()
print_log(f"Job {job.job_id} started.")
def bootstrap_log_follower() -> LogFollower:
gl = GitlabSection(
id="lava_boot",
header="LAVA boot",
type=LogSectionType.LAVA_BOOT,
start_collapsed=True,
)
print(gl.start())
return LogFollower(starting_section=gl)
def follow_job_execution(job, log_follower):
with log_follower:
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time, log_follower)
structural_log_phases(job, log_follower)
# Mesa Developers expect to have a simple pass/fail job result.
# If this does not happen, it probably means a LAVA infrastructure error
# happened.
if job.status not in ["pass", "fail"]:
raise_lava_error(job)
# LogFollower does some cleanup after the early exit (trigger by
# `hwci: pass|fail` regex), let's update the phases after the cleanup.
structural_log_phases(job, log_follower)
def structural_log_phases(job, log_follower):
phases: dict[str, Any] = {
s.header.split(" - ")[0]: {
k: str(getattr(s, k)) for k in ("start_time", "end_time")
}
for s in log_follower.section_history
}
job.log["dut_job_phases"] = phases
def print_job_final_status(job):
if job.status == "running":
job.status = "hung"
color = LAVAJob.COLOR_STATUS_MAP.get(job.status, CONSOLE_LOG["FG_RED"])
print_log(
f"{color}"
f"LAVA Job finished with status: {job.status}"
f"{CONSOLE_LOG['RESET']}"
)
job.refresh_log()
show_final_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
def execute_job_with_retries(
proxy, job_definition, retry_count, jobs_log
) -> Optional[LAVAJob]:
last_failed_job = None
for attempt_no in range(1, retry_count + 2):
# Need to get the logger value from its object to enable autosave
# features, if AutoSaveDict is enabled from StructuredLogging module
jobs_log.append({})
job_log = jobs_log[-1]
job = LAVAJob(proxy, job_definition, job_log)
STRUCTURAL_LOG["dut_attempt_counter"] = attempt_no
try:
job_log["submitter_start_time"] = datetime.now().isoformat()
submit_job(job)
wait_for_job_get_started(job)
log_follower: LogFollower = bootstrap_log_follower()
follow_job_execution(job, log_follower)
return job
except (MesaCIException, KeyboardInterrupt) as exception:
job.handle_exception(exception)
finally:
print_job_final_status(job)
# If LAVA takes too long to post process the job, the submitter
# gives up and proceeds.
job_log["submitter_end_time"] = datetime.now().isoformat()
last_failed_job = job
print_log(
f"{CONSOLE_LOG['BOLD']}"
f"Finished executing LAVA job in the attempt #{attempt_no}"
f"{CONSOLE_LOG['RESET']}"
)
return last_failed_job
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
last_attempted_job = execute_job_with_retries(
proxy, job_definition, number_of_retries, STRUCTURAL_LOG["dut_jobs"]
)
if last_attempted_job.exception is not None:
# Infra failed in all attempts
raise MesaCIRetryError(
f"{CONSOLE_LOG['BOLD']}"
f"{CONSOLE_LOG['FG_RED']}"
"Job failed after it exceeded the number of "
f"{number_of_retries} retries."
f"{CONSOLE_LOG['RESET']}",
retry_count=number_of_retries,
last_job=last_attempted_job,
)
return last_attempted_job
@dataclass
class PathResolver:
def __post_init__(self):
for field in fields(self):
value = getattr(self, field.name)
if not value:
continue
if field.type == pathlib.Path:
value = pathlib.Path(value)
setattr(self, field.name, value.resolve())
@dataclass
class LAVAJobSubmitter(PathResolver):
boot_method: str
ci_project_dir: str
device_type: str
job_timeout_min: int # The job timeout in minutes
build_url: str = None
dtb_filename: str = None
dump_yaml: bool = False # Whether to dump the YAML payload to stdout
first_stage_init: str = None
jwt_file: pathlib.Path = None
kernel_image_name: str = None
kernel_image_type: str = ""
kernel_url_prefix: str = None
kernel_external: str = None
lava_tags: str = "" # Comma-separated LAVA tags for the job
mesa_job_name: str = "mesa_ci_job"
pipeline_info: str = ""
rootfs_url_prefix: str = None
validate_only: bool = False # Whether to only validate the job, not execute it
visibility_group: str = None # Only affects LAVA farm maintainers
job_rootfs_overlay_url: str = None
structured_log_file: pathlib.Path = None # Log file path with structured LAVA log
ssh_client_image: str = None # x86_64 SSH client image to follow the job's output
project_name: str = None # Project name to be used in the job name
__structured_log_context = contextlib.nullcontext() # Structured Logger context
def __post_init__(self) -> None:
super().__post_init__()
# Remove mesa job names with spaces, which breaks the lava-test-case command
self.mesa_job_name = self.mesa_job_name.split(" ")[0]
if not self.structured_log_file:
return
self.__structured_log_context = StructuredLoggerWrapper(self).logger_context()
self.proxy = setup_lava_proxy()
def __prepare_submission(self) -> str:
# Overwrite the timeout for the testcases with the value offered by the
# user. The testcase running time should be at least 4 times greater than
# the other sections (boot and setup), so we can safely ignore them.
# If LAVA fails to stop the job at this stage, it will fall back to the
# script section timeout with a reasonable delay.
GL_SECTION_TIMEOUTS[LogSectionType.TEST_CASE] = timedelta(
minutes=self.job_timeout_min
)
job_definition = LAVAJobDefinition(self).generate_lava_job_definition()
if self.dump_yaml:
self.dump_job_definition(job_definition)
validation_job = LAVAJob(self.proxy, job_definition)
if errors := validation_job.validate():
fatal_err(f"Error in LAVA job definition: {errors}")
print_log("LAVA job definition validated successfully")
return job_definition
@classmethod
def is_under_ci(cls):
ci_envvar: str = getenv("CI", "false")
return ci_envvar.lower() == "true"
def dump_job_definition(self, job_definition) -> None:
with GitlabSection(
"yaml_dump",
"LAVA job definition (YAML)",
type=LogSectionType.LAVA_BOOT,
start_collapsed=True,
):
print(hide_sensitive_data(job_definition))
def submit(self) -> None:
"""
Prepares and submits the LAVA job.
If `validate_only` is True, it validates the job without submitting it.
If the job finishes with a non-pass status or encounters an exception,
the program exits with a non-zero return code.
"""
job_definition: str = self.__prepare_submission()
if self.validate_only:
return
with self.__structured_log_context:
last_attempt_job = None
try:
last_attempt_job = retriable_follow_job(self.proxy, job_definition)
except MesaCIRetryError as retry_exception:
last_attempt_job = retry_exception.last_job
except Exception as exception:
STRUCTURAL_LOG["job_combined_fail_reason"] = str(exception)
raise exception
finally:
self.finish_script(last_attempt_job)
def print_log_artifact_url(self):
relative_log_path = self.structured_log_file.relative_to(pathlib.Path.cwd())
full_path = f"$ARTIFACTS_BASE_URL/{relative_log_path}"
artifact_url = path.expandvars(full_path)
print_log(f"Structural Logging data available at: {artifact_url}")
def finish_script(self, last_attempt_job):
if self.is_under_ci() and self.structured_log_file:
self.print_log_artifact_url()
if not last_attempt_job:
# No job was run, something bad happened
STRUCTURAL_LOG["job_combined_status"] = "script_crash"
current_exception = str(sys.exc_info()[0])
STRUCTURAL_LOG["job_combined_fail_reason"] = current_exception
raise SystemExit(1)
STRUCTURAL_LOG["job_combined_status"] = last_attempt_job.status
if last_attempt_job.status != "pass":
raise SystemExit(1)
class StructuredLoggerWrapper:
def __init__(self, submitter: LAVAJobSubmitter) -> None:
self.__submitter: LAVAJobSubmitter = submitter
def _init_logger(self):
STRUCTURAL_LOG["fixed_tags"] = self.__submitter.lava_tags
STRUCTURAL_LOG["dut_job_type"] = self.__submitter.device_type
STRUCTURAL_LOG["job_combined_fail_reason"] = None
STRUCTURAL_LOG["job_combined_status"] = "not_submitted"
STRUCTURAL_LOG["dut_attempt_counter"] = 0
# Initialize dut_jobs list to enable appends
STRUCTURAL_LOG["dut_jobs"] = []
@contextlib.contextmanager
def _simple_logger_context(self):
log_file = pathlib.Path(self.__submitter.structured_log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
try:
# Truncate the file
log_file.write_text("")
yield
finally:
log_file.write_text(json.dumps(STRUCTURAL_LOG, indent=2))
def logger_context(self):
context = contextlib.nullcontext()
try:
global STRUCTURAL_LOG
STRUCTURAL_LOG = StructuredLogger(
self.__submitter.structured_log_file, truncate=True
).data
except NameError:
context = self._simple_logger_context()
self._init_logger()
return context
if __name__ == "__main__":
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
# GitLab runner -> GitLab primary -> user, safe to say we don't need any
# more buffering
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# LAVA farm is giving datetime in UTC timezone, let's set it locally for the
# script run.
# Setting environ here will not affect the system time, as the os.environ
# lifetime follows the script one.
environ["TZ"] = "UTC"
time.tzset()
fire.Fire(LAVAJobSubmitter)