| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package parallel |
| |
| import ( |
| "sync/atomic" |
| "testing" |
| |
| . "github.com/smartystreets/goconvey/convey" |
| ) |
| |
| func TestBuffer(t *testing.T) { |
| t.Parallel() |
| |
| Convey(`A task Buffer`, t, func() { |
| b := &Buffer{} |
| defer func() { |
| if b != nil { |
| b.Close() |
| } |
| }() |
| |
| Convey(`With 10 maximum goroutines, will execute at most 10 simultaneous tasks.`, func() { |
| const iters = 1000 |
| b.Maximum = 10 |
| |
| called := int32(0) |
| max := countMaxGoroutines(iters, 10, func(f func() error) { |
| b.WorkC() <- WorkItem{F: func() error { |
| atomic.AddInt32(&called, 1) |
| return f() |
| }} |
| }) |
| |
| So(max, ShouldEqual, 10) |
| So(called, ShouldEqual, iters) |
| }) |
| |
| Convey(`Will buffer tasks indefinitely.`, func() { |
| const iters = 1000 |
| b.Maximum = 10 |
| |
| errC := make([]<-chan error, iters) |
| unblockC := make(chan struct{}) |
| for i := range errC { |
| i := i |
| |
| errC[i] = b.RunOne(func() error { |
| <-unblockC |
| return numberError(i) |
| }) |
| } |
| |
| // All of the tasks are currently dispatched and blocking on unblockC. |
| // Unlock them all and read the resulting errors. |
| close(unblockC) |
| |
| errs := make([]bool, iters) |
| for _, c := range errC { |
| errs[int((<-c).(numberError))] = true |
| } |
| |
| tc := 0 |
| for _, v := range errs { |
| if v { |
| tc++ |
| } |
| } |
| So(tc, ShouldEqual, iters) |
| }) |
| |
| Convey(`Can buffer tasks faster than they are reaped via Run.`, func() { |
| const iters = 1000 |
| b.Maximum = 10 |
| |
| errC := b.Run(func(taskC chan<- func() error) { |
| for i := 0; i < iters; i++ { |
| i := i |
| taskC <- func() error { |
| return numberError(i) |
| } |
| } |
| }) |
| |
| // No errors have been reaped yet, so only b.Maximum tasks should have been |
| // dispatched, and they will be blocked on sending to errC. Unblock all of |
| // the tasks and confirm that error collection works. |
| seen := make(map[int]struct{}, iters) |
| for err := range errC { |
| seen[int(err.(numberError))] = struct{}{} |
| } |
| So(len(seen), ShouldEqual, iters) |
| }) |
| |
| Convey(`Has proper task dispatch order`, func() { |
| const iters = 10 |
| |
| // We use a Maximum of 1 to control task execution order. There should be |
| // no data races. |
| b.Maximum = 1 |
| |
| workerStarted := make(chan int, iters+3) // tasks push here; buffered to avoid the need to drain. |
| wait := make(chan struct{}) // first task will wait until it is closed. |
| gen := func(taskC chan<- func() error) { |
| // Start with 2 buffered tasks to fill our work channels so our buffer |
| // empty order is deterministic. |
| for i := -2; i < 0; i++ { |
| i := i |
| taskC <- func() error { |
| workerStarted <- i |
| <-wait |
| return nil |
| } |
| } |
| // Ensure 1 task has actually started execution. Note that the task is |
| // still running because it's blocked on `wait` channel. |
| <-workerStarted |
| // Add `iters` tasks which should be executed in the right order. |
| for i := 0; i < iters; i++ { |
| i := i |
| taskC <- func() error { |
| workerStarted <- i |
| return numberError(i) |
| } |
| } |
| // Finally, add 1 more "sentinel" task ... |
| taskC <- func() error { |
| workerStarted <- -3 |
| <-wait |
| return nil |
| } |
| // ... at this point since taskC is unbuffered channel, |
| // we are certain Buffer has accepted all `iters` tasks. |
| // Unblock the first 2 tasks. |
| close(wait) |
| } |
| |
| account := func(errC <-chan error) []int { |
| var order []int |
| for err := range errC { |
| if err != nil { |
| order = append(order, int(err.(numberError))) |
| } |
| } |
| return order |
| } |
| |
| Convey(`Is FIFO by default.`, func() { |
| So(account(b.Run(gen)), ShouldResemble, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) |
| }) |
| |
| Convey(`Will be LIFO if LIFO is set.`, func() { |
| b.SetFIFO(false) |
| So(account(b.Run(gen)), ShouldResemble, []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}) |
| }) |
| }) |
| |
| Convey(`Will finish tasks if closed while some are pending.`, func() { |
| const iters = 1000 |
| b.Maximum = 10 |
| |
| // Buffer iters tests. Each will block pending a signal. Since |
| // iters > b.Maximum, this ensures that some tasks remain in our Buffer |
| // which, in turn, ensures that when we Close the Buffer, it will not be |
| // empty. |
| workCount := int32(0) |
| unblockC := make(chan struct{}) |
| finishedC := make(chan struct{}) |
| for i := 0; i < iters; i++ { |
| b.WorkC() <- WorkItem{F: func() error { |
| <-unblockC |
| atomic.AddInt32(&workCount, 1) |
| return nil |
| }, After: func() { |
| if finishedC != nil { |
| finishedC <- struct{}{} |
| } |
| }} |
| } |
| |
| // First stage: fully execute one of the tasks. |
| unblockC <- struct{}{} |
| <-finishedC |
| |
| // Second stage: Close our Buffer, then unblock the remainder. We |
| // synchronize this by waiting for Buffer's internal workC to close. This |
| // is okay, since we're not sending more work through it. |
| go func() { |
| <-b.workC |
| |
| finishedC = nil |
| close(unblockC) |
| }() |
| |
| b.Close() |
| b = nil // So our test doesn't double-close. |
| |
| So(workCount, ShouldEqual, iters) |
| }) |
| }) |
| } |