Instalog: Merge related code between output_archive and output_file
BUG=b:119449789
TEST=make test
Change-Id: I567db218b0a3cd3ec30ffee7a7d05d4100f9ca47
Reviewed-on: https://chromium-review.googlesource.com/1343635
Commit-Ready: Chun-Tsen Kuo <chuntsen@chromium.org>
Tested-by: Chun-Tsen Kuo <chuntsen@chromium.org>
Reviewed-by: Ting Shen <phoenixshen@chromium.org>
diff --git a/py/instalog/plugins/output_archive.py b/py/instalog/plugins/output_archive.py
index dacef2d..6cab88b 100755
--- a/py/instalog/plugins/output_archive.py
+++ b/py/instalog/plugins/output_archive.py
@@ -15,11 +15,10 @@
InstalogEvents_YYYYmmddHHMMSS.tar.gz
InstalogEvents_YYYYmmddHHMMSS/
events.json
- attachments/ # Will not have this dir if no attachment.
- 000_${EVENT_0_ATTACHMENT_0_NAME}
- 000_${EVENT_0_ATTACHMENT_1_NAME}
- 001_${EVENT_1_ATTACHMENT_0_NAME}
- 001_${EVENT_1_ATTACHMENT_1_NAME}
+ attachments/
+ ${ATTACHMENT_0_HASH}
+ ${ATTACHMENT_1_HASH}
+ ${ATTACHMENT_2_HASH}
...
"""
@@ -32,50 +31,40 @@
import instalog_common # pylint: disable=unused-import
from instalog import plugin_base
+from instalog.plugins import output_file
+from instalog.utils import arg_utils
from instalog.utils.arg_utils import Arg
from instalog.utils import file_utils
-from instalog.utils import time_utils
-_ARCHIVE_MESSAGE_INTERVAL = 60 # 60sec
-_DEFAULT_INTERVAL = 1 * 60 * 60 # 1hr
-_DEFAULT_MAX_SIZE = 200 * 1024 * 1024 # 200mb
+class OutputArchive(output_file.OutputFile):
-
-class OutputArchive(plugin_base.OutputPlugin):
-
- ARGS = [
- Arg('interval', (int, float),
- 'How long to wait, in seconds, before creating the next archive.',
- default=_DEFAULT_INTERVAL),
- Arg('max_size', int,
- 'If the total_size bigger than max_size, archive these events.',
- default=_DEFAULT_MAX_SIZE),
- Arg('enable_disk', bool,
- 'Whether or not to save the archive to disk. True by default.',
- default=True),
- Arg('target_dir', (str, unicode),
- 'The directory in which to store archives. Uses the plugin\'s '
- 'data directory by default.',
- default=None),
- Arg('enable_gcs', bool,
- 'Whether or not to upload the archive to Google Cloud Storage. '
- 'False by default.',
- default=False),
- Arg('key_path', (str, unicode),
- 'Path to Cloud Storage service account JSON key file.',
- default=None),
- Arg('gcs_target_dir', (str, unicode),
- 'Path to the target bucket and directory on Google Cloud Storage.',
- default=None),
- ]
+ ARGS = arg_utils.MergeArgs(
+ output_file.OutputFile.ARGS,
+ [
+ Arg('enable_disk', bool,
+ 'Whether or not to save the archive to disk. True by default.',
+ default=True),
+ Arg('enable_gcs', bool,
+ 'Whether or not to upload the archive to Google Cloud Storage. '
+ 'False by default.',
+ default=False),
+ Arg('key_path', (str, unicode),
+ 'Path to Cloud Storage service account JSON key file.',
+ default=None),
+ Arg('gcs_target_dir', (str, unicode),
+ 'Path to the target bucket and directory on Google Cloud '
+ 'Storage.',
+ default=None),
+ ])
def __init__(self, *args, **kwargs):
- self._gcs = None
super(OutputArchive, self).__init__(*args, **kwargs)
+ self._gcs = None
def SetUp(self):
"""Sets up the plugin."""
+ super(OutputArchive, self).SetUp()
if not self.args.enable_disk and not self.args.enable_gcs:
raise ValueError('Please enable at least one of "enable_disk" or '
'"enable_gcs"')
@@ -83,11 +72,6 @@
if not self.args.enable_disk and self.args.target_dir:
raise ValueError('If specifying a "target_dir", "enable_disk" must '
'be set to True')
- # If saving to disk, ensure that the target_dir exists.
- if self.args.enable_disk and self.args.target_dir is None:
- self.args.target_dir = self.GetDataDir()
- if self.args.target_dir and not os.path.isdir(self.args.target_dir):
- os.makedirs(self.args.target_dir)
if not self.args.enable_gcs:
if self.args.key_path or self.args.gcs_target_dir:
@@ -101,100 +85,29 @@
from instalog.utils import gcs_utils
self._gcs = gcs_utils.CloudStorage(self.args.key_path)
- def Main(self):
- """Main thread of the plugin."""
- while not self.IsStopping():
- if not self.PrepareAndArchive():
- self.Sleep(1)
+ def ProcessEvents(self, base_dir):
+ # Create the archive.
+ cur_time = datetime.datetime.now()
+ archive_name = cur_time.strftime('InstalogEvents_%Y%m%d%H%M%S')
+ archive_filename = '%s.tar.gz' % archive_name
+ with file_utils.UnopenedTemporaryFile(
+ prefix='instalog_archive_', suffix='.tar.gz') as tmp_archive:
+ self.info('Creating temporary archive file: %s', tmp_archive)
+ with tarfile.open(tmp_archive, 'w:gz') as tar:
+ tar.add(base_dir, arcname=archive_name)
- def ProcessEvent(self, event_id, event, base_dir):
- """Copies an event's attachments and returns its serialized form."""
- for att_id, att_path in event.attachments.iteritems():
- if os.path.isfile(att_path):
- att_name = os.path.basename(att_path)
- att_newpath = os.path.join('attachments',
- '%03d_%s' % (event_id, att_name))
- shutil.copyfile(att_path, os.path.join(base_dir, att_newpath))
- event.attachments[att_id] = att_newpath
- return event.Serialize()
-
- def GetEventAttachmentSize(self, event):
- """Returns the total size of given event's attachments."""
- total_size = 0
- for _unused_att_id, att_path in event.attachments.iteritems():
- if os.path.isfile(att_path):
- total_size += os.path.getsize(att_path)
- return total_size
-
- def PrepareAndArchive(self):
- """Retrieves events, and archives them."""
- event_stream = self.NewStream()
- if not event_stream:
- return False
-
- with file_utils.TempDirectory(prefix='instalog_archive_') as base_dir:
- self.info('Creating temporary directory: %s', base_dir)
- # Create the attachments directory.
- att_dir = os.path.join(base_dir, 'attachments')
- os.mkdir(att_dir)
-
- # In order to save memory, write directly to a temp file on disk.
- with open(os.path.join(base_dir, 'events.json'), 'w') as events_f:
- num_events = 0
- total_size = 0
- time_last = time_utils.MonotonicTime()
- for event in event_stream.iter(timeout=self.args.interval):
- serialized_event = self.ProcessEvent(num_events, event, base_dir)
- attachment_size = self.GetEventAttachmentSize(event)
- events_f.write(serialized_event + '\n')
-
- total_size += len(serialized_event) + attachment_size
- num_events += 1
- self.debug('num_events = %d', num_events)
-
- # Throttle our status messages.
- time_now = time_utils.MonotonicTime()
- if (time_now - time_last) >= _ARCHIVE_MESSAGE_INTERVAL:
- time_last = time_now
- self.info('Currently at %.2f%% of %.2fMB before archiving',
- 100.0 * total_size / self.args.max_size,
- self.args.max_size / 1024.0 / 1024)
- if total_size >= self.args.max_size:
- break
-
- if self.IsStopping():
- self.info('Plugin is stopping! Abort %d events', num_events)
- event_stream.Abort()
- return False
-
- # Create the archive.
- if num_events > 0:
- cur_time = datetime.datetime.now()
- archive_name = cur_time.strftime('InstalogEvents_%Y%m%d%H%M%S')
- archive_filename = '%s.tar.gz' % archive_name
- with file_utils.UnopenedTemporaryFile(
- prefix='instalog_archive_', suffix='.tar.gz') as tmp_archive:
- self.info('Creating temporary archive file: %s', tmp_archive)
- self.info('Archiving %d events in %s', num_events, archive_name)
- with tarfile.open(tmp_archive, 'w:gz') as tar:
- tar.add(base_dir, arcname=archive_name)
-
- # What should we do with the archive?
- if self.args.enable_gcs:
- gcs_target_dir = self.args.gcs_target_dir.strip('/')
- gcs_target_path = '/%s/%s' % (gcs_target_dir, archive_filename)
- if not self._gcs.UploadFile(
- tmp_archive, gcs_target_path, overwrite=True):
- self.error('Unable to upload to GCS, aborting')
- event_stream.Abort()
- return False
- if self.args.enable_disk:
- target_path = os.path.join(self.args.target_dir, archive_filename)
- self.info('Saving archive to: %s', target_path)
- shutil.move(tmp_archive, target_path)
-
- self.info('Commit %d events', num_events)
- event_stream.Commit()
+ # What should we do with the archive?
+ if self.args.enable_gcs:
+ gcs_target_dir = self.args.gcs_target_dir.strip('/')
+ gcs_target_path = '/%s/%s' % (gcs_target_dir, archive_filename)
+ if not self._gcs.UploadFile(
+ tmp_archive, gcs_target_path, overwrite=True):
+ self.error('Unable to upload to GCS, aborting')
+ return False
+ if self.args.enable_disk:
+ target_path = os.path.join(self.target_dir, archive_filename)
+ self.info('Saving archive to: %s', target_path)
+ shutil.move(tmp_archive, target_path)
return True
diff --git a/py/instalog/plugins/output_archive_unittest.py b/py/instalog/plugins/output_archive_unittest.py
index 2fa2741..f89bf73 100755
--- a/py/instalog/plugins/output_archive_unittest.py
+++ b/py/instalog/plugins/output_archive_unittest.py
@@ -13,7 +13,9 @@
import logging
import os
import resource
+import shutil
import tarfile
+import tempfile
import time
import unittest
@@ -24,7 +26,6 @@
from instalog import log_utils
from instalog import plugin_sandbox
from instalog import testing
-from instalog.utils import file_utils
class TestOutputArchive(unittest.TestCase):
@@ -32,10 +33,12 @@
def setUp(self):
self.core = testing.MockCore()
self.stream = self.core.GetStream(0)
+ self.tmp_dir = tempfile.mkdtemp(prefix='output_archive_unittest_')
self.event = datatypes.Event({'plugin': 'archive'})
def tearDown(self):
self.core.Close()
+ shutil.rmtree(self.tmp_dir)
def _GetMemoryUsage(self):
"""Returns current process's memory usage in bytes."""
@@ -44,75 +47,73 @@
def testMemoryUsage(self):
big_event = datatypes.Event({'1mb': 'x' * 1024 * 1024})
event_size = len(big_event.Serialize())
- with file_utils.TempDirectory(prefix='test_output_archive') as data_dir:
- config = {
- 'interval': 1000, # arbitrary long time
- 'max_size': 1024 * 1024 * 1024, # arbitrary large value
- }
- sandbox = plugin_sandbox.PluginSandbox(
- 'output_archive', config=config,
- data_dir=data_dir, core_api=self.core)
- sandbox.Start(True)
+ config = {
+ 'interval': 1000, # arbitrary long time
+ 'threshold_size': 1024 * 1024 * 1024, # arbitrary large value
+ }
+ sandbox = plugin_sandbox.PluginSandbox(
+ 'output_archive', config=config,
+ data_dir=self.tmp_dir, core_api=self.core)
+ sandbox.Start(True)
- mem_usage_start = self._GetMemoryUsage()
- logging.info('Initial memory usage: %d', mem_usage_start)
- # additional_memory = big_event(1mb) * 10 events * 20 iterations = ~200mb
- # maximum_memory = (original_memory + additional_memory) plus 10% padding
- mem_usage_max = (mem_usage_start + (event_size * 10 * 20)) * 1.1
- for _unused_i in xrange(20):
- events = [copy.deepcopy(big_event) for _unused_i in xrange(10)]
- self.stream.Queue(events)
+ mem_usage_start = self._GetMemoryUsage()
+ logging.info('Initial memory usage: %d', mem_usage_start)
+ # additional_memory = big_event(1mb) * 10 events * 20 iterations = ~200mb
+ # maximum_memory = (original_memory + additional_memory) plus 10% padding
+ mem_usage_max = (mem_usage_start + (event_size * 10 * 20)) * 1.1
+ for _unused_i in xrange(20):
+ events = [copy.deepcopy(big_event) for _unused_j in xrange(10)]
+ self.stream.Queue(events)
- sandbox.Flush(1, False) # trigger archive creation
- while not self.stream.Empty():
- mem_usage = self._GetMemoryUsage()
- logging.info('Current memory usage: %d/%d', mem_usage, mem_usage_max)
- if mem_usage >= mem_usage_max:
- # The test has failed, but we need to interrupt the archive plugin
- # and get it to stop as quickly as possible.
- # Stop new events from being accessed.
- del self.core.streams[0]
- # Force any open file handles shut so the plugin stops writing
- # to the archive on disk.
- proc = psutil.Process()
- for f in proc.get_open_files():
- os.close(f.fd)
- # Manually set the plugin state to STOPPING and advance into this
- # state.
- # pylint: disable=protected-access
- sandbox._state = plugin_sandbox.STOPPING
- sandbox.AdvanceState(True)
- # Once the plugin has really stopped, report our error.
- self.fail('Memory usage exceeded: %d/%d' % (mem_usage, mem_usage_max))
- time.sleep(0.1)
- # pylint: disable=protected-access
- sandbox._state = plugin_sandbox.STOPPING
- sandbox.AdvanceState(True)
+ sandbox.Flush(1, False) # trigger archive creation
+ while not self.stream.Empty():
+ mem_usage = self._GetMemoryUsage()
+ logging.info('Current memory usage: %d/%d', mem_usage, mem_usage_max)
+ if mem_usage >= mem_usage_max:
+ # The test has failed, but we need to interrupt the archive plugin
+ # and get it to stop as quickly as possible.
+ # Stop new events from being accessed.
+ del self.core.streams[0]
+ # Force any open file handles shut so the plugin stops writing
+ # to the archive on disk.
+ proc = psutil.Process()
+ for f in proc.get_open_files():
+ os.close(f.fd)
+ # Manually set the plugin state to STOPPING and advance into this
+ # state.
+ # pylint: disable=protected-access
+ sandbox._state = plugin_sandbox.STOPPING
+ sandbox.AdvanceState(True)
+ # Once the plugin has really stopped, report our error.
+ self.fail('Memory usage exceeded: %d/%d' % (mem_usage, mem_usage_max))
+ time.sleep(0.1)
+ # pylint: disable=protected-access
+ sandbox._state = plugin_sandbox.STOPPING
+ sandbox.AdvanceState(True)
def testOneEvent(self):
- with file_utils.TempDirectory(prefix='test_output_archive') as data_dir:
- config = {
- 'interval': 1}
- sandbox = plugin_sandbox.PluginSandbox(
- 'output_archive', config=config,
- data_dir=data_dir, core_api=self.core)
- sandbox.Start(True)
- # pylint: disable=protected-access
- plugin = sandbox._plugin
- self.stream.Queue([self.event])
- plugin.PrepareAndArchive()
- sandbox.Flush(2, True)
- sandbox.Stop()
+ config = {
+ 'interval': 1}
+ sandbox = plugin_sandbox.PluginSandbox(
+ 'output_archive', config=config,
+ data_dir=self.tmp_dir, core_api=self.core)
+ sandbox.Start(True)
+ # pylint: disable=protected-access
+ plugin = sandbox._plugin
+ self.stream.Queue([self.event])
+ plugin.PrepareAndProcess()
+ sandbox.Flush(2, True)
+ sandbox.Stop()
- # Inspect the disk archive.
- archive_path = glob.glob(os.path.join(data_dir, 'InstalogEvents*'))[0]
- with tarfile.open(archive_path, 'r:gz') as tar:
- events_member = [n for n in tar.getnames() if 'events.json' in n][0]
- events_file = tar.extractfile(events_member)
- lines = events_file.readlines()
- self.assertEqual(1, len(lines))
- event = datatypes.Event.Deserialize(lines[0])
- self.assertEqual(event, self.event)
+ # Inspect the disk archive.
+ archive_path = glob.glob(os.path.join(self.tmp_dir, 'InstalogEvents*'))[0]
+ with tarfile.open(archive_path, 'r:gz') as tar:
+ events_member = [n for n in tar.getnames() if 'events.json' in n][0]
+ events_file = tar.extractfile(events_member)
+ lines = events_file.readlines()
+ self.assertEqual(1, len(lines))
+ event = datatypes.Event.Deserialize(lines[0])
+ self.assertEqual(event, self.event)
if __name__ == '__main__':