blob: 9236df541ee0593b2380baaeb24bd9750e5e7fd9 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Implements export of builds from datastore to BigQuery."""
import datetime
import json
import logging
from google.appengine.api import app_identity
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
import webapp2
from components import decorators
from components import net
from components import utils
import bqh
from go.chromium.org.luci.buildbucket.proto import build_pb2
import model
import tq
_REQUEST_ROWS_SIZE_LIMIT = 9e6
_SUCCESS, _TRANSIENT_FAILURE, _PERMANENT_FAILURE = range(3)
def enqueue_bq_export_async(build):
"""Enqueues a task to export a completed build to BigQuery."""
assert ndb.in_transaction()
assert build
assert build.is_ended
task_def = {
'url': '/internal/task/bq/export/%d' % build.key.id(),
'payload': {'id': build.key.id()},
}
return tq.enqueue_async('backend-default', [task_def])
class TaskExport(webapp2.RequestHandler): # pragma: no cover
"""Exports builds to a BigQuery table."""
@decorators.require_taskqueue('backend-default')
def post(self, build_id):
build_id = json.loads(self.request.body)['id']
bundle = model.BuildBundle.get(
build_id,
infra=True,
input_properties=True,
output_properties=True,
steps=True,
)
if not bundle:
return
build = build_pb2.Build()
bundle.to_proto(build, True)
result = _export_builds(
'raw', 'completed_builds', [build],
utils.utcnow() + datetime.timedelta(minutes=9)
)[build_id]
if result == _TRANSIENT_FAILURE:
logging.warning('transient failure')
# Raise a 500 to trigger a retry at the Cloud Tasks level.
self.abort(500)
if result == _PERMANENT_FAILURE:
logging.error('permanent failure')
# TODO(crbug/1042991): Remove cron once pull queue is drained.
class CronExportBuilds(webapp2.RequestHandler): # pragma: no cover
"""Exports builds to a BigQuery table."""
@decorators.require_cronjob
def get(self):
deadline = utils.utcnow() + datetime.timedelta(minutes=9)
while utils.utcnow() < deadline:
inserted, total = _process_pull_task_batch(
'bq-export', 'raw', 'completed_builds'
)
if total > 0 and inserted == 0:
logging.error('Failed to insert a single row out of %d', total)
self.abort(500)
if total < 100:
# Too few for a tight loop.
return
def _process_pull_task_batch(queue_name, dataset, table_name):
"""Exports up to 300 builds to BigQuery.
Leases pull tasks, fetches build entities and inserts them into BigQuery.
If the build is not finalized and it has been 20m or more since the build was
completed, the following strategies apply:
- if the build infra-failed with BOT_DIED or TIMED_OUT task status,
saves build as is.
- if the build infra-failed with BOOTSTRAPPER_ERROR and there are no steps,
assumes the build failed to register LogDog prefix and saves it as is.
- otherwise logs a warning/error, does not save to BigQuery and retries the
task later.
Returns: (inserted_count, total_count) tuple.
"""
now = utils.utcnow()
# Lease tasks.
lease_duration = datetime.timedelta(minutes=5)
lease_deadline = now + lease_duration
q = taskqueue.Queue(queue_name)
# https://cloud.google.com/bigquery/quotas#streaming_inserts
# says "We recommend using about 500 rows per request".
# We are using less because otherwise we tend to hit the 10 MB per request
# limit.
tasks = q.lease_tasks(lease_duration.total_seconds(), 150)
if not tasks:
return 0, 0
build_ids = [json.loads(t.payload)['id'] for t in tasks]
# IDs of builds that we could not save and want to retry later.
ids_to_retry = set()
# model.Build objects to insert to BigQuery.
to_insert = []
builds = ndb.get_multi(ndb.Key(model.Build, bid) for bid in build_ids)
for bid, b in zip(build_ids, builds):
if not b:
logging.error('skipping build %d: not found', bid)
elif not b.is_ended:
logging.error('will retry build: not complete\n%d', bid)
ids_to_retry.add(bid)
else:
to_insert.append(b)
inserted_count = 0
if to_insert:
pairs = [(b, build_pb2.Build()) for b in to_insert]
model.builds_to_protos_async(
pairs,
load_tags=True,
load_input_properties=True,
load_output_properties=True,
load_steps=True,
load_infra=True,
).get_result()
statuses = _export_builds(
dataset, table_name, [pb for _, pb in pairs], lease_deadline
)
inserted_count = len([1 for s in statuses.itervalues() if s == _SUCCESS])
ids_to_retry.update(
id for id, s in statuses.iteritems() if s == _TRANSIENT_FAILURE
)
if ids_to_retry:
logging.warning('will retry builds %r later', sorted(ids_to_retry))
done_tasks = [
t for bid, t in zip(build_ids, tasks) if bid not in ids_to_retry
]
q.delete_tasks(done_tasks)
logging.info(
'inserted %d rows, processed %d tasks', inserted_count, len(done_tasks)
)
return len(done_tasks), len(tasks)
def _export_builds(
dataset,
table_name,
build_protos,
deadline,
request_size_limit=_REQUEST_ROWS_SIZE_LIMIT,
):
"""Saves builds to BigQuery.
Mutates build_protos in-place.
Logs insert errors and returns a dict {build_id: status} where status
is one of _PERMANENT_FAILURE, _TRANSIENT_FAILURE, _SUCCESS.
"""
# BigQuery API doc:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
logging.info('sending %d rows', len(build_protos))
# Clear fields that we don't want in BigQuery.
#
for proto in build_protos:
proto.infra.buildbucket.hostname = ''
for s in proto.steps:
s.summary_markdown = ''
for log in s.logs:
# Clear everything but the name.
name = log.name
log.Clear()
log.name = name
row_statuses = {} # build_id -> status.
# Ensure we are under request size limit.
request_size = 0
to_insert = []
for proto in build_protos:
row_size = len(json.dumps(bqh.message_to_dict(proto)))
if request_size + row_size > request_size_limit:
row_statuses[proto.id] = _TRANSIENT_FAILURE
else:
request_size += row_size
to_insert.append(proto)
res = net.json_request(
url=((
'https://www.googleapis.com/bigquery/v2/'
'projects/%s/datasets/%s/tables/%s/insertAll'
) % (app_identity.get_application_id(), dataset, table_name)),
method='POST',
payload={
'kind':
'bigquery#tableDataInsertAllRequest',
# Do not fail entire request because of one bad build.
# We handle invalid rows below.
'skipInvalidRows':
True,
'ignoreUnknownValues':
False,
'rows': [{
'insertId': str(p.id),
'json': bqh.message_to_dict(p),
} for p in to_insert],
},
scopes=bqh.INSERT_ROWS_SCOPE,
# deadline parameter here is duration in seconds.
deadline=(deadline - utils.utcnow()).total_seconds(),
)
for err in res.get('insertErrors', []):
bp = build_protos[err['index']]
if any(_is_max_size_error(e) for e in err['errors']):
row_statuses[bp.id] = _PERMANENT_FAILURE
logging.error(
'permanently failed to insert row for build %d: %r',
bp.id,
err['errors'],
)
else:
row_statuses[bp.id] = _TRANSIENT_FAILURE
logging.warning(
'transiently failed to insert row for build %d: %r',
bp.id,
err['errors'],
)
for p in to_insert:
row_statuses.setdefault(p.id, _SUCCESS)
assert len(row_statuses) == len(build_protos)
return row_statuses
def _is_max_size_error(err):
# No better way than this.
return 'Maximum allowed row size exceeded' in err.get('message', '')