blob: c483abbcc02dcc09caaf9fee9f8278c71db79d8f [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test Result generated by autotest."""
import datetime
import json
import logging # pylint: disable=cros-logging-import
import os
import pathlib
import re
import sys
from moblab_common import afe_connector
import test_result_interface
_AUTOTEST_ROOT = pathlib.Path("/usr/local/autotest")
ROOT_DIR = _AUTOTEST_ROOT / "results"
_LOGGER = logging.getLogger("moblab_uploader")
_LOGGER.setLevel(logging.DEBUG)
_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
_STATUS_LOG = "status.log"
_JOB_INFO = "job.serialize"
# Add Autotest TKO directory to sys.path and import library to parse
# job.serialize.
sys.path.append(str(_AUTOTEST_ROOT))
from tko import tko_pb2
sys.path.pop()
def _query_speical_task_finished_time(id_):
"""Query AFE with task id and yield the finished time."""
afe = afe_connector.AFEConnector()
def func():
result = afe.get_special_tasks([id_])
if not result:
raise test_result_interface.TestResultError(
"Failed to query AFE special task {}".format(id_)
)
time_finished_str = result[0].get("time_finished")
if time_finished_str:
return datetime.datetime.strptime(
time_finished_str, _TIMESTAMP_FORMAT
).replace(tzinfo=datetime.timezone.utc)
else:
return None
return func
def _query_job_finished_time(id_):
"""Query AFE with job id and yield the finished time."""
afe = afe_connector.AFEConnector()
def func():
result = afe.get_jobs_by_ids([id_])
if not result:
raise test_result_interface.TestResultError(
"Failed to query AFE job {}".format(id_)
)
result = result[0]
hqes, error = afe.get_host_queue_entries(id_)
if error or not hqes:
_LOGGER.error("Error querying HQEs for job %s: %s", id_, error)
return None
running_hqes = [hqe for hqe in hqes if not hqe.get("complete")]
if running_hqes: # Test job is still running.
_LOGGER.debug("Determined the HQE for job %s are running", id_)
return None
hqes_finished_time = [
datetime.datetime.strptime(
hqe.get("finished_on"), _TIMESTAMP_FORMAT
)
for hqe in hqes
if hqe.get("finished_on")
]
# All autotest times are in UTC
hqes_finished_time = [
t.replace(tzinfo=datetime.timezone.utc) for t in hqes_finished_time
]
if hqes_finished_time:
return max(hqes_finished_time)
else:
# The value of 'finished_on' for failed/aborted HQE is NULL.
# Thus use the job creating time instead.
_LOGGER.info(
"Determined that all HQEs (%s) are completed.\n"
"No finished_on time, using job created_on time.\n"
"Job summary: %s\n"
"HQEs detailes: %s\n",
id_,
result,
hqes,
)
create_on_str = result.get("created_on")
if create_on_str:
created_time = datetime.datetime.strptime(
create_on_str, _TIMESTAMP_FORMAT
)
return created_time.replace(tzinfo=datetime.timezone.utc)
else:
_LOGGER.warn(
"No job finish or creation time for job id %s", id_
)
return None
return func
class _AutotestResult(test_result_interface.TestResult):
"""Base class for Autotest job and special tasks."""
@classmethod
def create(cls, *, root_dir, relative_path):
pattern_match = re.match(cls.REGEX_PATTERN, str(relative_path))
if not pattern_match:
return None
return cls(root_dir=root_dir, relative_path=relative_path)
def __init__(self, *, description, root_dir, relative_path):
super().__init__(
description=description,
root_dir=root_dir,
relative_path=relative_path,
)
# TODO(guocb) Cache the data to a local disk file.
self._finished_time = None
self._succeeded = None
self._afe_query = None
self._suite_name = None
@property
def id(self):
return self._id
@property
def job_id(self):
return None
@property
def finished_time(self):
if not self._finished_time:
self._finished_time = self._afe_query()
return self._finished_time
@property
def succeeded(self):
"""Check status.log to figure out the job/task exit status."""
if self._succeeded is not None:
return self._succeeded
succeeded = []
for status_log in self.abs_path.glob("**/{}".format(_STATUS_LOG)):
try:
with status_log.open() as f:
for line in f:
if not line.startswith("END "):
continue
exit_status = line.split(maxsplit=2)[1]
succeeded.append(exit_status == "GOOD")
except (FileNotFoundError, PermissionError, IndexError):
return None
self._succeeded = all(succeeded)
return self._succeeded
@property
def suite_name(self):
if self._suite_name:
return self._suite_name
job = tko_pb2.Job()
for host_job_info in self.abs_path.glob("*/{}".format(_JOB_INFO)):
# Just parse the 1st job.serialize file we found. The suite name
# is same for all host directory of each test.
try:
with host_job_info.open("rb") as f:
data = f.read()
except (FileNotFoundError, PermissionError) as err:
_LOGGER.warning("Can't parse %s: %s", _JOB_INFO, err)
return ""
job.ParseFromString(data)
self._suite_name = job.suite
return self._suite_name
return ""
class AutotestHostTask(_AutotestResult):
"""A class to represent autotest host tasks."""
GLOB_PATTERN = "hosts/*/[0-9]*-*/"
REGEX_PATTERN = r"^hosts/[^/]+/(?P<id>(?P<afe_id>\d+)-\S+)$"
def __init__(self, *, root_dir, relative_path):
super().__init__(
description="Autotest host task",
root_dir=root_dir,
relative_path=relative_path,
)
pattern_match = re.match(self.REGEX_PATTERN, str(relative_path))
if not pattern_match:
raise test_result_interface.TestResultError(
"Can't get id of {}".format(self)
)
self._id = pattern_match.group("id")
self._afe_query = _query_speical_task_finished_time(
pattern_match.group("afe_id")
)
@property
def pubsub_when_uploaded(self):
"""Don't pubsub event when upload an Autotest special task result."""
return False
@property
def skip_uploading_when_succeeded(self):
"""Don't upload the task directory when it succeeded."""
return True
@property
def is_logged(self):
"""No need to log information about non job results."""
return False
class AutotestTestJob(_AutotestResult):
"""A class to represent autotest test job."""
GLOB_PATTERN = "[0-9]*-*/"
REGEX_PATTERN = r"(?P<afe_id>\d+)-\S+"
def __init__(self, *, root_dir, relative_path):
super().__init__(
description="Autotest job",
root_dir=root_dir,
relative_path=relative_path,
)
pattern_match = re.match(self.REGEX_PATTERN, str(relative_path))
if not pattern_match:
raise test_result_interface.TestResultError(
"Can't get id of {}".format(self)
)
self._id = relative_path
self._afe_query = _query_job_finished_time(
pattern_match.group("afe_id")
)
self._job_id = pattern_match.group("afe_id")
@property
def job_id(self):
return self._job_id