blob: 8128536506689b9df19ea0c447d3bcfe15015e8b [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parallel
import (
"context"
"go.chromium.org/luci/common/errors"
)
// RunMulti initiates a nested RunMulti operation. It invokes an entry function,
// passing it a MultiRunner instance bound to the supplied constraints. Any
// nested parallel operations scheduled through that MultiRunner will not
// starve each other regardless of execution order.
//
// This is useful when sharing the same outer Runner constraints with multiple
// tiers of parallel operations. A naive approach would be to re-use a Runner's
// WorkC() or Run() functions, but this can result in deadlock if the outer
// functions consume all available resources running their inner payloads,
// forcing their inner payloads to block forever.
//
// The supplied Context will be monitored for cancellation. If the Context is
// canceled, new work dispatch will be inhibited. Any methods added to the
// work channel will not be executed, and RunMulti will treat them as if they
// ran and immediately returned the Context's Err() value.
func RunMulti(c context.Context, workers int, fn func(MultiRunner) error) error {
// Create a Runner to manage our goroutines. We will not set its Maximum,
// since we will be metering that internally using our own semaphore.
r := Runner{
Sustained: workers,
}
defer r.Close()
nrc := nestedRunnerContext{
ctx: c,
workC: r.WorkC(),
}
if workers > 0 {
// Create a semaphore with (workers-1) tokens. We subtract one because the
// runner has an implicit token on virtue of running the work.
nrc.sem = make(Semaphore, workers-1)
}
return fn(&nrc)
}
// MultiRunner can execute nested RunMulti against the same outer Runner.
type MultiRunner interface {
// RunMulti runs the supplied generator, returning an errors.MultiError with
// the task results.
//
// Since it blocks on result, RunMulti is safe to chain with other RunMulti
// operations without risk of deadlock, as the caller's blocking counts as one
// of the run tokens.
//
// Note that there is no association between the MultiError's error order and
// the generated task order.
RunMulti(func(chan<- func() error)) error
}
type nestedRunnerContext struct {
ctx context.Context
workC chan<- WorkItem
sem Semaphore
}
func (nrc *nestedRunnerContext) RunMulti(gen func(chan<- func() error)) error {
var (
result errors.MultiError
doneC = make(chan error)
realWorkC = make(chan func() error)
)
defer close(doneC)
// Call our task generator.
go func() {
defer close(realWorkC)
gen(realWorkC)
}()
var (
outstanding = 0
contextErr error
// We will toggle these based what we want to block on.
activeWorkC = realWorkC
activeSem Semaphore
newWorkC = activeWorkC
pendingWorkItem func() error
)
// Main dispatch control loop. Our goal is to have at least one task executing
// at any given time. If we want to execute more, we must acquire a token from
// the main semaphore.
for activeWorkC != nil || outstanding > 0 {
// Track whether we have a semaphore token. If we aren't using a semaphore,
// we have an implicit token (unthrottled).
hasToken := (nrc.sem == nil)
select {
case workItem, ok := <-newWorkC:
// Incoming task.
switch {
case !ok:
// Clear activeWorkC, instructing our select loop to stop accepting new
// requests.
activeWorkC = nil
case contextErr != nil:
// Ignore this request and pretend that it returned the Context error.
result = append(result, contextErr)
default:
// Enqueue this request for dispatch.
pendingWorkItem = workItem
}
case err := <-doneC:
// A dispatched task has finished.
if err != nil {
result = append(result, err)
}
// Return one of our semaphore tokens if we have more than one outstanding
// task.
if outstanding > 1 {
hasToken = true
}
outstanding--
case <-nrc.ctx.Done():
// Record our Context error. Future jobs will immediately fail with this
// error.
contextErr = nrc.ctx.Err()
case activeSem <- SemaphoreToken{}:
// We have a pending task, and we just acquired a semaphore token.
hasToken = true
}
// If we have a pending task, maybe dispatch it.
if pendingWorkItem != nil {
// If we have no outstanding tasks, use "our" semaphore token to dispatch
// this one immediately.
//
// If we have a token, use it immediately for this task.
if outstanding == 0 || hasToken {
nrc.workC <- WorkItem{
F: pendingWorkItem,
ErrC: doneC,
}
outstanding++
pendingWorkItem = nil
hasToken = false
}
}
// If we still have a token at this point, release it.
if hasToken {
nrc.sem.Unlock()
}
// Toggle blocking criteria.
if pendingWorkItem == nil {
// We have no currently-pending task, so pull a new one.
newWorkC = activeWorkC
// Don't try and acquire a semaphore token anymore, since we have nothing
// to dispatch with it at the moment.
activeSem = nil
} else {
// We have a pending task, but didn't dispatch, so we are blocking on
// token acquisition.
activeSem = nrc.sem
// We only handle one pending task, so don't acquire any new tasks until
// this one has been dispatched.
newWorkC = nil
}
}
// Choose our error response.
if len(result) > 0 {
return result
}
return nil
}