blob: 3c14ebbea123ef13577129fb74db420895b5308a [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Exports new entries into BigQuery tables."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import logging
class BigQueryExporter(object):
"""Exports new entries into BigQuery tables.
An exporter is the "sink" part of BigQuery archive table builder. It
will write entries to BigQuery tables.
"""
def __init__(self, bigquery_tables, table_spec):
"""Constructor.
Args:
bigquery_tables: BigQueryTables object.
table_spec: TableSpec object.
"""
self._bigquery_tables = bigquery_tables
self._table_spec = table_spec
def ExportNewEntries(self, entries):
"""Exports new entries to BigQuery tables.
This function is designed to be idempotent. In case there are already
existing entries with the same IDs, they are preserved and we never
insert conflicting entries.
Args:
entries: A list of dictionaries representing entries.
Returns:
(exported_table_suffixes, unexported_entries) where:
exported_table_suffixes: A list of name suffixes of updated BigQuery
tables.
unexported_entries: Entries that could not be exported to bigquery.
"""
if not entries:
return [], []
entries_by_date, unpartitioned_entries = _PartitionByDate(
entries, self._table_spec.partition_timestamp_column)
exported_table_suffixes = []
for date, entries_in_date in sorted(entries_by_date.items()):
table_suffix = date.strftime('%Y%m%d')
self._ExportNewEntriesToTable(table_suffix, entries_in_date)
exported_table_suffixes.append(table_suffix)
return exported_table_suffixes, unpartitioned_entries
def _ExportNewEntriesToTable(self, table_suffix, entries):
"""Exports new entries to a specified BigQuery table.
Args:
table_suffix: Table name suffix.
entries: A list of dictionaries representing entries.
"""
existing_id_set = set(self._bigquery_tables.QueryAllIds(table_suffix))
new_entries = [
e for e in entries
if e[self._table_spec.id_column] not in existing_id_set
]
if len(new_entries) != len(entries):
logging.warning('Skipping %d already existing entries.',
len(entries) - len(new_entries))
if new_entries:
self._bigquery_tables.LoadEntries(table_suffix, self._table_spec.schema,
new_entries)
def _PartitionByDate(entries, timestamp_column):
"""Partitions entries by date.
Args:
entries: A list of dictionaries representing entries.
timestamp_column: Name of timestamp column to be used on partitioning
entries.
Returns:
({date: entries_in_date}, unpartitioned_entries) where:
date: datetime.date object.
entries_in_date: A list of dictionaries representing entries in the
corresponding date.
unpartitioned_entries: A list of entries that could not be partitioned
into any table. This can happen if the column used to partition the
entries is unset.
"""
entries_by_date = collections.defaultdict(list)
unpartitioned_entries = []
for entry in entries:
entry_timestamp = entry.get(timestamp_column)
if not entry_timestamp:
unpartitioned_entries.append(entry)
continue
date = entry_timestamp.date()
entries_by_date[date].append(entry)
return dict(entries_by_date), unpartitioned_entries