blob: 024011f83744b251576caffbc0b1d25da073bebf [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Minijack is a real-time log converter for on-site factory log analysis.
It runs in the same device of the shopfloor service and keeps monitoring
the event log directory. When new logs come, it converts these event logs
and dumps them to a database, such that factory engineers can easily analyse
these logs using SQL queries.
This file starts a Minijack process which services forever until an user
presses Ctrl-C to terminate it. To use it, invoke as a standalone program:
./main.py [options]
"""
import logging
import multiprocessing
import optparse
import os
import signal
import sys
import time
import factory_common # pylint: disable=W0611
from cros.factory.test import utils
import minijack_common # pylint: disable=W0611
import db
from datatypes import EventPacket
from workers import FileScanner, IdentityWorker, EventLoadingWorker
SHOPFLOOR_DATA_DIR = 'shopfloor_data'
EVENT_LOG_DB_FILE = 'event_log_db'
MINIJACK_DB_FILE = 'minijack_db'
DEFAULT_WATCH_INTERVAL = 30 # seconds
DEFAULT_JOB_NUMBER = 6
DEFAULT_QUEUE_SIZE = 10
DEFAULT_NUM_LOG_PER_CALLBACK = 10
class EventSinker(object):
"""Event Sinker which invokes the proper exporters to sink events to database.
TODO(waihong): Unit tests.
Properties:
_database: The database object.
_all_exporters: A list of all registered exporters.
_event_invokers: A dict of lists, where the event id as key and the list
of handler functions as value.
"""
def __init__(self, database):
self._database = database
self._all_exporters = []
self._event_invokers = {}
self.RegisterDefaultExporters()
def RegisterDefaultExporters(self):
"""Registers the default exporters."""
# Find all exporter modules named xxx_exporter.
exporter_pkg = __import__('exporters')
for exporter_name in dir(exporter_pkg):
if exporter_name.endswith('_exporter'):
exporter_module = getattr(exporter_pkg, exporter_name)
# Class name conversion: XxxExporter.
class_name = ''.join([s.capitalize() for s in exporter_name.split('_')])
exporter_class = getattr(exporter_module, class_name)
exporter = exporter_class(self._database)
# Register the exporter instance.
self.RegisterExporter(exporter)
def RegisterExporter(self, exporter):
"""Registers a exporter object."""
logging.debug('Register the exporter: %s', exporter)
self._all_exporters.append(exporter)
# Search all Handle_xxx() methods in the exporter instance.
for handler_name in dir(exporter):
if handler_name.startswith('Handle_'):
event_id = handler_name.split('_', 1)[1]
# Create a new list if not present.
if event_id not in self._event_invokers:
self._event_invokers[event_id] = []
# Add the handler function to the list.
handler_func = getattr(exporter, handler_name)
self._event_invokers[event_id].append(handler_func)
logging.debug('Call the setup method of the exporter: %s', exporter)
exporter.Setup()
def SinkEventStream(self, stream):
"""Sinks the given event stream."""
start_time = time.time()
for event in stream:
packet = EventPacket(stream.metadata, stream.preamble, event)
self.SinkEventPacket(packet)
logging.info('Sinked to database (%s, %d events, %.3f sec)',
stream.metadata.get('log_name'),
len(stream),
time.time() - start_time)
def SinkEventPacket(self, packet):
"""Sinks the given event packet."""
# Event id 'all' is a special case, which means the handlers accepts
# all kinds of events.
for event_id in ('all', packet.event['EVENT']):
invokers = self._event_invokers.get(event_id, [])
for invoker in invokers:
try:
invoker(packet)
except: # pylint: disable=W0702
logging.exception('Error on invoking the exporter: %s',
utils.FormatExceptionOnly())
class Minijack(object):
"""The main Minijack flow.
TODO(waihong): Unit tests.
Properties:
_database: The database object.
_file_scanner: The file scanner object.
_worker_processes: A list of worker processes.
_event_blob_queue: The queue storing event blobs.
_event_stream_queue: The queue storing event streams.
"""
def __init__(self):
self._database = None
self._file_scanner = None
self._worker_processes = []
self._event_blob_queue = None
self._event_stream_queue = None
def Init(self):
"""Initializes Minijack."""
# Ignore Ctrl-C for all processes. The main process will be changed later.
# We don't want Ctrl-C to break the sub-process works. The terminations of
# sub-processes are controlled by the main process.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Pick the default event log dir depending on factory run or chroot run.
event_log_dir = SHOPFLOOR_DATA_DIR
if not os.path.exists(event_log_dir) and (
'CROS_WORKON_SRCROOT' in os.environ):
event_log_dir = os.path.join(
os.environ['CROS_WORKON_SRCROOT'],
'src', 'platform', 'factory', 'shopfloor_data')
# TODO(waihong): Add more options for customization.
# TODO(waihong): Use hacked_argparse.py which is helpful for args parsing.
parser = optparse.OptionParser()
parser.add_option('--event_log_dir', dest='event_log_dir', type='string',
metavar='PATH', default=event_log_dir,
help='path of the event log dir (default: %default)')
parser.add_option('--event_log_db', dest='event_log_db', type='string',
metavar='PATH', default=EVENT_LOG_DB_FILE,
help='path of the event log db file (default: %default)')
parser.add_option('--minijack_db', dest='minijack_db', type='string',
metavar='PATH', default=MINIJACK_DB_FILE,
help='path of the Minijack db file (default: %default)')
parser.add_option('--log', dest='log', type='string', metavar='PATH',
help='write log to this file instead of stderr')
parser.add_option('-i', '--interval', dest='interval', type='int',
default=DEFAULT_WATCH_INTERVAL,
help='file scanning interval in sec (default: %default)')
parser.add_option('-j', '--jobs', dest='jobs', type='int',
default=DEFAULT_JOB_NUMBER,
help='jobs to load events parallelly (default: %default)')
parser.add_option('-s', '--queue_size', dest='queue_size', type='int',
metavar='SIZE', default=DEFAULT_QUEUE_SIZE,
help='max size of the queue (default: %default)')
parser.add_option('-v', '--verbose', action='count', dest='verbose',
help='increase message verbosity')
parser.add_option('-q', '--quiet', action='store_true', dest='quiet',
help='turn off verbose messages')
(options, args) = parser.parse_args()
if args:
parser.error('Invalid args: %s' % ' '.join(args))
verbosity_map = {0: logging.INFO,
1: logging.DEBUG}
verbosity = verbosity_map.get(options.verbose or 0, logging.NOTSET)
log_format = '%(asctime)s %(levelname)s '
if options.verbose > 0:
log_format += '(%(filename)s:%(lineno)d) '
log_format += '%(message)s'
log_config = {'level': verbosity,
'format': log_format}
if options.log:
log_config.update({'filename': options.log})
logging.basicConfig(**log_config)
if options.quiet:
logging.disable(logging.INFO)
if not os.path.exists(options.event_log_dir):
logging.error('Event log directory "%s" does not exist\n',
options.event_log_dir)
parser.print_help()
sys.exit(os.EX_NOINPUT)
if options.jobs < 1:
logging.error('Job number should be larger than or equal to 1.\n')
parser.print_help()
sys.exit(os.EX_NOINPUT)
# TODO(waihong): Study the performance impact of the queue max size.
maxsize = options.queue_size
self._event_blob_queue = multiprocessing.JoinableQueue(maxsize)
self._event_stream_queue = multiprocessing.JoinableQueue(maxsize)
logging.debug('Init file scanner, interval = %d', options.interval)
self._file_scanner = FileScanner(
scan_dir=options.event_log_dir,
scan_db_file=options.event_log_db,
scan_period_sec=options.interval)
self._worker_processes.append(multiprocessing.Process(
target=self._file_scanner,
kwargs=dict(output_writer=self._event_blob_queue.put)))
logging.debug('Init event loading workers, jobs = %d', options.jobs)
self._worker_processes.extend([multiprocessing.Process(
target=EventLoadingWorker(options.event_log_dir),
kwargs=dict(
output_writer=self._event_stream_queue.put,
input_reader=iter(self._event_blob_queue.get, None),
input_done=self._event_blob_queue.task_done)
) for _ in range(options.jobs)])
logging.debug('Init event sinking workers')
self._database = db.Database(options.minijack_db)
try:
sinker = EventSinker(self._database)
except db.DatabaseException as e:
logging.exception('Error on initializing database: %s', str(e))
sys.exit(os.EX_DATAERR)
self._worker_processes.append(multiprocessing.Process(
target=IdentityWorker(),
kwargs=dict(
output_writer=sinker.SinkEventStream,
input_reader=iter(self._event_stream_queue.get, None),
input_done=lambda: (
self._event_stream_queue.task_done(),
# TODO(waihong): Move the queue monitoring to the main loop such
# that it has better controls to create/terminate processes.
self.CheckQueuesEmpty()))))
def Destory(self):
"""Destorys Minijack."""
logging.info('Stopping file scanner...')
if self._file_scanner:
self._file_scanner.Stop()
logging.info('Emptying all queues...')
for queue in (self._event_blob_queue, self._event_stream_queue):
if queue:
queue.join()
logging.info('Terminating all worker processes...')
for process in self._worker_processes:
if process and process.is_alive():
process.terminate()
process.join()
if self._database:
self._database.Close()
self._database = None
logging.info('Minijack is shutdown gracefully.')
def CheckQueuesEmpty(self):
"""Checks queues empty to info users Minijack is idle."""
if all((self._event_blob_queue.empty(), self._event_stream_queue.empty())):
logging.info('Minijack is idle.')
def Main(self):
"""The main Minijack logic."""
self.Init()
logging.debug('Start %d subprocesses.', len(self._worker_processes))
for process in self._worker_processes:
process.daemon = True
process.start()
# Exit main process when receiving Ctrl-C or a default kill signal.
signal_handler = lambda signum, frame: sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.pause()
if __name__ == '__main__':
minijack = Minijack()
try:
minijack.Main()
finally:
minijack.Destory()