blob: 6645a9865a583227678469d2ade7741b8daa2e77 [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import threading
import time
from google.protobuf import duration_pb2
from google.protobuf import json_format
from google.protobuf import message as message_pb
from google.protobuf import struct_pb2
from google.protobuf import timestamp_pb2
# This module is used as a symlink in buildbucket GAE app.
# Do not add import packages not available on GAE.
_BATCH_DEFAULT = 500
_BATCH_LIMIT = 10000
# OAuth 2.0 scope to insert rows.
INSERT_ROWS_SCOPE = 'https://www.googleapis.com/auth/bigquery.insertdata'
def message_to_dict(msg):
"""Converts a protobuf message to a dict, with field names as keys.
The conversion follows the rules described in
https://godoc.org/go.chromium.org/luci/tools/cmd/bqschemaupdater
Also omits Nones and empty lists values.
Args:
msg: an instance of google.protobuf.message.Message.
Returns:
A dict with BQ-compatible fields. If there are no BQ-compatible fields,
returns None.
"""
row = {}
for f in msg.DESCRIPTOR.fields:
if f.message_type:
if _is_empty_message_type(f.message_type):
# Omit message fields that would result in RECORD fields with no fields.
continue
if f.label != f.LABEL_REPEATED and not msg.HasField(f.name):
# Omit non-repeated message fields that we don't have.
continue
val = getattr(msg, f.name)
if f.label == f.LABEL_REPEATED:
if val: # Omit empty arrays.
if hasattr(val, 'items'): # it's a map<K, V>
# Get the synthesized key/value message type
synthDESC = f.message_type
row[f.name] = [
{
'key': _to_bq_value(key, synthDESC.fields_by_name['key']),
'value': _to_bq_value(value, synthDESC.fields_by_name['value']),
}
for key, value in sorted(val.items())
]
else:
row[f.name] = [_to_bq_value(elem, f) for elem in val]
else:
bq_value = _to_bq_value(val, f)
if bq_value is not None: # Omit NULL values.
row[f.name] = bq_value
return row or None # return None if there are no fields.
def _to_bq_value(value, field_desc):
if field_desc.enum_type:
# Enums are stored as strings.
enum_val = field_desc.enum_type.values_by_number.get(value)
if not enum_val:
raise ValueError('Invalid value %r for enum type %s' % (
value, field_desc.enum_type.full_name))
return enum_val.name
if isinstance(value, duration_pb2.Duration):
return value.ToTimedelta().total_seconds()
if isinstance(value, struct_pb2.Struct):
# Structs are stored as JSONPB strings,
# see https://bit.ly/chromium-bq-struct
return json_format.MessageToJson(value)
if isinstance(value, timestamp_pb2.Timestamp):
return value.ToDatetime().isoformat()
if isinstance(value, message_pb.Message):
return message_to_dict(value)
return value
def send_rows(bq_client, dataset_id, table_id, rows, batch_size=_BATCH_DEFAULT):
"""Sends rows to BigQuery.
Args:
rows: an iterable of any of the following
* tuples: each tuple should contain data of the correct type for each
schema field on the current table and in the same order as the schema
fields.
* google.protobuf.message.Message instance
bq_client: an instance of google.cloud.bigquery.client.Client
dataset_id, table_id (str): identifiers for the table to which the rows will
be inserted
batch_size (int): the max number of rows to send to BigQuery in a single
request. Values exceeding the limit will use the limit. Values less than 1
will use _BATCH_DEFAULT.
Please use google.protobuf.message.Message instances moving forward.
Tuples are deprecated.
"""
if batch_size > _BATCH_LIMIT:
batch_size = _BATCH_LIMIT
elif batch_size <= 0:
batch_size = _BATCH_DEFAULT
rows_to_send = []
for row in rows:
if isinstance(row, tuple):
rows_to_send.append(row)
continue
if isinstance(row, message_pb.Message):
rows_to_send.append(message_to_dict(row))
else:
raise UnsupportedTypeError(type(row).__name__)
table = bq_client.get_table(bq_client.dataset(dataset_id).table(table_id))
for row_set in _batch(rows_to_send, batch_size):
insert_errors = bq_client.create_rows(table, row_set)
if insert_errors:
logging.error('Failed to send event to bigquery: %s', insert_errors)
raise BigQueryInsertError(insert_errors)
def _batch(rows, batch_size):
for i in range(0, len(rows), batch_size):
yield rows[i:i + batch_size]
class UnsupportedTypeError(Exception):
"""BigQueryHelper only supports row representations described by send_rows.
row_type: string representation of type.
"""
def __init__(self, row_type):
msg = 'Unsupported row type for BigQueryHelper.send_rows: %s' % row_type
super(UnsupportedTypeError, self).__init__(msg)
class BigQueryInsertError(Exception):
"""Error from create_rows() call on BigQuery client.
insert_errors is in the form of a list of mappings, where each mapping
contains an "index" key corresponding to a row and an "errors" key.
"""
def __init__(self, insert_errors):
message = self._construct_message(insert_errors)
super(BigQueryInsertError, self).__init__(message)
@staticmethod
def _construct_message(insert_errors):
message = ''
for row_mapping in insert_errors:
index = row_mapping.get('index')
for err in row_mapping.get('errors') or []:
message += "Error inserting row %d: %s\n" % (index, err)
return message
def _is_empty_message_type(desc):
"""Returns true if the message type results in an empty RECORD BQ field.
Note: hangs on recursive messages.
"""
for f in desc.fields:
f_empty = f.message_type and _is_empty_message_type(f.message_type)
if not f_empty: # pragma: no branch
return False
return True