blob: 31cb09d7969ee3be4e2d90328e640f9dc20821e8 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Provides a factory function of ArchiveBuilder."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from chromite.lib import cidb as cidb_lib
from google.cloud import bigquery # pylint: disable=import-error,no-name-in-module
from google.cloud import storage # pylint: disable=import-error,no-name-in-module
import pytz
from ci_results_archiver import archive_builder
from ci_results_archiver import bigquery_dumper
from ci_results_archiver import bigquery_exporter
from ci_results_archiver import checkpointer as checkpointer_lib
from ci_results_archiver import table_specs
from ci_results_archiver import table_types
from ci_results_archiver.importers import afe_job_importer
from ci_results_archiver.importers import cidb_build_importer
from ci_results_archiver.importers import grace_period as grace_period_lib
from ci_results_archiver.importers import tko_job_importer
from ci_results_archiver.modifiers import tko_job_test_invalidator
from ci_results_archiver.utils import afe_connection
from ci_results_archiver.utils import bigquery_tables as bigquery_tables_lib
from ci_results_archiver.utils import bigquery_tko_tables
from ci_results_archiver.utils import bigquery_wrapper as bigquery_wrapper_lib
from ci_results_archiver.utils import mysql_wrapper
from ci_results_archiver.utils import text_util
from ci_results_archiver.utils import tko_connection
def CreateBuilder(table_type, configs):
"""Creates an ArchiveBuilder object.
Args:
table_type: TableType value of the desired archive table.
configs: Configuration dictionary.
Returns:
ArchiveBuilder object.
Raises:
ValueError: When |table_type| is not a valid archive table type.
"""
# This will raise ValueError if |table_type| is not a valid archive table
# type.
table_spec = table_specs.GetTableSpec(table_type)
bigquery_client = bigquery.Client(configs['bigquery']['project'])
dataset = bigquery_client.get_dataset(configs['bigquery']['dataset'])
storage_client = storage.Client(configs['cloud_storage']['project'])
temporary_bucket = storage_client.bucket(
configs['cloud_storage']['temporary_bucket'])
persistent_bucket = storage_client.bucket(
configs['cloud_storage']['persistent_bucket'])
bigquery_wrapper = bigquery_wrapper_lib.BigQueryWrapper(
bigquery_client=bigquery_client, dataset=dataset, bucket=temporary_bucket)
if table_type == table_types.TableType.TKO_JOBS:
bigquery_tables = bigquery_tko_tables.BigQueryTkoTables(
bigquery_wrapper=bigquery_wrapper, table_spec=table_spec)
else:
bigquery_tables = bigquery_tables_lib.BigQueryTables(
bigquery_wrapper=bigquery_wrapper, table_spec=table_spec)
importer = _CreateImporter(
table_type=table_type, table_spec=table_spec, configs=configs)
modifier = _CreateModifier(
table_type=table_type, bigquery_tables=bigquery_tables)
exporter = bigquery_exporter.BigQueryExporter(
bigquery_tables=bigquery_tables, table_spec=table_spec)
dump_enabled = configs['tables'][table_type.value].get('dump', True)
if dump_enabled:
dumper = bigquery_dumper.BigQueryDumper(
bigquery_tables=bigquery_tables,
table_spec=table_spec,
bucket_name=persistent_bucket.name)
else:
dumper = None
checkpointer = checkpointer_lib.Checkpointer(
project=configs['cloud_datastore']['project'],
kind=configs['cloud_datastore']['kind'],
entity=configs['cloud_datastore']['entity']
)
return archive_builder.ArchiveBuilder(
table_type=table_type,
importer=importer,
modifier=modifier,
exporter=exporter,
dumper=dumper,
checkpointer=checkpointer)
def _CreateImporter(table_type, table_spec, configs):
"""Creates an Importer object.
Args:
table_type: TableType value of the desired archive table.
table_spec: TableSpec object.
configs: Configuration dictionary.
Returns:
AbstractImporter object.
Raises:
KeyError: On unsupported table_type.
"""
table_configs = configs['tables'][table_type.value]
max_entries = table_configs['max_entries']
grace_period_configs = table_configs['grace_period']
grace_period = _CreateGracePeriod(
table_spec=table_spec, grace_period_configs=grace_period_configs)
if table_type == table_types.TableType.TKO_JOBS:
tko = tko_connection.TkoConnection(_CreateMySQLWrapper('tko', configs))
return tko_job_importer.TkoJobImporter(
tko=tko, max_entries=max_entries, grace_period=grace_period)
if table_type == table_types.TableType.AFE_JOBS:
afe = afe_connection.AfeConnection(_CreateMySQLWrapper('afe', configs))
return afe_job_importer.AfeJobImporter(
afe=afe, max_entries=max_entries, grace_period=grace_period)
if table_type == table_types.TableType.CIDB_BUILDS:
cidb = _CreateCIDBConnection(configs)
return cidb_build_importer.CidbBuildImporter(
cidb=cidb, max_entries=max_entries, grace_period=grace_period)
assert False, 'Unexpected table_type: %r' % table_type
def _CreateModifier(table_type, bigquery_tables):
"""Creates a Modifier object.
Args:
table_type: TableType value of the desired archive table.
bigquery_tables: BigQueryTables object.
Returns:
AbstractModifier object, or None.
"""
if table_type == table_types.TableType.TKO_JOBS:
return tko_job_test_invalidator.TkoJobTestInvalidator(
bigquery_tables=bigquery_tables)
if table_type in (table_types.TableType.AFE_JOBS,
table_types.TableType.CIDB_BUILDS):
return None
assert False, 'Unexpected table_type: %r' % table_type
def _CreateMySQLWrapper(database_name, configs):
"""Instantiates MySQLWrapper object with credentials in config files.
Args:
database_name: Database name.
configs: Configuration dictionary.
Returns:
MySQLWrapper object.
"""
database = configs['databases'][database_name]
creds = database['credentials']
return mysql_wrapper.MySQLWrapper(
hostname=creds['hostname'],
username=creds['username'],
password=creds['password'],
database=creds['database'],
timezone=pytz.timezone(database['timezone']))
def _CreateCIDBConnection(configs):
"""Instantiates CIDBConnection object with credentials in config files.
Args:
configs: Configuration dictionary.
Returns:
CIDBConnection object.
"""
return cidb_lib.CIDBConnection(configs['cidb']['credentials_dir'])
def _CreateGracePeriod(table_spec, grace_period_configs):
"""Creates a GracePeriod object from a config dictionary.
Args:
table_spec: TableSpec object.
grace_period_configs: A config dictionary.
Returns:
GracePeriod object.
"""
timeout_str = grace_period_configs.get('timeout')
timeout = text_util.ParseTimeDelta(timeout_str) if timeout_str else None
capacity = grace_period_configs.get('capacity')
return grace_period_lib.GracePeriod(
id_column=table_spec.id_column,
insertion_timestamp_column=table_spec.insertion_timestamp_column,
timeout=timeout,
capacity=capacity)