blob: c369d14bf00fc70b8994581a912e8193df55a6e0 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package workset contains a synchronized work queue implementation used by
// inproc sweeper.
package workset
import (
"context"
"math/rand"
"sync"
)
// Item is one item that when processed may result in more items.
type Item any
// WorkSet represents an unordered collection of some work items.
//
// Each item, when processed, may produce more follow up work items.
type WorkSet struct {
m sync.Mutex
r *rand.Rand
work []Item // available queued work
inflight int // how many workers are busy now
waiting chan struct{} // signaled to wake up all waiters
}
// New creates a new WorkSet and seeds it with the initial set of work items.
//
// If `r` is non-nil the workset will use it to randomize which items are picked
// up by Pop. Otherwise it will use the global math/rand source.
func New(work []Item, r *rand.Rand) *WorkSet {
return &WorkSet{r: r, work: append([]Item(nil), work...)}
}
// Pop blocks until there's some work item available and then pops it.
//
// Once it happens, it also increments "somebody is working" counter. The worker
// should eventually call `done(<more work items>)` or `done(nil)` to indicate
// that it has finished processing the item.
//
// If there's no available work items and no one is working (and thus no new
// items may ever appear), returns nil. Returns nil as well if the context is
// canceled.
func (ws *WorkSet) Pop(ctx context.Context) (item Item, done func([]Item)) {
ws.m.Lock()
defer ws.m.Unlock() // we'll also be unlocking it in the middle of the loop
// If there's no work available, but someone may produce it, block. Note that
// this is essentially a conditional variable (with broadcast) implemented on
// top of channels for compatibility with ctx.Done().
for len(ws.work) == 0 && ws.inflight > 0 && ctx.Err() == nil {
if ws.waiting == nil {
ws.waiting = make(chan struct{})
}
waiting := ws.waiting // will be closed in done(...) to wake everyone
// Release the lock while we are waiting, to let others reach this stage
// as well.
ws.m.Unlock()
select {
case <-waiting:
case <-ctx.Done():
}
ws.m.Lock()
}
wl := len(ws.work)
if wl == 0 {
return nil, nil // either the workset is empty or the context is done
}
// Pop a random work item by swapping it with tail and popping tail.
var idx int
if ws.r != nil {
idx = ws.r.Intn(wl)
} else {
idx = rand.Intn(wl)
}
ws.work[idx], ws.work[wl-1] = ws.work[wl-1], ws.work[idx]
ws.work, item = ws.work[:wl-1], ws.work[wl-1]
ws.inflight++
called := false
return item, func(more []Item) {
ws.m.Lock()
defer ws.m.Unlock()
if called {
panic("`done` callback called twice")
}
called = true
ws.inflight--
// Enqueue all follow up work.
ws.work = append(ws.work, more...)
// If conditional variable's condition is signaled, wake everyone up.
if ws.waiting != nil && (len(ws.work) != 0 || ws.inflight == 0) {
close(ws.waiting)
ws.waiting = nil
}
}
}