blob: f274581407210ee9ed2d0eab4ea3f4d0da09cfdb [file] [log] [blame]
#!/usr/bin/env python2
#
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for simple file-based buffer."""
# TODO(kitching): Add tests that deal with "out of disk space" situations.
# TODO(kitching): Add tests for reading data from corrupted databases.
# - data.json is smaller than pos in metadata.json
# - metadata.json does not contain the right version
# - metadata.json is an empty dict {}
# - metadata.json does not exist
# - data.json does not exist
# - metadata recovery: uncorrupted data.json
# - metadata recovery: corruptions at the beginning of data.json
# - metadata recovery: corruptions at the end of data.json
# - metadata recovery: fully corrupted data.json
# - consumer metadata: seq smaller than first_seq
# - consumer metadata: seq larger than last_seq
# - consumer metadata: pos smaller than start_pos
# - consumer metadata: pos larger than end_pos
# - consumer metadata: pos not synchronized with seq
# - consumer metadata is an empty dict {}
# - consumer metadata missing cur_pos or cur_seq
# TODO(kitching): Add tests for failure during Truncate operation.
from __future__ import print_function
import collections
import functools
import logging
import os
import Queue
import random
import shutil
import tempfile
import threading
import time
import unittest
from six.moves import xrange
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import log_utils
from instalog import plugin_base
# pylint: disable=no-name-in-module
from instalog.plugins import buffer_file_common
from instalog.plugins import buffer_simple_file
from instalog.utils import file_utils
# pylint: disable=protected-access
def _WithBufferSize(buffer_size):
def ModifyFn(fn):
@functools.wraps(fn)
def Wrapper(*args, **kwargs):
old_buffer_size_bytes = (
buffer_simple_file.buffer_file_common._BUFFER_SIZE_BYTES)
buffer_simple_file.buffer_file_common._BUFFER_SIZE_BYTES = buffer_size
try:
fn(*args, **kwargs)
finally:
buffer_simple_file.buffer_file_common._BUFFER_SIZE_BYTES = (
old_buffer_size_bytes)
return Wrapper
return ModifyFn
class TestBufferSimpleFile(unittest.TestCase):
def _CreateBuffer(self, config=None):
# Remove previous temporary folder if any.
if self.data_dir is not None:
shutil.rmtree(self.data_dir)
self.data_dir = tempfile.mkdtemp(prefix='buffer_simple_file_unittest_')
logging.info('Create state directory: %s', self.data_dir)
self.sf = buffer_simple_file.BufferSimpleFile(
config={} if config is None else config,
logger_name=self.logger.name,
store={},
plugin_api=None)
self.sf.GetDataDir = lambda: self.data_dir
self.sf.SetUp()
self.e1 = datatypes.Event({'test1': 'event'})
self.e2 = datatypes.Event({'test22': 'event'})
self.e3 = datatypes.Event({'test333': 'event'})
self.e4 = datatypes.Event({'test4444': 'event'})
self.e5 = datatypes.Event({'test55555': 'event'})
def setUp(self):
self.logger = logging.getLogger('simple_file')
self.data_dir = None
self._CreateBuffer()
def tearDown(self):
shutil.rmtree(self.data_dir)
def testFormatParseRecord(self):
"""Tests internal format and parse of data.json record."""
SEQ = 1989
RECORD = '{1: "hello world"}'
seq, record = buffer_file_common.ParseRecord(
buffer_file_common.FormatRecord(SEQ, RECORD), self.logger.name)
self.assertEqual(SEQ, seq)
self.assertEqual(RECORD, record)
# TODO(chuntsen): Remove old format.
seq, record = buffer_file_common.ParseRecord(
'[1989, {1: "hello world"}, 15fa0ea0]', self.logger.name)
self.assertEqual(SEQ, seq)
self.assertEqual(RECORD, record)
def testAddRemoveConsumer(self):
"""Tests adding and removing a Consumer."""
self.assertEqual({}, self.sf.ListConsumers())
self.sf.AddConsumer('a')
self.assertEqual(['a'], list(self.sf.ListConsumers()))
self.sf.RemoveConsumer('a')
self.assertEqual({}, self.sf.ListConsumers())
def testWriteRead(self):
"""Tests writing and reading back an Event."""
self.sf.Produce([self.e1])
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
self.assertEqual(self.e1, stream.Next())
def testLongCorruptedRecord(self):
"""Tests reading from a data store with a long corrupted record."""
# Ensure that the size of the event is greater than _BUFFER_SIZE_BYTES.
# pylint: disable=protected-access
e = datatypes.Event(
{'data':
'x' * buffer_simple_file.buffer_file_common._BUFFER_SIZE_BYTES})
self.sf.Produce([e])
# Purposely corrupt the data file.
with open(self.sf.buffer_file.data_path, 'r+') as f:
f.seek(1)
f.write('x')
self.sf.Produce([self.e2])
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
self.assertEqual(self.e2, stream.Next())
def testSkippedRecords(self):
"""Tests recovery from skipped records due to corruption.
Previously (prior to this test), a bug existed where, after a corrupt record
was skipped, its length was not included in calculating "new_pos" for a
consumer when processing subsequent records.
To illustrate this bug, we design a situation where we have two "buffer
refills". The first one includes a garbage event (e2) which will be
dropped. Padding is inserted into event e to ensure that the last event e3
is pushed into buffer refill #2. Events are retrieved sequentially from
buffer refill #1. As long as len(e2) > len(e1), after retrieving the last
event from buffer refill #1, the consumer's new_pos will be set to a
location *before* the last event in buffer refill #1. Thus the next buffer
will include both e1 and e3.
|--------------refill buffer #1----------------||---refill buffer #2---|
[ e1 ] [ e2 GARBAGE ] [ e ] [ e1 ] [ e3 ]
To calculate buffer size needed in e:
-------- (doesn't count) ------- -------- +1 to push over "<" limit
The fix is to ensure that the length of any previous "garbage records" are
included in the stored "length" of any event in the buffer. E.g. in this
case, the length of e2(GARBAGE) would be included in the length of event e.
"""
self.sf.Produce([self.e1])
e1_end = os.path.getsize(self.sf.buffer_file.data_path)
self.sf.Produce([self.e2])
e2_end = os.path.getsize(self.sf.buffer_file.data_path)
# Corrupt event e2 by writing garbage at the end.
with open(self.sf.buffer_file.data_path, 'r+') as f:
f.seek(e2_end - 10)
f.write('x' * 5)
# pylint: disable=protected-access
# Ensure that both e and e1 are included in the first buffer refill. The
# length of e can be based off of that of e1 (same base payload).
bytes_left = (buffer_simple_file.buffer_file_common._BUFFER_SIZE_BYTES -
(e1_end * 3) + 1)
e = datatypes.Event({'test1': 'event' + ('x' * bytes_left)})
self.sf.Produce([e])
self.sf.Produce([self.e1])
self.sf.Produce([self.e3])
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
self.assertEqual(self.e1, stream.Next())
self.assertEqual(e, stream.Next())
self.assertEqual(self.e1, stream.Next())
self.assertEqual(self.e3, stream.Next())
def testAppendedJunkStore(self):
"""Tests reading from a data store that has appended junk."""
self.sf.Produce([self.e1])
# Purposely append junk to the data store
with open(self.sf.buffer_file.data_path, 'a') as f:
f.write('xxxxxxxx')
self.sf.Produce([self.e2])
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
self.assertEqual(self.e1, stream.Next())
self.assertEqual(self.e2, stream.Next())
def testTwoBufferEventStreams(self):
"""Tries creating two BufferEventStream objects for one Consumer."""
self.sf.AddConsumer('a')
stream1 = self.sf.Consume('a')
stream2 = self.sf.Consume('a')
self.assertIsInstance(stream1, plugin_base.BufferEventStream)
self.assertEqual(stream2, None)
def testUseExpiredBufferEventStream(self):
"""Tests continuing to use an expired BufferEventStream."""
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
stream.Commit()
with self.assertRaises(plugin_base.EventStreamExpired):
stream.Next()
with self.assertRaises(plugin_base.EventStreamExpired):
stream.Abort()
with self.assertRaises(plugin_base.EventStreamExpired):
stream.Commit()
def testFirstLastSeq(self):
"""Checks the proper tracking of first_seq and last_seq."""
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 0)
first_seq, _ = self.sf.buffer_file._GetFirstUnconsumedRecord()
self.assertEqual(first_seq, 1)
self.sf.buffer_file.Truncate()
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 0)
first_seq, _ = self.sf.buffer_file._GetFirstUnconsumedRecord()
self.assertEqual(first_seq, 1)
self.sf.Produce([self.e1])
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 1)
first_seq, _ = self.sf.buffer_file._GetFirstUnconsumedRecord()
self.assertEqual(first_seq, 2)
self.sf.Produce([self.e1])
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 2)
first_seq, _ = self.sf.buffer_file._GetFirstUnconsumedRecord()
self.assertEqual(first_seq, 3)
def testTruncate(self):
"""Checks that Truncate truncates up to the last unread event."""
self.sf.AddConsumer('a')
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 0)
self.sf.Produce([self.e1, self.e2])
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 2)
self.sf.buffer_file.Truncate()
self.assertEqual(self.sf.buffer_file.first_seq, 1)
self.assertEqual(self.sf.buffer_file.last_seq, 2)
stream = self.sf.Consume('a')
self.assertEqual(self.e1, stream.Next())
stream.Commit()
self.sf.buffer_file.Truncate()
self.assertEqual(self.sf.buffer_file.first_seq, 2)
self.assertEqual(self.sf.buffer_file.last_seq, 2)
def testSeqOrder(self):
"""Checks that the order of sequence keys is consistent."""
self.sf.AddConsumer('a')
self.sf.buffer_file.Truncate()
self.sf.Produce([self.e1])
stream = self.sf.Consume('a')
seq, _ = stream._Next()
self.assertEqual(seq, 1)
stream.Commit()
self.sf.buffer_file.Truncate()
self.sf.Produce([self.e1, self.e1])
stream = self.sf.Consume('a')
seq, _ = stream._Next()
self.assertEqual(seq, 2)
seq, _ = stream._Next()
self.assertEqual(seq, 3)
stream.Commit()
@_WithBufferSize(0) # Force only keeping one record in buffer.
def testReloadBufferAfterTruncate(self):
"""Tests re-loading buffer of a BufferEventStream after Truncate."""
self.sf.AddConsumer('a')
self.sf.Produce([self.e1, self.e2, self.e3])
stream1 = self.sf.Consume('a')
self.assertEqual(self.e1, stream1.Next())
stream1.Commit()
stream2 = self.sf.Consume('a')
# Explicitly check that stream2's buffer only contains one item. This
# means the buffer will need to be reloaded after the following sequence
# of Next and Truncate.
self.assertEqual(1, len(stream2._Buffer()))
self.assertEqual(self.e2, stream2.Next())
self.sf.buffer_file.Truncate()
self.assertEqual(self.e3, stream2.Next())
stream2.Commit()
def testRecreateConsumer(self):
"""Tests for same position after removing and recreating Consumer."""
self.sf.Produce([self.e1, self.e2, self.e3])
self.sf.AddConsumer('a')
stream1 = self.sf.Consume('a')
self.assertEqual(self.e1, stream1.Next())
stream1.Commit()
self.sf.RemoveConsumer('a')
self.sf.AddConsumer('a')
stream2 = self.sf.Consume('a')
self.assertEqual(self.e2, stream2.Next())
stream2.Commit()
def testRecreateConsumerAfterTruncate(self):
"""Tests that recreated Consumer updates position after truncate."""
self.sf.Produce([self.e1, self.e2, self.e3])
self.sf.AddConsumer('a')
stream1 = self.sf.Consume('a')
self.assertEqual(self.e1, stream1.Next())
stream1.Commit()
self.sf.RemoveConsumer('a')
self.sf.AddConsumer('b')
stream2 = self.sf.Consume('b')
self.assertEqual(self.e1, stream2.Next())
self.assertEqual(self.e2, stream2.Next())
stream2.Commit()
self.sf.buffer_file.Truncate()
# Verify that the metadata is consistent after running Truncate.
self.sf.SetUp()
self.sf.AddConsumer('a')
stream3 = self.sf.Consume('a')
# Skips self.e2, since Truncate occurred while Consumer 'a' did not exist.
self.assertEqual(self.e3, stream3.Next())
stream3.Commit()
def testMultiThreadProduce(self):
"""Tests for correct output with multiple threads Producing events."""
random.seed(0)
def ProducerThread():
# Random sleep so that each thread produce isn't in sync.
time.sleep(random.randrange(3) * 0.1)
for _unused_i in xrange(10):
self.sf.Produce([self.e1, self.e2, self.e3])
threads = []
for _unused_i in xrange(10):
t = threading.Thread(target=ProducerThread)
threads.append(t)
t.start()
for t in threads:
t.join()
# 10 threads, 10 * 3 events each = expected 300 events, 100 of each type.
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
cur_seq = 1
record_count = collections.defaultdict(int)
while True:
seq, record = stream._Next()
if not seq:
break
# Make sure the sequence numbers are correct.
self.assertEqual(cur_seq, seq)
cur_seq += 1
record_count[record] += 1
self.assertEqual(3, len(record_count))
self.assertTrue(all([x == 100 for x in record_count.values()]))
@_WithBufferSize(80) # Each line is around ~35 characters.
def testMultiThreadConsumeTruncate(self):
"""Tests multiple Consumers reading simultaneously when Truncate occurs."""
record_count_queue = Queue.Queue()
def ConsumerThread(consumer_id):
stream = self.sf.Consume(consumer_id)
record_count = collections.defaultdict(int)
count = 0
while True:
# Commit and start a new BufferEventStream every 10 events.
if count % 10 == 0:
logging.info('Committing after 10 events...')
stream.Commit()
stream = self.sf.Consume(consumer_id)
event = stream.Next()
if not event:
break
record_count[repr(event.payload)] += 1
count += 1
stream.Commit()
record_count_queue.put(record_count)
self.sf.Produce([self.e1, self.e2, self.e3] * 25)
for i in xrange(2):
self.sf.AddConsumer(str(i))
threads = []
for i in xrange(2):
t = threading.Thread(target=ConsumerThread, args=(str(i),))
threads.append(t)
t.start()
for t in threads:
while t.isAlive():
# Add a small sleep to prevent occupying read_lock
time.sleep(0.01)
self.sf.buffer_file.Truncate()
t.join()
self.sf.buffer_file.Truncate()
self.assertEqual(25 * 3 + 1, self.sf.buffer_file.first_seq)
while not record_count_queue.empty():
record_count = record_count_queue.get()
self.assertEqual(3, len(record_count))
self.assertTrue(all([x == 25 for x in record_count.values()]))
def _CountAttachmentsInBuffer(self, sf):
return len(os.listdir(sf.buffer_file.attachments_dir))
def _TestAttachment(self, with_copy):
"""Helper function to test basic attachment functionality."""
FILE_STRING = 'Hello World!'
self._CreateBuffer({'copy_attachments': with_copy})
with file_utils.UnopenedTemporaryFile() as path:
with open(path, 'w') as f:
f.write(FILE_STRING)
self.assertTrue(os.path.isfile(path))
event = datatypes.Event({}, {'a': path})
self.assertEqual(True, self.sf.Produce([event]))
self.assertTrue(os.path.isfile(path) == with_copy)
# Get the event out of buffer to verify that the internal
# attachment exists.
self.sf.AddConsumer('a')
stream = self.sf.Consume('a')
internal_event = stream.Next()
internal_path = internal_event.attachments['a']
self.assertEqual(FILE_STRING, file_utils.ReadFile(internal_path))
# Ensure that an absolute path is returned.
self.assertTrue(internal_path.startswith('/'))
self.assertEqual(1, self._CountAttachmentsInBuffer(self.sf))
def testCopyAttachment(self):
"""Tests that an attachment is properly copied into the buffer state."""
self._TestAttachment(True)
def testMoveAttachment(self):
"""Tests that an attachment is properly moved into the buffer state."""
self._TestAttachment(False)
def testNonExistentAttachment(self):
"""Tests behaviour when a non-existent attachment is provided."""
event = datatypes.Event({}, {'a': '/tmp/non_existent_file'})
self.assertEqual(False, self.sf.Produce([event]))
self.assertEqual(0, self._CountAttachmentsInBuffer(self.sf))
def testPartFailMoveAttachmentTwoEvents(self):
"""Tests moving two attachments in separate events (real and fake)."""
self._CreateBuffer({'copy_attachments': False})
with file_utils.UnopenedTemporaryFile() as path:
real_event = datatypes.Event({}, {'a': path})
fake_event = datatypes.Event({}, {'a': '/tmp/non_existent_file'})
self.assertEqual(False, self.sf.Produce([real_event, fake_event]))
# Make sure source file still exists since Produce failed.
self.assertTrue(os.path.isfile(path))
# Make sure attachments_dir is empty.
self.assertEqual(0, self._CountAttachmentsInBuffer(self.sf))
def testPartFailMoveAttachmentOneEvent(self):
"""Tests moving two attachments in a single event (real and fake)."""
self._CreateBuffer({'copy_attachments': False})
with file_utils.UnopenedTemporaryFile() as path:
event = datatypes.Event({}, {
'a': path,
'b': '/tmp/non_existent_file'})
self.assertEqual(False, self.sf.Produce([event]))
# Make sure source file still exists since Produce failed.
self.assertTrue(os.path.isfile(path))
# Make sure attachments_dir is empty.
self.assertEqual(0, self._CountAttachmentsInBuffer(self.sf))
def testTruncateAttachments(self):
"""Tests that truncate removes attachments of truncated events."""
FILE_STRING = 'Hello World!'
with file_utils.UnopenedTemporaryFile() as path:
with open(path, 'w') as f:
f.write(FILE_STRING)
event = datatypes.Event({}, {'a': path})
self.sf.Produce([event])
self.assertEqual(1, self._CountAttachmentsInBuffer(self.sf))
self.sf.buffer_file.Truncate(truncate_attachments=False)
self.assertEqual(1, self._CountAttachmentsInBuffer(self.sf))
self.sf.buffer_file.Truncate()
self.assertEqual(0, self._CountAttachmentsInBuffer(self.sf))
if __name__ == '__main__':
log_utils.InitLogging(log_utils.GetStreamHandler(logging.INFO))
unittest.main()