blob: 5198f0759bc9a5299055cd344d9cb2c8ad918b9f [file] [log] [blame]
#!/usr/bin/env python2
#
# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Priority multi-file-based buffer.
A priority multi-file-based buffer plugin which seperates its events to several
priority and writes to different files. Every level has several files to avoid
that many input plugins require and wait the file lock.
This plugin is constructed by many simple file buffer, so the consumer of this
plugin has many consumers of simple file buffer.
Since this is a priority multi-file-based buffer plugin, it doesn't guarantee
the order of its events."""
from __future__ import division
from __future__ import print_function
import itertools
import multiprocessing
import os
import shutil
from six import iteritems
from six.moves import xrange
import instalog_common # pylint: disable=unused-import
from instalog import json_utils
from instalog import lock_utils
from instalog import log_utils
from instalog import plugin_base
from instalog.plugins import buffer_file_common
from instalog.utils.arg_utils import Arg
from instalog.utils import file_utils
_PRIORITY_LEVEL = 4
_PARTITION = 4
_LOCK_ACQUIRE_TIMEOUT = 0.1
_LOCK_ACQUIRE_LOOP_TIMES = 25 # emit timeout in
# (_LOCK_ACQUIRE_LOOP_TIMES * _PARTITION *
# _LOCK_ACQUIRE_TIMEOUT) = 10sec
_PROCESSES_NUMBER = 10
_TEMPORARY_METADATA_DIR = 'metadata_tmp_dir'
_TEMPORARY_ATTACHMENT_DIR = 'attachments_tmp_dir'
_DEFAULT_TRUNCATE_INTERVAL = 0 # truncating disabled
_DEFAULT_COPY_ATTACHMENTS = False # use move instead of copy by default
_DEFAULT_ENABLE_FSYNC = True # fsync when it receives events
class BufferPriorityFile(plugin_base.BufferPlugin):
ARGS = [
Arg('truncate_interval', (int, float),
'How often truncating the buffer file should be attempted. '
'If set to 0, truncating functionality will be disabled (default).',
default=_DEFAULT_TRUNCATE_INTERVAL),
Arg('copy_attachments', bool,
'Instead of moving an attachment into the buffer, perform a copy '
'operation, and leave the source file intact.',
default=_DEFAULT_COPY_ATTACHMENTS),
Arg('enable_fsync', bool,
'Synchronize the buffer file when it receives events. '
'Default is True.',
default=_DEFAULT_ENABLE_FSYNC)
]
def __init__(self, *args, **kwargs):
self.buffer_file = [[[] for _unused_j in xrange(_PARTITION)]
for _unused_i in xrange(_PRIORITY_LEVEL)]
self.attachments_tmp_dir = None
self.metadata_tmp_dir = None
self.consumers = {}
self._file_num_lock = [None] * _PARTITION
self.process_pool = None
self._produce_partition = 0
self._consume_partition = 0
super(BufferPriorityFile, self).__init__(*args, **kwargs)
def SetUp(self):
"""Sets up the plugin."""
self.attachments_tmp_dir = os.path.join(self.GetDataDir(),
_TEMPORARY_ATTACHMENT_DIR)
# Remove the attachments tmp dir, if Instalog terminated last time.
if os.path.exists(self.attachments_tmp_dir):
shutil.rmtree(self.attachments_tmp_dir)
os.makedirs(self.attachments_tmp_dir)
self.metadata_tmp_dir = os.path.join(self.GetDataDir(),
_TEMPORARY_METADATA_DIR)
# Recover Temporary Metadata.
if os.path.isdir(self.metadata_tmp_dir):
for file_name in os.listdir(self.metadata_tmp_dir):
file_path = os.path.join(self.metadata_tmp_dir, file_name)
if os.path.isfile(file_path):
self.RecoverTemporaryMetadata(file_path)
else:
os.makedirs(self.metadata_tmp_dir)
for pri_level in xrange(_PRIORITY_LEVEL):
for file_num in xrange(_PARTITION):
self.buffer_file[pri_level][file_num] = buffer_file_common.BufferFile(
self.args,
self.logger.name,
os.path.join(self.GetDataDir(), '%d_%d' % (pri_level, file_num)))
for file_num in xrange(_PARTITION):
self._file_num_lock[file_num] = lock_utils.Lock(self.logger.name)
for name in self.buffer_file[0][0].consumers.keys():
self.consumers[name] = Consumer(name, self)
self.process_pool = multiprocessing.Pool(processes=_PROCESSES_NUMBER)
def TearDown(self):
"""Tears down the plugin."""
self.process_pool.close()
self.info('Joining the processes in the process pool')
self.process_pool.join()
self.info('Finished joining the processes')
def Main(self):
"""Main thread of the plugin."""
while not self.IsStopping():
if not self.args.truncate_interval:
# Truncating is disabled. But we should keep the main thread running,
# or else PluginSandbox will assume the plugin has crashed, and will
# take the plugin down.
# TODO(kitching): Consider altering PluginSandbox to allow Main to
# return some particular value which signifies "I am
# exiting of my own free will and I should be allowed to
# continue running normally."
self.Sleep(100)
continue
self.Truncate()
self.info('Truncating complete. Sleeping %d secs...',
self.args.truncate_interval / _PARTITION)
self.Sleep(self.args.truncate_interval / _PARTITION)
def ProduceOrderIter(self):
"""Returns a iterator to get produce order of partitioned buffers."""
first_level = self._produce_partition
self._produce_partition = (self._produce_partition + 1) % _PARTITION
return itertools.chain(xrange(first_level, _PARTITION),
xrange(0, first_level))
def ConsumeOrderIter(self):
"""Returns a iterator to get consume order of partitioned buffers."""
first_level = self._consume_partition
return itertools.chain(xrange(first_level, _PARTITION),
xrange(0, first_level))
def Truncate(self):
"""Truncates all data files to only contain unprocessed records."""
# A buffer can be truncated faster after it is consumed for a while.
file_num = self._consume_partition
self._consume_partition = (self._consume_partition + 1) % _PARTITION
with self._file_num_lock[file_num]:
for pri_level in xrange(_PRIORITY_LEVEL):
self.info('Truncating database %d_%d...', pri_level, file_num)
self.buffer_file[pri_level][file_num].Truncate(
process_pool=self.process_pool)
def EventLevel(self, event):
"""Prioritizes the level of the event.
Returns:
An integer of priority level.
"""
pri_levels = xrange(_PRIORITY_LEVEL)
pri = event.get('priority')
return pri if pri in pri_levels else _PRIORITY_LEVEL - 1
def PrioritizeEvents(self, events):
"""Prioritizes the list of events, and seperates to several lists.
Returns:
A list of several lists, and each list has events in its priority level.
"""
priority_events = [[] for _unused_i in xrange(_PRIORITY_LEVEL)]
for event in events:
priority_events[self.EventLevel(event)].append(event)
return priority_events
def SaveTemporaryMetadata(self, file_num):
"""Saves all metadatas to a temporary file, to prevent unexpected failures.
Returns:
The path of temporary metadata.
"""
# We didn't use file_utils.AtomicWrite since it create another file on
# self.metadata_tmp_dir.
with file_utils.UnopenedTemporaryFile() as tmp_path:
tmp_metadata_path = os.path.join(self.metadata_tmp_dir,
os.path.basename(tmp_path))
all_metadata = {}
for pri_level in xrange(_PRIORITY_LEVEL):
metadata_path = self.buffer_file[pri_level][file_num].metadata_path
if os.path.isfile(metadata_path):
all_metadata[metadata_path] = file_utils.ReadFile(metadata_path)
else:
all_metadata[metadata_path] = None
with open(tmp_path, 'w') as f:
f.write(json_utils.encoder.encode(all_metadata))
file_utils.AtomicCopy(tmp_path, tmp_metadata_path)
file_utils.SyncDirectory(self.metadata_tmp_dir)
return tmp_metadata_path
def RecoverTemporaryMetadata(self, tmp_metadata_path):
"""Recovers metadatas in the temporary file."""
all_metadata = json_utils.decoder.decode(
file_utils.ReadFile(tmp_metadata_path))
for path, metadata in iteritems(all_metadata):
self.info('Recover metadata: `%s` New: `%s` Old: `%s`', path, metadata,
file_utils.ReadFile(path) if os.path.exists(path) else 'None')
if metadata is None:
if os.path.isfile(path):
os.unlink(path)
else:
with file_utils.AtomicWrite(path) as f:
f.write(metadata)
os.unlink(tmp_metadata_path)
def AcquireLock(self):
for file_num in self.ProduceOrderIter():
if self._file_num_lock[file_num].acquire(block=False):
return file_num
for _unused_i in xrange(_LOCK_ACQUIRE_LOOP_TIMES):
for file_num in self.ProduceOrderIter():
if self._file_num_lock[file_num].acquire(
timeout=_LOCK_ACQUIRE_TIMEOUT):
return file_num
return False
def Produce(self, events):
"""See BufferPlugin.Produce.
Note the careful edge cases with attachment files. We want them *all* to
be either moved or copied into the buffer's database, or *none* at all.
"""
file_num = None
tmp_metadata_path = ''
with file_utils.TempDirectory(dir=self.attachments_tmp_dir) as tmp_dir:
try:
# Step 1: Copy attachments.
source_paths = []
for event in events:
for att_id, att_path in iteritems(event.attachments):
source_paths.append(att_path)
event.attachments[att_id] = os.path.join(
tmp_dir, att_path.replace('/', '_'))
if not self.process_pool.apply(
buffer_file_common.CopyAttachmentsToTempDir,
(source_paths, tmp_dir, self.logger.name)):
return False
# Step 2: Acquire a lock.
file_num = self.AcquireLock()
if file_num is None:
return False
tmp_metadata_path = self.SaveTemporaryMetadata(file_num)
priority_events = self.PrioritizeEvents(events)
# Step 3: Write the new events to the file.
for pri_level in xrange(_PRIORITY_LEVEL):
if not priority_events[pri_level]:
continue
self.buffer_file[pri_level][file_num].ProduceEvents(
priority_events[pri_level], self.process_pool)
# Step 4: Remove source attachment files if necessary.
if not self.args.copy_attachments:
for path in source_paths:
try:
os.unlink(path)
except Exception:
self.exception('One of source attachment files (%s) could not be '
'deleted; silently ignoring', path)
os.unlink(tmp_metadata_path)
return True
except Exception:
self.exception('Exception encountered in Produce')
try:
if os.path.isfile(tmp_metadata_path):
self.RecoverTemporaryMetadata(tmp_metadata_path)
if file_num is not None:
for pri_level in xrange(_PRIORITY_LEVEL):
self.buffer_file[pri_level][file_num].RestoreMetadata()
except Exception:
self.exception('Exception encountered in RecoverTemporaryMetadata '
'(%s)', tmp_metadata_path)
return False
finally:
if file_num is not None:
self._file_num_lock[file_num].CheckAndRelease()
def AddConsumer(self, name):
"""See BufferPlugin.AddConsumer."""
self.consumers[name] = Consumer(name, self)
for pri_level in xrange(_PRIORITY_LEVEL):
for file_num in xrange(_PARTITION):
self.buffer_file[pri_level][file_num].AddConsumer(name)
def RemoveConsumer(self, name):
"""See BufferPlugin.RemoveConsumer."""
for pri_level in xrange(_PRIORITY_LEVEL):
for file_num in xrange(_PARTITION):
self.buffer_file[pri_level][file_num].RemoveConsumer(name)
def ListConsumers(self, details=0):
"""See BufferPlugin.ListConsumers."""
consumers_dict = {}
progress_dict = {}
for name in self.consumers:
progress_dict[name] = {}
for pri_level in xrange(_PRIORITY_LEVEL):
progress_dict[name][pri_level] = {}
for file_num in xrange(_PARTITION):
progress_dict[name][pri_level][file_num] = (
self.buffer_file[pri_level][file_num].ListConsumers()[name])
if details >= 2:
consumers_dict['%s(%d-%d)' % (name, pri_level, file_num)] = (
progress_dict[name][pri_level][file_num])
progress_dict[name][pri_level] = tuple(
map(sum, list(zip(*progress_dict[name][pri_level].values()))))
if details == 1:
consumers_dict['%s(%d)' % (name, pri_level)] = (
progress_dict[name][pri_level])
progress_dict[name] = tuple(
map(sum, list(zip(*progress_dict[name].values()))))
if details <= 0:
consumers_dict[name] = progress_dict[name]
return consumers_dict
def Consume(self, name):
"""See BufferPlugin.Consume."""
return self.consumers[name].CreateStream()
class Consumer(log_utils.LoggerMixin, plugin_base.BufferEventStream):
"""Represents a Consumer and its BufferEventStream."""
def __init__(self, name, priority_buffer):
self.name = name
self.priority_buffer = priority_buffer
self.streams = []
self.streams_index = 0
def CreateStream(self):
"""Creates a BufferEventStream object to be used by Instalog core."""
fail = False
for pri_level in xrange(_PRIORITY_LEVEL):
for file_num in self.priority_buffer.ConsumeOrderIter():
self.streams.append(
self.priority_buffer.buffer_file[pri_level][file_num].consumers[
self.name].CreateStream())
if self.streams[-1] is None:
self.streams.pop()
fail = True
break
if fail:
break
if fail:
for stream in self.streams:
stream.Abort()
self.streams = []
return None
self.streams_index = 0
return self
def _Next(self):
"""Helper for Next."""
while self.streams_index < len(self.streams):
event = self.streams[self.streams_index].Next()
if event is not None:
return event
else:
self.streams_index += 1
return None
def Next(self):
"""See BufferEventStream.Next."""
event = self._Next()
if event is not None:
return event
# If the streams_index is the end, we should check all buffer file again.
self.streams_index = 0
# If there's no more event in any buffer file, we can return None now.
return self._Next()
def Commit(self):
"""See BufferEventStream.Commit."""
for stream in self.streams:
stream.Commit()
self.streams = []
def Abort(self):
"""See BufferEventStream.Abort."""
for stream in self.streams:
stream.Abort()
self.streams = []
if __name__ == '__main__':
plugin_base.main()