blob: 3bfb5ee31512273930017dcdfea10bf41e9c2270 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""BigQuery table builder.
This program runs in one of following two modes depending on command line
arguments:
1. One-shot mode (if --one-shot option is given). The program runs a builder
for tables of the type specified by --table-type and exits. This mode is
usually invoked by a daemon described next, but is also useful for debugging.
2. Daemon mode (if --one-shot option is NOT given). The program starts as
a daemon process running forever. It periodically executes itself in one-shot
mode as subprocesses.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import contextlib
import datetime
import logging
import multiprocessing
import signal
import sys
from apscheduler.executors import pool as pool_executors
from apscheduler.schedulers import blocking as blocking_scheduler
from apscheduler.triggers import interval as interval_triggers
from chromite.lib import metrics
from chromite.lib import ts_mon_config
import subprocess32
from ci_results_archiver import archive_builder_factory
from ci_results_archiver import config_loader
from ci_results_archiver import constants
from ci_results_archiver import table_types
from ci_results_archiver.utils import text_util
import pytz
_TS_MON_SERVICE_NAME = 'ci_results_archiver'
def _DaemonMain(run_soon, configs):
"""The entry point for a daemon process.
Args:
run_soon: If True, start builders soon.
configs: Configuration dictionary.
"""
# We do not use ProcessPoolExecutor because its underlying multiprocessing
# module easily deadlocks on signals.
executor = pool_executors.ThreadPoolExecutor(
max_workers=len(table_types.TableType))
scheduler = blocking_scheduler.BlockingScheduler(
executors={'default': executor})
if run_soon:
# Start jobs in 3 seconds.
start_date = datetime.datetime.now() + datetime.timedelta(seconds=3)
else:
# Set the start date UNIX epoch so that jobs start in consistent periods.
start_date = datetime.datetime.fromtimestamp(0, pytz.utc)
interval = text_util.ParseTimeDelta(configs['scheduler']['interval'])
trigger = interval_triggers.IntervalTrigger(
seconds=interval.total_seconds(), start_date=start_date)
timeout = text_util.ParseTimeDelta(configs['scheduler']['timeout'])
for table_type in table_types.TableType:
scheduler.add_job(
name=table_type.value,
func=_RunBuilder,
kwargs={'table_type': table_type, 'timeout': timeout},
trigger=trigger,
max_instances=1)
try:
scheduler.start()
except KeyboardInterrupt:
logging.info('Received a signal. Exiting.')
def _RunBuilder(table_type, timeout):
"""Spawns a subprocess to run a builder for the specified tables.
Args:
table_type: TableType value.
timeout: Timeout in datetime.timedelta.
Raises:
subprocess.CalledProcessError: When a subprocess exited with errors.
"""
builder_argv = (
[sys.executable] + sys.argv +
['--one-shot', '--table-type=%s' % table_type.value])
subprocess32.check_call(builder_argv, timeout=timeout.total_seconds())
def _BuilderMain(table_type, configs):
"""The entry point for builder processes.
Args:
table_type: TableType to build tables of.
configs: Configuration dictionary.
"""
metric_prefix = '%s/builder/%s' % (constants.METRIC_BASE, table_type.value)
with metrics.SuccessCounter('%s/tick' % metric_prefix):
builder = archive_builder_factory.CreateBuilder(
table_type=table_type, configs=configs)
builder.Run()
@contextlib.contextmanager
def _SetupTsMon(short_lived):
"""Sets up ts_mon and cleans up it on exit.
Args:
short_lived: A boolean indicating this process it short-lived or not.
"""
with ts_mon_config.SetupTsMonGlobalState(
_TS_MON_SERVICE_NAME,
short_lived=short_lived,
auto_flush=(not short_lived)):
try:
yield
finally:
if short_lived:
metrics.Flush()
def _SetupLogging(verbose, process_name):
"""Sets up logging.
Args:
verbose: If True, turns on verbose debug logging.
process_name: Process name to show in log lines.
"""
multiprocessing.current_process().name = process_name
logging.basicConfig(
level=(logging.DEBUG if verbose else logging.INFO),
format=('%(levelname)s %(asctime)-15s %(processName)s '
'[%(filename)s:%(lineno)d] %(message)s'),
datefmt='%Y-%m-%d %H:%M:%S')
def _ParseArgs():
"""Parses command line arguments.
Returns:
argparse.ArgumentParser object.
"""
parser = argparse.ArgumentParser(
prog='ci_results_archiver',
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'-v', '--verbose', action='store_true', help='Enable verbose logging.')
parser.add_argument(
'-c',
'--config',
dest='config_path',
metavar='PATH',
required=True,
help='Path to a config file.')
parser.add_argument(
'--run-soon',
action='store_true',
help='Runs jobs soon after the start of the script.')
parser.add_argument(
'--one-shot', action='store_true', help='Runs in one-shot mode.')
parser.add_argument(
'--table-type',
type=table_types.TableType,
metavar='TYPE',
help='If this option is set, the program builds tables of the specified '
'type and exits.')
options = parser.parse_args()
if options.one_shot:
if not options.table_type:
logging.error('--table-type must be set in one-shot mode')
sys.exit(1)
else:
if options.table_type:
logging.error('--table-type must not be set in daemon mode')
sys.exit(1)
return options
def main():
"""The entry point."""
# Install SIGTERM handler so we can clean up.
signal.signal(signal.SIGTERM, signal.default_int_handler)
options = _ParseArgs()
configs = config_loader.Load(options.config_path)
process_name = options.table_type.value if options.table_type else 'scheduler'
_SetupLogging(verbose=options.verbose, process_name=process_name)
with _SetupTsMon(short_lived=options.one_shot):
if options.one_shot:
_BuilderMain(table_type=options.table_type, configs=configs)
else:
_DaemonMain(run_soon=options.run_soon, configs=configs)
if __name__ == '__main__':
main()