Instalog: Merge related code between output_archive and output_file

BUG=b:119449789
TEST=make test

Change-Id: I567db218b0a3cd3ec30ffee7a7d05d4100f9ca47
Reviewed-on: https://chromium-review.googlesource.com/1343635
Commit-Ready: Chun-Tsen Kuo <chuntsen@chromium.org>
Tested-by: Chun-Tsen Kuo <chuntsen@chromium.org>
Reviewed-by: Ting Shen <phoenixshen@chromium.org>
diff --git a/py/instalog/plugins/output_archive.py b/py/instalog/plugins/output_archive.py
index dacef2d..6cab88b 100755
--- a/py/instalog/plugins/output_archive.py
+++ b/py/instalog/plugins/output_archive.py
@@ -15,11 +15,10 @@
   InstalogEvents_YYYYmmddHHMMSS.tar.gz
     InstalogEvents_YYYYmmddHHMMSS/
       events.json
-      attachments/  # Will not have this dir if no attachment.
-        000_${EVENT_0_ATTACHMENT_0_NAME}
-        000_${EVENT_0_ATTACHMENT_1_NAME}
-        001_${EVENT_1_ATTACHMENT_0_NAME}
-        001_${EVENT_1_ATTACHMENT_1_NAME}
+      attachments/
+        ${ATTACHMENT_0_HASH}
+        ${ATTACHMENT_1_HASH}
+        ${ATTACHMENT_2_HASH}
         ...
 """
 
@@ -32,50 +31,40 @@
 
 import instalog_common  # pylint: disable=unused-import
 from instalog import plugin_base
+from instalog.plugins import output_file
+from instalog.utils import arg_utils
 from instalog.utils.arg_utils import Arg
 from instalog.utils import file_utils
-from instalog.utils import time_utils
 
 
-_ARCHIVE_MESSAGE_INTERVAL = 60  # 60sec
-_DEFAULT_INTERVAL = 1 * 60 * 60  # 1hr
-_DEFAULT_MAX_SIZE = 200 * 1024 * 1024  # 200mb
+class OutputArchive(output_file.OutputFile):
 
-
-class OutputArchive(plugin_base.OutputPlugin):
-
-  ARGS = [
-      Arg('interval', (int, float),
-          'How long to wait, in seconds, before creating the next archive.',
-          default=_DEFAULT_INTERVAL),
-      Arg('max_size', int,
-          'If the total_size bigger than max_size, archive these events.',
-          default=_DEFAULT_MAX_SIZE),
-      Arg('enable_disk', bool,
-          'Whether or not to save the archive to disk.  True by default.',
-          default=True),
-      Arg('target_dir', (str, unicode),
-          'The directory in which to store archives.  Uses the plugin\'s '
-          'data directory by default.',
-          default=None),
-      Arg('enable_gcs', bool,
-          'Whether or not to upload the archive to Google Cloud Storage.  '
-          'False by default.',
-          default=False),
-      Arg('key_path', (str, unicode),
-          'Path to Cloud Storage service account JSON key file.',
-          default=None),
-      Arg('gcs_target_dir', (str, unicode),
-          'Path to the target bucket and directory on Google Cloud Storage.',
-          default=None),
-  ]
+  ARGS = arg_utils.MergeArgs(
+      output_file.OutputFile.ARGS,
+      [
+          Arg('enable_disk', bool,
+              'Whether or not to save the archive to disk.  True by default.',
+              default=True),
+          Arg('enable_gcs', bool,
+              'Whether or not to upload the archive to Google Cloud Storage.  '
+              'False by default.',
+              default=False),
+          Arg('key_path', (str, unicode),
+              'Path to Cloud Storage service account JSON key file.',
+              default=None),
+          Arg('gcs_target_dir', (str, unicode),
+              'Path to the target bucket and directory on Google Cloud '
+              'Storage.',
+              default=None),
+      ])
 
   def __init__(self, *args, **kwargs):
-    self._gcs = None
     super(OutputArchive, self).__init__(*args, **kwargs)
+    self._gcs = None
 
   def SetUp(self):
     """Sets up the plugin."""
+    super(OutputArchive, self).SetUp()
     if not self.args.enable_disk and not self.args.enable_gcs:
       raise ValueError('Please enable at least one of "enable_disk" or '
                        '"enable_gcs"')
@@ -83,11 +72,6 @@
     if not self.args.enable_disk and self.args.target_dir:
       raise ValueError('If specifying a "target_dir", "enable_disk" must '
                        'be set to True')
-    # If saving to disk, ensure that the target_dir exists.
-    if self.args.enable_disk and self.args.target_dir is None:
-      self.args.target_dir = self.GetDataDir()
-    if self.args.target_dir and not os.path.isdir(self.args.target_dir):
-      os.makedirs(self.args.target_dir)
 
     if not self.args.enable_gcs:
       if self.args.key_path or self.args.gcs_target_dir:
@@ -101,100 +85,29 @@
       from instalog.utils import gcs_utils
       self._gcs = gcs_utils.CloudStorage(self.args.key_path)
 
-  def Main(self):
-    """Main thread of the plugin."""
-    while not self.IsStopping():
-      if not self.PrepareAndArchive():
-        self.Sleep(1)
+  def ProcessEvents(self, base_dir):
+    # Create the archive.
+    cur_time = datetime.datetime.now()
+    archive_name = cur_time.strftime('InstalogEvents_%Y%m%d%H%M%S')
+    archive_filename = '%s.tar.gz' % archive_name
+    with file_utils.UnopenedTemporaryFile(
+        prefix='instalog_archive_', suffix='.tar.gz') as tmp_archive:
+      self.info('Creating temporary archive file: %s', tmp_archive)
+      with tarfile.open(tmp_archive, 'w:gz') as tar:
+        tar.add(base_dir, arcname=archive_name)
 
-  def ProcessEvent(self, event_id, event, base_dir):
-    """Copies an event's attachments and returns its serialized form."""
-    for att_id, att_path in event.attachments.iteritems():
-      if os.path.isfile(att_path):
-        att_name = os.path.basename(att_path)
-        att_newpath = os.path.join('attachments',
-                                   '%03d_%s' % (event_id, att_name))
-        shutil.copyfile(att_path, os.path.join(base_dir, att_newpath))
-        event.attachments[att_id] = att_newpath
-    return event.Serialize()
-
-  def GetEventAttachmentSize(self, event):
-    """Returns the total size of given event's attachments."""
-    total_size = 0
-    for _unused_att_id, att_path in event.attachments.iteritems():
-      if os.path.isfile(att_path):
-        total_size += os.path.getsize(att_path)
-    return total_size
-
-  def PrepareAndArchive(self):
-    """Retrieves events, and archives them."""
-    event_stream = self.NewStream()
-    if not event_stream:
-      return False
-
-    with file_utils.TempDirectory(prefix='instalog_archive_') as base_dir:
-      self.info('Creating temporary directory: %s', base_dir)
-      # Create the attachments directory.
-      att_dir = os.path.join(base_dir, 'attachments')
-      os.mkdir(att_dir)
-
-      # In order to save memory, write directly to a temp file on disk.
-      with open(os.path.join(base_dir, 'events.json'), 'w') as events_f:
-        num_events = 0
-        total_size = 0
-        time_last = time_utils.MonotonicTime()
-        for event in event_stream.iter(timeout=self.args.interval):
-          serialized_event = self.ProcessEvent(num_events, event, base_dir)
-          attachment_size = self.GetEventAttachmentSize(event)
-          events_f.write(serialized_event + '\n')
-
-          total_size += len(serialized_event) + attachment_size
-          num_events += 1
-          self.debug('num_events = %d', num_events)
-
-          # Throttle our status messages.
-          time_now = time_utils.MonotonicTime()
-          if (time_now - time_last) >= _ARCHIVE_MESSAGE_INTERVAL:
-            time_last = time_now
-            self.info('Currently at %.2f%% of %.2fMB before archiving',
-                      100.0 * total_size / self.args.max_size,
-                      self.args.max_size / 1024.0 / 1024)
-          if total_size >= self.args.max_size:
-            break
-
-      if self.IsStopping():
-        self.info('Plugin is stopping! Abort %d events', num_events)
-        event_stream.Abort()
-        return False
-
-      # Create the archive.
-      if num_events > 0:
-        cur_time = datetime.datetime.now()
-        archive_name = cur_time.strftime('InstalogEvents_%Y%m%d%H%M%S')
-        archive_filename = '%s.tar.gz' % archive_name
-        with file_utils.UnopenedTemporaryFile(
-            prefix='instalog_archive_', suffix='.tar.gz') as tmp_archive:
-          self.info('Creating temporary archive file: %s', tmp_archive)
-          self.info('Archiving %d events in %s', num_events, archive_name)
-          with tarfile.open(tmp_archive, 'w:gz') as tar:
-            tar.add(base_dir, arcname=archive_name)
-
-          # What should we do with the archive?
-          if self.args.enable_gcs:
-            gcs_target_dir = self.args.gcs_target_dir.strip('/')
-            gcs_target_path = '/%s/%s' % (gcs_target_dir, archive_filename)
-            if not self._gcs.UploadFile(
-                tmp_archive, gcs_target_path, overwrite=True):
-              self.error('Unable to upload to GCS, aborting')
-              event_stream.Abort()
-              return False
-          if self.args.enable_disk:
-            target_path = os.path.join(self.args.target_dir, archive_filename)
-            self.info('Saving archive to: %s', target_path)
-            shutil.move(tmp_archive, target_path)
-
-    self.info('Commit %d events', num_events)
-    event_stream.Commit()
+      # What should we do with the archive?
+      if self.args.enable_gcs:
+        gcs_target_dir = self.args.gcs_target_dir.strip('/')
+        gcs_target_path = '/%s/%s' % (gcs_target_dir, archive_filename)
+        if not self._gcs.UploadFile(
+            tmp_archive, gcs_target_path, overwrite=True):
+          self.error('Unable to upload to GCS, aborting')
+          return False
+      if self.args.enable_disk:
+        target_path = os.path.join(self.target_dir, archive_filename)
+        self.info('Saving archive to: %s', target_path)
+        shutil.move(tmp_archive, target_path)
     return True
 
 
diff --git a/py/instalog/plugins/output_archive_unittest.py b/py/instalog/plugins/output_archive_unittest.py
index 2fa2741..f89bf73 100755
--- a/py/instalog/plugins/output_archive_unittest.py
+++ b/py/instalog/plugins/output_archive_unittest.py
@@ -13,7 +13,9 @@
 import logging
 import os
 import resource
+import shutil
 import tarfile
+import tempfile
 import time
 import unittest
 
@@ -24,7 +26,6 @@
 from instalog import log_utils
 from instalog import plugin_sandbox
 from instalog import testing
-from instalog.utils import file_utils
 
 
 class TestOutputArchive(unittest.TestCase):
@@ -32,10 +33,12 @@
   def setUp(self):
     self.core = testing.MockCore()
     self.stream = self.core.GetStream(0)
+    self.tmp_dir = tempfile.mkdtemp(prefix='output_archive_unittest_')
     self.event = datatypes.Event({'plugin': 'archive'})
 
   def tearDown(self):
     self.core.Close()
+    shutil.rmtree(self.tmp_dir)
 
   def _GetMemoryUsage(self):
     """Returns current process's memory usage in bytes."""
@@ -44,75 +47,73 @@
   def testMemoryUsage(self):
     big_event = datatypes.Event({'1mb': 'x' * 1024 * 1024})
     event_size = len(big_event.Serialize())
-    with file_utils.TempDirectory(prefix='test_output_archive') as data_dir:
-      config = {
-          'interval': 1000,  # arbitrary long time
-          'max_size': 1024 * 1024 * 1024,  # arbitrary large value
-      }
-      sandbox = plugin_sandbox.PluginSandbox(
-          'output_archive', config=config,
-          data_dir=data_dir, core_api=self.core)
-      sandbox.Start(True)
+    config = {
+        'interval': 1000,  # arbitrary long time
+        'threshold_size': 1024 * 1024 * 1024,  # arbitrary large value
+    }
+    sandbox = plugin_sandbox.PluginSandbox(
+        'output_archive', config=config,
+        data_dir=self.tmp_dir, core_api=self.core)
+    sandbox.Start(True)
 
-      mem_usage_start = self._GetMemoryUsage()
-      logging.info('Initial memory usage: %d', mem_usage_start)
-      # additional_memory = big_event(1mb) * 10 events * 20 iterations = ~200mb
-      # maximum_memory = (original_memory + additional_memory) plus 10% padding
-      mem_usage_max = (mem_usage_start + (event_size * 10 * 20)) * 1.1
-      for _unused_i in xrange(20):
-        events = [copy.deepcopy(big_event) for _unused_i in xrange(10)]
-        self.stream.Queue(events)
+    mem_usage_start = self._GetMemoryUsage()
+    logging.info('Initial memory usage: %d', mem_usage_start)
+    # additional_memory = big_event(1mb) * 10 events * 20 iterations = ~200mb
+    # maximum_memory = (original_memory + additional_memory) plus 10% padding
+    mem_usage_max = (mem_usage_start + (event_size * 10 * 20)) * 1.1
+    for _unused_i in xrange(20):
+      events = [copy.deepcopy(big_event) for _unused_j in xrange(10)]
+      self.stream.Queue(events)
 
-      sandbox.Flush(1, False)  # trigger archive creation
-      while not self.stream.Empty():
-        mem_usage = self._GetMemoryUsage()
-        logging.info('Current memory usage: %d/%d', mem_usage, mem_usage_max)
-        if mem_usage >= mem_usage_max:
-          # The test has failed, but we need to interrupt the archive plugin
-          # and get it to stop as quickly as possible.
-          # Stop new events from being accessed.
-          del self.core.streams[0]
-          # Force any open file handles shut so the plugin stops writing
-          # to the archive on disk.
-          proc = psutil.Process()
-          for f in proc.get_open_files():
-            os.close(f.fd)
-          # Manually set the plugin state to STOPPING and advance into this
-          # state.
-          # pylint: disable=protected-access
-          sandbox._state = plugin_sandbox.STOPPING
-          sandbox.AdvanceState(True)
-          # Once the plugin has really stopped, report our error.
-          self.fail('Memory usage exceeded: %d/%d' % (mem_usage, mem_usage_max))
-        time.sleep(0.1)
-      # pylint: disable=protected-access
-      sandbox._state = plugin_sandbox.STOPPING
-      sandbox.AdvanceState(True)
+    sandbox.Flush(1, False)  # trigger archive creation
+    while not self.stream.Empty():
+      mem_usage = self._GetMemoryUsage()
+      logging.info('Current memory usage: %d/%d', mem_usage, mem_usage_max)
+      if mem_usage >= mem_usage_max:
+        # The test has failed, but we need to interrupt the archive plugin
+        # and get it to stop as quickly as possible.
+        # Stop new events from being accessed.
+        del self.core.streams[0]
+        # Force any open file handles shut so the plugin stops writing
+        # to the archive on disk.
+        proc = psutil.Process()
+        for f in proc.get_open_files():
+          os.close(f.fd)
+        # Manually set the plugin state to STOPPING and advance into this
+        # state.
+        # pylint: disable=protected-access
+        sandbox._state = plugin_sandbox.STOPPING
+        sandbox.AdvanceState(True)
+        # Once the plugin has really stopped, report our error.
+        self.fail('Memory usage exceeded: %d/%d' % (mem_usage, mem_usage_max))
+      time.sleep(0.1)
+    # pylint: disable=protected-access
+    sandbox._state = plugin_sandbox.STOPPING
+    sandbox.AdvanceState(True)
 
   def testOneEvent(self):
-    with file_utils.TempDirectory(prefix='test_output_archive') as data_dir:
-      config = {
-          'interval': 1}
-      sandbox = plugin_sandbox.PluginSandbox(
-          'output_archive', config=config,
-          data_dir=data_dir, core_api=self.core)
-      sandbox.Start(True)
-      # pylint: disable=protected-access
-      plugin = sandbox._plugin
-      self.stream.Queue([self.event])
-      plugin.PrepareAndArchive()
-      sandbox.Flush(2, True)
-      sandbox.Stop()
+    config = {
+        'interval': 1}
+    sandbox = plugin_sandbox.PluginSandbox(
+        'output_archive', config=config,
+        data_dir=self.tmp_dir, core_api=self.core)
+    sandbox.Start(True)
+    # pylint: disable=protected-access
+    plugin = sandbox._plugin
+    self.stream.Queue([self.event])
+    plugin.PrepareAndProcess()
+    sandbox.Flush(2, True)
+    sandbox.Stop()
 
-      # Inspect the disk archive.
-      archive_path = glob.glob(os.path.join(data_dir, 'InstalogEvents*'))[0]
-      with tarfile.open(archive_path, 'r:gz') as tar:
-        events_member = [n for n in tar.getnames() if 'events.json' in n][0]
-        events_file = tar.extractfile(events_member)
-        lines = events_file.readlines()
-        self.assertEqual(1, len(lines))
-        event = datatypes.Event.Deserialize(lines[0])
-        self.assertEqual(event, self.event)
+    # Inspect the disk archive.
+    archive_path = glob.glob(os.path.join(self.tmp_dir, 'InstalogEvents*'))[0]
+    with tarfile.open(archive_path, 'r:gz') as tar:
+      events_member = [n for n in tar.getnames() if 'events.json' in n][0]
+      events_file = tar.extractfile(events_member)
+      lines = events_file.readlines()
+      self.assertEqual(1, len(lines))
+      event = datatypes.Event.Deserialize(lines[0])
+      self.assertEqual(event, self.event)
 
 
 if __name__ == '__main__':