blob: e0a48b7a9ea0f12cfec34904b77e73e62ca56384 [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file
import apache_beam as beam
class ConvertToCSV(beam.CombineFn):
"""Convert elements to CSV format to be written out
Transform for writing elements out in a CSV format. Can process elements
of type dictonary or list. This transform only supports consistent elements,
meaning dictionaries must all have the same keys and lists must have the
same length and order. If provided a header, a list must already be in that
order and a dictionary must have at least those fields.
"""
def __init__(self, header=None):
self.header = header
def create_accumulator(self):
return []
def iterable(self, obj):
"""Returns an iterable for a dictionary or list
Sorting dictionary keys assures that the CSV is in the same order
for all dictionaries. If given a header, use the header fields as
the iterable for the dictionary to preserve that order.
"""
if isinstance(obj, dict):
if self.header:
return self.header
keys = obj.keys()
return sorted(keys)
else:
return range(len(obj))
def add_input(self, accumulator, element):
element_string = []
for i in self.iterable(element):
element_string.append(str(element[i]))
accumulator.append(','.join(element_string) + '\n')
return accumulator
def merge_accumulators(self, accumulators):
return sum(accumulators, [])
def extract_output(self, accumulator):
if self.header:
accumulator.insert(0, ','.join(self.header) + '\n')
return ''.join(accumulator)