blob: 2e24f51beee4dcf0eeab96e66fd9f74ff6bb31be [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Cloud Storage attachment upload output plugin."""
from __future__ import print_function
import os
# pylint: disable=import-error
from import storage
from google.oauth2 import service_account
import instalog_common # pylint: disable=unused-import
from instalog import plugin_base
from instalog.utils.arg_utils import Arg
from instalog.utils import file_utils
_CHUNK_SIZE = 4 * 1024 * 1024 # 4mb
class OutputCloudStorage(plugin_base.OutputPlugin):
ARGS = [
Arg('interval', (int, float),
'Frequency to re-emit events, if no attachments are encountered. '
'When attachments are encountered, events are re-emitted right '
'after upload.',
Arg('key_path', (str, unicode),
'Path to Cloud Storage service account JSON key file.'),
Arg('target_dir', (str, unicode),
'Path to the target bucket and directory on Google Cloud.'),
Arg('use_sha1', bool,
'Use the attachment\'s SHA1 hex-encoded hash as its filename. '
'Note that this means multiple attachments may point to the same '
'file on Cloud Storage. If set to False, the attachment ID will '
'be used as its filename.',
Arg('enable_emit', bool,
'Strip events of their attachments and re-emit.',
def __init__(self, *args, **kwargs):
self.client = None
self.bucket = None
self.bucket_id = None
self.dir_in_bucket = None
super(OutputCloudStorage, self).__init__(*args, **kwargs)
def SetUp(self):
"""Authenticates the connection to Cloud Storage."""
self.args.target_dir = self.args.target_dir.strip('/')
self.bucket_id, _unused_slash, self.dir_in_bucket = (
self.client = self.BuildClient()
self.bucket = self.BuildBucket()
def Main(self):
"""Main thread of the plugin."""
while not self.IsStopping():
if not self.ProcessNextBatch():
# TODO(kitching): Find a better way to block the plugin when we are in
# one of the PAUSING, PAUSED, or UNPAUSING states.
def BuildClient(self):
"""Builds a Storage client object."""
credentials = service_account.Credentials.from_service_account_file(
self.args.key_path, scopes=(_GCS_SCOPE,))
# Google Cloud Storage is depend on bucket instead of project, so we don't
# need to put project name to arguments. However, this client is general
# Google Cloud client, so the project can't be None; instead it can be an
# empty string.
return storage.Client(project='', credentials=credentials)
def BuildBucket(self):
"""Builds a Storage bucket object."""
bucket = storage.Bucket(self.client, self.bucket_id)
if not bucket.exists():
raise ValueError('Bucket %s doesn\'t exist! Please create it before you '
'run this plugin')
return bucket
def ProcessNextBatch(self):
"""Gets the next event with attachments and uploads it.
True if the next batch was successfully processed. False if there were no
events available for processing, or if an error occurred.
event_stream = self.NewStream()
if not event_stream:
return False
events = []
success = True
for event in event_stream.iter(timeout=self.args.interval):
if event.attachments:
self.debug('Will upload %d attachments from event',
except Exception:
self.exception('Exception encountered during upload, aborting')
success = False
# Re-emit events with their attachments removed.
if success and self.args.enable_emit:
if not self.Emit(events):
self.error('Unable to emit, aborting')
success = False
if success:
self.debug('Processed batch of %d events', len(events))
# Return False if failure occurred, or if no events were processed.
return success and bool(events)
def UploadEvent(self, event):
"""Uploads attachments of given event."""
for att_id, att_path in event.attachments.iteritems():
target_filename = (file_utils.SHA1InHex(att_path) if self.args.use_sha1
else att_id)
path_in_bucket = '%s/%s' % (self.dir_in_bucket, target_filename)
# Upload the file.'Uploading to GCS: /%s/%s', self.bucket_id, path_in_bucket)
self.UploadFile(att_path, path_in_bucket)
# Relocate the attachments entry into the event payload.
event.setdefault('__attachments__', {})[att_id] = 'gs://%s/%s' % (
self.bucket_id, path_in_bucket)
# Remove attachments from the event for re-emitting.
event.attachments = {}
def UploadFile(self, local_path, target_path):
"""Attempts to upload a file to GCS, with resumability.
local_path: Path to the file on local disk.
target_path: Target path in self.bucket.
Raises: if the upload response returns an
error status.
ValueError if the uploaded file on GCS doesn't exist or has different
local_md5 = file_utils.MD5InBase64(local_path)
local_size = os.path.getsize(local_path)
blob = storage.Blob(target_path, self.bucket, chunk_size=_CHUNK_SIZE)
if blob.exists():
if blob.md5_hash == local_md5 and blob.size == local_size:
self.warning('File already exists on remote end with same size (%d) '
'and same MD5 hash (%s); skipping',
blob.size, blob.md5_hash)
self.error('File already exists on remote end, but size or MD5 hash '
'doesn\'t match; size on remote %s = %d, size on local %s = '
'%d; will overwrite',
target_path, blob.size, local_path, local_size)
if not blob.exists():
raise ValueError('File doesn\'t exist after uploading')
if blob.md5_hash != local_md5 or blob.size != local_size:
raise ValueError('Size or MD5 mismatch after uploading; '
'local_size = %d, confirmed_size = %d; local_md5 = %s, '
'confirmed_md5 = %s' % (local_size, blob.size, local_md5,
if __name__ == '__main__':