blob: ff2ab5543110d135bcc693480342a17d016567a2 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Instalog plugin base.
Defines plugin classes (buffer, input, output), and a PluginAPI interface for
plugins to access.
"""
import inspect
import os
import time
import instalog_common # pylint: disable=W0611
from instalog import json_utils
from instalog import log_utils
from instalog.utils import arg_utils
from instalog.utils import file_utils
from instalog.utils import time_utils
class LoadPluginError(Exception):
"""The plugin encountered an error while loading."""
pass
class WaitException(Exception):
"""The plugin currently cannot perform the requested operation."""
pass
class UnexpectedAccess(Exception):
"""The plugin is accessing data when it should be stopped."""
pass
class StateCommandError(Exception):
"""A state command on the plugin sandbox could not be run."""
pass
class EventStreamExpired(Exception):
"""The event stream in question is expired and can no longer be used."""
pass
class PluginCallError(Exception):
"""An error occurred when calling a method on the plugin instance."""
pass
class ConfigError(Exception):
"""An error occurred when loading the config file."""
pass
class PluginAPI(object):
"""Defines an interface for plugins to call."""
def SaveStore(self, plugin):
"""See Plugin.SaveStore."""
raise NotImplementedError
def GetDataDir(self, plugin):
"""See Plugin.GetDataDir."""
raise NotImplementedError
def IsStopping(self, plugin):
"""See Plugin.IsStopping."""
raise NotImplementedError
def IsFlushing(self, plugin):
"""See Plugin.IsStopping."""
raise NotImplementedError
def Emit(self, plugin, events):
"""See InputPlugin.Emit."""
raise NotImplementedError
def NewStream(self, plugin):
"""See OutputPlugin.NewStream."""
raise NotImplementedError
def EventStreamNext(self, plugin, plugin_stream):
"""See BufferEventStream.Next."""
raise NotImplementedError
def EventStreamCommit(self, plugin, plugin_stream):
"""See BufferEventStream.Commit."""
raise NotImplementedError
def EventStreamAbort(self, plugin, plugin_stream):
"""See BufferEventStream.Abort."""
raise NotImplementedError
class Plugin(log_utils.LoggerMixin, object):
"""Base class for a buffer plugin, input plugin, or output plugin in Instalog.
This is a base class for BufferPlugin, InputPlugin and OutputPlugin. Plugins
should subclass from these three classes.
This base class processes plugin arguments set through the ARGS variable, and
sets some shortcut functions to the logger.
"""
def __init__(self, config, logger, store, plugin_api):
"""Plugin constructor.
Args:
config: A dictionary representing arguments for this plugin. Will be
validated against the specification in ARGS.
logger: A reference to the logger for this plugin instance.
store: A reference to the plugin's store dictionary.
plugin_api: An instance of a class implementing PluginAPI.
Raises:
arg_utils.ArgError if the arguments fail to validate.
"""
# Try parsing the arguments according to the spec in ARGS.
arg_spec = getattr(self, 'ARGS', [])
setattr(self, 'args', arg_utils.Args(*arg_spec).Parse(config))
# log_utils.LoggerMixin creates shortcut functions for convenience.
self.logger = logger
# Plugin data store dictionary.
self.store = store
# Save the core API to a private instance variable.
self._plugin_api = plugin_api
def SetUp(self):
"""Sets up any connections or threads needed.
This function should return to the caller after the plugin has been
initialized.
"""
return
def Main(self):
"""Main thread of the plugin, started by Instalog.
Should regularly check self.IsStopping(). In the case that IsStopping()
returns True, this thread should complete execution as soon as possible.
"""
return
def TearDown(self):
"""Shuts down any extra threads and connections used by the plugin.
This function should only return to the caller after all threads and
extra processes used by the plugin have stopped.
"""
return
def SaveStore(self):
"""Saves the data store dictionary to disk.
Plugins may make many updates to the store (inefficient to write on every
change), or might only want to write it to disk in certain situations to
ensure atomicity. Thus the action of saving the store is exposed for the
plugin to handle.
"""
return self._plugin_api.SaveStore(self)
def GetDataDir(self):
"""Returns the data directory of this plugin.
This directory is set aside by Instalog core for the plugin to store any
data. Its value can be expected to be consistent across plugin restarts or
Instalog restarts.
Raises:
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not.
"""
return self._plugin_api.GetDataDir(self)
def IsStopping(self):
"""Returns whether or not the plugin may continue running.
If True is returned, the plugin should continue running as usual. If False
is returned, the plugin should shut down as soon as it finishes its work.
Should be checked regularly in the Main thread, as well as any other threads
started by the plugin.
Raises:
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not.
"""
return self._plugin_api.IsStopping(self)
def IsFlushing(self):
"""Returns whether or not the plugin is flushing.
If True is returned, the plugin should continue running as usual. If False
is returned, the plugin should process any remaining data, and not wait for
further data to be included in the current "batch".
Raises:
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not.
"""
return self._plugin_api.IsFlushing(self)
def Sleep(self, secs):
"""Suspends execution of the current thread for the given number of seconds.
When a plugin is requested to stop, it might be in the middle of a
time.sleep call. This provides an alternative sleep function, which will
return immediately when a plugin changes to the STOPPING state.
Should typically be used at the end of an iteration of a plugin's Main
while loop. For example:
while not self.IsStopping():
# ... do some work ...
self.Sleep(self.args.interval)
"""
end_time = time_utils.MonotonicTime() + secs
while (time_utils.MonotonicTime() < end_time and
(not self.IsStopping() and not self.IsFlushing())):
time.sleep(min(1, secs))
class BufferPlugin(Plugin):
"""Base class for a buffer plugin in Instalog."""
def AddConsumer(self, consumer_id):
"""Subscribes the specified consumer ID to the buffer.
Args:
consumer_id: Unique identifier of the consumer being added.
"""
raise NotImplementedError
def RemoveConsumer(self, consumer_id):
"""Unsubscribes the specified consumer ID from the buffer.
Args:
consumer_id: Unique identifier of the consumer being removed.
"""
raise NotImplementedError
def ListConsumers(self):
"""Returns information about consumers subscribed to the buffer.
Returns:
A dictionary, where keys are consumer IDs, and values are tuples
of (completed_count, total_count) representing progress through
Event processing.
"""
raise NotImplementedError
def Produce(self, events):
"""Produces events to be stored into the buffer.
Args:
events: List of Event objects to be inserted into the buffer.
Returns:
True if successful, False otherwise.
"""
raise NotImplementedError
def Consume(self, consumer_id):
"""Returns a BufferEventStream to consume events from the buffer.
Args:
consumer_id: ID of the consumer for which to create a BufferEventStream.
Returns:
True if successful, False otherwise.
"""
raise NotImplementedError
class BufferEventStream(object):
"""Event stream interface that a buffer needs to implement.
Objects implementing BufferEventStream should be returned when the buffer
plugin's Consume method is called.
"""
def Next(self):
"""Returns the next available Event."""
raise NotImplementedError
def Commit(self):
"""Marks this batch of Events as successfully processed.
Marks this BufferEventStream as expired.
Raises:
EventStreamExpired if this BufferEventStream is expired.
"""
raise NotImplementedError
def Abort(self):
"""Aborts processing this batch of Events.
Marks this BufferEventStream as expired. This BufferEventStream's Events
will still be returned on subsequent Next calls from other BufferEventStream
objects.
Raises:
EventStreamExpired if this BufferEventStream is expired.
"""
raise NotImplementedError
class InputPlugin(Plugin):
"""Base class for an input plugin in Instalog."""
def Emit(self, events):
"""Emits a set of Event objects to be passed to Instalog's buffer.
Args:
events: Either a single Event or a list of Event objects to be emitted.
Returns:
True on success, False on failure. In either case, the plugin is
expected to deal appropriately with retrying, or letting its source know
that a failure occurred.
Raises:
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not.
"""
try:
return self._plugin_api.Emit(self, events)
except WaitException:
return False
class OutputPlugin(InputPlugin):
"""Base class for an output plugin in Instalog.
An output plugin may also Emit events, thus OutputPlugin inherits from
InputPlugin as its parent class.
"""
def NewStream(self):
"""Gets a new EventStream object to retrieve output events.
Returns:
An EventStream object (see datatypes module). None if we currently do not
have permission to create a new EventStream object (i.e. plugin is not in
one of the allowed states), or if the data the buffer would need to access
in the EventStream is currently unavailable.
Raises:
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not.
"""
try:
return self._plugin_api.NewStream(self)
except WaitException:
return None
def main():
"""Runs plugins as executables.
Forwards main to plugin_sandbox module. Plugins can enable their executable
bit, and include the following __main__ snippet at the bottom in order to
provide self-running abilities for test purposes:
if __name__ == '__main__':
plugin_sandbox.main()
See plugin_sandbox.main for more details.
"""
frame_info = inspect.stack()[1]
plugin_type = os.path.splitext(os.path.basename(frame_info[1]))[0]
from instalog import run_plugin
run_plugin.main(plugin_type)