blob: c49b3a2a6164f659af27ee371fdc3044de66c563 [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module defines some coroutines to check test result status."""
from __future__ import print_function
import collections
import datetime
import enum
import logging
import processing_step
import test_result_interface
from moblab_common import lock_util
MARKER_FILE = ".uploader_status"
_MARKER_LOCK_FILE = ".uploader_status.lock"
_LOGGER = logging.getLogger("moblab_uploader")
Status = enum.Enum(
"Status", ["NOT_READY", "READY", "UPLOADING", "UPLOADED", "EXPIRED"]
)
ResultAndStatus = collections.namedtuple(
"ResultAndStatus", ["status", "test_result"]
)
async def _load_status(test_result):
"""Load status of the `test_result` from status file."""
try:
status_str = (test_result.abs_path / MARKER_FILE).read_text()
except FileNotFoundError:
return Status.NOT_READY
for status in Status:
if status.name == status_str:
return status
return Status.NOT_READY
async def _write_status(test_result, status):
"""Write new `status` to status file of the `test_result_state`."""
if test_result.is_logged:
_LOGGER.info("change status of %s to %s", test_result, status)
(test_result.abs_path / MARKER_FILE).write_text(status.name)
async def mark_uploading_started(test_result):
"""Change the test result status to UPLOADING."""
with lock_util.file_lock(
test_result.abs_path / _MARKER_LOCK_FILE, exclusive=True
):
previous_status = await _load_status(test_result)
# Should have been expired, so skip the uploading.
if previous_status != Status.READY:
return False
await _write_status(test_result, Status.UPLOADING)
return True
async def mark_uploading_result(test_result, *, succeeded):
"""Change the test result status according to uploading result."""
with lock_util.file_lock(
test_result.abs_path / _MARKER_LOCK_FILE, exclusive=True
):
await _write_status(
test_result, Status.UPLOADED if succeeded else Status.NOT_READY
)
class ResultStatusChecker(processing_step.ProcessStep):
"""A class to check current test result status for uploading."""
def __init__(self, *, min_age_to_upload, configuration, next_step=None):
self._min_age_to_upload = min_age_to_upload
self._configuration = configuration
self._next_step = next_step
def _check_latest_status(self, test_result, previous_status):
# check test_result finished time to determine the status.
try:
finished_time = test_result.finished_time
except test_result_interface.TestResultError as ex:
_LOGGER.warning(
"Cannot get info of %s from scheduler, keep retrying. "
"Exception: %s",
test_result,
ex,
)
# TODO(haddowk) Figure out a way to expire these jobs after
# a certain amount of time.
return Status.NOT_READY
if not finished_time:
return Status.NOT_READY
age = datetime.timedelta()
try:
age = datetime.datetime.now(datetime.timezone.utc) - finished_time
except TypeError:
_LOGGER.error("Unable to determine age - default to 0")
_LOGGER.error(finished_time)
if previous_status not in [Status.UPLOADED]:
if test_result.is_logged:
_LOGGER.debug(
"%s finished on %s age %s", test_result, finished_time, age
)
if age < self._min_age_to_upload:
return Status.NOT_READY
if (
age > self._configuration.get_min_age_to_prune()
and previous_status in [Status.UPLOADED]
):
if test_result.is_logged:
_LOGGER.debug(
"%s Job is past the max age - expire", test_result
)
return Status.EXPIRED
if previous_status == Status.NOT_READY:
if test_result.is_logged:
_LOGGER.debug(
"%s Not running and in the age range to upload.",
test_result,
)
return Status.READY
return previous_status
async def check(self, test_result):
"""Check current status and upload the test result if OK.
We use MARKER_FILE to save result directory status:
- EXPIRED: which means it should be pruned, but not
complete yet.
- UPLOADING: which means a thread is uploading it. We don't prune
directory in this state even though it's expired.
- Other status (i.e. NOT_READY, UPLOADED), we will check the latest
time according to the finished time to see if it is ready for UPLOADING
or has EXPIRED.
"""
try:
with lock_util.file_lock(
test_result.abs_path / _MARKER_LOCK_FILE, exclusive=True
):
previous_status = await _load_status(test_result)
if test_result.is_logged and previous_status not in [
Status.UPLOADED
]:
_LOGGER.debug(
"%s Previous status %s", test_result, previous_status
)
current_status = previous_status
if previous_status not in [Status.EXPIRED, Status.UPLOADING]:
current_status = self._check_latest_status(
test_result, previous_status
)
if test_result.is_logged and previous_status != current_status:
_LOGGER.debug(
"%s Current status %s", test_result, current_status
)
if previous_status != current_status:
await _write_status(test_result, current_status)
except FileNotFoundError:
_LOGGER.exception(
"The directory may be pruned just before checking."
)
return
if current_status in [Status.EXPIRED, Status.READY, Status.UPLOADING]:
await self._next_step(
ResultAndStatus(status=current_status, test_result=test_result)
)
async def get_last_status(self, test_result):
"""Run next step with 'last' (may out of date) directory status."""
last_status = await _load_status(test_result)
await self._next_step(
ResultAndStatus(status=last_status, test_result=test_result)
)
class StatusFilter(processing_step.ProcessStep):
def __init__(self, *status, next_step=None):
super().__init__(next_step=next_step)
self._status = status
async def filter(self, result_and_status):
if result_and_status.status in self._status:
await self._next_step(result_and_status.test_result)