blob: 4412ee4a73efe5d640fd0e97fb9b628e90cff8f4 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package txndefer implements a filter that calls best-effort callbacks on
// successful transaction commits.
//
// Useful when an activity inside a transaction has some best-effort follow up
// that should be done once the transaction has successfully landed.
package txndefer
import (
"context"
"sync"
ds "go.chromium.org/luci/gae/service/datastore"
)
// FilterRDS installs the datastore filter into the context.
func FilterRDS(ctx context.Context) context.Context {
return ds.AddRawFilters(ctx, func(_ context.Context, inner ds.RawInterface) ds.RawInterface {
return filteredDS{inner}
})
}
// Defer schedules `cb` for execution when the current transaction successfully
// lands.
//
// Intended for a best-effort non-transactional follow up to a successful
// transaction. Note that in presence of failures there's no guarantee the
// callback will be called. For example, the callback won't ever be called if
// the process crashes right after landing the transaction. Or if the
// transaction really landed, but RunInTransaction finished with "deadline
// exceeded" (or some similar) error.
//
// Callbacks are executed sequentially in the reverse order they were deferred.
// They receive the non-transactional version of the context initially passed to
// RunInTransaction so that they inherit the deadline of the entire transaction.
//
// Panics if the given context is not transactional or there's no txndefer
// filter installed.
func Defer(ctx context.Context, cb func(context.Context)) {
state, _ := ctx.Value(&ctxKey).(*txnState)
if state == nil {
panic("not a transactional context or no txndefer filter installed")
}
state.deferCB(cb)
}
////////////////////////////////////////////////////////////////////////////////
var ctxKey = "txndefer.txnState"
type txnState struct {
ctx context.Context // the original transaction context
m sync.Mutex
cbs []func(context.Context)
}
func (s *txnState) deferCB(cb func(context.Context)) {
s.m.Lock()
s.cbs = append(s.cbs, cb)
s.m.Unlock()
}
func (s *txnState) execCBs() {
// Note: execCBs happens after RunInTransaction has finished. If it spawned
// any goroutines, they must have been finished already too (calling Defer
// from a goroutine that outlives a transaction is rightfully a race). Thus
// all writes to `s.cbs` are finished already and we also passed some
// synchronization barrier that waited for the goroutines to join. It's fine
// to avoid locking s.m in this case saving 200ns on hot code path.
if len(s.cbs) != 0 {
ctx := ds.WithoutTransaction(s.ctx)
for i := len(s.cbs) - 1; i >= 0; i-- {
s.cbs[i](ctx)
}
}
}
type filteredDS struct {
ds.RawInterface
}
func (fds filteredDS) RunInTransaction(f func(ctx context.Context) error, opts *ds.TransactionOptions) error {
var state *txnState
err := fds.RawInterface.RunInTransaction(func(ctx context.Context) error {
state = &txnState{ctx: ctx}
return f(context.WithValue(ctx, &ctxKey, state))
}, opts)
if err == nil {
state.execCBs()
}
return err
}