blob: b5c699d3cfeedbd128c57654041cb6de60551398 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Instalog plugin sandbox.
Loads the plugin class instance (using plugin_loader), manages the plugin's
state, and implements PluginAPI functions for the plugin.
"""
from __future__ import print_function
import inspect
import logging
import os
import sys
import threading
import time
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import flow_policy
from instalog import json_utils
from instalog import log_utils
from instalog import plugin_base
from instalog import plugin_loader
from instalog.utils import debug_utils
from instalog.utils import file_utils
from instalog.utils import sync_utils
from instalog.utils import time_utils
from instalog.utils import type_utils
# The maximum number of unexpected accesses to store for debugging purposes.
# This is for both unittests and debugging purposes (assuming that the
# PluginSandbox instance can be accessed during runtime).
_UNEXPECTED_ACCESSES_MAX = 5
# Possible plugin states.
STARTING = 'STARTING'
UP = 'UP'
STOPPING = 'STOPPING'
FLUSHING = 'FLUSHING'
DOWN = 'DOWN'
PAUSING = 'PAUSING'
PAUSED = 'PAUSED'
UNPAUSING = 'UNPAUSING'
# TODO(kitching): Find a better home for this class definition.
class CoreAPI(object):
"""Defines the API a sandbox should use interact with Instalog core."""
def Emit(self, plugin, events):
"""See Core.Emit."""
raise NotImplementedError
def NewStream(self, plugin):
"""See Core.NewStream."""
raise NotImplementedError
def GetProgress(self, plugin):
"""See Core.GetProgress."""
raise NotImplementedError
def GetNodeID(self):
"""See Core.GetNodeID."""
raise NotImplementedError
class PluginSandbox(plugin_base.PluginAPI, log_utils.LoggerMixin):
"""Represents a running instance of a particular plugin.
Implementation for non-PluginAPI functions is not thread-safe. I.e., you
should not give multiple threads access to a PluginSandbox object, and run
Stop() and Pause() simultaneously. Bad things will happen. Plugins, however,
are expected to be able to run multiple threads, and run multiple PluginAPI
functions simultaneously. This is expected behaviour.
"""
# Different actions to take when a call is made into PluginAPI functions. See
# the _AskGatekeeper function.
_ALLOW = 'allow'
_WAIT = 'wait'
_ERROR = 'error'
# Commonly-used sets of Gatekeeper permissions.
_GATEKEEPER_ALLOW_ALL = {
STARTING: _ALLOW,
UP: _ALLOW,
STOPPING: _ALLOW,
FLUSHING: _ALLOW,
DOWN: _ERROR,
PAUSING: _ALLOW,
PAUSED: _ALLOW,
UNPAUSING: _ALLOW}
_GATEKEEPER_ALLOW_UP = {
STARTING: _WAIT,
UP: _ALLOW,
STOPPING: _WAIT,
FLUSHING: _ALLOW,
DOWN: _ERROR,
PAUSING: _WAIT,
PAUSED: _WAIT,
UNPAUSING: _WAIT}
_GATEKEEPER_ALLOW_UP_PAUSING_STOPPING = {
STARTING: _WAIT,
UP: _ALLOW,
STOPPING: _ALLOW,
FLUSHING: _ALLOW,
DOWN: _ERROR,
PAUSING: _ALLOW,
PAUSED: _WAIT,
UNPAUSING: _WAIT}
def __init__(self, plugin_type, plugin_id=None, superclass=None, config=None,
policy=None, store_path=None, data_dir=None, core_api=None,
_plugin_class=None):
"""Initializes the PluginSandbox.
Args:
plugin_type: The plugin type of this entry. Corresponds to the filename
of the plugin.
plugin_id: The unique identifier of this plugin entry. One plugin type
may have multiple plugin entries with different IDs. If
unspecified, will default to the same as plugin_type.
superclass: The superclass of this plugin. Can be one of:
BufferPlugin, InputPlugin, OutputPlugin. If unspecified,
will allow any of the three types to be created.
config: Configuration dict of the plugin entry. Defaults to an empty
dict.
policy: FlowPolicy object describing the allow/deny policy of this
plugin.
store_path: Path to this plugin's data store file.
data_dir: Path to the the data directory of this plugin.
core_api: Reference to an object that implements CoreAPI, usually Core.
Defaults to an instance of the CoreAPI interface, which will
throw NotImplementedError when any method is called. This may
be acceptible for testing.
_plugin_class: A "pre-loaded" plugin class for the plugin in question.
If provided, the module "loading" and "unloading" steps are
skipped, and the plugin class is directly initialized. For
testing purposes.
"""
self.plugin_type = plugin_type
self.plugin_id = plugin_id or plugin_type
self.config = config or {}
# Allow all events by default (usually used by run_plugin or testing).
self._policy = policy or flow_policy.FlowPolicy(allow=[{'rule': 'all'}])
self._store_path = store_path
if self._store_path:
self.store = self._LoadStore(self._store_path)
else:
self.store = {}
self._data_dir = data_dir
self._core_api = core_api or CoreAPI()
if not isinstance(self._core_api, CoreAPI):
raise TypeError('Invalid CoreAPI object provided')
# Create a logger this class to use.
self.logger = logging.getLogger('%s.plugin_sandbox' % self.plugin_id)
self._loader = plugin_loader.PluginLoader(
self.plugin_type, plugin_id=self.plugin_id,
superclass=superclass, config=self.config, store=self.store,
plugin_api=self, _plugin_class=_plugin_class)
self._plugin = None
self._state = DOWN
self._event_stream_map = {}
# Store the target processed event count and timeout for FLUSHING state.
self._flushing_target = None
self._flushing_timeout = None
# Store information about the last _UNEXPECTED_ACCESSES_MAX unexpected
# accesses.
self._unexpected_accesses = []
# Store the last exception caused by SetUp, Main or TearDown.
self._last_exception = None
self._setup_thread = None
self._main_thread = None
self._teardown_thread = None
def __repr__(self):
"""Implements repr function for debugging."""
return ('PluginSandbox(%s, state=%s)'
% (self.plugin_id, self._state))
def _LoadStore(self, store_path):
"""Loads the data store dictionary from disk.
Only used when the plugin is first initialized.
"""
if not os.path.isfile(store_path):
return {}
with open(store_path) as f:
return json_utils.JSONDecoder().decode(f.read())
def GetSuperclass(self):
"""Get the superclass of the plugin class.
Returns:
None if _plugin_class is not specified and GetClass() has not yet been
run. Afterwards, one of BufferPlugin, InputPlugin, or OutputPlugin.
"""
return self._loader.GetSuperclass()
def CallPlugin(self, method_name, *args, **kwargs):
"""Safely calls a method of the plugin instance.
Args:
method_name: Name of the method being called (string).
allowed_exceptions: A list of exceptions that the plugin is expected to
raise. These exceptions will be directly raised back
to the caller unmodified.
Returns:
The value returned by the called plugin method.
Raises:
PluginCallError if the plugin raises any unexpected exceptions.
Any exception in allowed_exceptions may also be raised.
"""
# TODO(kitching): Test this in unittest.
# TODO(kitching): Figure out what to do in the case when a
# BufferEventStream is returned.
allowed_exceptions = tuple(kwargs.pop('allowed_exceptions', ()))
try:
ret = getattr(self._plugin, method_name)(*args, **kwargs)
except allowed_exceptions: # pylint: disable=catching-non-exception
raise
except Exception:
_, exc, tb = sys.exc_info()
exc_message = '%s: %s' % (exc.__class__.__name__, str(exc))
new_exc = plugin_base.PluginCallError(
'Plugin call for %s unexpectedly failed: %s'
% (self.plugin_id, exc_message))
raise new_exc.__class__, new_exc, tb
return ret
def _RecordUnexpectedAccess(self, plugin_ref, caller_name, stack):
"""Record an unexpected access from the plugin (i.e. in a stopped state).
At most _UNEXPECTED_ACCESSES_MAX entries are stored in
self._unexpected_accesses for debugging purposes. This function is not
thread-safe, so it is possible that unexpected accesses may be inserted
out-of-order, or more than _UNEXPECTED_ACCESSES_MAX entries will be removed
in the while loop.
"""
self._unexpected_accesses.insert(0, {
'caller_name': caller_name,
'plugin_id': self.plugin_id,
'plugin_ref': plugin_ref,
'plugin_type': self.plugin_type,
'stack': stack,
'state': self._state,
'timestamp': time.time()})
while len(self._unexpected_accesses) > _UNEXPECTED_ACCESSES_MAX:
self._unexpected_accesses.pop()
def _AskGatekeeper(self, plugin, state_map):
"""Ensure a plugin is properly registered and in the correct state.
Args:
plugin: The plugin that has made the call to core.
state_map: A map of states to their actions. Actions can be one of:
self._ALLOW, self._WAIT, self._ERROR.
Raises:
WaitException if the plugin is currently unable to perform the
requested operation (action is self._WAIT).
UnexpectedAccess if the plugin instance is in some unexpected state and
is trying to access core functionality that it should not
(action is self._ERROR).
"""
caller_name = debug_utils.GetCallerName()
self.debug('_AskGatekeeper for plugin %s (%s) on function %s',
self.plugin_id, self._state, caller_name)
# Ensure that the plugin instance is currently registered. If the plugin
# has previously been restarted, and some remaining threads are still
# attempting to access core, we need to record the access for debugging
# purposes.
if plugin is not self._plugin:
self._RecordUnexpectedAccess(plugin, caller_name, inspect.stack())
self.critical(
'Plugin %s (%s) called core %s: Unexpected plugin instance',
self.plugin_id, self._state, caller_name)
raise plugin_base.UnexpectedAccess
# Map the plugin's state to our action (default self._ERROR).
action = state_map.get(self._state, self._ERROR)
if action is self._WAIT:
self.info(
'Plugin %s (%s) called core %s: Currently in a paused state',
self.plugin_id, self._state, caller_name)
raise plugin_base.WaitException
if action is self._ERROR:
self._RecordUnexpectedAccess(plugin, caller_name, inspect.stack())
self.info(
'Plugin %s (%s) called core %s: Unexpected access',
self.plugin_id, self._state, caller_name)
raise plugin_base.UnexpectedAccess
def _CheckStateCommand(self, allowed_states):
"""Checks to see whether a state command may be run.
Args:
allowed_states: A list of allowed states for this state command.
Raises:
StateCommandError if not allowed to use the given transition state
command.
"""
if not isinstance(allowed_states, list):
allowed_states = [allowed_states]
caller_name = debug_utils.GetCallerName()
self.debug(
'_CheckStateCommand for plugin %s (%s) on function %s',
self.plugin_id, self._state, caller_name)
if self._state not in allowed_states:
raise plugin_base.StateCommandError(
'Plugin %s (%s) called %s, but only allowed for %s'
% (self.plugin_id, self._state, caller_name, allowed_states))
def GetState(self):
"""Returns the current state of the plugin."""
self.debug('GetState called: %s', self._state)
return self._state
def GetProgress(self):
"""Returns the current progress through buffer for the specified plugin.
Args:
plugin: PluginSandbox object requesting BufferEventStream.
Returns:
A tuple (completed_count, total_count) representing how many Events have
been processed so far, and how many exist in total.
"""
return self._core_api.GetProgress(self)
def IsLoaded(self):
"""Returns whether the plugin is currently loaded (not DOWN)."""
self.debug('IsLoaded called: %s', self._state)
return self._state is not DOWN
def _Load(self):
"""Asks the PluginLoader factory to give us a new plugin instance."""
assert self._plugin is None
self._plugin = self._loader.Create()
def Start(self, sync=False):
"""Starts the plugin."""
self._CheckStateCommand(DOWN)
self._Load()
self._state = STARTING
if sync:
self.AdvanceState(sync)
def Stop(self, sync=False):
"""Stops the plugin."""
self._CheckStateCommand([UP, PAUSED])
self._state = STOPPING
if sync:
self.AdvanceState(sync)
def Flush(self, timeout, sync=False):
"""Flushes the plugin.
Returns:
If the sync argument is True, Flush will run asynchronously. True or
False will be returned depending on whether the sync succeeded within the
specified timeout.
"""
self._CheckStateCommand([UP, PAUSED])
# Prepare flushing_timeout and flushing_target for AdvanceState.
self._flushing_timeout = time_utils.MonotonicTime() + timeout
unused_completed_count, flushing_target = self.GetProgress()
self._flushing_target = flushing_target
self._state = FLUSHING
if sync:
self.AdvanceState(sync)
# Check to see if the flushing target has been surpassed.
current_count, unused_total_count = self.GetProgress()
return current_count >= flushing_target
def Pause(self, sync=False):
"""Pauses the plugin."""
self._CheckStateCommand(UP)
self._state = PAUSING
if sync:
self.AdvanceState(sync)
def Unpause(self, sync=False):
"""Unpauses the plugin."""
self._CheckStateCommand(PAUSED)
self._state = UNPAUSING
if sync:
self.AdvanceState(sync)
def TogglePause(self, sync=False):
"""Toggles the paused state on the plugin."""
self._CheckStateCommand([UP, PAUSED])
if self._state is UP:
self.Pause(sync)
elif self._state is PAUSED:
self.Unpause(sync)
def AdvanceState(self, sync=False):
"""Runs state machine transitions.
Needs an external thread to periodically run AdvanceState to run any pending
actions to take the plugin into its next requested state. For example, if
the state has been set to STOPPING, AdvanceState takes care of running the
appropriate actions and taking the plugin into the STOPPED state.
Args:
sync: Whether or not the call should be synchronous. E.g. if the state
has been set to STOPPING, AdvanceState won't return until the plugin
has been stopped.
"""
# TODO(kitching): Test SpawnFn and exception handling in unittest.
def SpawnFn(fn, sync=False):
"""Spawns a function in a thread and captures any exceptions thrown.
If we just let an exception go by uncaptured, it would be displayed to
stdout, but would go uncaptured by logging (which means only running
Instalog in the foreground would show the exception). Additionally, we
wouldn't have any way of when knowing the plugin encountered some failure.
Instead, we wrap calls to the plugin and capture exceptions, logging them
without re-raising. The last exception is saved into self._last_exception
for further processing in the next call to AdvanceState.
"""
def RunAndCaptureException(fn):
try:
fn()
except Exception as e:
self._last_exception = e
self.exception('Exception caused by %s', fn.__name__)
t = threading.Thread(target=RunAndCaptureException, args=(fn,))
t.start()
if sync:
t.join()
return t
# Check for the existence of self._last_exception, which denotes that the
# plugin thread spawned by SpawnFn encountered an error. Deal with the
# error appropriately.
if self._last_exception:
self.debug('AdvanceState last_exception exists')
self._last_exception = None
if self._state is DOWN:
self.error('Exception occurred, current state is DOWN')
else:
self.error('Exception occurred, forcing state to STOPPING')
self._state = STOPPING
# If we are in a stage where the main thread should be running, but it has
# stopped, something must have gone wrong. Force the plugin into a
# STOPPING state.
# TODO(kitching): Come up with a better way of differentiating plugins which
# define Main, and those which do not.
if (self._state in (UP, PAUSING, PAUSED) and
'Main' in self._plugin.__class__.__dict__ and
not self._main_thread.is_alive()):
self.debug('AdvanceState unexpected main thread dead')
self.error('Main thread died unexpectedly, '
'forcing state to STOPPING')
self._state = STOPPING
if self._state is STARTING:
self.debug('AdvanceState on STARTING')
if not self._setup_thread:
self._setup_thread = SpawnFn(self._plugin.SetUp, sync)
if self._setup_thread and not self._setup_thread.is_alive():
self._setup_thread = None
self._main_thread = SpawnFn(self._plugin.Main)
self._state = UP
elif self._state is STOPPING:
self.debug('AdvanceState on STOPPING')
if self._main_thread and sync:
self._main_thread.join()
if self._main_thread and not self._main_thread.is_alive():
self._main_thread = None
self._teardown_thread = SpawnFn(self._plugin.TearDown, sync)
if self._teardown_thread and not self._teardown_thread.is_alive():
self._teardown_thread = None
self._plugin = None
self._state = DOWN
elif self._state is FLUSHING:
self.debug('AdvanceState on FLUSHING')
if not self._flushing_target or not self._flushing_timeout:
self._flushing_target = None
self._flushing_timeout = None
self._state = UP
flushing_target = self._flushing_target
flushing_timeout = self._flushing_timeout
def FlushingTargetReached():
current_count, unused_completed_count = self.GetProgress()
return current_count >= flushing_target
if sync:
try:
sync_utils.WaitFor(
condition=FlushingTargetReached,
timeout_secs=flushing_timeout - time_utils.MonotonicTime() + 0.5,
poll_interval=0.5)
except type_utils.TimeoutError:
pass
if (FlushingTargetReached() or
time_utils.MonotonicTime() >= flushing_timeout):
self._flushing_target = None
self._flushing_timeout = None
self._state = UP
elif self._state is PAUSING:
self.debug('AdvanceState on PAUSING')
if not self._event_stream_map:
self._state = PAUSED
elif self._state is UNPAUSING:
self.debug('AdvanceState on UNPAUSING')
self._state = UP
############################################################
# Functions below implement plugin_base.PluginAPI.
############################################################
def SaveStore(self, plugin):
"""See PluginAPI.SaveStore."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_ALL)
self.debug('SaveStore called with state=%s', self._state)
with file_utils.AtomicWrite(self._store_path) as f:
f.write(json_utils.JSONEncoder().encode(self.store))
def GetDataDir(self, plugin):
"""See PluginAPI.GetDataDir."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_ALL)
self.debug('GetDataDir called with state=%s', self._state)
return self._data_dir
def GetNodeID(self, plugin):
"""See PluginAPI.GetNodeID."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_ALL)
self.debug('GetNodeID called with state=%s', self._state)
return self._core_api.GetNodeID()
def IsStopping(self, plugin):
"""See PluginAPI.IsStopping."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_ALL)
self.debug('IsStopping called with state=%s', self._state)
return self._state is STOPPING
def IsFlushing(self, plugin):
"""See PluginAPI.IsFlushing."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_ALL)
self.debug('IsFlushing called with state=%s', self._state)
if self._state is not FLUSHING:
return False
# Flushing may have already completed, despite the state not having left
# FLUSHING yet. Check on the flushing target manually.
flushing_target = self._flushing_target
if flushing_target:
completed_count, unused_total_count = self.GetProgress()
if completed_count >= flushing_target:
return False
return True
def Emit(self, plugin, events):
"""See PluginAPI.Emit."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP)
self.debug('Emit called with state=%s', self._state)
# TODO(kitching): Relocate the ProcessStage annotation into Core.
process_stage = datatypes.ProcessStage(
node_id=self._core_api.GetNodeID(),
time=time.time(),
plugin_id=self.plugin_id,
plugin_type=self.plugin_type,
target=datatypes.ProcessStage.BUFFER)
for event in events:
# Add the current step in this event's processing history.
event.AppendStage(process_stage)
return self._core_api.Emit(self, events)
def NewStream(self, plugin):
"""See PluginAPI.NewStream."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP_PAUSING_STOPPING)
self.debug('NewStream called with state=%s', self._state)
buffer_stream = self._core_api.NewStream(self)
plugin_stream = datatypes.EventStream(plugin, self)
self._event_stream_map[plugin_stream] = buffer_stream
return plugin_stream
def EventStreamNext(self, plugin, plugin_stream, timeout=1):
"""See PluginAPI.EventStreamNext."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP)
self.debug('EventStreamNext called with state=%s', self._state)
if plugin_stream not in self._event_stream_map:
raise plugin_base.UnexpectedAccess
ret = self._NextMatchingEvent(plugin_stream, timeout)
if ret:
# TODO(kitching): Relocate the ProcessStage annotation into Core.
process_stage = datatypes.ProcessStage(
node_id=self._core_api.GetNodeID(),
time=time.time(),
plugin_id=self.plugin_id,
plugin_type=self.plugin_type,
target=datatypes.ProcessStage.EXTERNAL)
ret.AppendStage(process_stage)
return ret
def _NextMatchingEvent(self, plugin_stream, timeout):
"""Retrieves the next event matching the plugin's FlowPolicy.
Args:
plugin_stream: A stream of events for an output plugin to process.
timeout: Seconds to wait for retrieving next event.
Returns:
None if timeout or no events are available.
"""
try:
def CheckEvent(event):
return event is None or self._policy.MatchEvent(event)
return sync_utils.PollForCondition(
poll_method=self._event_stream_map[plugin_stream].Next,
condition_method=CheckEvent,
timeout_secs=timeout,
poll_interval_secs=0)
except type_utils.TimeoutError:
return None
def EventStreamCommit(self, plugin, plugin_stream):
"""See PluginAPI.EventStreamCommit."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP_PAUSING_STOPPING)
self.debug('EventStreamCommit called with state=%s', self._state)
self._RecordUnexpectedAccess(plugin, 'EventStreamAbort', inspect.stack())
if plugin_stream not in self._event_stream_map:
raise plugin_base.UnexpectedAccess
return self._event_stream_map.pop(plugin_stream).Commit()
def EventStreamAbort(self, plugin, plugin_stream):
"""See PluginAPI.EventStreamAbort."""
# TODO(kitching): Test in unittest.
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP_PAUSING_STOPPING)
self.debug('EventStreamAbort called with state=%s', self._state)
self._RecordUnexpectedAccess(plugin, 'EventStreamAbort', inspect.stack())
if plugin_stream not in self._event_stream_map:
raise plugin_base.UnexpectedAccess
# If no events were processed, use Commit() instead of Abort(). This
# accounts for the case where all events were skipped because of the
# FlowPolicy. If no "valid" events are ever encountered, the plugin's
# consumer will never advance through the buffer, which could cause it to
# grow without the possibility of truncation. Thus we force Commit() to
# make sure any events "hidden" by the FlowPolicy are committed.
if plugin_stream.GetCount() == 0:
return self._event_stream_map.pop(plugin_stream).Commit()
return self._event_stream_map.pop(plugin_stream).Abort()