blob: 619208e5b49d8ef8a899a67eba97590b0b53a9a7 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import print_function
# Fix for bug b/30904731: Import _strptime manually. Otherwise,
# threads may initially raise the exception `AttributeError: _strptime`.
import _strptime # pylint: disable=unused-import
import logging
import os
import threading
import time
import instalog_common # pylint: disable=unused-import
from instalog import flow_policy
from instalog import json_utils
from instalog import plugin_base
from instalog import plugin_sandbox
# pylint: disable=no-name-in-module
from instalog.external.jsonrpclib import SimpleJSONRPCServer
# Possible daemon states.
STARTING = 'STARTING'
UP = 'UP'
STOPPING = 'STOPPING'
DOWN = 'DOWN'
class Instalog(plugin_sandbox.CoreAPI):
def __init__(self, node_id, data_dir, cli_hostname, cli_port, buffer_plugin,
input_plugins=None, output_plugins=None):
"""Constructor.
Args:
node_id: ID of this Instalog node.
data_dir: Path to Instalog's state directory. Plugin state directories
will be stored here.
cli_hostname: Hostname used for the CLI RPC server.
cli_port: Port used for the CLI RPC server.
buffer_plugin: Configuration dict for the buffer plugin. Keys should
consist of:
- plugin: Required, plugin module name.
- args: Optional, defines plugin arguments.
input_plugins: List of configuration dicts for input plugins.
Configuration dicts should be the same format as
that of buffer_plugin, with the addition of:
- targets: Optional, defines target plugins.
output_plugins: List of configuration dicts for output plugins.
- plugin: Required, plugin module name.
- args: Optional, defines plugin arguments.
- allow: Optional, defines flow policy allow rules.
- deny: Optional, defines flow policy deny rules.
- targets: Optional, defines target plugins.
"""
self._rpc_lock = threading.Lock()
self._state = DOWN
# Store the node ID.
self._node_id = node_id
# Ensure we have a working data directory.
self._data_dir = data_dir
# Create plugin sandboxes.
self._PreprocessConfigEntries(input_plugins, output_plugins)
self._buffer = self._ConfigEntryToSandbox(
plugin_base.BufferPlugin, 'buffer', buffer_plugin)
self._plugins = {}
self._plugins.update(self._ConfigEntriesToSandboxes(
plugin_base.InputPlugin, input_plugins))
self._plugins.update(self._ConfigEntriesToSandboxes(
plugin_base.OutputPlugin, output_plugins))
# Start the RPC server.
self._rpc_server = SimpleJSONRPCServer.SimpleJSONRPCServer(
(cli_hostname, cli_port))
self._rpc_server.register_function(self.IsUp)
self._rpc_server.register_function(self.Stop)
self._rpc_server.register_function(self.Inspect)
self._rpc_server.register_function(self.Flush)
self._rpc_server.register_function(self.GetAllProgress)
self._rpc_thread = threading.Thread(target=self._rpc_server.serve_forever)
self._rpc_thread.start()
def _ShutdownRPCServer(self):
def ShutdownThread():
self._rpc_server.shutdown()
self._rpc_server.server_close()
t = threading.Thread(target=ShutdownThread)
t.start()
def _PreprocessConfigEntries(self, input_plugins, output_plugins):
"""Preprocesses config entries to allow the "targets" argument."""
# Ensure that plugin IDs don't overlap across input and output.
if any([plugin_id in output_plugins for plugin_id in input_plugins]):
raise ValueError
# Next, convert 'targets' entries to corresponding allow policy rule.
for dct in [input_plugins, output_plugins]:
for plugin_id, plugin_config in dct.iteritems():
if 'targets' in plugin_config:
targets = plugin_config.pop('targets')
if not isinstance(targets, list):
targets = [targets]
for target in targets:
if target not in output_plugins:
raise plugin_base.ConfigError(
'Non-existent target output plugin ID `%s\' referenced in '
'plugin `%s\' config' % (target, plugin_id))
target_allow = output_plugins[target].setdefault('allow', [])
target_allow.append({'rule': 'history',
'plugin_id': plugin_id,
'position': -1})
# Ensure that all output plugins have at least one event source.
for plugin_id, plugin_config in output_plugins.iteritems():
if not plugin_config.get('allow'):
raise plugin_base.ConfigError(
'No plugin is targetting output plugin `%s\'. Please (1) disable '
'this plugin, (2) add allow/deny rules, or (3) configure '
'`targets\' of another plugin to point to it.' % plugin_id)
def _ConfigEntryToSandbox(self, superclass, plugin_id, config):
"""Parses configuration for a particular plugin entry.
Returns:
PluginSandbox object representing the plugin.
Raises:
ConfigError if the config dict does not include the plugin module to load.
"""
# The plugin type is included along with its configuration. Extract it.
if not isinstance(config, dict) or 'plugin' not in config:
raise plugin_base.ConfigError(
'Plugin %s must have a config dictionary which includes the key '
'`plugin` to specify which plugin module to load' % plugin_id)
plugin_type = config.pop('plugin')
allow = config.pop('allow', [])
deny = config.pop('deny', [])
args = config.pop('args', {})
# Disallow recursion by default. Any events emitted by a plugin should
# never be processed by that plugin again.
enable_recursion = config.pop('enable_recursion', False)
if config:
raise plugin_base.ConfigError(
'Plugin %s has extra arguments: %s' % (plugin_id, ', '.join(config)))
# Create FlowPolicy object.
policy = flow_policy.FlowPolicy(allow, deny)
if not enable_recursion:
policy.deny.append(
flow_policy.HistoryRule(plugin_id=plugin_id,
node_id=self._node_id))
# Make sure we have a store_path and data_dir for the plugin.
store_path = os.path.join(self._data_dir, '%s.json' % plugin_id)
data_dir = os.path.join(self._data_dir, plugin_id)
if not os.path.exists(data_dir):
os.makedirs(data_dir)
return plugin_sandbox.PluginSandbox(
plugin_type=plugin_type,
plugin_id=plugin_id,
superclass=superclass,
config=args,
policy=policy,
store_path=store_path,
data_dir=data_dir,
core_api=self)
def _ConfigEntriesToSandboxes(self, superclass, entries):
plugins = {}
for plugin_id, plugin_config in entries.iteritems():
# Parse this particular plugin entry and add to the _plugins map.
plugin_entry = self._ConfigEntryToSandbox(
superclass=superclass,
plugin_id=plugin_id,
config=plugin_config)
plugins[plugin_id] = plugin_entry
return plugins
def _StartBuffer(self):
self._buffer.Start(True)
self._SyncConsumerList()
def _SyncConsumerList(self):
"""Synchronizes consumer list with buffer."""
consumers = [plugin.plugin_id for plugin in self._plugins.values()
if plugin.GetSuperclass() is plugin_base.OutputPlugin]
buffer_consumers = self._buffer.CallPlugin('ListConsumers').keys()
logging.info('Syncing consumer lists')
logging.debug('Our consumer list: %s', consumers)
logging.debug('Buffer consumer list: %s', buffer_consumers)
for c in buffer_consumers:
if c not in consumers:
self._buffer.CallPlugin('RemoveConsumer', c)
for c in consumers:
if c not in buffer_consumers:
self._buffer.CallPlugin('AddConsumer', c)
def Run(self):
try:
self._state = STARTING
self._Start()
plugin_states = {}
for plugin in self._plugins.values():
plugin_states[plugin] = plugin.GetState()
while self._state not in (STOPPING, DOWN):
# If Instalog is just starting, check to see that all plugins have left
# the STARTING state. When this occurs, Instalog's state should change
# to UP.
if (self._state is STARTING and
all([state is not plugin_sandbox.STARTING
for state in plugin_states.values()])):
self._state = UP
for plugin in self._plugins.values():
plugin.AdvanceState()
if plugin_states[plugin] != plugin.GetState():
logging.info('Plugin %s changed state from %s to %s',
plugin.plugin_id, plugin_states[plugin],
plugin.GetState())
plugin_states[plugin] = plugin.GetState()
time.sleep(1)
except Exception as e:
logging.exception(e)
# In case there was some error in the Run function (exception or otherwise),
# call Stop synchronously at the end just in case.
self.Stop(True)
logging.warning('Stopped')
def _Start(self):
logging.info('Starting buffer...')
self._StartBuffer()
logging.info('Started buffer')
for plugin in self._plugins.values():
logging.info('Starting %s...', plugin.plugin_id)
plugin.Start()
for plugin in self._plugins.values():
plugin.AdvanceState(True)
logging.info('Started %s', plugin.plugin_id)
def IsUp(self):
with self._rpc_lock:
return self._state is UP
def Stop(self, sync=False):
"""Stops Instalog.
Args:
sync: If true, only returns when Instalog has stopped running.
"""
# If called in asynchronous mode, kick off a thread to perform the stop.
if not sync:
threading.Thread(target=self.Stop, args=(True,)).start()
return
with self._rpc_lock:
# If Instalog is still starting up, wait for it to finish.
while self._state is STARTING:
time.sleep(0.5)
# Check for _state here just in case of multiple Stop calls.
if self._state is STOPPING:
while self._state is not DOWN:
time.sleep(0.5)
if self._state is DOWN:
return
self._state = STOPPING
for plugin in self._plugins.values():
if plugin.IsLoaded():
logging.info('Stopping %s...', plugin.plugin_id)
plugin.Stop()
for plugin in self._plugins.values():
plugin.AdvanceState(True)
logging.info('Stopped %s', plugin.plugin_id)
logging.info('Stopping buffer...')
self._buffer.Stop(True)
logging.info('Stopped buffer')
self._state = DOWN
self._ShutdownRPCServer()
def Inspect(self, plugin_id, json_path):
with self._rpc_lock:
if plugin_id not in self._plugins:
return False, 'Plugin `%s\' not found' % plugin_id
try:
store_data = self._plugins[plugin_id].store
return True, json_utils.JSONEncoder().encode(
json_utils.WalkJSONPath(json_path, store_data))
except Exception as e:
return False, ('Error on inspect with JSON path `%s\': %s'
% (json_path, e.message))
def Flush(self, plugin_id, timeout):
with self._rpc_lock:
if plugin_id not in self._plugins:
return False, 'Plugin `%s\' not found' % plugin_id
plugin = self._plugins[plugin_id]
if plugin.GetSuperclass() is not plugin_base.OutputPlugin:
return False, ('Only output plugins can be flushed (tried: `%s\')'
% plugin_id)
if plugin.Flush(timeout, True):
progress = plugin.GetProgress()
return True, ('Flushed `%s\' successfully (%d / %d events)'
% (plugin_id, progress[0], progress[1]))
else:
progress = plugin.GetProgress()
return False, ('Flush for `%s\' failed within the specified timeout '
'(%d / %d events)'
% (plugin_id, progress[0], progress[1]))
def GetAllProgress(self, details=0):
return self._buffer.CallPlugin('ListConsumers', details)
############################################################
# Functions below implement plugin_base.CoreAPI.
############################################################
def Emit(self, plugin, events):
"""Emits given events from the specified plugin.
Args:
plugin: PluginSandbox object of plugin performing Emit.
events: List of events to be emitted.
Returns:
True if successful, False if any failure occurred.
Raises:
PluginCallError if Buffer fails unexpectedly.
"""
return self._buffer.CallPlugin('Produce', events)
def NewStream(self, plugin):
"""Creates a new BufferEventStream for the specified plugin.
Args:
plugin: PluginSandbox object requesting BufferEventStream.
Returns:
Object implementing plugin_base.BufferEventStream.
Raises:
PluginCallError if Buffer fails unexpectedly.
"""
return self._buffer.CallPlugin('Consume', plugin.plugin_id)
def GetProgress(self, plugin):
"""Returns the current progress through buffer for the specified plugin.
Args:
plugin: PluginSandbox object requesting BufferEventStream.
Returns:
A tuple (completed_count, total_count) representing how many Events have
been processed so far, and how many exist in total.
Raises:
PluginCallError if Buffer fails unexpectedly.
"""
return self._buffer.CallPlugin('ListConsumers')[plugin.plugin_id]
def GetNodeID(self):
"""Returns the ID of this Instalog node.
Returns:
A string representing the ID of this Instalog node.
"""
return self._node_id