blob: 345c2335be1feee44d728cc69ebec8348cdb4bb3 [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The implementation of subcommand `run`."""
import argparse
import asyncio
import datetime
import functools
import logging # pylint: disable=cros-logging-import
import os
import shutil
import stat
import yaml
from uploader_configuration import UploaderConfiguration
from moblab_common import afe_connector
from moblab_common import config_connector
from moblab_common import moblabrpc_connector
from moblab_common.moblab_configuration_connector import (
MoblabConfigurationRpcConnector,
)
from moblab_common import result_upload_status_connector
from moblab_common.result_upload_status_connector import Status as UploadStatus
from moblab_common.proto import moblabrpc_pb2
import gs_api_wrapper
import processing_step
import result_dir_status
import result_file_filter
import result_uploader
import test_result_registry as test_result_registry_module
SUB_COMMAND_NAME = "run"
_LOGGER = logging.getLogger("moblab_uploader")
_DEFAULT_UPLOAD_TIMEOUT = datetime.timedelta(days=3)
_DEFAULT_MIN_AGE_TO_UPLOAD = datetime.timedelta(days=0)
_RUN_INTERVAL = datetime.timedelta(minutes=1)
# We cache all live test result objects in a dictionary. The key is the abs
# path of it. We only remove them from the cache when they are removed from
# disk.
_TEST_RESULT_CACHE = {}
async def _enumerate_test_results(test_result_registry):
"""Enumerate and yield all test results registered."""
for registry_entry in test_result_registry:
for cls in registry_entry.classes:
root_dir = registry_entry.root_dir
for d in root_dir.glob(cls.GLOB_PATTERN):
test_result = cls.create(
root_dir=root_dir, relative_path=d.relative_to(root_dir)
)
if test_result:
# Give other coroutine chance to run.
await asyncio.sleep(0.01)
yield _TEST_RESULT_CACHE.setdefault(
test_result.abs_path, test_result
)
async def _prune_test_result(dry_run, test_result):
"""Check the `test_result` and try to prune it if we should."""
# In the results directory remove all subdirectores before deleting the
# root directory as that is where the marker file is assumed to be.
# Leaving the marker file to as late as possible in the process ensures
# that cleanup will resume in the event of a process crash.
try:
_LOGGER.debug("Removing test result %s", test_result)
if dry_run:
_LOGGER.debug("DRYRUN: Removed: %s", test_result)
def log_error(func, path, _):
"Log prune failures, catching them preventing them being raised."
# Even the chmod failed to fix the issue, just log and leave
# the problem behind, nothing we can do.
_LOGGER.error("PRUNE: Failed to remove file %s", path)
def remove_readonly(func, path, _):
"Clear the readonly bit and re-attempt the removal"
_LOGGER.error("PRUNE: Fixing read only %s", path)
try:
os.chmod(path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
# If the process is not the owner of the file then just give up
# with out crashing.
except PermissionError as e:
_LOGGER.exception(e)
else:
# Successful chmod on the problem file, retry the delete.
func(path, onerror=log_error)
# To be safe - delete all the subdirs first, then the whole directory
# which will leave the marker file to last.
# example structure
# 1-moblab/
# .uploader_status
# 192.168.238.100/
# status.log
# debug/
_, topdirs, _ = next(os.walk(test_result.abs_path), [])
# In the example this will delete directory 192.168.23.100 and all
# the subdirs, it will also delete any sibling directores - there are
# none in this example.
for topdir in topdirs:
shutil.rmtree(
os.path.join(test_result.abs_path, topdir),
onerror=remove_readonly,
)
# This will remove 1-moblab and all its subdirs
shutil.rmtree(test_result.abs_path, onerror=remove_readonly)
del _TEST_RESULT_CACHE[test_result.abs_path]
# Update upload-status after result's deletion
_result_upload_status_store = (
result_upload_status_connector.ResultUploadStatusConnector()
)
_result_upload_status_store.set_job_upload_status(
test_result.id, result_upload_status_connector.Status.DELETED
)
_LOGGER.debug("Removed: %s", test_result)
except Exception as e:
_LOGGER.exception(e)
async def _async_prune(args, registry, configuration):
"""Scan and prune all EXPIRED test results."""
status_checker = result_dir_status.ResultStatusChecker(
min_age_to_upload=args.min_age_to_upload,
configuration=configuration,
)
status_filter = result_dir_status.StatusFilter(
result_dir_status.Status.EXPIRED
)
# Steps needed to prune a result directory:
# check status -> for each EXPIRED directories -> prune it.
status_checker.next_step = status_filter.filter
status_filter.next_step = functools.partial(
_prune_test_result, args.dry_run
)
while True:
async for test_result in _enumerate_test_results(registry):
await status_checker.check(test_result)
if args.once:
break
await asyncio.sleep(_RUN_INTERVAL.total_seconds())
async def _resume_aborted_uploading(args, registry, status_checker, uploader):
"""Resume the uploading of last run.
This script indented to be run as a daemon. When the daemon interrupted for
any reason, it aborts the current uploading job and leave it in UPLOADING
status. When restart the daemon, we resume it and eventually change the
state to allow future processing (pruning, or re-uploading).
This is only for one time before we start the main loop.
"""
# We hook up a status filter which only interests in directories UPLOADING
# state, and an uploading context which doesn't change directory status
# before uploading.
uploader.uploading_context = result_uploader.UploadingContext(
gs_bucket=args.gs_api.bucket_name,
dry_run=args.dry_run,
resume_aborted=True,
)
status_filter = result_dir_status.StatusFilter(
result_dir_status.Status.UPLOADING
)
status_checker.next_step = status_filter.filter
status_filter.next_step = uploader.try_upload_dir
_LOGGER.info("First scan for aborted uploading jobs.")
async for test_result in _enumerate_test_results(registry):
await status_checker.get_last_status(test_result)
async def _run_upload_status_backfill_migration(registry, status_checker):
# 0. Get all jobs with status;
jobs = moblabrpc_connector.MoblabRpcConnector.get_jobs(
query_start=0, query_limit=10000
)
# 1. If exist at least one record skip migration;
_LOGGER.info("Checking whether job upload status backfill is needed.")
if not jobs or any(
job.upload_state.status
!= moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_UNKNOWN
for job in jobs
):
return
_LOGGER.info("Migrating jobs upload stutuses to moblab db.")
# 2. Initialize upload status based on completed/not completed job status;
jobs_upload_status = {
job.job_id: UploadStatus.UPLOADED
if _is_job_finished(job.status)
else UploadStatus.NOT_READY
for job in jobs
}
# 3. Iterate over existing dirs
# - Update jobs based on status;
async for test_result in _enumerate_test_results(registry):
if (
test_result.id in jobs_upload_status
and jobs_upload_status[test_result.id] == UploadStatus.UPLOADED
):
upload_file_status = await result_dir_status._load_status(
test_result
)
if upload_file_status == result_dir_status.Status.NOT_READY:
jobs_upload_status[test_result.id] = UploadStatus.NOT_READY
elif upload_file_status == result_dir_status.Status.READY:
jobs_upload_status[test_result.id] = UploadStatus.QUEUED
elif upload_file_status == result_dir_status.Status.UPLOADING:
jobs_upload_status[test_result.id] = UploadStatus.UPLOADING
elif upload_file_status == result_dir_status.Status.UPLOADED:
jobs_upload_status[test_result.id] = UploadStatus.UPLOADED
elif upload_file_status == result_dir_status.Status.EXPIRED:
jobs_upload_status[test_result.id] = UploadStatus.DELETED
# 4. Save back all the upload statuses;
upload_status_store = (
result_upload_status_connector.ResultUploadStatusConnector()
)
for job_id in jobs_upload_status:
upload_status_store.set_job_upload_status(
job_id, jobs_upload_status[job_id]
)
def _is_job_finished(job_status):
# COMPLETE = 5
# FAILED = 6
# ABORTED = 7
return job_status in {5, 6, 8}
async def _async_upload(args, registry, configuration):
"""Scan and upload all 'READY' test results."""
status_checker = result_dir_status.ResultStatusChecker(
min_age_to_upload=args.min_age_to_upload,
configuration=configuration,
)
uploader = result_uploader.Uploader(
timeout=datetime.timedelta(minutes=60),
gs_api=args.gs_api,
notify_uploaded=result_file_filter.mark_files_uploaded,
)
file_filter = result_file_filter.ResultFileFilter(
filter_config=args.uploading_filter_config
)
uploader.next_step = file_filter.filter_files
file_filter.next_step = uploader.uploading_queue.put
consumer = asyncio.ensure_future(
uploader.upload_files(args.jobs, args.dry_run)
)
await _run_upload_status_backfill_migration(registry, status_checker)
# Finish unfinished jobs of last run.
await _resume_aborted_uploading(args, registry, status_checker, uploader)
# Then start the main loop.
# We hook up a new status filter which only interests in directories in
# READY state, and a new uploading context which changes directory state to
# UPLOADING before uploading.
status_filter = result_dir_status.StatusFilter(
result_dir_status.Status.READY
)
status_checker.next_step = status_filter.filter
status_filter.next_step = uploader.try_upload_dir
uploader.uploading_context = result_uploader.UploadingContext(
gs_bucket=args.gs_api.bucket_name,
dry_run=args.dry_run,
resume_aborted=False,
)
_LOGGER.info("Main uploading scan loop started.")
while True:
async for test_result in _enumerate_test_results(registry):
await status_checker.check(test_result)
if args.once:
break
await asyncio.sleep(_RUN_INTERVAL.total_seconds())
consumer.cancel()
def _accessible_file_type(file_path):
"""Check if input is an accessible file."""
if not file_path:
raise argparse.ArgumentTypeError("No value specified.")
try:
with open(file_path):
pass
return file_path
except (FileNotFoundError, PermissionError) as err:
raise argparse.ArgumentTypeError(err)
def add_arguments(parent_parser):
"""Add arguments for subcommand 'run'.
Args:
parent_parser: The argparse.ArgumentParser object to add arguments.
"""
parser = parent_parser.add_parser(
SUB_COMMAND_NAME,
help="Scan, upload and prune the test result directories.",
)
parser.add_argument(
"--once", action="store_true", help="Just run once and exit."
)
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
help="Dry run mode: just print out directories processed. Only valid "
"when specified with option --once.",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
default=1,
help="Allow N jobs run simultaneously to work.",
)
parser.add_argument(
"--gs-bucket",
default=config_connector.MoblabConfigConnector(
afe_connector.AFEConnector()
).get_cloud_bucket(),
help='The Google Storage Bucket to upload (without "gs://" prefix). '
"Default is reading from Autotest config file, i.e. %(default)s",
)
parser.add_argument(
"--credentials-file",
default=os.environ.get(gs_api_wrapper.ENV_GOOGLE_APP_CREDENTIALS, ""),
type=_accessible_file_type,
help="Google cloud application credentials file. Default is the value "
'of environment variable {}, i.e. "%(default)s".'.format(
gs_api_wrapper.ENV_GOOGLE_APP_CREDENTIALS
),
)
parser.add_argument(
"--timeout-days",
type=lambda x: datetime.timedelta(days=float(x)),
default=_DEFAULT_UPLOAD_TIMEOUT,
dest="upload_timeout",
help="The timeout time in days (can be a float number) to give up the "
"uploading (default: %(default)s).",
)
parser.add_argument(
"--min-age-to-upload",
type=lambda x: datetime.timedelta(days=float(x)),
default=_DEFAULT_MIN_AGE_TO_UPLOAD,
help="Minimum job age in days (can be a float number) before a result "
"can be uploaded, but not removed from local storage "
"(default: %(default)s).",
)
parser.set_defaults(
func=run, post_parse_args=functools.partial(_post_parse_args, parser)
)
def _post_parse_args(arg_parser, args):
"""massage input arguments."""
_LOGGER.debug("Input arguments: %s", args)
if not args.gs_bucket:
gs_bucket = config_connector.MoblabConfigConnector(
afe_connector.AFEConnector()
).get_cloud_bucket()
if not gs_bucket:
raise Exception("No GS bucket specified or configured.")
else:
gs_bucket = args.gs_bucket
try:
args.gs_api = gs_api_wrapper.GsApiWrapper(
args.gs_bucket, args.credentials_file
)
except gs_api_wrapper.GsAccessError as err:
arg_parser.error(str(err))
delattr(args, "credentials_file")
delattr(args, "gs_bucket")
if (
args.dry_run
): # It makes no sense to dry run a daemon, so just run once.
args.once = True
async def _async_config_poll(args, configuration: UploaderConfiguration):
"""Polls the config changes and updates the configuration object."""
previous_value = -1
while True:
results_retention_time = (
MoblabConfigurationRpcConnector.get_local_results_retention_duration()
)
if results_retention_time and previous_value != results_retention_time:
timeperiod = datetime.timedelta(seconds=results_retention_time)
configuration.set_min_age_to_prune(timeperiod)
_LOGGER.info(
"Results retention period set to: %s",
timeperiod,
)
previous_value = results_retention_time
await asyncio.sleep(_RUN_INTERVAL.total_seconds())
def run(args, test_result_registry=None):
"""Main logic to scan, upload and prune the test results.
Args:
args: The command line arguments specified.
test_result_registry: A list of TestResultRegisteryEntry which has
needed test result directory information.
"""
_LOGGER.info("Starting to scan, upload and prune test results.")
_LOGGER.setLevel(logging.DEBUG)
registry = test_result_registry or test_result_registry_module.REGISTRY
loop = asyncio.get_event_loop()
configuration = UploaderConfiguration()
try:
loop.run_until_complete(
asyncio.gather(
_async_config_poll(args, configuration),
_async_upload(args, registry, configuration),
_async_prune(args, registry, configuration),
)
)
except KeyboardInterrupt:
_LOGGER.info("Program is terminated by user.")
pass
finally:
_LOGGER.info("Shutting down. Bye!")
loop.close()