blob: c4048121a910d2028cdbdfc8f8be62944998d092 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" A library for shared code used in both wmatrix DB update scripts.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import time
from chromite.lib import metrics
from infra_libs import ts_mon
from src import settings
from src.backend import dbpump
from src.backend import get_schedule
from src.models import db_interface
METRICS_WHITELISTED_ERRORS = ["ImportError", "NameError", "AttributeError",
"SyntaxError", "TypeError", "OSError"]
def initialize_options():
"""Add shared arguments to the command-line parser and parse all args."""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--config', help='Configuration file',
dest='config', default=settings.DEFAULT_CONFIG_INI)
init_days_help = ('How many days of data to copy when initializing a'
' fresh DB')
parser.add_argument('--init-days',
help=init_days_help, dest='init_days', type=int,
default=1)
return parser.parse_args()
def initialize_db_connection(src_db_name, dest_db_name, options):
"""Initialize the connections to the source and destination databases."""
connection_settings = settings.Settings(options.config)
logging.basicConfig(level=logging.DEBUG)
src_settings = connection_settings.get_db(src_db_name)
dest_settings = connection_settings.get_db(dest_db_name)
logging.info("""
\n%s
%s DB update started on %s UTC
dest_db: %s@%s
""",
'/' * 80,
dest_db_name,
time.asctime(time.gmtime()),
dest_settings.db_params['db'],
dest_settings.db_params['host'],
)
src_db = db_interface.DBInterface(src_settings)
dest_db = db_interface.DBInterface(dest_settings)
return (connection_settings, src_db, dest_db)
def retrieve_dest_db_label(settings, name):
"""Get the db@host information, as strings, from the settings blob."""
db_params = settings.get_db(name).db_params
return (db_params['db'], db_params['host'])
def log_update(error=None, dbname='None', hostname='None'):
"""Send metrics about the update, including the error if there was one."""
metrics.Counter('chromeos/wmatrix/db_update_tick',
"Wmatrix database pipeline",
field_spec=[ts_mon.BooleanField('success'),
ts_mon.StringField('error'),
ts_mon.StringField('host'),
ts_mon.StringField('db_name')]
).increment(fields= {'success': not error,
'error': classify_error(error),
'host': hostname,
'db_name': dbname})
def wrapped_query(src_db, dest_db, raw, init_days=1):
"""Run the update query and the preliminaries needed for it.
Args:
src_db: (string) The database to pull data from.
dest_db: (string) The database to push data to.
raw: (boolean) Whether this query works with raw data (as opposed to
formatted data.)
init_days: (integer): The number of days of data to pre-populate the DB
with. Ends at the present.
"""
if raw:
table_name = "tko_tests"
else:
table_name = "good_tests"
sql = 'SELECT max(test_idx) FROM %s' % table_name
test_max_idx = dest_db.run_query_scalar(sql)
logging.info('Destination DB max test_idx is %s', test_max_idx)
if not test_max_idx:
logging.info('Initializing empty DB, init_days=%d',
init_days)
is_init = True
idxs = dbpump.get_starting_idxs_for_init(src_db, init_days)
else:
is_init = False
idxs = dbpump.get_starting_idxs(src_db, dest_db, wmdb=(not raw))
logging.info('Truncating import_* tables.')
dest_db.run_query_cmd('CALL clean_import_tables()')
if raw:
copy_raw_tables(src_db, dest_db, idxs)
logging.info("Running the SP to copy data from import_* tables.")
dest_db.run_query_cmd('CALL copy_imported()')
else:
logging.info("Parsing and copying suite_scheduler.ini.")
schedule = get_schedule.get_schedule(
settings.settings.suite_scheduler_ini)
dbpump.insert_dictlist(schedule,
get_schedule.db_fields,
dest_db,
'import_suite_schedule')
copy_tables(src_db, dest_db, idxs, maxfetch=1000)
logging.info("Running the SP to copy data from import_* tables.")
start_time = time.time()
dest_db.run_query_cmd('CALL copy_imported(%s)' % is_init)
logging.info('copy_imported() took %0.3f seconds',
time.time() - start_time)
def classify_error(error):
"""Bucket an error between various types we care about"""
if not error:
return "None"
error_name = error.__class__.__name__
if error_name in METRICS_WHITELISTED_ERRORS:
return error_name
else:
return "UnrecognizedError"
def pprint_query(sql):
logging.debug('Using query: \n%s\n%s\n%s', '-' * 80, sql, '=' * 80)
def copy_raw_tables(src_db, dest_db, idxs):
"""Copy data from AutoTest db to the import_* tables."""
# machine_idx 2300 is the fake 'hostless' machine. We don't want any
# hostless tests.
sql_tests = """
SELECT * from tko_tests
WHERE test_idx >= %(TEST_IDX)d
AND (finished_time >= '%(MAX_TIME)s' or finished_time is NULL)
""" % idxs
sql_jobs = """
SELECT * from tko_jobs
WHERE job_idx >= %(JOB_IDX)d
AND (finished_time >= '%(MAX_TIME)s' or finished_time is NULL)
""" % idxs
# Only import *_build keyvals from tko_job_keyvals, e.g., fwrw_build,
# fwro_build and test_source_build. These builds will be used in FAFT view.
sql_job_keyvals = """
SELECT * from tko_job_keyvals
WHERE job_id >= %(JOB_IDX)d
AND tko_job_keyvals.key LIKE 'fwr%%_build' OR
tko_job_keyvals.key='test_source_build'
""" % idxs
## See crbug.com/253992 for some info about this query.
#sql_bugs = """
# SELECT kv.id as jkv_id, test_idx, convert(kv.value, unsigned) AS bug_id
# FROM tko_tests t
# JOIN tko_jobs tj1 USING (job_idx)
# JOIN tko_jobs tj2 ON (tj2.afe_job_id = tj1.afe_parent_job_id)
# JOIN tko_job_keyvals kv
# ON (kv.job_id = tj2.job_idx
# AND
# kv.`key` = CONCAT(t.test, '-', 'Bug_Id')
# )
# WHERE test_idx >= %(TEST_IDX)d
# """ % idxs
logging.info("Copying tests")
pprint_query(sql_tests)
dbpump.pump_table(src_db, sql_tests, dest_db, 'import_tko_tests')
logging.info("Copying jobs")
pprint_query(sql_jobs)
dbpump.pump_table(src_db, sql_jobs, dest_db, 'import_tko_jobs')
logging.info("Copying job_keyvals")
pprint_query(sql_job_keyvals)
dbpump.pump_table(src_db, sql_job_keyvals, dest_db,
'import_tko_job_keyvals')
# TODO(fdeng): Disable syncing bugs temporarily due to crbug.com/550996
# This query has been taking ~1 hour to run.
# logging.info("Copying bugs")
# dbpump.pump_table(src_db, sql_bugs, dest_db, 'import_autobugs')
logging.info("Copying small tables")
# 2300 is the fake hostless machine.
sql_machines = "SELECT * from tko_machines WHERE machine_idx <> 2300"
dbpump.pump_table(src_db, sql_machines, dest_db, 'import_tko_machines')
sql = "SELECT * from tko_status"
dbpump.pump_table(src_db, sql, dest_db, 'import_tko_status')
def copy_tables(src_db, dest_db, idxs, maxfetch=1000):
"""Copy data from AutoTest db to the import_* tables.
The main table 'tests' in wmdb is populated from a JOIN of:
- tko_tests
- tko_jobs (with more columns added by JobConverter)
- tko_machines (provides platform name)
- tko_status (converts numerical status to words like GOOD, FAIL etc.)
'builds' tables contains data harvested from the buildbots including Chrome
version.
"""
# machine_idx 2300 is the fake 'hostless' machine. We don't want any
# hostless tests.
sql_tests = """
SELECT * from tko_tests
WHERE test_idx >= %(TEST_IDX)d
AND test <> 'SERVER_JOB'
AND test NOT REGEXP '^CLIENT_JOB'
AND test NOT REGEXP 'browsertests'
AND machine_idx <> 2300
AND status <>
(SELECT status_idx FROM tko_status WHERE word = 'RUNNING')
AND (finished_time >= '%(MAX_TIME)s' OR finished_time IS NULL)
""" % idxs
sql_jobs = """
SELECT * from tko_jobs
WHERE job_idx >= %(JOB_IDX)d
AND (username REGEXP '^chromeos-.*' or username = 'moblab')
AND machine_idx <> 2300
AND `label` NOT REGEXP '(-paladin|-pgo-|/browsertests/)'
AND (finished_time >= '%(MAX_TIME)s' OR finished_time IS NULL)
""" % idxs
logging.info("Copying tests")
dbpump.pump_table(src_db, sql_tests, dest_db, 'import_tko_tests',
converter_class=dbpump.TimeConverter, maxfetch=maxfetch)
logging.info("Copying jobs")
dbpump.pump_table(src_db, sql_jobs, dest_db, 'import_jobs',
converter_class=dbpump.JobsConverter, maxfetch=maxfetch)
logging.info("Copying bugs")
sql = 'SELECT * FROM autobugs WHERE test_idx >= %(TEST_IDX)d' % idxs
dbpump.pump_table(src_db, sql, dest_db, 'import_autobugs',
maxfetch=maxfetch)
logging.info("Copying small tables")
# 2300 is the fake hostless machine.
sql_machines = "SELECT * from tko_machines WHERE machine_idx <> 2300"
dbpump.pump_table(src_db, sql_machines, dest_db, 'import_tko_machines')
sql = "SELECT * FROM tko_status"
dbpump.pump_table(src_db, sql, dest_db, 'import_tko_status',
maxfetch=maxfetch)
sql = ('SELECT buildbot_root, builder_name, buildname, build, '
'release_number, config, board, `number`, completed, result, '
'simplified_result, start_time, end_time, chromever, reason, '
'platform, revision '
'FROM builds'
)
dbpump.pump_table(src_db, sql, dest_db, 'import_images', maxfetch=maxfetch)