blob: d06e2910fda192ed15d4f9e6f5f2a00f1d653a01 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package span
import (
"context"
"sync"
"time"
"cloud.google.com/go/spanner"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/grpc/codes"
"go.chromium.org/luci/common/errors"
)
// Transaction is a common interface of spanner.ReadOnlyTransaction and
// spanner.ReadWriteTransaction.
type Transaction interface {
// ReadRow reads a single row from the database.
ReadRow(ctx context.Context, table string, key spanner.Key, columns []string) (*spanner.Row, error)
// ReadRowWithOptions reads a single row from the database, and allows customizing options.
ReadRowWithOptions(ctx context.Context, table string, key spanner.Key, columns []string, opts *spanner.ReadOptions) (*spanner.Row, error)
// Read reads multiple rows from the database.
Read(ctx context.Context, table string, keys spanner.KeySet, columns []string) *spanner.RowIterator
// ReadWithOptions reads multiple rows from the database, and allows
// customizing options.
ReadWithOptions(ctx context.Context, table string, keys spanner.KeySet, columns []string, opts *spanner.ReadOptions) *spanner.RowIterator
// Query reads multiple rows returned by SQL statement.
Query(ctx context.Context, statement spanner.Statement) *spanner.RowIterator
// QueryWithOptions reads multiple rows returned by SQL statement, and allows
// customizing options.
QueryWithOptions(ctx context.Context, statement spanner.Statement, opts spanner.QueryOptions) *spanner.RowIterator
}
// UseClient installs a Spanner client into the context.
//
// Primarily used by the module initialization code. May be useful in tests as
// well.
func UseClient(ctx context.Context, client *spanner.Client) context.Context {
return context.WithValue(ctx, &clientContextKey, client)
}
// Apply applies a list of mutations atomically to the database.
//
// Panics if called from inside a read-write transaction. Use BufferWrite to
// apply mutations there.
func Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) {
panicOnNestedRW(ctx)
if ctxOpts, ok := ctx.Value(&requestOptionsContextKey).(*RequestOptions); ok {
opts = append([]spanner.ApplyOption{spanner.Priority(ctxOpts.Priority)}, opts...)
}
return client(ctx).Apply(ctx, ms, opts...)
}
// PartitionedUpdate executes a DML statement in parallel across the database,
// using separate, internal transactions that commit independently.
//
// Panics if called from inside a read-write transaction.
func PartitionedUpdate(ctx context.Context, st spanner.Statement) (count int64, err error) {
panicOnNestedRW(ctx)
return client(ctx).PartitionedUpdateWithOptions(ctx, st, queryOptionsFromContext(ctx))
}
// Single returns a derived context with a single-use read-only transaction.
//
// It provides a read-only snapshot transaction optimized for the case where
// only a single read or query is needed. This is more efficient than using
// ReadOnlyTransaction() for a single read or query.
//
// The transaction object can be obtained through RO(ctx) or Txn(ctx). It is
// also transparently used by ReadRow, Read, Query, etc. wrappers.
//
// Panics if `ctx` already holds a transaction (either read-write or read-only).
func Single(ctx context.Context) context.Context {
panicOnNestedRO(ctx)
return setTxnState(ctx, &txnState{ro: client(ctx).Single()})
}
// ReadOnlyTransaction returns a derived context with a read-only transaction.
//
// It can be used for multiple reads from the database. To avoid leaking
// resources on the server this context *must* be canceled as soon as all reads
// are done.
//
// The transaction object can be obtained through RO(ctx) or Txn(ctx). It is
// also transparently used by ReadRow, Read, Query, etc. wrappers.
//
// Panics if `ctx` already holds a transaction (either read-write or read-only).
func ReadOnlyTransaction(ctx context.Context) (context.Context, context.CancelFunc) {
panicOnNestedRO(ctx)
txn := client(ctx).ReadOnlyTransaction()
ctx, cancel := context.WithCancel(setTxnState(ctx, &txnState{ro: txn}))
return ctx, func() { cancel(); txn.Close() }
}
// ReadWriteTransaction executes a read-write transaction, with retries as
// necessary.
//
// The callback may be called multiple times if Spanner client decides to retry
// the transaction. In particular this happens if the callback returns (perhaps
// wrapped) ABORTED error. This error is returned by Spanner client methods if
// they encounter a stale transaction.
//
// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
// more details.
//
// The callback can access the transaction object via RW(ctx) or Txn(ctx). It is
// also transparently used by ReadRow, Read, Query, BufferWrite, etc. wrappers.
//
// Panics if `ctx` already holds a read-write transaction. Starting a read-write
// transaction from a read-only transaction is OK though, but beware that they
// are completely separate unrelated transactions.
func ReadWriteTransaction(ctx context.Context, f func(ctx context.Context) error) (commitTimestamp time.Time, err error) {
panicOnNestedRW(ctx)
var state *txnState
cts, err := client(ctx).ReadWriteTransaction(ctx, func(ctx context.Context, rw *spanner.ReadWriteTransaction) error {
state = &txnState{rw: rw}
err := f(setTxnState(ctx, state))
if unwrapped := errors.Unwrap(err); spanner.ErrCode(unwrapped) == codes.Aborted {
err = unwrapped
}
return err
})
if err == nil {
state.execCBs(ctx)
}
return cts, err
}
// RO returns the current read-only transaction in the context or nil if it's
// not a read-only transactional context.
func RO(ctx context.Context) *spanner.ReadOnlyTransaction {
if s := getTxnState(ctx); s != nil {
return s.ro
}
return nil
}
// MustRO is like RO except it panics if `ctx` is not read-only transactional.
func MustRO(ctx context.Context) *spanner.ReadOnlyTransaction {
if ro := RO(ctx); ro != nil {
return ro
}
panic("not a read-only Spanner transactional context")
}
// RW returns the current read-write transaction in the context or nil if it's
// not a read-write transactional context.
func RW(ctx context.Context) *spanner.ReadWriteTransaction {
if s := getTxnState(ctx); s != nil {
return s.rw
}
return nil
}
// MustRW is like RW except it panics if `ctx` is not read-write transactional.
func MustRW(ctx context.Context) *spanner.ReadWriteTransaction {
if rw := RW(ctx); rw != nil {
return rw
}
panic("not a read-write Spanner transactional context")
}
// Txn returns an interface that can be used to read data in the current
// read-only or read-write transaction.
//
// Returns nil if `ctx` is not a transactional context.
func Txn(ctx context.Context) Transaction {
switch s := getTxnState(ctx); {
case s == nil:
return nil
case s.ro != nil:
return s.ro
default:
return s.rw
}
}
// MustTxn is like Txn except it panics if `ctx` is not transactional.
func MustTxn(ctx context.Context) Transaction {
if txn := Txn(ctx); txn != nil {
return txn
}
panic("not a transactional Spanner context")
}
// WithoutTxn returns a copy of the context without the transaction in it.
//
// This can be used to spawn separate independent transactions from within
// a transaction.
func WithoutTxn(ctx context.Context) context.Context {
if getTxnState(ctx) == nil {
return ctx
}
return setTxnState(ctx, nil)
}
// Defer schedules `cb` for execution when the current read-write transaction
// successfully lands.
//
// Intended for a best-effort non-transactional follow up to a successful
// transaction. Note that in presence of failures there's no guarantee the
// callback will be called. For example, the callback won't ever be called if
// the process crashes right after landing the transaction. Or if the
// transaction really landed, but ReadWriteTransaction finished with
// "deadline exceeded" (or some similar) error.
//
// Callbacks are executed sequentially in the reverse order they were deferred.
// They receive the non-transactional version of the context initially passed to
// ReadWriteTransaction.
//
// Panics if the given context is not transactional.
func Defer(ctx context.Context, cb func(context.Context)) {
state := getTxnState(ctx)
if state == nil || state.rw == nil {
panic("not a read-write Spanner transactional context")
}
state.deferCB(cb)
}
// ReadRow reads a single row from the database.
//
// It is a shortcut for MustTxn(ctx).ReadRow(ctx, ...). Panics if the context
// is not transactional.
func ReadRow(ctx context.Context, table string, key spanner.Key, columns []string) (*spanner.Row, error) {
return MustTxn(ctx).ReadRowWithOptions(ctx, table, key, columns, readOptionsFromContext(ctx))
}
// ReadRowWithOptions reads a single row from the database, and allows customizing
// options.
//
// It is a shortcut for MustTxn(ctx).ReadRowWithOptions(ctx, ...). Panics if
// the context is not transactional.
//
// It does not use the default RequestOptions from the ctx. Use opts to pass
// these explicitly.
func ReadRowWithOptions(ctx context.Context, table string, key spanner.Key, columns []string, opts *spanner.ReadOptions) (*spanner.Row, error) {
return MustTxn(ctx).ReadRowWithOptions(ctx, table, key, columns, opts)
}
// Read reads multiple rows from the database.
//
// It is a shortcut for MustTxn(ctx).Read(ctx, ...). Panics if the context
// is not transactional.
func Read(ctx context.Context, table string, keys spanner.KeySet, columns []string) *spanner.RowIterator {
return MustTxn(ctx).ReadWithOptions(ctx, table, keys, columns, readOptionsFromContext(ctx))
}
// ReadWithOptions reads multiple rows from the database, and allows customizing
// options.
//
// It is a shortcut for MustTxn(ctx).ReadWithOptions(ctx, ...). Panics if the
// context is not transactional.
//
// It does not use the default RequestOptions from the ctx. Use opts to pass
// these explicitly.
func ReadWithOptions(ctx context.Context, table string, keys spanner.KeySet, columns []string, opts *spanner.ReadOptions) *spanner.RowIterator {
return MustTxn(ctx).ReadWithOptions(ctx, table, keys, columns, opts)
}
// Query reads multiple rows returned by SQL statement.
//
// It is a shortcut for MustTxn(ctx).Query(ctx, ...). Panics if the context is
// not transactional.
func Query(ctx context.Context, statement spanner.Statement) *spanner.RowIterator {
return MustTxn(ctx).QueryWithOptions(ctx, statement, queryOptionsFromContext(ctx))
}
// BufferWrite adds a list of mutations to the set of updates that will be
// applied when the transaction is committed.
//
// It does not actually apply the write until the transaction is committed, so
// the operation does not block. The effects of the write won't be visible to
// any reads (including reads done in the same transaction) until the
// transaction commits.
//
// It is a wrapper over MustRW(ctx).BufferWrite(...). Panics if the context is
// not read-write transactional.
func BufferWrite(ctx context.Context, ms ...*spanner.Mutation) {
// BufferWrite just appends mutation to an internal buffer. It fails only if
// the transaction has already landed. We are OK to panic in this case:
// calling BufferWrite outside of a transaction is a programming error.
if err := MustRW(ctx).BufferWrite(ms); err != nil {
panic(err)
}
}
// Update executes a DML statement against the database. It returns the number
// of affected rows. Update returns an error if the statement is a query.
// However, the query is executed, and any data read will be validated upon
// commit.
//
// It is a shortcut for MustRW(ctx).Update(...). Panics if the context is not
// read-write transactional.
func Update(ctx context.Context, stmt spanner.Statement) (rowCount int64, err error) {
return MustRW(ctx).UpdateWithOptions(ctx, stmt, queryOptionsFromContext(ctx))
}
// UpdateWithOptions executes a DML statement against the database. It returns
// the number of affected rows. The sql query execution will be optimized based
// on the given query options.
//
// It is a shortcut for MustRW(ctx).Update(...). Panics if the context is not
// read-write transactional.
//
// It does not use the default RequestOptions from the ctx. Use opts to pass
// these explicitly.
func UpdateWithOptions(ctx context.Context, stmt spanner.Statement, opts spanner.QueryOptions) (rowCount int64, err error) {
return MustRW(ctx).UpdateWithOptions(ctx, stmt, opts)
}
// BatchWrite applies a list of mutation groups in a collection of efficient
// transactions.
//
// See https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite for
// details.
//
// Must be used outside of any transactions since it launches transactions
// itself and nested transactions are not supported. Panics if the context is
// transactional.
func BatchWrite(ctx context.Context, mgs []*spanner.MutationGroup) *spanner.BatchWriteResponseIterator {
return BatchWriteWithOptions(ctx, mgs, batchWriteOptionsFromContext(ctx))
}
// BatchWriteWithOptions applies a list of mutation groups in a collection of
// efficient transactions.
//
// See https://pkg.go.dev/cloud.google.com/go/spanner#Client.BatchWrite for
// details.
//
// Must be used outside of any transactions since it launches transactions
// itself and nested transactions are not supported. Panics if the context is
// transactional.
func BatchWriteWithOptions(ctx context.Context, mgs []*spanner.MutationGroup, opts spanner.BatchWriteOptions) *spanner.BatchWriteResponseIterator {
if getTxnState(ctx) != nil {
panic("BatchWrite cannot be used in a transaction: it launches transactions itself")
}
return client(ctx).BatchWriteWithOptions(ctx, mgs, opts)
}
////////////////////////////////////////////////////////////////////////////////
var (
clientContextKey = "go.chromium.org/luci/server/span:client"
txnContextKey = "go.chromium.org/luci/server/span:txn"
requestOptionsContextKey = "go.chromium.org/luci/server/span:requestOptions"
)
// client returns a Spanner client installed in the context.
//
// Panics if it is not there.
//
// Intentionally private to force all callers to go through package's functions
// like ReadWriteTransaction, ReadOnlyTransaction, Single, etc. since they
// generally add additional functionality on top of the raw Spanner client that
// other LUCI packages assume to be present. Using the Spanner client directly
// may violate such assumptions leading to undefined behavior when multiple
// packages are used together.
func client(ctx context.Context) *spanner.Client {
cl, _ := ctx.Value(&clientContextKey).(*spanner.Client)
if cl == nil {
panic("no Spanner client in the context")
}
return cl
}
type txnState struct {
ro *spanner.ReadOnlyTransaction
rw *spanner.ReadWriteTransaction
m sync.Mutex
cbs []func(context.Context)
}
func (s *txnState) deferCB(cb func(context.Context)) {
s.m.Lock()
s.cbs = append(s.cbs, cb)
s.m.Unlock()
}
func (s *txnState) execCBs(ctx context.Context) {
// Note: execCBs happens after ReadWriteTransaction has finished. If it
// spawned any goroutines, they must have been finished already too (calling
// Defer from a goroutine that outlives a transaction is rightfully a race).
// Thus all writes to `s.cbs` are finished already and we also passed some
// synchronization barrier that waited for the goroutines to join. It's fine
// to avoid locking s.m in this case saving 200ns on hot code path.
for i := len(s.cbs) - 1; i >= 0; i-- {
s.cbs[i](ctx)
}
}
func setTxnState(ctx context.Context, s *txnState) context.Context {
return context.WithValue(ctx, &txnContextKey, s)
}
func getTxnState(ctx context.Context) *txnState {
s, _ := ctx.Value(&txnContextKey).(*txnState)
return s
}
func panicOnNestedRW(ctx context.Context) {
if RW(ctx) != nil {
panic("nested Spanner write transactions are not allowed")
}
}
func panicOnNestedRO(ctx context.Context) {
if getTxnState(ctx) != nil {
panic("nested Spanner read transactions are not allowed")
}
}
// RequestOptions holds options common to many Spanner requests.
//
// Used for setting default request options in the context via
// ModifyRequestOptions. See also spanner.ReadOptions also
// spanner.QueryOptions.
type RequestOptions struct {
Tag string
Priority sppb.RequestOptions_Priority
}
// ModifyRequestOptions returns a new Context that carries default Spanner
// request options.
//
// These request options will be used by all Spanner operations in this module
// if the underlying operation supports it, except the *WithOptions functions
// (where the caller provides options explicitly).
//
// The cb function will be called with the current defaults from the context,
// to allow for incremental updates.
func ModifyRequestOptions(ctx context.Context, cb func(*RequestOptions)) context.Context {
var next RequestOptions
if cur, ok := ctx.Value(&requestOptionsContextKey).(*RequestOptions); ok {
next = *cur
}
cb(&next)
return context.WithValue(ctx, &requestOptionsContextKey, &next)
}
func queryOptionsFromContext(ctx context.Context) spanner.QueryOptions {
opts, ok := ctx.Value(&requestOptionsContextKey).(*RequestOptions)
if !ok {
return spanner.QueryOptions{}
}
return spanner.QueryOptions{
Priority: opts.Priority,
RequestTag: opts.Tag,
}
}
func readOptionsFromContext(ctx context.Context) *spanner.ReadOptions {
opts, ok := ctx.Value(&requestOptionsContextKey).(*RequestOptions)
if !ok {
return &spanner.ReadOptions{}
}
return &spanner.ReadOptions{
Priority: opts.Priority,
RequestTag: opts.Tag,
}
}
func batchWriteOptionsFromContext(ctx context.Context) spanner.BatchWriteOptions {
opts, ok := ctx.Value(&requestOptionsContextKey).(*RequestOptions)
if !ok {
return spanner.BatchWriteOptions{}
}
return spanner.BatchWriteOptions{
Priority: opts.Priority,
TransactionTag: opts.Tag,
}
}