// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package finalizer
import (
pb ""
// InitServer initializes a finalizer server.
func InitServer(srv *server.Server) {
// init() below takes care of everything.
func init() {
tasks.FinalizationTasks.AttachHandler(func(ctx context.Context, msg proto.Message) error {
task := msg.(*taskspb.TryFinalizeInvocation)
return tryFinalizeInvocation(ctx, invocations.ID(task.InvocationId))
// Invocation finalization is asynchronous. First, an invocation transitions
// from ACTIVE to FINALIZING state and transactionally an invocation task is
// enqueued to try to transition it from FINALIZING to FINALIZED.
// Then the task tries to finalize the invocation:
// 1. Check if the invocation is ready to be finalized.
// 2. Finalize the invocation.
// The invocation is ready to be finalized iff it is in FINALIZING state and it
// does not include, directly or indirectly, an active invocation.
// The latter involves a graph traversal.
// Given that a client cannot mutate inclusions of a FINALIZING/FINALIZED
// invocation, this means that once an invocation is ready to be finalized,
// it cannot become un-ready. This is why the check is done in a ready-only
// transaction with minimal contention.
// If the invocation is not ready to finalize, the task is dropped.
// This check is implemented in readyToFinalize() function.
// The second part is actual finalization. It is done in a separate read-write
// transaction. First the task checks again if the invocation is still
// FINALIZING. If so, the task changes state to FINALIZED, enqueues BQExport
// tasks and tasks to try to finalize invocations that directly include the
// current one (more about this below).
// The finalization is implemented in finalizeInvocation() function.
// If we have a chain of inclusions A includes B, B includes C, where A and B
// are FINALIZING and C is active, then A and B are waiting for C to be
// finalized.
// In this state, tasks attempting to finalize A or B will conclude that they
// are not ready.
// Once C is finalized, a task to try to finalize B is enqueued.
// B gets finalized and it enqueues a task to try to finalize A.
// More generally speaking, whenever a node transitions from FINALIZING to
// FINALIZED, we ping incoming edges. This may cause a chain of pings along
// the edges.
// More specifically, given edge (A, B), when finalizing B, A is pinged only if
// it is FINALIZING. It does not make sense to do it if A is FINALIZED for
// obvious reasons; and there is no need to do it if A is ACTIVE because
// a transition ACTIVE->FINALIZING is always accompanied with enqueuing a task
// to try to finalize it.
// tryFinalizeInvocation finalizes the invocation unless it directly or
// indirectly includes an ACTIVE invocation.
// If the invocation is too early to finalize, logs the reason and returns nil.
// Idempotent.
func tryFinalizeInvocation(ctx context.Context, invID invocations.ID) error {
// The check whether the invocation is ready to finalize involves traversing
// the invocation graph and reading Invocations.State column. Doing so in a
// RW transaction will cause contention. Fortunately, once an invocation
// is ready to finalize, it cannot go back to being unready, so doing
// check and finalization in separate transactions is fine.
switch ready, err := readyToFinalize(ctx, invID); {
case err != nil:
return err
case !ready:
return nil
logging.Infof(ctx, "decided to finalize %s...", invID.Name())
return finalizeInvocation(ctx, invID)
var errAlreadyFinalized = fmt.Errorf("the invocation is already finalized")
// notReadyToFinalize means the invocation is not ready to finalize.
// It is used exclusively inside readyToFinalize.
var notReadyToFinalize = errors.BoolTag{Key: errors.NewTagKey("not ready to get finalized")}
// readyToFinalize returns true if the invocation should be finalized.
// An invocation is ready to be finalized if no ACTIVE invocation is reachable
// from it.
func readyToFinalize(ctx context.Context, invID invocations.ID) (ready bool, err error) {
ctx, ts := tracing.Start(ctx, "resultdb.readyToFinalize")
defer func() { tracing.End(ts, err) }()
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
// Ensure the root invocation is in FINALIZING state.
eg.Go(func() error {
return ensureFinalizing(ctx, invID)
// Walk the graph of invocations, starting from the root, along the inclusion
// edges.
// Stop walking as soon as we encounter an active invocation.
seen := make(invocations.IDSet, 1)
var mu sync.Mutex
// Limit the number of concurrent queries.
sem := semaphore.NewWeighted(64)
var visit func(id invocations.ID)
visit = func(id invocations.ID) {
// Do not visit same node twice.
if seen.Has(id) {
// Concurrently fetch inclusions without a lock.
eg.Go(func() error {
// Limit concurrent Spanner queries.
if err := sem.Acquire(ctx, 1); err != nil {
return err
defer sem.Release(1)
// Ignore inclusions of FINALIZED invocations. An ACTIVE invocation is
// certainly not reachable from those.
st := spanner.NewStatement(`
SELECT included.InvocationId, included.State
FROM IncludedInvocations incl
JOIN Invocations included on incl.IncludedInvocationId = included.InvocationId
WHERE incl.InvocationId = @invID AND included.State != @finalized
st.Params = spanutil.ToSpannerMap(map[string]any{
"finalized": pb.Invocation_FINALIZED,
"invID": id,
var b spanutil.Buffer
return span.Query(ctx, st).Do(func(row *spanner.Row) error {
var includedID invocations.ID
var includedState pb.Invocation_State
switch err := b.FromSpanner(row, &includedID, &includedState); {
case err != nil:
return err
case includedState == pb.Invocation_ACTIVE:
return errors.Reason("%s is still ACTIVE", includedID.Name()).Tag(notReadyToFinalize).Err()
case includedState != pb.Invocation_FINALIZING:
return errors.Reason("%s has unexpected state %s", includedID.Name(), includedState).Err()
// The included invocation is FINALIZING and MAY include other
// still-active invocations. We must go deeper.
return nil
switch err := eg.Wait(); {
case errors.Unwrap(err) == errAlreadyFinalized:
// The invocation is already finalized.
return false, nil
case notReadyToFinalize.In(err):
logging.Infof(ctx, "not ready to finalize: %s", err.Error())
return false, nil
return err == nil, err
func ensureFinalizing(ctx context.Context, invID invocations.ID) error {
switch state, err := invocations.ReadState(ctx, invID); {
case err != nil:
return err
case state == pb.Invocation_FINALIZED:
return errAlreadyFinalized
case state != pb.Invocation_FINALIZING:
return errors.Reason("expected %s to be FINALIZING, but it is %s", invID.Name(), state).Err()
return nil
// finalizeInvocation updates the invocation state to FINALIZED.
// Enqueues BigQuery export tasks.
// For each FINALIZING invocation that includes the given one, enqueues
// a finalization task.
func finalizeInvocation(ctx context.Context, invID invocations.ID) error {
_, err := span.ReadWriteTransaction(ctx, func(ctx context.Context) error {
// Check the state before proceeding, so that if the invocation already
// finalized, we return errAlreadyFinalized.
if err := ensureFinalizing(ctx, invID); err != nil {
return err
err := parallel.FanOutIn(func(work chan<- func() error) {
work <- func() error {
parentInvs, err := parentsInFinalizingState(ctx, invID)
if err != nil {
return err
// Enqueue tasks to try to finalize invocations that include ours.
// Note that MustAddTask in a Spanner transaction is essentially
// a BufferWrite (no RPCs inside), it's fine to call it sequentially
// and panic on errors.
for _, id := range parentInvs {
tq.MustAddTask(ctx, &tq.Task{
Payload: &taskspb.TryFinalizeInvocation{InvocationId: string(id)},
Title: string(id),
// Enqueue a notification to pub/sub listeners that the invocation
// has been finalized.
realm, err := invocations.ReadRealm(ctx, invID)
if err != nil {
return err
// Note that this submits the notification transactionally,
// i.e. conditionally on this transaction committing.
notification := &pb.InvocationFinalizedNotification{
Invocation: invID.Name(),
Realm: realm,
tasks.NotifyInvocationFinalized(ctx, notification)
// Enqueue update test metadata task transactionally.
if err := testmetadataupdator.Schedule(ctx, invID); err != nil {
return err
// Enqueue export artifact task transactionally.
if err := artifactexporter.Schedule(ctx, invID); err != nil {
return err
// Enqueue BigQuery exports transactionally.
return bqexporter.Schedule(ctx, invID)
if err != nil {
return err
// Update the invocation.
span.BufferWrite(ctx, spanutil.UpdateMap("Invocations", map[string]any{
"InvocationId": invID,
"State": pb.Invocation_FINALIZED,
"FinalizeTime": spanner.CommitTimestamp,
if err = scheduleBaselineTask(ctx, invID); err != nil {
return err
return nil
switch {
case err == errAlreadyFinalized:
return nil
case err != nil:
return err
return nil
// parentsInFinalizingState returns IDs of invocations in FINALIZING state that
// directly include ours.
func parentsInFinalizingState(ctx context.Context, invID invocations.ID) (ids []invocations.ID, err error) {
st := spanner.NewStatement(`
SELECT including.InvocationId
FROM IncludedInvocations@{FORCE_INDEX=ReversedIncludedInvocations} incl
JOIN Invocations including ON incl.InvocationId = including.InvocationId
WHERE IncludedInvocationId = @invID AND including.State = @finalizing
st.Params = spanutil.ToSpannerMap(map[string]any{
"invID": invID.RowID(),
"finalizing": pb.Invocation_FINALIZING,
err = span.Query(ctx, st).Do(func(row *spanner.Row) error {
var id invocations.ID
if err := spanutil.FromSpanner(row, &id); err != nil {
return err
ids = append(ids, id)
return nil
return ids, err
func scheduleBaselineTask(ctx context.Context, invID invocations.ID) error {
submitted, err := invocations.ReadSubmitted(ctx, invID)
if err != nil {
return err
if submitted {
baselineupdater.Schedule(ctx, string(invID))
return nil