blob: 20ffad5da5e552fd3bf8d0f57f45857b4a6ae0bc [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import apache_beam as beam
from apache_beam.pipeline import PipelineOptions
class EventsPipeline(beam.Pipeline):
"""Pipeline that reads options from the command line."""
def __init__(self):
super(EventsPipeline, self).__init__(options=PipelineOptions())
class BQRead(beam.io.iobase.Read):
"""Read transform created from a BigQuerySource with convenient defaults."""
def __init__(self, query, validate=True, coder=None, use_standard_sql=True,
flatten_results=False):
"""
Args:
query: The query to be run. Should specify table in
`project.dataset.table` form for standard SQL and
[project:dataset.table] form is use_standard_sql is False.
See beam.io.BigQuerySource for explanation of remaining arguments.
"""
source = beam.io.BigQuerySource(query=query, validate=validate, coder=coder,
flatten_results=flatten_results,
use_standard_sql=use_standard_sql)
super(BQRead, self).__init__(source)
class BQWrite(beam.io.Write):
"""Write transform created from a BigQuerySink with convenient defaults.
beam.io.BigQuerySink will automatically add unique insert ids to rows,
which BigQuery uses to prevent duplicate inserts.
"""
def __init__(self, table, dataset='aggregated',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE):
sink = beam.io.BigQuerySink(table, dataset, project='chrome-infra-events',
write_disposition=write_disposition)
super(BQWrite, self).__init__(sink)