blob: 35724d7b7bbbdbfab2aa06470c8157e08fa8fb78 [file] [log] [blame]
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module integrates buildbucket with resultdb."""
import hashlib
import json
import logging
from google.appengine.api import app_identity
from google.appengine.ext import ndb
import webapp2
from components import decorators
from components import net
from components.prpc import client
from components.prpc import codes
from go.chromium.org.luci.buildbucket.proto import common_pb2
from go.chromium.org.luci.resultdb.proto.v1 import recorder_pb2
from go.chromium.org.luci.resultdb.proto.v1 import recorder_prpc_pb2
from go.chromium.org.luci.resultdb.proto.v1 import invocation_pb2
import config
import model
import tq
def _split_by_project(builds_and_configs):
"""Splits the list of pairs into multiple where each belongs to one project.
"""
batches = {}
for build, cfg in builds_and_configs:
project = build.proto.builder.project
batches.setdefault(project, [])
batches[project].append((build, cfg))
return batches.values()
@ndb.tasklet
def create_invocations_async(builds_and_configs):
"""Creates resultdb invocations for each build.
Only create invocations if ResultDB hostname is globally set.
Args:
builds_and_configs: a list of (build, builder_cfg) tuples.
"""
if not builds_and_configs: # pragma: no cover
return
settings = yield config.get_settings_async()
resultdb_host = settings.resultdb.hostname
if not resultdb_host:
# resultdb host needs to be enabled at service level, i.e. globally per
# buildbucket deployment.
return
bb_host = app_identity.get_default_version_hostname()
# We need to do one batch request per project, since the rpc to create
# invocations uses per-project credentials.
batches = _split_by_project(builds_and_configs)
batch_reqs_and_creds = []
for batch in batches:
project = batch[0][0].proto.builder.project
req = recorder_pb2.BatchCreateInvocationsRequest(
# build-<first build id>+<number of other builds in the batch>
request_id='build-%d+%d' % (batch[0][0].proto.id, len(batch) - 1)
)
for build, cfg in batch:
history_options = invocation_pb2.HistoryOptions()
history_options.use_invocation_timestamp = (
cfg.resultdb.history_options.use_invocation_timestamp
)
inv_req = req.requests.add(
invocation_id='build-%d' % build.proto.id,
invocation=invocation_pb2.Invocation(
realm=build.realm,
bigquery_exports=cfg.resultdb.bq_exports,
producer_resource='//%s/builds/%s' % (bb_host, build.key.id()),
history_options=history_options,
),
)
build.proto.infra.resultdb.invocation = (
'invocations/%s' % inv_req.invocation_id
)
if build.proto.number:
req.requests.add(
invocation_id='build-%s-%d' %
(hashlib.sha256(build.builder_id).hexdigest(), build.proto.number),
invocation=invocation_pb2.Invocation(
realm=build.realm,
included_invocations=['invocations/%s' % inv_req.invocation_id],
state=invocation_pb2.Invocation.State.FINALIZING,
producer_resource=inv_req.invocation.producer_resource,
),
)
# Accumulate one (request, credentials) pair per batch.
batch_reqs_and_creds.append((
req,
client.project_credentials(project),
))
rec_client = _recorder_client(resultdb_host)
# Do rpcs in parallel.
resps = yield [
rec_client.BatchCreateInvocationsAsync(req, credentials=creds)
for req, creds in batch_reqs_and_creds
]
for batch, res in zip(batches, resps):
assert len(res.invocations) == len(res.update_tokens)
# There may be more invocations in the response than there are builds in
# the batch, because for some builds we create two invocations.
# However, there are exactly as many update_tokens as there are invocations,
# and their indices are expected to match.
# We create a mapping between invocation name and update token, and later
# save the appropriate update token to the build based on the name of its
# corresponding invocation.
tokens = {
inv.name: tok for inv, tok in zip(res.invocations, res.update_tokens)
}
for build, _ in batch:
assert build.proto.infra.resultdb.invocation in tokens
build.resultdb_update_token = tokens[build.proto.infra.resultdb.invocation
]
def enqueue_invocation_finalization_async(build):
"""Enqueues a task to call ResultDB to finalize the build's invocation."""
assert ndb.in_transaction()
assert build
assert build.is_ended
task_def = {
'url': '/internal/task/resultdb/finalize/%d' % build.key.id(),
'payload': {'id': build.key.id()},
'retry_options': {'task_age_limit': model.BUILD_TIMEOUT.total_seconds()},
}
return tq.enqueue_async('backend-default', [task_def])
class FinalizeInvocation(webapp2.RequestHandler): # pragma: no cover
"""Calls ResultDB to finalize the build's invocation."""
@decorators.require_taskqueue('backend-default')
def post(self, build_id): # pylint: disable=unused-argument
build_id = json.loads(self.request.body)['id']
_finalize_invocation(build_id)
def _finalize_invocation(build_id):
bundle = model.BuildBundle.get(build_id, infra=True)
rdb = bundle.infra.parse().resultdb
if not rdb.hostname or not rdb.invocation:
# If there's no hostname or no invocation, it means resultdb integration
# is not enabled for this build.
return
try:
_recorder_client(rdb.hostname).FinalizeInvocation(
recorder_pb2.FinalizeInvocationRequest(name=rdb.invocation),
credentials=client.service_account_credentials(),
metadata={'update-token': bundle.build.resultdb_update_token},
)
except client.RpcError as rpce:
if rpce.status_code in (codes.StatusCode.FAILED_PRECONDITION,
codes.StatusCode.PERMISSION_DENIED):
logging.error('RpcError when finalizing %s: %s', rdb.invocation, rpce)
else:
raise # Retry other errors.
def _recorder_client(hostname): # pragma: no cover
return client.Client(hostname, recorder_prpc_pb2.RecorderServiceDescription)