blob: 569730bc641cdb08343f20034e7bad275f60a666 [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module defines classes and/or other utils for uploading to GS."""
from __future__ import print_function
import asyncio
import logging # pylint: disable=cros-logging-import
import os
from concurrent import futures
from moblab_common import moblab_info
from moblab_common import pubsub_client
from moblab_common import (
result_upload_status_connector as upload_status_connector,
)
import processing_step
import result_dir_status
_LOGGER = logging.getLogger("moblab_uploader")
_SERIAL_NUMBER = moblab_info.get_serial_number()
_MOBLAB_ID = moblab_info.get_or_create_id()
_GS_BUCKET_ROOT = "results"
class _TestResultExpiredError(Exception):
"""An error raised when a dir is expired before uploading."""
class Uploader(processing_step.ProcessStep):
"""A class to upload a directory to GS in parallel."""
# We use this queue to allow parallel files uploading.
def __init__(
self, *, timeout, gs_api, next_step=None, notify_uploaded=None
):
"""Constructor.
Args:
timeout: The timeout for uploading each directory in type of
datetime.timedelta.
gs_api: An object of gs_api_wrapper.GsApiWrapper which wraps Google
cloud storage API.
next_step: A coroutine will be awaited as the next step in the
whole pipeline.
notify_uploaded: Callback to notify that a batch of files has been
successfully uploaded.
"""
super().__init__(next_step=next_step)
self._gs_api = gs_api
self._timeout = timeout
self._gs_path_prefix = os.path.join(
_GS_BUCKET_ROOT, _SERIAL_NUMBER, _MOBLAB_ID
)
self._uploading_context = None
self.uploading_queue = asyncio.Queue(maxsize=0)
self._notify_uploaded = notify_uploaded
def uploading_context(self, ctx):
self._uploading_context = ctx
uploading_context = property(None, uploading_context)
async def try_upload_dir(self, test_result):
try:
async with self._uploading_context(test_result) as ctx:
try:
ctx.gs_path = await self._try_upload_dir(test_result)
if test_result.is_logged:
_LOGGER.debug(
"%s Try upload dir success %s",
test_result,
ctx.gs_path,
)
except Exception:
ctx.gs_path = None
_LOGGER.exception("Try upload dir failed")
except _TestResultExpiredError as err:
_LOGGER.info(err)
async def _try_upload_dir(self, test_result):
"""Try to upload a directory with timeout checking."""
if test_result.is_logged:
_LOGGER.info("Try to upload dir %s", test_result)
# The GS path of uploaded test result.
test_result_gs_path = ""
try:
await self._next_step(test_result)
await asyncio.wait_for(
self.uploading_queue.join(), self._timeout.total_seconds()
)
if not self._uploading_context.error:
test_result_gs_path = os.path.join(
self._gs_path_prefix, test_result.name
)
except (asyncio.TimeoutError, KeyboardInterrupt) as err:
_LOGGER.error(
"Error occurs when uploading %s: %s", test_result, type(err)
)
self._uploading_context.error = str(err)
if err is KeyboardInterrupt:
raise err
return test_result_gs_path
async def upload_files(self, jobs=1, dry_run=False):
"""Upload files in the queue to GS in background."""
loop = asyncio.get_event_loop()
executor = futures.ThreadPoolExecutor(max_workers=jobs)
awaitables = []
result_filenames = []
while True:
f = await self.uploading_queue.get()
if dry_run:
_LOGGER.debug("DRYRUN: Uploading %s", f)
else:
awaitables.append(
loop.run_in_executor(executor, self._upload_file, f)
)
result_filenames.append(f)
if self._uploading_context._test_result.is_logged:
_LOGGER.debug(
"Uploading %s, awaitables %d", f, len(awaitables)
)
if self.uploading_queue.empty() or len(awaitables) >= jobs:
if self._uploading_context._test_result.is_logged:
_LOGGER.debug(
"%s Gathering Queue len %d, awaitables %d",
self._uploading_context._test_result,
self.uploading_queue.qsize(),
len(awaitables),
)
try:
results = await asyncio.gather(
*awaitables, return_exceptions=True
)
_LOGGER.debug(
"%s Gathering Results %s",
self._uploading_context._test_result,
results,
)
for result in results:
# If we got an error uploading a single file give up.
if result:
# One file returned error mark the upload as bad
self._uploading_context.error = result
result_filenames.clear()
# The only way to end the queue join is to empty
# the queue.
_LOGGER.debug(
"%s Error found abandoning upload. Error %s",
self._uploading_context._test_result,
result,
)
for _ in range(self.uploading_queue.qsize()):
# Depending on your program, you may want to
# catch QueueEmpty
try:
self.uploading_queue.get_nowait()
self.uploading_queue.task_done()
except asyncio.QueueEmpty:
# This should be impossible to happen, we
# have only one loop removing items from
# the queue.
_LOGGER.error(
"Error trying to remove "
"item from empty queue."
)
pass
self.uploading_queue.task_done()
awaitables = []
if self._notify_uploaded:
self._notify_uploaded(result_filenames)
result_filenames.clear()
except Exception as e:
_LOGGER.debug(
"%s Exception thrown %s",
self._uploading_context._test_result,
str(e),
)
def _upload_file(self, result_file):
"""Upload a file to GS."""
remote_name = os.path.join(
self._gs_path_prefix,
result_file.test_result.name,
result_file.relative_path,
)
self._gs_api.upload_from_filename(
local_name=result_file.abs_path, remote_name=remote_name
)
class UploadingContext(object):
"""Docstring for UploadingContext. """
def __init__(self, *, gs_bucket, dry_run, resume_aborted=False):
self._test_result = None
self._gs_bucket = gs_bucket
self.gs_path = None
self._dry_run = dry_run
self._resume_aborted = resume_aborted
self.error = None
self._result_upload_status_store = (
upload_status_connector.ResultUploadStatusConnector()
)
def __call__(self, test_result):
self._test_result = test_result
return self
@property
def uploaded(self):
return bool(self.gs_path)
async def __aenter__(self):
self.error = None
if self._resume_aborted:
return self
if self._dry_run:
return self
if not await self._set_upload_status_started():
raise _TestResultExpiredError(
"%s is expired. Skip the uploading.", self._test_result
)
return self
async def __aexit__(self, exc_type, exc, tb):
if not self.uploaded or self.error:
_LOGGER.debug(
"%s Directory upload timed out,"
" or failed uploaded=%s error=%s",
self._test_result,
self.uploaded,
self.error is not None,
)
error_message = str(self.error) if self.error else "Unknown error"
await self._set_upload_status_complete(error_message)
return
if self._dry_run:
if self._test_result.pubsub_when_uploaded:
_LOGGER.debug(
"DRYRUN: publishing message: %s",
os.path.join("gs://", self._gs_bucket, self.gs_path),
)
return
if not self._test_result.pubsub_when_uploaded:
if self._test_result.is_logged:
_LOGGER.debug("%s Pubsub not requested", self._test_result)
error_message = None
if self.error:
error_message = str(self.error)
elif not self.uploaded:
error_message = "Unknown error"
await self._set_upload_status_complete(error_message)
return
loop = asyncio.get_event_loop()
console_client = pubsub_client.PubSubBasedClient()
pubsub_uri = os.path.join("gs://", self._gs_bucket, self.gs_path)
_LOGGER.debug(
"Sending pubsub %s serialno %s moblab_id %s",
pubsub_uri,
_SERIAL_NUMBER,
_MOBLAB_ID,
)
posted = await loop.run_in_executor(
None,
console_client.send_test_job_offloaded_message,
pubsub_uri,
_SERIAL_NUMBER,
_MOBLAB_ID,
)
await self._set_upload_status_complete(
None if posted else "Failed to send message"
)
async def _set_upload_status_complete(self, error_message):
await result_dir_status.mark_uploading_result(
self._test_result, succeeded=error_message is None
)
status = upload_status_connector.Status.UPLOADED
if error_message:
status = upload_status_connector.Status.UPLOAD_FAILED
if self._test_result.job_id:
self._result_upload_status_store.set_job_upload_status(
self._test_result.id,
status,
error_message or "",
)
async def _set_upload_status_started(self):
if not await result_dir_status.mark_uploading_started(
self._test_result
):
return False
if self._test_result.job_id:
self._result_upload_status_store.set_job_upload_status(
self._test_result.id, upload_status_connector.Status.UPLOADING
)
return True