| # -*- coding: utf-8 -*- |
| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Test Result generated by autotest.""" |
| |
| import datetime |
| import json |
| import logging # pylint: disable=cros-logging-import |
| import os |
| import pathlib |
| import re |
| import sys |
| |
| from moblab_common import afe_connector |
| |
| import test_result_interface |
| |
| |
| _AUTOTEST_ROOT = pathlib.Path("/usr/local/autotest") |
| ROOT_DIR = _AUTOTEST_ROOT / "results" |
| |
| _LOGGER = logging.getLogger("moblab_uploader") |
| _LOGGER.setLevel(logging.DEBUG) |
| _TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S" |
| _STATUS_LOG = "status.log" |
| _JOB_INFO = "job.serialize" |
| |
| # Add Autotest TKO directory to sys.path and import library to parse |
| # job.serialize. |
| sys.path.append(str(_AUTOTEST_ROOT)) |
| from tko import tko_pb2 |
| |
| sys.path.pop() |
| |
| |
| def _query_speical_task_finished_time(id_): |
| """Query AFE with task id and yield the finished time.""" |
| afe = afe_connector.AFEConnector() |
| |
| def func(): |
| result = afe.get_special_tasks([id_]) |
| if not result: |
| raise test_result_interface.TestResultError( |
| "Failed to query AFE special task {}".format(id_) |
| ) |
| |
| time_finished_str = result[0].get("time_finished") |
| if time_finished_str: |
| return datetime.datetime.strptime( |
| time_finished_str, _TIMESTAMP_FORMAT |
| ).replace(tzinfo=datetime.timezone.utc) |
| else: |
| return None |
| |
| return func |
| |
| |
| def _query_job_finished_time(id_): |
| """Query AFE with job id and yield the finished time.""" |
| afe = afe_connector.AFEConnector() |
| |
| def func(): |
| result = afe.get_jobs_by_ids([id_]) |
| if not result: |
| raise test_result_interface.TestResultError( |
| "Failed to query AFE job {}".format(id_) |
| ) |
| result = result[0] |
| hqes, error = afe.get_host_queue_entries(id_) |
| if error or not hqes: |
| _LOGGER.error("Error querying HQEs for job %s: %s", id_, error) |
| return None |
| |
| running_hqes = [hqe for hqe in hqes if not hqe.get("complete")] |
| if running_hqes: # Test job is still running. |
| _LOGGER.debug("Determined the HQE for job %s are running", id_) |
| return None |
| |
| hqes_finished_time = [ |
| datetime.datetime.strptime( |
| hqe.get("finished_on"), _TIMESTAMP_FORMAT |
| ) |
| for hqe in hqes |
| if hqe.get("finished_on") |
| ] |
| # All autotest times are in UTC |
| hqes_finished_time = [ |
| t.replace(tzinfo=datetime.timezone.utc) for t in hqes_finished_time |
| ] |
| if hqes_finished_time: |
| return max(hqes_finished_time) |
| else: |
| # The value of 'finished_on' for failed/aborted HQE is NULL. |
| # Thus use the job creating time instead. |
| _LOGGER.info( |
| "Determined that all HQEs (%s) are completed.\n" |
| "No finished_on time, using job created_on time.\n" |
| "Job summary: %s\n" |
| "HQEs detailes: %s\n", |
| id_, |
| result, |
| hqes, |
| ) |
| create_on_str = result.get("created_on") |
| if create_on_str: |
| created_time = datetime.datetime.strptime( |
| create_on_str, _TIMESTAMP_FORMAT |
| ) |
| return created_time.replace(tzinfo=datetime.timezone.utc) |
| else: |
| _LOGGER.warn( |
| "No job finish or creation time for job id %s", id_ |
| ) |
| return None |
| |
| return func |
| |
| |
| class _AutotestResult(test_result_interface.TestResult): |
| """Base class for Autotest job and special tasks.""" |
| |
| @classmethod |
| def create(cls, *, root_dir, relative_path): |
| pattern_match = re.match(cls.REGEX_PATTERN, str(relative_path)) |
| if not pattern_match: |
| return None |
| return cls(root_dir=root_dir, relative_path=relative_path) |
| |
| def __init__(self, *, description, root_dir, relative_path): |
| super().__init__( |
| description=description, |
| root_dir=root_dir, |
| relative_path=relative_path, |
| ) |
| # TODO(guocb) Cache the data to a local disk file. |
| self._finished_time = None |
| self._succeeded = None |
| self._afe_query = None |
| self._suite_name = None |
| |
| @property |
| def id(self): |
| return self._id |
| |
| @property |
| def job_id(self): |
| return None |
| |
| @property |
| def finished_time(self): |
| if not self._finished_time: |
| self._finished_time = self._afe_query() |
| |
| return self._finished_time |
| |
| @property |
| def succeeded(self): |
| """Check status.log to figure out the job/task exit status.""" |
| if self._succeeded is not None: |
| return self._succeeded |
| |
| succeeded = [] |
| for status_log in self.abs_path.glob("**/{}".format(_STATUS_LOG)): |
| try: |
| with status_log.open() as f: |
| for line in f: |
| if not line.startswith("END "): |
| continue |
| exit_status = line.split(maxsplit=2)[1] |
| succeeded.append(exit_status == "GOOD") |
| |
| except (FileNotFoundError, PermissionError, IndexError): |
| return None |
| |
| self._succeeded = all(succeeded) |
| return self._succeeded |
| |
| @property |
| def suite_name(self): |
| if self._suite_name: |
| return self._suite_name |
| |
| job = tko_pb2.Job() |
| for host_job_info in self.abs_path.glob("*/{}".format(_JOB_INFO)): |
| # Just parse the 1st job.serialize file we found. The suite name |
| # is same for all host directory of each test. |
| try: |
| with host_job_info.open("rb") as f: |
| data = f.read() |
| except (FileNotFoundError, PermissionError) as err: |
| _LOGGER.warning("Can't parse %s: %s", _JOB_INFO, err) |
| return "" |
| |
| job.ParseFromString(data) |
| self._suite_name = job.suite |
| return self._suite_name |
| |
| return "" |
| |
| |
| class AutotestHostTask(_AutotestResult): |
| """A class to represent autotest host tasks.""" |
| |
| GLOB_PATTERN = "hosts/*/[0-9]*-*/" |
| REGEX_PATTERN = r"^hosts/[^/]+/(?P<id>(?P<afe_id>\d+)-\S+)$" |
| |
| def __init__(self, *, root_dir, relative_path): |
| super().__init__( |
| description="Autotest host task", |
| root_dir=root_dir, |
| relative_path=relative_path, |
| ) |
| pattern_match = re.match(self.REGEX_PATTERN, str(relative_path)) |
| if not pattern_match: |
| raise test_result_interface.TestResultError( |
| "Can't get id of {}".format(self) |
| ) |
| |
| self._id = pattern_match.group("id") |
| self._afe_query = _query_speical_task_finished_time( |
| pattern_match.group("afe_id") |
| ) |
| |
| @property |
| def pubsub_when_uploaded(self): |
| """Don't pubsub event when upload an Autotest special task result.""" |
| return False |
| |
| @property |
| def skip_uploading_when_succeeded(self): |
| """Don't upload the task directory when it succeeded.""" |
| return True |
| |
| @property |
| def is_logged(self): |
| """No need to log information about non job results.""" |
| return False |
| |
| |
| class AutotestTestJob(_AutotestResult): |
| """A class to represent autotest test job.""" |
| |
| GLOB_PATTERN = "[0-9]*-*/" |
| REGEX_PATTERN = r"(?P<afe_id>\d+)-\S+" |
| |
| def __init__(self, *, root_dir, relative_path): |
| super().__init__( |
| description="Autotest job", |
| root_dir=root_dir, |
| relative_path=relative_path, |
| ) |
| pattern_match = re.match(self.REGEX_PATTERN, str(relative_path)) |
| if not pattern_match: |
| raise test_result_interface.TestResultError( |
| "Can't get id of {}".format(self) |
| ) |
| |
| self._id = relative_path |
| self._afe_query = _query_job_finished_time( |
| pattern_match.group("afe_id") |
| ) |
| self._job_id = pattern_match.group("afe_id") |
| |
| @property |
| def job_id(self): |
| return self._job_id |