blob: 42daf774b0eb670f5b893a122d4771e968bf200c [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parallel
import (
"errors"
"fmt"
"sync/atomic"
"testing"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
type numberError int
func (e numberError) Error() string {
return fmt.Sprintf("#%d", e)
}
func TestRunner(t *testing.T) {
t.Parallel()
Convey("When using a Runner directly", t, func() {
r := &Runner{}
defer func() {
if r != nil {
r.Close()
}
}()
Convey(`Can schedule individual tasks.`, func() {
const iters = 100
ac := int32(0)
resultC := make(chan int)
// Dispatch iters tasks.
for i := 0; i < iters; i++ {
i := i
errC := r.RunOne(func() error {
atomic.AddInt32(&ac, int32(i))
return numberError(i)
})
// Reap errC.
go func() {
resultC <- int((<-errC).(numberError))
}()
}
// Reap the results and compare.
result := 0
for i := 0; i < iters; i++ {
result += <-resultC
}
So(result, ShouldEqual, atomic.LoadInt32(&ac))
})
Convey(`Can use WorkC directly in a bidirectional select loop.`, func() {
// Generate a function that writes a value to "outC".
outC := make(chan int)
valueWriter := func(v int) func() error {
return func() error {
outC <- v
return nil
}
}
// We will repeatedly dispatch tasks and reap their results in the same
// select loop.
//
// remaining is the total number of tasks to dispatch. dispatch controls
// the number of simultaneous tasks that we're willing to send at a time.
// count is the number of tasks that have completed.
remaining := 1000
count := 0
dispatch := 10
wc := r.WorkC()
for count < 1000 {
select {
case wc <- WorkItem{F: valueWriter(1)}:
dispatch--
remaining--
break
case v := <-outC:
count += v
dispatch++
}
if dispatch == 0 || remaining == 0 {
// Stop writing work items until we reap some results.
wc = nil
} else {
// We have results, start writing work items again.
wc = r.WorkC()
}
}
So(count, ShouldEqual, 1000)
})
Convey(`A WorkItem's After method can recover from a panic.`, func() {
testErr := errors.New("test error")
var err error
r.WorkC() <- WorkItem{
F: func() error {
panic(testErr)
},
After: func() {
if r := recover(); r != nil {
err = r.(error)
}
},
}
r.Close()
r = nil // Do we don't close it in a defer.
So(err, ShouldEqual, testErr)
})
Convey("Ignore consumes the errors and blocks", func() {
count := new(int32)
Ignore(r.Run(func(ch chan<- func() error) {
for i := 0; i < 100; i++ {
ch <- func() error {
atomic.AddInt32(count, 1)
return fmt.Errorf("whaaattt")
}
}
}))
So(*count, ShouldEqual, 100)
})
Convey("Must panics on the first error", func() {
r.Maximum = 1
count := new(int32)
// Reap errC at the end, since Must will panic without consuming its
// contents.
var errC <-chan error
defer func() {
if errC != nil {
for range errC {
}
}
}()
So(func() {
errC = r.Run(func(ch chan<- func() error) {
for i := 0; i < 100; i++ {
i := i
ch <- func() error {
atomic.AddInt32(count, 1)
return fmt.Errorf("whaaattt: %d", i)
}
}
})
Must(errC)
}, ShouldPanicLike, "whaaattt: 0")
// Either:
// * the panic happened and we load count before ch is unblocked
// * the panic happened then ch(1) pushes and runs, then we load count
// So count will either be 1 or 2, but never more or less.
So(atomic.LoadInt32(count), ShouldBeBetweenOrEqual, 1, 2)
})
})
}