blob: 968f1445a5a4c646a91bff32ada43f79547399cb [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""File-based buffer common.
A file-based buffer which writes its events to a single file on disk, and
separately maintains metadata.
There are three files maintained, plus one for each consumer created:
data.json:
Stores actual data. Each line corresponds to one event. As events are
written to disk, each one is given a sequence number. Format of each line:
[SEQ_NUM, {EVENT_DATA}, CRC_SUM]
Writing SEQ_NUM to data.json is not strictly necessary since we keep track
of sequence numbers in metadata, but it is useful for debugging, and could
also help when restoring a corrupt database.
metadata.json:
Stores current sequence numbers and cursor positions. The "first seq" and
"start pos" are taken to be absolute to the original untruncated data file,
and refer to the beginning of data currently stored on disk.
So if seq=1 was consumed by all Consumers, and Truncate removed it from
disk, first_seq would be set to 2.
Note that since the cursor positions are absolute, start_pos must be
subtracted to get the actual position in the file on disk, e.g.:
f.seek(current_pos - start_pos)
consumers.json:
Stores a list of all active Consumers. If a Consumer is removed, it will be
removed from this list, but its metadata file will continue to exist. If it
is ever re-created, the existing metadata will be used. If this is
undesired behaviour, the metadata file for that Consumer should be manually
deleted.
consumer_X.json:
Stores the sequence number and cursor position of a particular Consumer.
Versioning:
Another concept that is worth explaining separately is "versioning". We want
to support truncating, that is, when our file contains N records which have
already been consumed by all Consumers, and M remaining records, remove the
first N records from the main data file in order to save disk space. After
rewriting the data file, update metadata accordingly.
But what happens if a failure occurs in between these two steps? Our "old"
metadata now is now paired with a "new" data file, which means we will likely
be unable to read anything properly.
To solve this problem, before re-writing the main data file, we save a
metadata file to disk with both "old" and "new" metadata versions *before*
performing a truncate on the main data file. The key is a CRC hash of the
first line of the main data file. When the buffer first starts, it will check
to see which key matches the first line, and it will use this metadata
version.
Thus, if a failure occurs *before* writing the main data file, the "old"
metadata can be used. If a failure occurs *after* writing the main data file,
the "new" metadata can be used.
"""
from __future__ import print_function
import copy
import json
import logging
import os
import shutil
import zlib
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import lock_utils
from instalog import log_utils
from instalog import plugin_base
from instalog.utils import file_utils
# The number of bytes to buffer when retrieving events from a file.
_BUFFER_SIZE_BYTES = 4 * 1024 # 4kb
class SimpleFileException(Exception):
"""General exception type for this plugin."""
pass
def GetChecksum(data):
"""Generates an 8-character CRC32 checksum of given string."""
return '{:08x}'.format(abs(zlib.crc32(data)))
def FormatRecord(seq, record):
"""Returns a record formatted as a line to be written to disk."""
data = '%d, %s' % (seq, record)
checksum = GetChecksum(data)
return '[%s, "%s"]\n' % (data, checksum)
def ParseRecord(line, logger_name=None):
"""Parses and returns a line from disk as a record.
Returns:
A tuple of (seq_number, record), or None on failure.
"""
logger = logging.getLogger(logger_name)
line_inner = line.rstrip()[1:-1] # Strip [] and newline
data, _, checksum = line_inner.rpartition(', ')
# TODO(chuntsen): Change this method after a long time.
checksum = checksum.strip('"')
seq, _, record = data.partition(', ')
if not seq or not record:
logger.warning('Parsing error for record %s', line.rstrip())
return None, None
if checksum != GetChecksum(data):
logger.warning('Checksum error for record %s', line.rstrip())
return None, None
return int(seq), record
def TryLoadJSON(path, logger_name=None):
"""Attempts to load JSON from the given file.
Returns:
Parsed data from the file. None if the file does not exist.
Raises:
Exception if there was some other problem reading the file, or if something
went wrong parsing the data.
"""
logger = logging.getLogger(logger_name)
if not os.path.isfile(path):
logger.debug('%s: does not exist', path)
return None
try:
with open(path, 'r') as f:
return json.load(f)
except Exception:
logger.exception('%s: Error reading disk or loading JSON', path)
raise
def CopyAttachmentsToTempDir(att_paths, tmp_dir, logger_name=None):
"""Copys attachments to the temporary directory."""
logger = logging.getLogger(logger_name)
try:
for att_path in att_paths:
# Check that the source file exists.
if not os.path.isfile(att_path):
raise ValueError('Attachment path `%s` specified in event does not '
'exist' % att_path)
target_path = os.path.join(tmp_dir, att_path.replace('/', '_'))
logger.debug('Copying attachment: %s --> %s',
att_path, target_path)
with open(target_path, 'w') as dst_f:
with open(att_path, 'r') as src_f:
shutil.copyfileobj(src_f, dst_f)
# Fsync the file and the containing directory to make sure it
# is flushed to disk.
dst_f.flush()
os.fdatasync(dst_f)
# Fsync the containing directory to make sure all attachments are flushed
# to disk.
dirfd = os.open(tmp_dir, os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
return True
except Exception:
logger.exception('Exception encountered when copying attachments')
return False
def MoveAndWrite(config_dct, events):
"""Moves the atts, serializes the events and writes them to the data_path."""
logger = logging.getLogger(config_dct['logger_name'])
metadata_dct = RestoreMetadata(config_dct)
cur_seq = metadata_dct['last_seq'] + 1
cur_pos = metadata_dct['end_pos'] - metadata_dct['start_pos']
# Truncate the size of the file in case of a previously unfinished
# transaction.
with open(config_dct['data_path'], 'a') as f:
f.truncate(cur_pos)
with open(config_dct['data_path'], 'a') as f:
# On some machines, the file handle offset isn't set to EOF until
# a write occurs. Thus we must manually seek to the end to ensure
# that f.tell() will return useful results.
f.seek(0, 2) # 2 means use EOF as the reference point.
assert f.tell() == cur_pos
for event in events:
for att_id, att_path in event.attachments.iteritems():
target_name = '%s_%s' % (cur_seq, att_id)
target_path = os.path.join(config_dct['attachments_dir'], target_name)
event.attachments[att_id] = target_name
logger.debug('Relocating attachment %s: %s --> %s',
att_id, att_path, target_path)
# Note: This could potentially overwrite an existing file that got
# written just before Instalog process stopped unexpectedly.
os.rename(att_path, target_path)
logger.debug('Writing event with cur_seq=%d, cur_pos=%d',
cur_seq, cur_pos)
output = FormatRecord(cur_seq, event.Serialize())
# Store the version for SaveMetadata to use.
if cur_pos == 0:
metadata_dct['version'] = GetChecksum(output)
f.write(output)
cur_seq += 1
cur_pos += len(output)
if config_dct['args'].enable_fsync:
# Fsync the file and the containing directory to make sure it
# is flushed to disk.
f.flush()
os.fdatasync(f)
dirfd = os.open(os.path.dirname(config_dct['data_path']),
os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
metadata_dct['last_seq'] = cur_seq - 1
metadata_dct['end_pos'] = metadata_dct['start_pos'] + cur_pos
SaveMetadata(config_dct, metadata_dct)
def SaveMetadata(config_dct, metadata_dct, old_metadata_dct=None):
"""Writes metadata of main database to disk."""
if not metadata_dct['version']:
raise SimpleFileException('No `version` available for SaveMetadata')
data = {metadata_dct['version']: metadata_dct}
if old_metadata_dct and old_metadata_dct['version']:
if metadata_dct['version'] == old_metadata_dct['version']:
raise SimpleFileException(
'Same `version` from new metadata and old metadata')
data[old_metadata_dct['version']] = old_metadata_dct
with file_utils.AtomicWrite(config_dct['metadata_path'], fsync=True) as f:
json.dump(data, f)
def RestoreMetadata(config_dct):
"""Restores version from the main data file on disk.
If the metadata file does not exist, will silently return.
"""
logger = logging.getLogger(config_dct['logger_name'])
metadata_dct = {'first_seq': 1, 'last_seq': 0,
'start_pos': 0, 'end_pos': 0,
'version': '00000000'}
data = TryLoadJSON(config_dct['metadata_path'], logger.name)
if data is not None:
try:
with open(config_dct['data_path'], 'r') as f:
metadata_dct['version'] = GetChecksum(f.readline())
except Exception:
logger.error('Data file unexpectedly missing; resetting metadata')
return metadata_dct
if metadata_dct['version'] not in data:
logger.error('Could not find metadata version %s (available: %s); '
'recovering metadata from data file',
metadata_dct['version'], ', '.join(data.keys()))
RecoverMetadata(config_dct, metadata_dct)
return metadata_dct
if len(data) > 1:
logger.info('Metadata contains multiple versions %s; choosing %s',
', '.join(data.keys()), metadata_dct['version'])
metadata_dct.update(data[metadata_dct['version']])
if (metadata_dct['end_pos'] >
metadata_dct['start_pos'] + os.path.getsize(config_dct['data_path'])):
logger.error('end_pos in restored metadata is larger than start_pos + '
'data file; recovering metadata from data file')
RecoverMetadata(config_dct, metadata_dct)
else:
if os.path.isfile(config_dct['data_path']):
logger.error('Could not find metadata file, but we have data file; '
'recovering metadata from data file')
RecoverMetadata(config_dct, metadata_dct)
else:
logger.info('Creating metadata file and data file')
SaveMetadata(config_dct, metadata_dct)
file_utils.TouchFile(config_dct['data_path'])
return metadata_dct
def RecoverMetadata(config_dct, metadata_dct):
"""Recovers metadata from the main data file on disk.
Uses the first valid record for first_seq and start_pos, and the last
valid record for last_seq and end_pos.
"""
logger = logging.getLogger(config_dct['logger_name'])
first_record = True
cur_pos = 0
with open(config_dct['data_path'], 'r') as f:
for line in f:
seq, _unused_record = ParseRecord(line, config_dct['logger_name'])
if first_record and seq:
metadata_dct['first_seq'] = seq
metadata_dct['start_pos'] = cur_pos
first_record = False
cur_pos += len(line)
if seq:
metadata_dct['last_seq'] = seq
metadata_dct['end_pos'] = cur_pos
logger.info('Finished recovering metadata; sequence range found: %d to %d',
metadata_dct['first_seq'], metadata_dct['last_seq'])
SaveMetadata(config_dct, metadata_dct)
def TruncateAttachments(config_dct):
"""Deletes attachments of events no longer stored within data.json."""
logger = logging.getLogger(config_dct['logger_name'])
metadata_dct = RestoreMetadata(config_dct)
for fname in os.listdir(config_dct['attachments_dir']):
fpath = os.path.join(config_dct['attachments_dir'], fname)
if not os.path.isfile(fpath):
continue
seq, unused_underscore, unused_att_id = fname.partition('_')
if not seq.isdigit():
continue
if (int(seq) < metadata_dct['first_seq'] or
int(seq) > metadata_dct['last_seq']):
logger.debug('Truncating attachment (<seq=%d or >seq=%d): %s',
metadata_dct['first_seq'], metadata_dct['last_seq'], fname)
os.unlink(fpath)
def Truncate(config_dct, min_seq, min_pos, truncate_attachments=True):
"""Truncates the main data file to only contain unprocessed records.
See file-level docstring for more information about versions.
Args:
truncate_attachments: Whether or not to truncate attachments.
For testing.
"""
logger = logging.getLogger(config_dct['logger_name'])
metadata_dct = RestoreMetadata(config_dct)
# Does the buffer already have data in it?
if not metadata_dct['version']:
return
if metadata_dct['first_seq'] == min_seq:
logger.info('No need to truncate')
return
try:
logger.debug('Will truncate up until seq=%d, pos=%d', min_seq, min_pos)
# Prepare the old vs. new metadata to write to disk.
old_metadata_dct = copy.deepcopy(metadata_dct)
metadata_dct['first_seq'] = min_seq
metadata_dct['start_pos'] = min_pos
with file_utils.AtomicWrite(config_dct['data_path'], fsync=True) as new_f:
# AtomicWrite opens a file handle to a temporary file right next to
# the real file (data_path), so we can open a "read" handle on data_path
# without affecting AtomicWrite's handle. Only when AtomicWrite's context
# block ends will the temporary be moved to replace data_path.
with open(config_dct['data_path'], 'r') as old_f:
old_f.seek(min_pos - old_metadata_dct['start_pos'])
# Deal with the first line separately to get the new version.
first_line = old_f.readline()
metadata_dct['version'] = GetChecksum(first_line)
new_f.write(first_line)
shutil.copyfileobj(old_f, new_f)
# Before performing the "replace" step of write-replace (when
# the file_utils.AtomicWrite context ends), save metadata to disk in
# case of disk failure.
SaveMetadata(config_dct, metadata_dct, old_metadata_dct)
# After we use AtomicWrite, we can remove old metadata.
SaveMetadata(config_dct, metadata_dct)
# Now that we have written the new data and metadata to disk, remove any
# unused attachments.
if truncate_attachments:
TruncateAttachments(config_dct)
except Exception:
logger.exception('Exception occurred during Truncate operation')
raise
class BufferFile(log_utils.LoggerMixin):
def __init__(self, args, logger_name, data_dir):
"""Sets up the plugin."""
self.args = args
self.logger = logging.getLogger(logger_name)
self.data_dir = data_dir
self.data_path = os.path.join(
data_dir, 'data.json')
self.metadata_path = os.path.join(
data_dir, 'metadata.json')
self.consumers_list_path = os.path.join(
data_dir, 'consumers.json')
self.consumer_path_format = os.path.join(
data_dir, 'consumer_%s.json')
self.attachments_dir = os.path.join(
data_dir, 'attachments')
if not os.path.exists(self.attachments_dir):
os.makedirs(self.attachments_dir)
# Lock for writing to the self.data_path file. Used by
# Produce and Truncate.
self.data_write_lock = lock_utils.Lock()
# Lock for modifying the self.consumers variable or for
# preventing other threads from changing it.
self._consumer_lock = lock_utils.Lock()
self.consumers = {}
self._RestoreConsumers()
# Try truncating any attachments from any partial Truncate operations.
TruncateAttachments(self.ConfigToDict())
@property
def first_seq(self):
return RestoreMetadata(self.ConfigToDict())['first_seq']
@property
def last_seq(self):
return RestoreMetadata(self.ConfigToDict())['last_seq']
@property
def start_pos(self):
return RestoreMetadata(self.ConfigToDict())['start_pos']
@property
def end_pos(self):
return RestoreMetadata(self.ConfigToDict())['end_pos']
@property
def version(self):
return RestoreMetadata(self.ConfigToDict())['version']
def _SaveConsumers(self):
"""Saves the current list of active Consumers to disk."""
with file_utils.AtomicWrite(self.consumers_list_path, fsync=True) as f:
json.dump(self.consumers.keys(), f)
def _RestoreConsumers(self):
"""Restore Consumers from disk.
Creates a corresponding Consumer object for each Consumer listed on disk.
Only ever called when the buffer first starts up, so we don't need to
check for any existing Consumers in self.consumers.
"""
data = TryLoadJSON(self.consumers_list_path, self.logger.name)
if data:
for name in data:
self.consumers[name] = self._CreateConsumer(name)
def ExternalizeEvent(self, event):
"""Modifies attachment paths of given event to be absolute."""
for att_id in event.attachments.keys():
# Reconstruct the full path to the attachment on disk.
event.attachments[att_id] = os.path.abspath(os.path.join(
self.attachments_dir, event.attachments[att_id]))
return event
def ConfigToDict(self):
return {'logger_name': self.logger.name, 'args': self.args,
'data_dir': self.data_dir, 'data_path': self.data_path,
'metadata_path': self.metadata_path,
'consumers_list_path': self.consumers_list_path,
'consumer_path_format': self.consumer_path_format,
'attachments_dir': self.attachments_dir}
def ProduceEvents(self, events, process_pool=None):
"""Moves attachments, serializes events and writes them to the data_path."""
with self.data_write_lock:
# If we are going to write the first line which will change the version,
# we should prevent the data and metadata from being read by consumers.
metadata_dct = RestoreMetadata(self.ConfigToDict())
first_line = (metadata_dct['start_pos'] == metadata_dct['end_pos'])
try:
if first_line:
self._consumer_lock.acquire()
for consumer in self.consumers.values():
consumer.read_lock.acquire()
if process_pool is None:
MoveAndWrite(self.ConfigToDict(), events)
else:
process_pool.apply(MoveAndWrite, (self.ConfigToDict(), events))
except Exception:
self.exception('Exception occurred during ProduceEvents operation')
raise
finally:
# Ensure that regardless of any errors, locks are released.
if first_line:
for consumer in self.consumers.values():
try:
consumer.read_lock.release()
except Exception:
pass
try:
self._consumer_lock.release()
except Exception:
pass
def _GetFirstUnconsumedRecord(self):
"""Returns the seq and pos of the first unprocessed record.
Checks each Consumer to find the earliest unprocessed record, and returns
that record's seq and pos.
"""
metadata_dct = RestoreMetadata(self.ConfigToDict())
min_seq = metadata_dct['last_seq'] + 1
min_pos = metadata_dct['end_pos']
for consumer in self.consumers.values():
min_seq = min(min_seq, consumer.cur_seq)
min_pos = min(min_pos, consumer.cur_pos)
return min_seq, min_pos
def Truncate(self, truncate_attachments=True, process_pool=None):
"""Truncates the main data file to only contain unprocessed records.
See file-level docstring for more information about versions.
Args:
truncate_attachments: Whether or not to truncate attachments.
For testing.
"""
with self.data_write_lock, self._consumer_lock:
min_seq, min_pos = self._GetFirstUnconsumedRecord()
try:
for consumer in self.consumers.values():
consumer.read_lock.acquire()
if process_pool is None:
Truncate(self.ConfigToDict(), min_seq, min_pos, truncate_attachments)
else:
process_pool.apply(
Truncate,
(self.ConfigToDict(), min_seq, min_pos, truncate_attachments))
except Exception:
self.exception('Exception occurred during Truncate operation')
# If any exceptions occurred, restore metadata, to make sure we are
# using the correct version, since we aren't sure if the write succeeded
# or not.
raise
finally:
# Ensure that regardless of any errors, locks are released.
for consumer in self.consumers.values():
try:
consumer.read_lock.release()
except Exception:
pass
def _CreateConsumer(self, name):
"""Returns a new Consumer object with the given name."""
return Consumer(
name, self, self.consumer_path_format % name, self.logger.name)
def AddConsumer(self, name):
"""See BufferPlugin.AddConsumer."""
self.debug('Add consumer %s', name)
with self.data_write_lock, self._consumer_lock:
if name in self.consumers:
raise SimpleFileException('Consumer %s already exists' % name)
self.consumers[name] = self._CreateConsumer(name)
self._SaveConsumers()
def RemoveConsumer(self, name):
"""See BufferPlugin.RemoveConsumer."""
self.debug('Remove consumer %s', name)
with self.data_write_lock, self._consumer_lock:
if name not in self.consumers:
raise SimpleFileException('Consumer %s does not exist' % name)
del self.consumers[name]
self._SaveConsumers()
def ListConsumers(self):
"""See BufferPlugin.ListConsumers."""
with self._consumer_lock:
# cur_seq represents the sequence ID of the consumer's next event. If
# that event doesn't exist yet, it will be set to the next (non-existent)
# sequence ID. We must subtract 1 to get the "last completed" event.
cur_seqs = {key: consumer.cur_seq - 1
for key, consumer in self.consumers.iteritems()}
# Grab last_seq at the end, in order to guarantee that for any consumer,
# last_seq >= cur_seq, and that all last_seq are equal.
last_seq = self.last_seq
return {key: (cur_seq, last_seq)
for key, cur_seq in cur_seqs.iteritems()}
def Consume(self, name):
"""See BufferPlugin.Consume."""
return self.consumers[name].CreateStream()
class Consumer(log_utils.LoggerMixin, plugin_base.BufferEventStream):
"""Represents a Consumer and its BufferEventStream.
Since SimpleFile has only a single database file, there can only ever be one
functioning BufferEventStream at any given time. So we bundle the Consumer
and its BufferEventStream into one object. When CreateStream is called, a
lock is acquired and the Consumer object is return. The lock must first be
acquired before any of Next, Commit, or Abort can be used.
"""
def __init__(self, name, simple_file, metadata_path, logger_name):
self.name = name
self.simple_file = simple_file
self.metadata_path = metadata_path
self.logger = logging.getLogger(logger_name)
self._stream_lock = lock_utils.Lock()
self.read_lock = lock_utils.Lock()
self.read_buf = []
with self.read_lock:
metadata_dct = RestoreMetadata(self.simple_file.ConfigToDict())
self.cur_seq = metadata_dct['first_seq']
self.cur_pos = metadata_dct['start_pos']
self.new_seq = self.cur_seq
self.new_pos = self.cur_pos
# Try restoring metadata, if it exists.
self.RestoreConsumerMetadata()
self._SaveMetadata()
def CreateStream(self):
"""Creates a BufferEventStream object to be used by Instalog core.
Since this class doubles as BufferEventStream, we mark that the
BufferEventStream is "unexpired" by setting self._stream_lock,
and return self.
Returns:
`self` if BufferEventStream not already in use, None if busy.
"""
return self if self._stream_lock.acquire(False) else None
def _SaveMetadata(self):
"""Saves metadata for this Consumer to disk (seq and pos)."""
data = {'cur_seq': self.cur_seq,
'cur_pos': self.cur_pos}
with file_utils.AtomicWrite(self.metadata_path, fsync=True) as f:
json.dump(data, f)
def RestoreConsumerMetadata(self):
"""Restores metadata for this Consumer from disk (seq and pos).
On each restore, ensure that the available window of records on disk has
not surpassed our own current record. How would this happen? If the
Consumer is removed, records it still hasn't read are truncated from the
main database, and the Consumer is re-added under the same name.
If the metadata file does not exist, will silently return.
"""
data = TryLoadJSON(self.metadata_path, self.logger.name)
if data is not None:
if 'cur_seq' not in data or 'cur_pos' not in data:
self.error('Consumer %s metadata file invalid; resetting', self.name)
return
# Make sure we are still ahead of simple_file.
with self.read_lock:
metadata_dct = RestoreMetadata(self.simple_file.ConfigToDict())
self.cur_seq = min(max(metadata_dct['first_seq'], data['cur_seq']),
metadata_dct['last_seq'] + 1)
self.cur_pos = min(max(metadata_dct['start_pos'], data['cur_pos']),
metadata_dct['end_pos'])
if (data['cur_seq'] < metadata_dct['first_seq'] or
data['cur_seq'] > (metadata_dct['last_seq'] + 1)):
self.error('Consumer %s cur_seq=%d is out of buffer range %d to %d, '
'correcting to %d', self.name, data['cur_seq'],
metadata_dct['first_seq'], metadata_dct['last_seq'] + 1,
self.cur_seq)
self.new_seq = self.cur_seq
self.new_pos = self.cur_pos
def _Buffer(self):
"""Returns a list of pending records.
Stores the current buffer internally at self.read_buf. If it already has
data in it, self.read_buf will be returned as-is. It will be "refilled"
when it is empty.
Reads up to _BUFFER_SIZE_BYTES from the file on each "refill".
Returns:
A list of records, where each is a three-element tuple:
(record_seq, record_data, line_bytes).
"""
if self.read_buf:
return self.read_buf
# Does the buffer already have data in it?
if not os.path.exists(self.simple_file.data_path):
return self.read_buf
self.debug('_Buffer: waiting for read_lock')
with self.read_lock:
metadata_dct = RestoreMetadata(self.simple_file.ConfigToDict())
with open(self.simple_file.data_path, 'r') as f:
cur = self.new_pos - metadata_dct['start_pos']
f.seek(cur)
total_bytes = 0
skipped_bytes = 0
for line in f:
if total_bytes > _BUFFER_SIZE_BYTES:
break
size = len(line)
cur += size
if cur > (metadata_dct['end_pos'] - metadata_dct['start_pos']):
break
seq, record = ParseRecord(line, self.logger.name)
if seq is None:
# Parsing of this line failed for some reason.
skipped_bytes += size
continue
# Only add to total_bytes for a valid line.
total_bytes += size
# Include any skipped bytes from previously skipped records in the
# "size" of this record, in order to allow the consumer to skip to the
# proper offset.
self.read_buf.append((seq, record, size + skipped_bytes))
skipped_bytes = 0
return self.read_buf
def _Next(self):
"""Helper for Next, also used for testing purposes.
Returns:
A tuple of (seq, record), or (None, None) if no records available.
"""
if not self._stream_lock.locked():
raise plugin_base.EventStreamExpired
buf = self._Buffer()
if not buf:
return None, None
seq, record, size = buf.pop(0)
self.new_seq = seq + 1
self.new_pos += size
return seq, record
def Next(self):
"""See BufferEventStream.Next."""
seq, record = self._Next()
if not seq:
return None
event = datatypes.Event.Deserialize(record)
return self.simple_file.ExternalizeEvent(event)
def Commit(self):
"""See BufferEventStream.Commit."""
if not self._stream_lock.locked():
raise plugin_base.EventStreamExpired
self.cur_seq = self.new_seq
self.cur_pos = self.new_pos
# Ensure that regardless of any errors, locks are released.
try:
self._SaveMetadata()
except Exception:
# TODO(kitching): Instalog core or PluginSandbox should catch this
# exception and attempt to safely shut down.
self.exception('Commit: Write exception occurred, Events may be '
'processed by output plugin multiple times')
finally:
try:
self._stream_lock.release()
except Exception:
# TODO(kitching): Instalog core or PluginSandbox should catch this
# exception and attempt to safely shut down.
self.exception('Commit: Internal error occurred')
def Abort(self):
"""See BufferEventStream.Abort."""
if not self._stream_lock.locked():
raise plugin_base.EventStreamExpired
self.new_seq = self.cur_seq
self.new_pos = self.cur_pos
self.read_buf = []
try:
self._stream_lock.release()
except Exception:
# TODO(kitching): Instalog core or PluginSandbox should catch this
# exception and attempt to safely shut down.
self.exception('Abort: Internal error occurred')