The entry point of ci_results_archiver.
BUG=chromium:733103
TEST=bin/run_lint && bin/run_tests && bin/run_yapf
TEST=bin/ci_results_archiver --config=config.yaml --run-soon
CQ-DEPEND=CL:683976
Change-Id: I535500bb4c1d6eda6e6d41d19afb382dec1282f0
Reviewed-on: https://chromium-review.googlesource.com/649937
Commit-Ready: Shuhei Takahashi <nya@chromium.org>
Tested-by: Shuhei Takahashi <nya@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
diff --git a/bin/ci_results_archiver b/bin/ci_results_archiver
new file mode 100755
index 0000000..0e39072
--- /dev/null
+++ b/bin/ci_results_archiver
@@ -0,0 +1,11 @@
+#!/bin/bash
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+#
+# The launch script of ci_results_archiver.
+set -eu
+
+readonly bin_dir="$(readlink -e -- "$(dirname -- "$0")")"
+
+exec "${bin_dir}/python_venv" -m ci_results_archiver.main "$@"
diff --git a/ci_results_archiver/main.py b/ci_results_archiver/main.py
new file mode 100644
index 0000000..cd8fb68
--- /dev/null
+++ b/ci_results_archiver/main.py
@@ -0,0 +1,188 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+"""BigQuery table builder.
+
+This program runs in one of following two modes depending on command line
+arguments:
+
+1. One-shot mode (if --one-shot option is given). The program runs a builder
+ for tables of the type specified by --table-type and exits. This mode is
+ usually invoked by a daemon described next, but is also useful for debugging.
+
+2. Daemon mode (if --one-shot option is NOT given). The program starts as
+ a daemon process running forever. It periodically executes itself in one-shot
+ mode as subprocesses.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import argparse
+import datetime
+import logging
+import multiprocessing
+import signal
+import sys
+
+from apscheduler.executors import pool as pool_executors
+from apscheduler.schedulers import blocking as blocking_scheduler
+from apscheduler.triggers import interval as interval_triggers
+import subprocess32
+
+from ci_results_archiver import archive_builder_factory
+from ci_results_archiver import config_loader
+from ci_results_archiver import table_types
+from ci_results_archiver.utils import text_util
+
+import pytz
+
+
+def _DaemonMain(run_soon, configs):
+ """The entry point for a daemon process.
+
+ Args:
+ run_soon: If True, start builders soon.
+ configs: Configuration dictionary.
+ """
+ # We do not use ProcessPoolExecutor because its underlying multiprocessing
+ # module easily deadlocks on signals.
+ executor = pool_executors.ThreadPoolExecutor(
+ max_workers=len(table_types.TableType))
+ scheduler = blocking_scheduler.BlockingScheduler(
+ executors={'default': executor})
+ if run_soon:
+ # Start jobs in 3 seconds.
+ start_date = datetime.datetime.now() + datetime.timedelta(seconds=3)
+ else:
+ # Set the start date UNIX epoch so that jobs start in consistent periods.
+ start_date = datetime.datetime.fromtimestamp(0, pytz.utc)
+ interval = text_util.ParseTimeDelta(configs['scheduler']['interval'])
+ trigger = interval_triggers.IntervalTrigger(
+ seconds=interval.total_seconds(), start_date=start_date)
+
+ timeout = text_util.ParseTimeDelta(configs['scheduler']['timeout'])
+
+ for table_type in table_types.TableType:
+ scheduler.add_job(
+ name=table_type.value,
+ func=_RunBuilder,
+ kwargs={'table_type': table_type, 'timeout': timeout},
+ trigger=trigger,
+ max_instances=1)
+
+ try:
+ scheduler.start()
+ except KeyboardInterrupt:
+ logging.info('Received a signal. Exiting.')
+
+
+def _RunBuilder(table_type, timeout):
+ """Spawns a subprocess to run a builder for the specified tables.
+
+ Args:
+ table_type: TableType value.
+ timeout: Timeout in datetime.timedelta.
+
+ Raises:
+ subprocess.CalledProcessError: When a subprocess exited with errors.
+ """
+ builder_argv = (
+ [sys.executable] + sys.argv +
+ ['--one-shot', '--table-type=%s' % table_type.value])
+ subprocess32.check_call(builder_argv, timeout=timeout.total_seconds())
+
+
+def _BuilderMain(table_type, configs):
+ """The entry point for builder processes.
+
+ Args:
+ table_type: TableType to build tables of.
+ configs: Configuration dictionary.
+ """
+ builder = archive_builder_factory.CreateBuilder(
+ table_type=table_type, configs=configs)
+ builder.Run()
+
+
+def _SetupLogging(verbose, process_name):
+ """Sets up logging.
+
+ Args:
+ verbose: If True, turns on verbose debug logging.
+ process_name: Process name to show in log lines.
+ """
+ multiprocessing.current_process().name = process_name
+ logging.basicConfig(
+ level=(logging.DEBUG if verbose else logging.INFO),
+ format=('%(levelname)s %(asctime)-15s %(processName)s '
+ '[%(filename)s:%(lineno)d] %(message)s'),
+ datefmt='%Y-%m-%d %H:%M:%S')
+
+
+def _ParseArgs():
+ """Parses command line arguments.
+
+ Returns:
+ argparse.ArgumentParser object.
+ """
+ parser = argparse.ArgumentParser(
+ prog='ci_results_archiver',
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+ parser.add_argument(
+ '-v', '--verbose', action='store_true', help='Enable verbose logging.')
+ parser.add_argument(
+ '-c',
+ '--config',
+ dest='config_path',
+ metavar='PATH',
+ required=True,
+ help='Path to a config file.')
+ parser.add_argument(
+ '--run-soon',
+ action='store_true',
+ help='Runs jobs soon after the start of the script.')
+ parser.add_argument(
+ '--one-shot', action='store_true', help='Runs in one-shot mode.')
+ parser.add_argument(
+ '--table-type',
+ type=table_types.TableType,
+ metavar='TYPE',
+ help='If this option is set, the program builds tables of the specified '
+ 'type and exits.')
+ options = parser.parse_args()
+
+ if options.one_shot:
+ if not options.table_type:
+ logging.error('--table-type must be set in one-shot mode')
+ sys.exit(1)
+ else:
+ if options.table_type:
+ logging.error('--table-type must not be set in daemon mode')
+ sys.exit(1)
+
+ return options
+
+
+def main():
+ """The entry point."""
+ # Install SIGTERM handler so we can clean up.
+ signal.signal(signal.SIGTERM, signal.default_int_handler)
+
+ options = _ParseArgs()
+ configs = config_loader.Load(options.config_path)
+
+ process_name = options.table_type.value if options.table_type else 'scheduler'
+ _SetupLogging(verbose=options.verbose, process_name=process_name)
+
+ if options.table_type:
+ _BuilderMain(table_type=options.table_type, configs=configs)
+ else:
+ _DaemonMain(run_soon=options.run_soon, configs=configs)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ci_results_archiver/test/config_example.yaml b/ci_results_archiver/test/config_example.yaml
index a523524..a11b68f 100644
--- a/ci_results_archiver/test/config_example.yaml
+++ b/ci_results_archiver/test/config_example.yaml
@@ -1,3 +1,7 @@
+scheduler:
+ interval: 20m
+ timeout: 19m30s
+
tables:
afe_jobs:
max_entries: 1000
diff --git a/venv/requirements.txt b/venv/requirements.txt
index b850541..d6cfe51 100644
--- a/venv/requirements.txt
+++ b/venv/requirements.txt
@@ -1,3 +1,4 @@
+APScheduler==3.3.1
enum34==1.1.6
google-cloud-bigquery==0.25.0
google-cloud-storage==1.2.0
@@ -5,6 +6,7 @@
pytest-cov==2.5.1
pytz==2017.2
PyYAML==3.12
+subprocess32==3.2.7
# yapf
yapf==0.18.0