| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ A library for shared code used in both wmatrix DB update scripts. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import argparse |
| import logging |
| import time |
| |
| from chromite.lib import metrics |
| from infra_libs import ts_mon |
| from src import settings |
| from src.backend import dbpump |
| from src.backend import get_schedule |
| from src.models import db_interface |
| |
| METRICS_WHITELISTED_ERRORS = ["ImportError", "NameError", "AttributeError", |
| "SyntaxError", "TypeError", "OSError"] |
| |
| def initialize_options(): |
| """Add shared arguments to the command-line parser and parse all args.""" |
| parser = argparse.ArgumentParser( |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
| parser.add_argument('--config', help='Configuration file', |
| dest='config', default=settings.DEFAULT_CONFIG_INI) |
| init_days_help = ('How many days of data to copy when initializing a' |
| ' fresh DB') |
| parser.add_argument('--init-days', |
| help=init_days_help, dest='init_days', type=int, |
| default=1) |
| return parser.parse_args() |
| |
| |
| def initialize_db_connection(src_db_name, dest_db_name, options): |
| """Initialize the connections to the source and destination databases.""" |
| connection_settings = settings.Settings(options.config) |
| |
| logging.basicConfig(level=logging.DEBUG) |
| src_settings = connection_settings.get_db(src_db_name) |
| dest_settings = connection_settings.get_db(dest_db_name) |
| |
| logging.info(""" |
| \n%s |
| %s DB update started on %s UTC |
| dest_db: %s@%s |
| |
| """, |
| '/' * 80, |
| dest_db_name, |
| time.asctime(time.gmtime()), |
| dest_settings.db_params['db'], |
| dest_settings.db_params['host'], |
| ) |
| |
| src_db = db_interface.DBInterface(src_settings) |
| dest_db = db_interface.DBInterface(dest_settings) |
| return (connection_settings, src_db, dest_db) |
| |
| |
| def retrieve_dest_db_label(settings, name): |
| """Get the db@host information, as strings, from the settings blob.""" |
| db_params = settings.get_db(name).db_params |
| return (db_params['db'], db_params['host']) |
| |
| |
| def log_update(error=None, dbname='None', hostname='None'): |
| """Send metrics about the update, including the error if there was one.""" |
| metrics.Counter('chromeos/wmatrix/db_update_tick', |
| "Wmatrix database pipeline", |
| field_spec=[ts_mon.BooleanField('success'), |
| ts_mon.StringField('error'), |
| ts_mon.StringField('host'), |
| ts_mon.StringField('db_name')] |
| ).increment(fields= {'success': not error, |
| 'error': classify_error(error), |
| 'host': hostname, |
| 'db_name': dbname}) |
| |
| |
| def wrapped_query(src_db, dest_db, raw, init_days=1): |
| """Run the update query and the preliminaries needed for it. |
| |
| Args: |
| src_db: (string) The database to pull data from. |
| dest_db: (string) The database to push data to. |
| raw: (boolean) Whether this query works with raw data (as opposed to |
| formatted data.) |
| init_days: (integer): The number of days of data to pre-populate the DB |
| with. Ends at the present. |
| """ |
| if raw: |
| table_name = "tko_tests" |
| else: |
| table_name = "good_tests" |
| sql = 'SELECT max(test_idx) FROM %s' % table_name |
| |
| test_max_idx = dest_db.run_query_scalar(sql) |
| logging.info('Destination DB max test_idx is %s', test_max_idx) |
| if not test_max_idx: |
| logging.info('Initializing empty DB, init_days=%d', |
| init_days) |
| is_init = True |
| idxs = dbpump.get_starting_idxs_for_init(src_db, init_days) |
| else: |
| is_init = False |
| idxs = dbpump.get_starting_idxs(src_db, dest_db, wmdb=(not raw)) |
| |
| logging.info('Truncating import_* tables.') |
| dest_db.run_query_cmd('CALL clean_import_tables()') |
| if raw: |
| copy_raw_tables(src_db, dest_db, idxs) |
| logging.info("Running the SP to copy data from import_* tables.") |
| dest_db.run_query_cmd('CALL copy_imported()') |
| else: |
| logging.info("Parsing and copying suite_scheduler.ini.") |
| schedule = get_schedule.get_schedule( |
| settings.settings.suite_scheduler_ini) |
| dbpump.insert_dictlist(schedule, |
| get_schedule.db_fields, |
| dest_db, |
| 'import_suite_schedule') |
| copy_tables(src_db, dest_db, idxs, maxfetch=1000) |
| logging.info("Running the SP to copy data from import_* tables.") |
| start_time = time.time() |
| dest_db.run_query_cmd('CALL copy_imported(%s)' % is_init) |
| logging.info('copy_imported() took %0.3f seconds', |
| time.time() - start_time) |
| |
| |
| def classify_error(error): |
| """Bucket an error between various types we care about""" |
| if not error: |
| return "None" |
| |
| error_name = error.__class__.__name__ |
| if error_name in METRICS_WHITELISTED_ERRORS: |
| return error_name |
| else: |
| return "UnrecognizedError" |
| |
| |
| def pprint_query(sql): |
| logging.debug('Using query: \n%s\n%s\n%s', '-' * 80, sql, '=' * 80) |
| |
| |
| def copy_raw_tables(src_db, dest_db, idxs): |
| """Copy data from AutoTest db to the import_* tables.""" |
| |
| # machine_idx 2300 is the fake 'hostless' machine. We don't want any |
| # hostless tests. |
| sql_tests = """ |
| SELECT * from tko_tests |
| WHERE test_idx >= %(TEST_IDX)d |
| AND (finished_time >= '%(MAX_TIME)s' or finished_time is NULL) |
| """ % idxs |
| |
| sql_jobs = """ |
| SELECT * from tko_jobs |
| WHERE job_idx >= %(JOB_IDX)d |
| AND (finished_time >= '%(MAX_TIME)s' or finished_time is NULL) |
| """ % idxs |
| |
| # Only import *_build keyvals from tko_job_keyvals, e.g., fwrw_build, |
| # fwro_build and test_source_build. These builds will be used in FAFT view. |
| sql_job_keyvals = """ |
| SELECT * from tko_job_keyvals |
| WHERE job_id >= %(JOB_IDX)d |
| AND tko_job_keyvals.key LIKE 'fwr%%_build' OR |
| tko_job_keyvals.key='test_source_build' |
| """ % idxs |
| |
| ## See crbug.com/253992 for some info about this query. |
| #sql_bugs = """ |
| # SELECT kv.id as jkv_id, test_idx, convert(kv.value, unsigned) AS bug_id |
| # FROM tko_tests t |
| # JOIN tko_jobs tj1 USING (job_idx) |
| # JOIN tko_jobs tj2 ON (tj2.afe_job_id = tj1.afe_parent_job_id) |
| # JOIN tko_job_keyvals kv |
| # ON (kv.job_id = tj2.job_idx |
| # AND |
| # kv.`key` = CONCAT(t.test, '-', 'Bug_Id') |
| # ) |
| # WHERE test_idx >= %(TEST_IDX)d |
| # """ % idxs |
| |
| logging.info("Copying tests") |
| pprint_query(sql_tests) |
| dbpump.pump_table(src_db, sql_tests, dest_db, 'import_tko_tests') |
| |
| logging.info("Copying jobs") |
| pprint_query(sql_jobs) |
| dbpump.pump_table(src_db, sql_jobs, dest_db, 'import_tko_jobs') |
| |
| logging.info("Copying job_keyvals") |
| pprint_query(sql_job_keyvals) |
| dbpump.pump_table(src_db, sql_job_keyvals, dest_db, |
| 'import_tko_job_keyvals') |
| |
| # TODO(fdeng): Disable syncing bugs temporarily due to crbug.com/550996 |
| # This query has been taking ~1 hour to run. |
| # logging.info("Copying bugs") |
| # dbpump.pump_table(src_db, sql_bugs, dest_db, 'import_autobugs') |
| |
| logging.info("Copying small tables") |
| # 2300 is the fake hostless machine. |
| sql_machines = "SELECT * from tko_machines WHERE machine_idx <> 2300" |
| dbpump.pump_table(src_db, sql_machines, dest_db, 'import_tko_machines') |
| |
| sql = "SELECT * from tko_status" |
| dbpump.pump_table(src_db, sql, dest_db, 'import_tko_status') |
| |
| |
| def copy_tables(src_db, dest_db, idxs, maxfetch=1000): |
| """Copy data from AutoTest db to the import_* tables. |
| |
| The main table 'tests' in wmdb is populated from a JOIN of: |
| - tko_tests |
| - tko_jobs (with more columns added by JobConverter) |
| - tko_machines (provides platform name) |
| - tko_status (converts numerical status to words like GOOD, FAIL etc.) |
| |
| 'builds' tables contains data harvested from the buildbots including Chrome |
| version. |
| """ |
| |
| # machine_idx 2300 is the fake 'hostless' machine. We don't want any |
| # hostless tests. |
| sql_tests = """ |
| SELECT * from tko_tests |
| WHERE test_idx >= %(TEST_IDX)d |
| AND test <> 'SERVER_JOB' |
| AND test NOT REGEXP '^CLIENT_JOB' |
| AND test NOT REGEXP 'browsertests' |
| AND machine_idx <> 2300 |
| AND status <> |
| (SELECT status_idx FROM tko_status WHERE word = 'RUNNING') |
| AND (finished_time >= '%(MAX_TIME)s' OR finished_time IS NULL) |
| """ % idxs |
| |
| sql_jobs = """ |
| SELECT * from tko_jobs |
| WHERE job_idx >= %(JOB_IDX)d |
| AND (username REGEXP '^chromeos-.*' or username = 'moblab') |
| AND machine_idx <> 2300 |
| AND `label` NOT REGEXP '(-paladin|-pgo-|/browsertests/)' |
| AND (finished_time >= '%(MAX_TIME)s' OR finished_time IS NULL) |
| """ % idxs |
| |
| logging.info("Copying tests") |
| dbpump.pump_table(src_db, sql_tests, dest_db, 'import_tko_tests', |
| converter_class=dbpump.TimeConverter, maxfetch=maxfetch) |
| |
| logging.info("Copying jobs") |
| dbpump.pump_table(src_db, sql_jobs, dest_db, 'import_jobs', |
| converter_class=dbpump.JobsConverter, maxfetch=maxfetch) |
| |
| logging.info("Copying bugs") |
| sql = 'SELECT * FROM autobugs WHERE test_idx >= %(TEST_IDX)d' % idxs |
| dbpump.pump_table(src_db, sql, dest_db, 'import_autobugs', |
| maxfetch=maxfetch) |
| |
| logging.info("Copying small tables") |
| # 2300 is the fake hostless machine. |
| sql_machines = "SELECT * from tko_machines WHERE machine_idx <> 2300" |
| dbpump.pump_table(src_db, sql_machines, dest_db, 'import_tko_machines') |
| |
| sql = "SELECT * FROM tko_status" |
| dbpump.pump_table(src_db, sql, dest_db, 'import_tko_status', |
| maxfetch=maxfetch) |
| |
| sql = ('SELECT buildbot_root, builder_name, buildname, build, ' |
| 'release_number, config, board, `number`, completed, result, ' |
| 'simplified_result, start_time, end_time, chromever, reason, ' |
| 'platform, revision ' |
| 'FROM builds' |
| ) |
| dbpump.pump_table(src_db, sql, dest_db, 'import_images', maxfetch=maxfetch) |