package recorder
import (
pb ""
// validateBatchCreateInvocationsRequest checks that the individual requests
// are valid, that they match the batch request requestID and that their names
// are not repeated.
// It also returns an IDSet containing the ids of all the invocations to be
// included in the new invocations.
func validateBatchCreateInvocationsRequest(
now time.Time, reqs []*pb.CreateInvocationRequest, requestID string) (newInvs, includedInvs invocations.IDSet, err error) {
if err := pbutil.ValidateRequestID(requestID); err != nil {
return nil, nil, errors.Annotate(err, "request_id").Err()
if err := pbutil.ValidateBatchRequestCount(len(reqs)); err != nil {
return nil, nil, err
newInvs = make(invocations.IDSet, len(reqs))
allIncludedIDs := make(invocations.IDSet)
for i, req := range reqs {
if err := validateCreateInvocationRequest(req, now, allIncludedIDs); err != nil {
return nil, nil, errors.Annotate(err, "requests[%d]", i).Err()
// If there's multiple `CreateInvocationRequest`s their request id
// must either be empty or match the one in the batch request.
if req.RequestId != "" && req.RequestId != requestID {
return nil, nil, errors.Reason("requests[%d].request_id: %q does not match request_id %q", i, requestID, req.RequestId).Err()
invID := invocations.ID(req.InvocationId)
if newInvs.Has(invID) {
return nil, nil, errors.Reason("requests[%d].invocation_id: duplicated invocation id %q", i, req.InvocationId).Err()
return newInvs, allIncludedIDs, nil
// BatchCreateInvocations implements pb.RecorderServer.
func (s *recorderServer) BatchCreateInvocations(ctx context.Context, in *pb.BatchCreateInvocationsRequest) (*pb.BatchCreateInvocationsResponse, error) {
now := clock.Now(ctx).UTC()
for i, r := range in.Requests {
if err := verifyCreateInvocationPermissions(ctx, r); err != nil {
return nil, errors.Annotate(err, "requests[%d]", i).Err()
idSet, includedInvs, err := validateBatchCreateInvocationsRequest(now, in.Requests, in.RequestId)
if err != nil {
return nil, appstatus.BadRequest(err)
if err := permissions.VerifyInvocations(span.Single(ctx), includedInvs, permIncludeInvocation); err != nil {
return nil, err
invs, tokens, err := s.createInvocations(ctx, in.Requests, in.RequestId, now, idSet)
if err != nil {
return nil, err
return &pb.BatchCreateInvocationsResponse{Invocations: invs, UpdateTokens: tokens}, nil
// createInvocations is a shared implementation for CreateInvocation and BatchCreateInvocations RPCs.
func (s *recorderServer) createInvocations(ctx context.Context, reqs []*pb.CreateInvocationRequest, requestID string, now time.Time, idSet invocations.IDSet) ([]*pb.Invocation, []string, error) {
createdBy := string(auth.CurrentIdentity(ctx))
var err error
deduped := false
_, err = span.ReadWriteTransaction(ctx, func(ctx context.Context) error {
deduped, err = deduplicateCreateInvocations(ctx, idSet, requestID, createdBy)
if err != nil {
return err
if !deduped {
s.createInvocationsInternal(ctx, now, reqs, requestID, createdBy)
return nil
if err != nil {
return nil, nil, err
if !deduped {
for _, r := range reqs {
spanutil.IncRowCount(ctx, 1, spanutil.Invocations, spanutil.Inserted, r.Invocation.GetRealm())
return getCreatedInvocationsAndUpdateTokens(ctx, idSet, reqs)
// createInvocationsInternal creates each invocation requested. It enqueues
// task queue tasks for finalization and/or export notifications as appropriate.
// Must be called within a Spanner Read/Write transaction.
func (s *recorderServer) createInvocationsInternal(ctx context.Context, now time.Time, reqs []*pb.CreateInvocationRequest, requestID, createdBy string) {
ms := make([]*spanner.Mutation, 0, len(reqs))
// Compute mutations
for _, req := range reqs {
newInvState := req.Invocation.GetState()
if newInvState == pb.Invocation_STATE_UNSPECIFIED {
newInvState = pb.Invocation_ACTIVE
if newInvState != pb.Invocation_ACTIVE && newInvState != pb.Invocation_FINALIZING {
// validateCreateInvocationRequest should have rejected any other states.
panic("do not create invocations in states other than active or finalizing")
// Prepare the invocation we will save to spanner.
inv := &pb.Invocation{
Name: invocations.ID(req.InvocationId).Name(),
State: newInvState,
Deadline: req.Invocation.GetDeadline(),
Tags: req.Invocation.GetTags(),
IsExportRoot: req.Invocation.GetIsExportRoot(),
BigqueryExports: req.Invocation.GetBigqueryExports(),
CreatedBy: createdBy,
ProducerResource: req.Invocation.GetProducerResource(),
Realm: req.Invocation.GetRealm(),
Properties: req.Invocation.GetProperties(),
SourceSpec: req.Invocation.GetSourceSpec(),
IsSourceSpecFinal: req.Invocation.GetIsSourceSpecFinal(),
BaselineId: req.Invocation.GetBaselineId(),
Instructions: req.Invocation.GetInstructions(),
ExtendedProperties: req.Invocation.GetExtendedProperties(),
// Ensure the invocation has a deadline.
if inv.Deadline == nil {
inv.Deadline = pbutil.MustTimestampProto(now.Add(defaultInvocationDeadlineDuration))
// Create a mutation to create the invocation.
ms = append(ms, spanutil.InsertMap("Invocations", s.rowOfInvocation(ctx, inv, requestID)))
// Add any inclusions.
var includedInvocationIDs []string
for _, incName := range req.Invocation.IncludedInvocations {
invID := invocations.MustParseName(incName)
ms = append(ms, spanutil.InsertMap("IncludedInvocations", map[string]any{
"InvocationId": invocations.ID(req.InvocationId),
"IncludedInvocationId": invID,
includedInvocationIDs = append(includedInvocationIDs, string(invID))
if req.Invocation.GetIsExportRoot() {
// An export root has itself as an export root. Exportnotifier service
// will propagate the export root to included invocations in a separate
// task.
root := exportroots.ExportRoot{
Invocation: invocations.ID(req.InvocationId),
RootInvocation: invocations.ID(req.InvocationId),
IsInheritedSourcesSet: true, // Roots inherit nil sources.
InheritedSources: nil,
IsNotified: false,
ms = append(ms, exportroots.Create(root))
if len(includedInvocationIDs) > 0 {
// Enqueue task to propagate export root to any included invocations
// and send any notifications.
exportnotifier.EnqueueTask(ctx, &taskspb.RunExportNotifications{
InvocationId: req.InvocationId,
RootInvocationIds: []string{req.InvocationId},
IncludedInvocationIds: includedInvocationIDs,
if req.Invocation.State == pb.Invocation_FINALIZING {
// Enqueue finalization task and run export notifications task.
tasks.StartInvocationFinalization(ctx, invocations.ID(req.InvocationId), false)
span.BufferWrite(ctx, ms...)
// getCreatedInvocationsAndUpdateTokens reads the full details of the
// invocations just created in a separate read-only transaction, and
// generates an update token for each.
func getCreatedInvocationsAndUpdateTokens(ctx context.Context, idSet invocations.IDSet, reqs []*pb.CreateInvocationRequest) ([]*pb.Invocation, []string, error) {
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
invMap, err := invocations.ReadBatch(ctx, idSet)
if err != nil {
return nil, nil, err
// Arrange them in same order as the incoming requests.
// Ordering is important to match the tokens.
invs := make([]*pb.Invocation, len(reqs))
for i, req := range reqs {
invs[i] = invMap[invocations.ID(req.InvocationId)]
tokens, err := generateTokens(ctx, invs)
if err != nil {
return nil, nil, err
return invs, tokens, nil
// deduplicateCreateInvocations checks if the invocations have already been
// created with the given requestID and current requester.
// Returns a true if they have.
func deduplicateCreateInvocations(ctx context.Context, idSet invocations.IDSet, requestID, createdBy string) (bool, error) {
invCount := 0
columns := []string{"InvocationId", "CreateRequestId", "CreatedBy"}
err := span.Read(ctx, "Invocations", idSet.Keys(), columns).Do(func(r *spanner.Row) error {
var invID invocations.ID
var rowRequestID spanner.NullString
var rowCreatedBy spanner.NullString
switch err := spanutil.FromSpanner(r, &invID, &rowRequestID, &rowCreatedBy); {
case err != nil:
return err
case !rowRequestID.Valid || rowRequestID.StringVal != requestID:
return invocationAlreadyExists(invID)
case rowCreatedBy.StringVal != createdBy:
return invocationAlreadyExists(invID)
return nil
switch {
case err != nil:
return false, err
case invCount == len(idSet):
// All invocations were previously created with this request id.
return true, nil
case invCount == 0:
// None of the invocations exist already.
return false, nil
// Could happen if someone sent two different but overlapping batch create
// requests, but reused the request_id.
return false, appstatus.Errorf(codes.AlreadyExists, "some, but not all of the invocations already created with this request id")
// generateTokens generates an update token for each invocation.
func generateTokens(ctx context.Context, invs []*pb.Invocation) ([]string, error) {
ret := make([]string, len(invs))
for i, inv := range invs {
updateToken, err := generateInvocationToken(ctx, invocations.MustParseName(inv.Name))
if err != nil {
return nil, err
ret[i] = updateToken
return ret, nil