blob: a79402c27a8d9b7657ed1fe07685075cd13fccb1 [file] [log] [blame] [edit]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# TODO(guocb) Move this wrapper to moblab_common and integreated with
# moblab_common/versioned_upload.py
"""A wrapper of Google Cloud Storage API."""
import asyncio
import gzip
import logging # pylint: disable=cros-logging-import
import os
import random
import requests
import shutil
import tempfile
import time
import urllib3
from google.cloud import exceptions as gcp_exceptions
from google.cloud import storage as gs
from retry.api import retry_call
ENV_GOOGLE_APP_CREDENTIALS = "GOOGLE_APPLICATION_CREDENTIALS"
_LOGGER = logging.getLogger("moblab_uploader")
"""
Defines what GsApiWrapper will compress before uploading, and what it will not
compress before uploading.
Args:
path: a pathlib.Path object representing the file targeted for upload.
Returns:
Boolean that is true if the file will not be gzipped before upload
"""
def do_not_gzip(path):
return path.suffix.endswith("gz") or path.suffix.endswith("zip")
class GsAccessError(Exception):
"""Exceptions raised when access the GS bucket."""
def _get_gs_bucket(gs_bucket, credentials_file):
"""Get GS bucket object with permission checking.
Args:
gs_bucket: The Google Storage bucket name, without 'gs://' prefix.
credentials_file: The path of the credentials file.
Returns:
A google.cloud.bucket object.
Raises:
GsAccessError when we can't access the bucket or don't have enough
permission.
"""
_LOGGER.info("Verify the access to Google Storage.")
if not credentials_file:
raise GsAccessError("No credential file specified.")
os.environ[ENV_GOOGLE_APP_CREDENTIALS] = credentials_file
client = gs.Client()
try:
bucket = client.get_bucket(gs_bucket)
except (gcp_exceptions.Forbidden, gcp_exceptions.NotFound) as err:
raise GsAccessError(str(err))
# Test the write permission.
if bucket.test_iam_permissions(["storage.objects.create"]):
return bucket
raise GsAccessError(
"You don't have write permission to the bucket:{}".format(gs_bucket)
)
class GsApiWrapper(object):
"""A wrapper of Google Cloud Storage API."""
def __init__(self, gs_bucket, credentials_file):
"""Constructor.
Args:
gs_bucket: The Google Storage bucket name, without 'gs://' prefix.
credentials_file: The path of the credentials file.
"""
self._bucket_name = gs_bucket
self._credentials_file = credentials_file
self._gs_bucket = _get_gs_bucket(
self._bucket_name, self._credentials_file
)
self._client = gs.Client()
def _gzip_file(self, local_name):
zipped_file = tempfile.mktemp()
with open(local_name, "rb") as f_in:
with gzip.open(zipped_file, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
return zipped_file
"""
Uploads 'local_name' file to the GCS bucket that GsApiWrapper has been
initialized to.
Args:
local_name: a pathlib.Path object representing the file targeted for upload.
remote_name: a string representing the intended name of the file once
uploaded.
"""
def upload_from_filename(self, *, local_name, remote_name):
"""Upload the file `local_name` to GS as a blob named `remote_name`."""
# We just upload regular files.
if not local_name.is_file():
_LOGGER.debug("Not attempting upload - not a file %s", local_name)
return
if local_name.is_symlink():
_LOGGER.debug("Not attempting upload - symlink %s", local_name)
return
blob = self._gs_bucket.blob(remote_name, chunk_size=(1024 * 1024 * 100))
if do_not_gzip(local_name):
target_file_path = local_name
else:
temp_zipped_file_path = self._gzip_file(local_name)
target_file_path = temp_zipped_file_path
blob.content_encoding = "gzip"
try:
retry_call(
blob.upload_from_filename,
fargs=[str(target_file_path)],
fkwargs={"client": self._client},
exceptions=(
requests.exceptions.ConnectionError,
urllib3.exceptions.ProtocolError,
),
tries=20,
delay=1,
backoff=1.25,
)
finally:
try:
os.remove(temp_zipped_file_path)
except UnboundLocalError:
# If remove operation fails with unbound error, zip file was never created, pass.
pass
@property
def bucket_name(self):
return self._bucket_name