blob: 4412ee4a73efe5d640fd0e97fb9b628e90cff8f4 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package txndefer implements a filter that calls best-effort callbacks on
// successful transaction commits.
// Useful when an activity inside a transaction has some best-effort follow up
// that should be done once the transaction has successfully landed.
package txndefer
import (
ds ""
// FilterRDS installs the datastore filter into the context.
func FilterRDS(ctx context.Context) context.Context {
return ds.AddRawFilters(ctx, func(_ context.Context, inner ds.RawInterface) ds.RawInterface {
return filteredDS{inner}
// Defer schedules `cb` for execution when the current transaction successfully
// lands.
// Intended for a best-effort non-transactional follow up to a successful
// transaction. Note that in presence of failures there's no guarantee the
// callback will be called. For example, the callback won't ever be called if
// the process crashes right after landing the transaction. Or if the
// transaction really landed, but RunInTransaction finished with "deadline
// exceeded" (or some similar) error.
// Callbacks are executed sequentially in the reverse order they were deferred.
// They receive the non-transactional version of the context initially passed to
// RunInTransaction so that they inherit the deadline of the entire transaction.
// Panics if the given context is not transactional or there's no txndefer
// filter installed.
func Defer(ctx context.Context, cb func(context.Context)) {
state, _ := ctx.Value(&ctxKey).(*txnState)
if state == nil {
panic("not a transactional context or no txndefer filter installed")
var ctxKey = "txndefer.txnState"
type txnState struct {
ctx context.Context // the original transaction context
m sync.Mutex
cbs []func(context.Context)
func (s *txnState) deferCB(cb func(context.Context)) {
s.m.Lock() = append(, cb)
func (s *txnState) execCBs() {
// Note: execCBs happens after RunInTransaction has finished. If it spawned
// any goroutines, they must have been finished already too (calling Defer
// from a goroutine that outlives a transaction is rightfully a race). Thus
// all writes to `` are finished already and we also passed some
// synchronization barrier that waited for the goroutines to join. It's fine
// to avoid locking s.m in this case saving 200ns on hot code path.
if len( != 0 {
ctx := ds.WithoutTransaction(s.ctx)
for i := len( - 1; i >= 0; i-- {[i](ctx)
type filteredDS struct {
func (fds filteredDS) RunInTransaction(f func(ctx context.Context) error, opts *ds.TransactionOptions) error {
var state *txnState
err := fds.RawInterface.RunInTransaction(func(ctx context.Context) error {
state = &txnState{ctx: ctx}
return f(context.WithValue(ctx, &ctxKey, state))
}, opts)
if err == nil {
return err