blob: f89bf73a09dbb5264c9ea0275ca62bf7baab20e6 [file] [log] [blame]
#!/usr/bin/python2
#
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for output archive plugin."""
from __future__ import print_function
import copy
import glob
import logging
import os
import resource
import shutil
import tarfile
import tempfile
import time
import unittest
import psutil
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import log_utils
from instalog import plugin_sandbox
from instalog import testing
class TestOutputArchive(unittest.TestCase):
def setUp(self):
self.core = testing.MockCore()
self.stream = self.core.GetStream(0)
self.tmp_dir = tempfile.mkdtemp(prefix='output_archive_unittest_')
self.event = datatypes.Event({'plugin': 'archive'})
def tearDown(self):
self.core.Close()
shutil.rmtree(self.tmp_dir)
def _GetMemoryUsage(self):
"""Returns current process's memory usage in bytes."""
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024
def testMemoryUsage(self):
big_event = datatypes.Event({'1mb': 'x' * 1024 * 1024})
event_size = len(big_event.Serialize())
config = {
'interval': 1000, # arbitrary long time
'threshold_size': 1024 * 1024 * 1024, # arbitrary large value
}
sandbox = plugin_sandbox.PluginSandbox(
'output_archive', config=config,
data_dir=self.tmp_dir, core_api=self.core)
sandbox.Start(True)
mem_usage_start = self._GetMemoryUsage()
logging.info('Initial memory usage: %d', mem_usage_start)
# additional_memory = big_event(1mb) * 10 events * 20 iterations = ~200mb
# maximum_memory = (original_memory + additional_memory) plus 10% padding
mem_usage_max = (mem_usage_start + (event_size * 10 * 20)) * 1.1
for _unused_i in xrange(20):
events = [copy.deepcopy(big_event) for _unused_j in xrange(10)]
self.stream.Queue(events)
sandbox.Flush(1, False) # trigger archive creation
while not self.stream.Empty():
mem_usage = self._GetMemoryUsage()
logging.info('Current memory usage: %d/%d', mem_usage, mem_usage_max)
if mem_usage >= mem_usage_max:
# The test has failed, but we need to interrupt the archive plugin
# and get it to stop as quickly as possible.
# Stop new events from being accessed.
del self.core.streams[0]
# Force any open file handles shut so the plugin stops writing
# to the archive on disk.
proc = psutil.Process()
for f in proc.get_open_files():
os.close(f.fd)
# Manually set the plugin state to STOPPING and advance into this
# state.
# pylint: disable=protected-access
sandbox._state = plugin_sandbox.STOPPING
sandbox.AdvanceState(True)
# Once the plugin has really stopped, report our error.
self.fail('Memory usage exceeded: %d/%d' % (mem_usage, mem_usage_max))
time.sleep(0.1)
# pylint: disable=protected-access
sandbox._state = plugin_sandbox.STOPPING
sandbox.AdvanceState(True)
def testOneEvent(self):
config = {
'interval': 1}
sandbox = plugin_sandbox.PluginSandbox(
'output_archive', config=config,
data_dir=self.tmp_dir, core_api=self.core)
sandbox.Start(True)
# pylint: disable=protected-access
plugin = sandbox._plugin
self.stream.Queue([self.event])
plugin.PrepareAndProcess()
sandbox.Flush(2, True)
sandbox.Stop()
# Inspect the disk archive.
archive_path = glob.glob(os.path.join(self.tmp_dir, 'InstalogEvents*'))[0]
with tarfile.open(archive_path, 'r:gz') as tar:
events_member = [n for n in tar.getnames() if 'events.json' in n][0]
events_file = tar.extractfile(events_member)
lines = events_file.readlines()
self.assertEqual(1, len(lines))
event = datatypes.Event.Deserialize(lines[0])
self.assertEqual(event, self.event)
if __name__ == '__main__':
log_utils.InitLogging(log_utils.GetStreamHandler(logging.INFO))
unittest.main()