blob: cc2787e4e97c7a5f22b1483ad5468cd33d5b4145 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package lessor defines common lessor interface.
package lessor
import (
"context"
"fmt"
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/server/tq/internal/partition"
)
// WithLeaseCB executes with active lease on the provided SortedPartitions.
//
// SortedPartitions may be empty slice, meaning there were existing active
// leases cumulatively covering the entire desired partition.
// Context deadline is set before the lease expires.
type WithLeaseCB func(context.Context, partition.SortedPartitions)
// Lessor abstracts out different implementations aimed to prevent concurrent
// processing of the same range of Reminders.
//
// Lessors are used by the distributed sweep implementation.
type Lessor interface {
// WithLease acquires the lease and executes WithLeaseCB.
//
// The obtained lease duration may be shorter than requested.
// The obtained lease may be only for some parts of the desired Partition.
//
// The given `sectionID` identifies a transactionally updated object that
// actually stores records about the currently leased sub-partitions of
// `part`. Each such section is independent of another. In other words, if
// some range of keys is covered by two different sections, it may be leased
// to two different callers at the same time, there's no synchronization in
// such case.
WithLease(ctx context.Context, sectionID string, part *partition.Partition, dur time.Duration, cb WithLeaseCB) error
}
var lessors = map[string]func(ctx context.Context) (Lessor, error){}
// Register registers a lessor implementation.
//
// Preferably IDs should match the corresponding database.Database
// implementations, since by default if the TQ uses a database "<X>" it will use
// the lessor "<X>" as well. But there may be IDs that are no associated with
// any database implementation (e.g. a Redis-based lessor). Such lessors need
// an explicit opt-in to be used.
//
// Must be called during init time.
func Register(id string, factory func(ctx context.Context) (Lessor, error)) {
if lessors[id] != nil {
panic(fmt.Sprintf("lessor kind %q is already registered", id))
}
lessors[id] = factory
}
// Get returns a particular Lessor implementation given its ID.
func Get(ctx context.Context, id string) (Lessor, error) {
if factory := lessors[id]; factory != nil {
return factory(ctx)
}
return nil, errors.Reason("no lessor kind %q is registered in the process", id).Err()
}