blob: 462457aa5c345db232fe0010b48e33d813c2ff06 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sweep
import (
"context"
"sync/atomic"
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/server/tq/internal"
"go.chromium.org/luci/server/tq/internal/db"
"go.chromium.org/luci/server/tq/internal/reminder"
)
// BatchProcessor handles reminders in batches.
type BatchProcessor struct {
Context context.Context // the context to use for processing
DB db.DB // DB to use to fetch reminders from
Submitter internal.Submitter // knows how to submit tasks
BatchSize int // max size of a single reminder batch
ConcurrentBatches int // how many concurrent batches to process
ch dispatcher.Channel
processed int32 // total reminders successfully processed
}
// Start launches background processor goroutines.
func (p *BatchProcessor) Start() error {
var err error
p.ch, err = dispatcher.NewChannel(
p.Context,
&dispatcher.Options{
Buffer: buffer.Options{
MaxLeases: p.ConcurrentBatches,
BatchSize: p.BatchSize,
// Max waiting time to fill the batch.
BatchDuration: 10 * time.Millisecond,
FullBehavior: &buffer.BlockNewItems{
// If all workers are busy, block Enqueue.
MaxItems: p.ConcurrentBatches * p.BatchSize,
},
},
},
p.processBatch,
)
if err != nil {
return errors.Annotate(err, "invalid sweeper configuration").Err()
}
return nil
}
// Stop waits until all enqueues reminders are processed and then stops the
// processor.
//
// Returns the total number of successfully processed reminders.
func (p *BatchProcessor) Stop() int {
p.ch.Close()
<-p.ch.DrainC
return int(atomic.LoadInt32(&p.processed))
}
// Enqueue adds reminder to the to-be-processed queue.
//
// Must be called only between Start and Stop. Drops reminders on the floor if
// the context is canceled.
func (p *BatchProcessor) Enqueue(ctx context.Context, r []*reminder.Reminder) {
for _, rem := range r {
select {
case p.ch.C <- rem:
case <-ctx.Done():
return
}
}
}
// processBatch called concurrently to handle a single batch of items.
//
// Logs errors inside, doesn't return them.
func (p *BatchProcessor) processBatch(data *buffer.Batch) error {
batch := make([]*reminder.Reminder, len(data.Data))
for i, d := range data.Data {
batch[i] = d.(*reminder.Reminder)
}
count, err := internal.SubmitBatch(p.Context, p.Submitter, p.DB, batch)
if err != nil {
logging.Errorf(p.Context, "Processed only %d/%d reminders: %s", count, len(batch), err)
}
atomic.AddInt32(&p.processed, int32(count))
return nil
}