blob: 0cc39babc8d51bb4c0cbae8ba6942b455a448b87 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Inserts json or binary protobufs into a BigQuery table."""
from __future__ import print_function
import argparse
import importlib
from google.cloud import bigquery
from google.protobuf import json_format
import results_database
# The following messages are treated as lists, otherwise protobufs are
# treated as singletons.
LIST_MESSAGES = frozenset(['ResultList', 'TraceList'])
# The messages that are handled by this script, with a tuple of:
# - Full path to python object to instantiate
# - BigQuery table
RESULTS_PATH = 'chromiumos.config.api.test.results.v1'
GRAPHICS_PATH = 'chromiumos.config.api.test.results.graphics.v1'
MESSAGES = {
'Machine': (RESULTS_PATH + '.machine_pb2.Machine', 'machines'),
'ResultList': (RESULTS_PATH + '.result_pb2.ResultList', 'results'),
'SoftwareConfig': (RESULTS_PATH + '.software_config_pb2.SoftwareConfig',
'software_configs'),
'TraceList': (GRAPHICS_PATH + '.trace_pb2.TraceList', 'traces'),
}
def init_argparse():
"""Creates argument parser.
Returns:
An ArgumentParser.
"""
parser = argparse.ArgumentParser(
usage='%(prog)s [OPTION] [FILE]...',
description='Post protobufs to BigQuery database',
)
parser.add_argument('--class_path',
help='Override full python class to instantiate')
parser.add_argument('--dataset',
default='graphics',
help='Project dataset')
parser.add_argument('--deduplicate', action='store_true',
help='Do not upload any existing protobufs')
parser.add_argument('--dryrun', action='store_true',
help='Do a dry run without actually uploading results')
parser.add_argument('--message',
default='ResultList',
choices=[*MESSAGES],
help='Protobuf message to instantiate')
parser.add_argument('--project',
default='google.com:stainless-dev',
help='Google Cloud project')
parser.add_argument('--table',
help='Override table to insert results into')
parser.add_argument('--verbose', action='store_true',
help='Explain what is being done')
parser.add_argument('files', nargs='*',
help='results protobufs')
return parser
def main():
"""Main function."""
args = init_argparse().parse_args()
# Determine class path if not fully specified.
class_path = (args.class_path if args.class_path
else MESSAGES[args.message][0])
(module_path, class_name) = class_path.rsplit('.', 1)
if args.verbose:
print('module: %s' % module_path)
print('class: %s' % class_name)
# Load the appropriate python protobuf module based on the specified
# class path.
module = importlib.import_module(module_path)
class_type = getattr(module, class_name)
# Read in all the protobufs.
rows = []
row_ids = []
id_key = None
for file in args.files:
pb = class_type()
results_database.read_pb(pb, file)
pb_list = pb.value if class_name in LIST_MESSAGES else [pb]
for p in pb_list:
rows.append(json_format.MessageToDict(
p,
including_default_value_fields=True,
preserving_proto_field_name=True))
if hasattr(p, 'id'):
row_ids.append(p.id.value)
id_key = 'id.value'
elif hasattr(p, 'name'):
row_ids.append(p.name.value)
id_key = 'name.value'
else:
row_ids.append(None)
print('WARNING: message missing id/name: %s' % p)
# BigQuery connection to the specified table.
client = bigquery.Client(args.project)
table_name = '.'.join([args.project, args.dataset,
args.table if args.table
else MESSAGES[args.message][1]])
if args.verbose:
print('table: %s' % table_name)
client.get_table(table_name)
# If de-duplication is enabled, query for all ids and only insert new ones.
if id_key and args.deduplicate:
query = ('SELECT %s FROM `%s` WHERE %s IN (%s)' %
(id_key, table_name, id_key,
','.join(["'%s'" % id for id in row_ids])))
if args.verbose:
print('dedup query: %s' % query)
query_job = client.query(query) # API request
results = query_job.result() # Waits for query to finish
duplicates = []
if results:
duplicates = {row.value for row in results}
print('skipping %d duplicates:' % len(duplicates))
for d in duplicates:
print(' %s' % d)
# Filter duplicates if any where found.
if duplicates:
dedup_rows = []
dedup_ids = []
for row, row_id in zip(rows, row_ids):
if id not in duplicates:
dedup_rows.append(row)
dedup_ids.append(row_id)
rows = dedup_rows
row_ids = dedup_ids
# Actually insert the results.
if rows:
# TODO(davidriley): Split this into chunks if necessary.
print('inserting %d rows to %s' % (len(rows), table_name))
if not args.dryrun:
insert = client.insert_rows_json(table_name, rows)
if insert:
print('ERROR: could not insert rows: %s' % insert)
else:
print('dryrun: skipping inserts')
else:
print('no data to insert')
main()