blob: 7325e70e6fbf426c7620cf3aa246677a27f91d9b [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package taskqueue
import (
"time"
"golang.org/x/net/context"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/sync/parallel"
)
// Add adds the specified task(s) to the specified task queue.
//
// If only one task is provided its error will be returned directly. If more
// than one task is provided, an errors.MultiError will be returned in the
// event of an error, with a given error index corresponding to the error
// encountered when processing the task at that index.
//
// If the number of tasks is beyond the limits of the underlying implementation,
// splits the batch into multiple ones.
func Add(c context.Context, queueName string, tasks ...*Task) error {
return addRaw(Raw(c), queueName, tasks)
}
func makeBatches(tasks []*Task, limit int) [][]*Task {
if limit <= 0 {
return [][]*Task{tasks}
}
batches := make([][]*Task, 0, len(tasks)/limit+1)
for len(tasks) > 0 {
batch := tasks
if len(batch) > limit {
batch = batch[:limit]
}
batches = append(batches, batch)
tasks = tasks[len(batch):]
}
return batches
}
func addRaw(raw RawInterface, queueName string, tasks []*Task) error {
lme := errors.NewLazyMultiError(len(tasks))
err := parallel.FanOutIn(func(work chan<- func() error) {
offset := 0
for _, batch := range makeBatches(tasks, raw.Constraints().MaxAddSize) {
batch := batch
i := offset
offset += len(batch)
work <- func() error {
return raw.AddMulti(batch, queueName, func(t *Task, err error) {
if !lme.Assign(i, err) {
*tasks[i] = *t
}
i++
})
}
}
})
if err != nil {
return err
}
err = lme.Get()
if len(tasks) == 1 {
err = errors.SingleError(err)
}
return err
}
// Delete deletes a task from the task queue.
//
// If only one task is provided its error will be returned directly. If more
// than one task is provided, an errors.MultiError will be returned in the
// event of an error, with a given error index corresponding to the error
// encountered when processing the task at that index.
//
// If the number of tasks is beyond the limits of the underlying implementation,
// splits the batch into multiple ones.
func Delete(c context.Context, queueName string, tasks ...*Task) error {
raw := Raw(c)
lme := errors.NewLazyMultiError(len(tasks))
err := parallel.FanOutIn(func(work chan<- func() error) {
offset := 0
for _, batch := range makeBatches(tasks, raw.Constraints().MaxDeleteSize) {
batch := batch
localOffset := offset
offset += len(batch)
work <- func() error {
return raw.DeleteMulti(batch, queueName, func(i int, err error) {
lme.Assign(localOffset+i, err)
})
}
}
})
if err != nil {
return err
}
err = lme.Get()
if len(tasks) == 1 {
err = errors.SingleError(err)
}
return err
}
// NOTE(riannucci): Pull task queues API can be extended to support automatic
// lease management.
//
// The theory is that a good lease API might look like:
//
// func Lease(queueName, tag string, batchSize int, duration time.Time, cb func(*Task, error<-))
//
// Which blocks and calls cb for each task obtained. Lease would then do all
// necessary backoff negotiation with the backend. The callback could execute
// synchronously (stuffing an error into the chan or panicing if it fails), or
// asynchronously (dispatching a goroutine which will then populate the error
// channel if needed). If it operates asynchronously, it has the option of
// processing multiple work items at a time.
//
// Lease would also take care of calling ModifyLease as necessary to ensure
// that each call to cb would have 'duration' amount of time to work on the
// task, as well as releasing as many leased tasks as it can on a failure.
// Lease leases tasks from a queue.
//
// leaseTime has seconds precision. The number of tasks fetched will be at most
// maxTasks.
func Lease(c context.Context, maxTasks int, queueName string, leaseTime time.Duration) ([]*Task, error) {
return Raw(c).Lease(maxTasks, queueName, leaseTime)
}
// LeaseByTag leases tasks from a queue, grouped by tag.
//
// If tag is empty, then the returned tasks are grouped by the tag of the task
// with earliest ETA.
//
// leaseTime has seconds precision. The number of tasks fetched will be at most
// maxTasks.
func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime time.Duration, tag string) ([]*Task, error) {
return Raw(c).LeaseByTag(maxTasks, queueName, leaseTime, tag)
}
// ModifyLease modifies the lease of a task.
//
// Used to request more processing time, or to abandon processing. leaseTime has
// seconds precision and must not be negative.
//
// On success, modifies task's ETA field in-place with updated lease expiration
// time.
func ModifyLease(c context.Context, task *Task, queueName string, leaseTime time.Duration) error {
return Raw(c).ModifyLease(task, queueName, leaseTime)
}
// Purge purges all tasks form the named queue.
func Purge(c context.Context, queueName string) error {
return Raw(c).Purge(queueName)
}
// Stats returns Statistics instances for each of the named task queues.
//
// If only one task is provided its error will be returned directly. If more
// than one task is provided, an errors.MultiError will be returned in the
// event of an error, with a given error index corresponding to the error
// encountered when processing the task at that index.
func Stats(c context.Context, queueNames ...string) ([]Statistics, error) {
ret := make([]Statistics, len(queueNames))
lme := errors.NewLazyMultiError(len(queueNames))
i := 0
err := Raw(c).Stats(queueNames, func(s *Statistics, err error) {
if !lme.Assign(i, err) {
ret[i] = *s
}
i++
})
if err == nil {
err = lme.Get()
if len(queueNames) == 1 {
err = errors.SingleError(err)
}
}
return ret, err
}
// GetTestable returns a Testable for the current task queue service in c, or
// nil if it does not offer one.
func GetTestable(c context.Context) Testable {
return Raw(c).GetTestable()
}