blob: 04b901f71c4eb744ffa2049ec60ce83911d64a3b [file] [log] [blame]
#!/usr/bin/env python
"""Wrapper around 'bq' tool from Google Cloud SDK.
It automates some common tasks, like dataset and table creation and schema
changes.
It reads a definition of the desired state of tables from config files,
compares it to what's in BigQuery now, and performs necessary updates. It will
not execute any removals though. The user must execute removals manually, to
acknowledge the data loss.
Prerequisites:
* 'bq' tool is in PATH.
* Application default credentials are configured.
"""
import argparse
import ast
import collections
import difflib
import json
import logging
import subprocess
import sys
import tempfile
class Error(Exception):
pass
class BigQueryError(Error):
pass
class DefinitionError(Error):
pass
class BigQuery(object):
"""A thin wrapper around 'bq' tool.
Hides its CLI interface and argument marshaling. In theory, can be replaced
with direct REST API calls. It will speed everything up significantly.
All methods may raise BigQueryError.
"""
def __init__(self, project_id):
self.project_id = str(project_id)
def _call(self, cmd, args, decode_json=True):
full_cmd = [
'bq',
'--api_version', 'v2', # we rely on particular format of JSON responses
'--project_id', self.project_id,
'--format', 'prettyjson',
cmd
] + list(args)
logging.debug('Calling %s', full_cmd)
try:
output = subprocess.check_output(full_cmd)
except subprocess.CalledProcessError as exc:
logging.debug('Failed with code %d: %s', exc.returncode)
raise BigQueryError(exc.output.strip())
logging.debug('Output:\n%s', output or '<empty>')
if decode_json:
return json.loads(output) if output else None
return output
def _tid(self, dataset_id, table_id):
"""Returns fully-qualified table ID."""
return '%s:%s.%s' % (self.project_id, dataset_id, table_id)
### Datasets
def list_datasets(self):
"""Returns a list of dataset IDs in the project."""
reply = self._call('ls', ['%s:' % self.project_id]) or []
return [d['datasetId'] for d in reply]
def create_dataset(self, dataset_id, description=''):
# 'mk' returns "Dataset '...' successfully created." instead of JSON.
self._call(
'mk', ['--description', description, '-d', dataset_id],
decode_json=False)
### Tables
def list_tables(self, dataset_id):
"""Returns a list of table IDs within the dataset."""
reply = self._call('ls', ['%s:%s.' % (self.project_id, dataset_id)]) or []
return [t['tableId'] for t in reply]
def get_table_metadata(self, dataset_id, table_id):
"""Returns information about an existing table (as a dict).
The format of the dict matches 'bigquery#table' resource defined here:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables
"""
out = self._call('show', [self._tid(dataset_id, table_id)])
assert out.get('kind') == 'bigquery#table'
return out
def create_table(
self,
dataset_id,
table_id,
schema,
time_partitioning,
time_partitioning_exp_days,
description):
"""Creates a table (time partitioned or not).
Args:
dataset_id: ID of an existing dataset.
table_id: ID of the table to create.
schema: a dict with table fields description (in REST API format).
time_partitioning: True to enable time partitioning.
time_partitioning_exp_days: number of days to keep or 0 for forever.
description: a friendly text description.
"""
with tempfile.NamedTemporaryFile() as schema_tmp:
json.dump(schema, schema_tmp)
schema_tmp.flush()
args = ['--schema', schema_tmp.name]
if time_partitioning:
args.extend(['--time_partitioning_type', 'DAY'])
if time_partitioning_exp_days:
args.extend([
'--time_partitioning_expiration',
str(time_partitioning_exp_days*24*3600),
])
if description:
args.extend(['--description', description])
args.append(self._tid(dataset_id, table_id))
self._call('mk', args, decode_json=False)
def update_table(
self,
dataset_id,
table_id,
schema=None,
time_partitioning=None,
time_partitioning_exp_days=None,
description=None):
"""Updates a table definition.
Only non-None fields are updated.
Args:
dataset_id: ID of an existing dataset.
table_id: ID of the table to create.
schema: a dict with table fields description (in REST API format).
time_partitioning: True to enable time partitioning.
time_partitioning_exp_days: number of days to keep or 0 for forever.
description: a friendly text description.
"""
args = []
schema_tmp = None
if schema is not None:
schema_tmp = tempfile.NamedTemporaryFile()
json.dump(schema, schema_tmp)
schema_tmp.flush()
args.extend(['--schema', schema_tmp.name])
if time_partitioning is not None:
if not time_partitioning:
raise BigQueryError('Removing time partitioning is not supported')
args.extend(['--time_partitioning_type', 'DAY'])
if time_partitioning_exp_days is not None:
if time_partitioning_exp_days > 0:
args.extend([
'--time_partitioning_expiration',
str(time_partitioning_exp_days*24*3600),
])
else:
args.extend(['--time_partitioning_expiration', '-1'])
if description is not None:
args.extend(['--description', description])
if not args:
return # nothing to update
args.append(self._tid(dataset_id, table_id))
try:
self._call('update', args, decode_json=False)
finally:
if schema_tmp:
schema_tmp.close()
class Table(object):
"""A description of one table (including its schema).
Includes only properties we will ever touch. For example, we don't use
'expirationTime', and so it's completely omitted from this description. All
omitted properties are left unchanged by updates.
"""
def __init__(self, dataset_id, table_id):
self.dataset_id = dataset_id
self.table_id = table_id
self.description = ''
self.time_partitioning = False
self.time_partitioning_exp_days = 0
self.fields = TableFields()
@classmethod
def load_from_definition(cls, dataset_id, path):
"""Loads the table definition from Python literal eval file (*.schema).
The file at 'path' is expected to contain python dict with following
structure:
{
'table_id': '<must match table_id>',
'description': '<arbitrary text description>',
'time_partitioning': <True or False>,
'time_partitioning_exp_days': <int with number of days to keep>,
'schema': <TableFieldSchema list>,
}
Where <TableFieldSchema list> is a list:
[
{
'name': '<field name>',
'type': '<field type, e.g INTEGER or RECORD>',
'mode': '<NULLABLE, REQUIRED or REPEATED (default is NULLABLE)>',
'description': '<arbitrary text description>',
'fields': <TableFieldSchema list with structure for RECORD fields>,
},
...
]
"""
with open(path, 'rt') as f:
text = f.read()
obj = ast.literal_eval(text)
if not isinstance(obj, dict):
raise DefinitionError('A schema document should be a dict')
table_id = obj.pop('table_id', None)
if not table_id or not isinstance(table_id, basestring):
raise DefinitionError('"table_id" field must be a string')
table = cls(dataset_id, table_id)
description = obj.pop('description', '')
if not isinstance(description, basestring):
raise DefinitionError('"description" must be a string')
table.description = description
time_partitioning = obj.pop('time_partitioning', False)
if not isinstance(time_partitioning, bool):
raise DefinitionError('"time_partitioning" must be True or False')
table.time_partitioning = time_partitioning
time_partitioning_exp_days = obj.pop('time_partitioning_exp_days', 0)
if not isinstance(time_partitioning_exp_days, int):
raise DefinitionError('"time_partitioning_exp_days" must be an int')
table.time_partitioning_exp_days = time_partitioning_exp_days
fields = obj.pop('fields', [])
if not isinstance(fields, list):
raise DefinitionError('"fields" must be a list')
table.fields = TableFields.load_from_definition(fields)
if obj:
raise DefinitionError(
'Unknown table definition keys in %s: %s' % (path, ', '.join(obj)))
return table
@classmethod
def load_from_bq(cls, bq, dataset_id, table_id):
"""Loads the table definition by fetching BigQuery table metadata."""
meta = bq.get_table_metadata(dataset_id, table_id)
table = cls(dataset_id, table_id)
table.description = meta['description']
partitioning = meta.get('timePartitioning')
if partitioning:
# BigQuery only supports daily partitioning currently.
assert partitioning['type'] == 'DAY'
table.time_partitioning = True
ms = int(partitioning.get('expirationMs', '0'))
table.time_partitioning_exp_days = ms / 1000 / 3600 / 24
# Only 1 day resolution is actually supported.
assert ms == table.time_partitioning_exp_days * 24 * 3600 * 1000
# BQ format for fields schema is same as on-disk format we use in *.schema.
table.fields = TableFields.load_from_definition(
meta.get('schema', {}).get('fields', []))
return table
def __eq__(self, other):
assert isinstance(other, Table)
keys = (
'dataset_id', 'table_id', 'description', 'time_partitioning',
'time_partitioning_exp_days', 'fields')
for k in keys:
if getattr(self, k) != getattr(other, k):
return False
return True
def __ne__(self, other):
return not self == other
def create(self, bq):
"""Creates this table in BigQuery."""
bq.create_table(
dataset_id=self.dataset_id,
table_id=self.table_id,
schema=self.fields.to_definition(),
time_partitioning=self.time_partitioning,
time_partitioning_exp_days=self.time_partitioning_exp_days,
description=self.description)
def apply_diff(self, bq, other):
"""Updates this table in BigQuery to match 'other'.
Updates only changed fields. Prints diff to the console.
"""
assert self.dataset_id == other.dataset_id
assert self.table_id == other.table_id
schema = None
if self.fields != other.fields:
print 'Schema diff:'
print diff_table_fields(self.fields, other.fields)
schema = other.fields.to_definition()
time_partitioning = None
if self.time_partitioning != other.time_partitioning:
print 'Time partitioning: %s' % other.time_partitioning
time_partitioning = other.time_partitioning
time_partitioning_exp_days = None
if self.time_partitioning_exp_days != other.time_partitioning_exp_days:
val = (
'%d days' if other.time_partitioning_exp_days else 'no expiration')
print 'Time partitioning expiration: %s' % val
time_partitioning_exp_days = other.time_partitioning_exp_days
description = None
if self.description != other.description:
print 'Description: %s' % other.description
description = other.description
bq.update_table(
dataset_id=self.dataset_id,
table_id=self.table_id,
schema=schema,
time_partitioning=time_partitioning,
time_partitioning_exp_days=time_partitioning_exp_days,
description=description)
class TableFields(object):
"""A schema of a row or a structured field."""
Field = collections.namedtuple('Field', 'name type mode description fields')
# Supported field types, minus confusing aliases (e.g we always use BOOLEAN,
# not BOOL).
KNOWN_FIELD_TYPES = frozenset([
'BOOLEAN',
'BYTES',
'DATE',
'DATETIME',
'FLOAT',
'INTEGER',
'RECORD',
'STRING',
'TIME',
'TIMESTAMP',
])
KNOWN_FIELD_MODES = frozenset(['NULLABLE', 'REQUIRED', 'REPEATED'])
def __init__(self, fields=None):
self.fields = list(fields or [])
def __eq__(self, other):
"""Returns True if this schema is semantically identical to 'other'.
Ignores order of fields, just like BigQuery does.
"""
return {f.name: f for f in self.fields} == {f.name: f for f in other.fields}
def __ne__(self, other):
return not self == other
@classmethod
def load_from_definition(cls, fields_list):
"""Loads fields description from the python list."""
obj = cls()
for f in fields_list:
if not isinstance(f, dict):
raise DefinitionError('A field must be described in a dict')
f = f.copy()
name = f.pop('name', None)
if not name or not isinstance(name, basestring):
raise DefinitionError('Field name must be a string, not %r' % (name,))
typ = f.pop('type', None)
if typ not in cls.KNOWN_FIELD_TYPES:
raise DefinitionError('Unknown field type %r' % (typ,))
mode = f.pop('mode', 'NULLABLE')
if mode not in cls.KNOWN_FIELD_MODES:
raise DefinitionError('Unknown field mode %r' % (mode,))
desc = f.pop('description', '')
if not isinstance(desc, basestring):
raise DefinitionError(
'Field description must be a string, not %r' % (desc,))
inner_fields_obj = None
fields = f.pop('fields', [])
if not isinstance(fields, list):
raise DefinitionError(
'Fields schema must be a list, not %r' % (fields,))
if typ == 'RECORD':
inner_fields_obj = TableFields.load_from_definition(fields)
elif fields:
raise DefinitionError('Only RECORD fields can have schema')
if f:
raise DefinitionError(
'Unknown keys in field definition: %s' % ', '.join(f))
obj.fields.append(cls.Field(name, typ, mode, desc, inner_fields_obj))
return obj
def to_definition(self):
"""Produces a JSONish dict with fields definition.
The format of the dict matches REST API format, and on disk *.schema
representation.
"""
out = []
for f in self.fields:
d = {'name': f.name, 'type': f.type}
if f.mode != 'NULLABLE': # this is default
d['mode'] = f.mode
if f.description:
d['description'] = f.description
if f.fields:
d['fields'] = f.fields.to_definition()
out.append(d)
return out
def diff_table_fields(old, new):
"""Returns text diff between two TableFields objects.
Ignores order of fields.
"""
def sort_fields(fields):
assert isinstance(fields, list)
fields.sort(key=lambda f: f['name'])
for field in fields:
if field.get('fields'):
sort_fields(field.get('fields'))
return fields
def normalized_lines(fields):
return json.dumps(
sort_fields(fields.to_definition()),
sort_keys=True,
indent=2,
separators=(',', ': ')).splitlines()
d = difflib.Differ()
return '\n'.join(d.compare(normalized_lines(old), normalized_lines(new)))
def update_tables_cmd(bq, args):
print 'Ensuring that the dataset "%s" exists...' % args.dataset_id
if args.dataset_id not in bq.list_datasets():
bq.create_dataset(args.dataset_id)
desired_tables = {}
for path in getattr(args, 'schema-file'):
t = Table.load_from_definition(args.dataset_id, path)
desired_tables[t.table_id] = t
# Create completely new tables.
all_tables = bq.list_tables(args.dataset_id)
for table_id in sorted(set(desired_tables) - set(all_tables)):
print 'Creating table "%s"...' % table_id
desired_tables[table_id].create(bq)
# Update existing tables.
for table_id in sorted(set(desired_tables) & set(all_tables)):
print 'Fetching the state of the table "%s"...' % table_id
old = Table.load_from_bq(bq, args.dataset_id, table_id)
new = desired_tables[table_id]
if old == new:
print 'Table "%s" is up-to-date.' % table_id
else:
print 'Updating table "%s"...' % table_id
old.apply_diff(bq, new)
def main(argv):
parser = argparse.ArgumentParser(description='Big Query helper.')
parser.set_defaults(verbose=False)
parser.add_argument(
'-p', '--project-id', metavar='PROJECT_ID', type=str,
help='Cloud ProjectID that contains the dataset')
parser.add_argument(
'-d', '--dataset-id', metavar='DATASET_ID', type=str,
help='Dataset ID to operate it (will be created if missing)')
parser.add_argument(
'-v', '--verbose', action='store_true', help='More logging')
subparsers = parser.add_subparsers()
update_args = subparsers.add_parser('update-tables')
update_args.add_argument(
'schema-file', metavar='SCHEMA_FILE', type=str, nargs='+',
help='Path to a file with a table definition to apply')
update_args.set_defaults(subcmd_func=update_tables_cmd)
args = parser.parse_args(argv[1:])
if not args.project_id:
print '--project-id is required'
parser.print_help()
return 1
if not args.dataset_id:
print '--dataset-id is required'
parser.print_help()
return 1
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
try:
return args.subcmd_func(BigQuery(args.project_id), args)
except Error as exc:
print >> sys.stderr, '-'*30 + ' ERROR ' + '-'*30
print >> sys.stderr, str(exc)
print >> sys.stderr, '-'*30 + ' ERROR ' + '-'*30
return 1
if __name__ == '__main__':
sys.exit(main(sys.argv))