blob: fc7eb131d941ea895cd5acf8119fe33efe107a09 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parallel
import (
"context"
"sort"
"testing"
. "github.com/smartystreets/goconvey/convey"
"go.chromium.org/luci/common/errors"
)
func TestRunMulti(t *testing.T) {
t.Parallel()
Convey(`A RunMulti operation with two workers can be nested without deadlock.`, t, func() {
const n = 2
const inner = 128
// This will hand out "n" tokens, then close "tokensOutC".
tokenC := make(chan struct{})
tokensOutC := make(chan struct{})
go func() {
defer close(tokensOutC)
for i := 0; i < n; i++ {
tokenC <- struct{}{}
}
}()
err := RunMulti(context.Background(), n, func(mr MultiRunner) error {
return mr.RunMulti(func(workC chan<- func() error) {
// Dispatch n top-level dispatchers and block until they are both
// executed. This will consume the total number of workers. In a normal
// Runner, this would prevent the top-level dispatchers' dispatched
// routines from running.
for i := 0; i < n; i++ {
i := i
workC <- func() error {
// Take one token.
<-tokenC
// Wait until all of the tokens have been taken.
<-tokensOutC
// Dispatch a bunch of sub-work.
return mr.RunMulti(func(workC chan<- func() error) {
for j := 0; j < inner; j++ {
index := (i * inner) + j
workC <- func() error { return numberError(index) }
}
})
}
}
})
})
// Flatten our "n" top-level MultiErrors together.
So(err, ShouldHaveSameTypeAs, (errors.MultiError)(nil))
aggregateErr := make(errors.MultiError, 0, (n * inner))
for _, ierr := range err.(errors.MultiError) {
So(ierr, ShouldHaveSameTypeAs, (errors.MultiError)(nil))
aggregateErr = append(aggregateErr, ierr.(errors.MultiError)...)
}
So(aggregateErr, ShouldHaveLength, (n * inner))
// Make sure all of the error values that we expect are present.
actual := make([]int, len(aggregateErr))
expected := make([]int, len(aggregateErr))
for i := 0; i < len(aggregateErr); i++ {
actual[i] = int(aggregateErr[i].(numberError))
expected[i] = i
}
sort.Ints(actual)
So(actual, ShouldResemble, expected)
})
Convey(`A RunMulti operation will stop executing jobs if its Context is canceled.`, t, func() {
const n = 128
const cancelPoint = 16
c, cancelFunc := context.WithCancel(context.Background())
err := RunMulti(c, 1, func(mr MultiRunner) error {
return mr.RunMulti(func(workC chan<- func() error) {
for i := 0; i < n; i++ {
i := i
if i == cancelPoint {
// This and all future work should not be dispatched. Our previous
// work item *may* execute depending on whether it was dispatched
// before or after the cancel was processed.
cancelFunc()
}
workC <- func() error { return nil }
}
})
})
// We should have somewhere between (n-cancelPoint-1) and (n-cancelPoint)
// context errors.
So(err, ShouldHaveSameTypeAs, (errors.MultiError)(nil))
So(len(err.(errors.MultiError)), ShouldBeBetweenOrEqual, n-cancelPoint-1, n-cancelPoint)
})
Convey(`A RunMulti operation with no worker limit will not be constrained.`, t, func() {
const n = 128
// This will hand out "n" tokens, then close "tokensOutC".
tokenC := make(chan struct{})
tokensOutC := make(chan struct{})
go func() {
defer close(tokensOutC)
for i := 0; i < n; i++ {
tokenC <- struct{}{}
}
}()
err := RunMulti(context.Background(), 0, func(mr MultiRunner) error {
// This will deadlock if all n workers can't run simultaneously.
return mr.RunMulti(func(workC chan<- func() error) {
for i := 0; i < n; i++ {
workC <- func() error {
<-tokenC
<-tokensOutC
return nil
}
}
})
})
So(err, ShouldBeNil)
})
}