blob: 348891ffbee61c14babf9d84add72588dee2c23a [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package protoutil
import (
"container/heap"
"context"
"sync"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/data/stringset"
pb "go.chromium.org/luci/buildbucket/proto"
)
// Search searches for builds continuously, sending findings to buildC
// until the search is exhausted or context is canceled.
// The builds are ordered newest-to-oldest and deduplicated.
//
// If len(requests) > 1, the builds represent a union of the search requests.
//
// Search does not return a next page token because ctx can be canceled in the
// middle of a page and because Search supports multiple requests.
func Search(ctx context.Context, buildC chan<- *pb.Build, client pb.BuildsClient, requests ...*pb.SearchBuildsRequest) error {
// Do not leave sub-goroutunes running.
var wg sync.WaitGroup
defer wg.Wait()
// Ensure sub-goroutines end.
subCtx, cancelSearches := context.WithCancel(ctx)
defer cancelSearches()
// Start len(requests) concurrent searches.
allStreams := make([]searchStream, len(requests))
for i, req := range requests {
req := req
s := searchStream{
buildC: make(chan *pb.Build),
// Create a buffered channel because we will not necessarily
// read error from all sub-goroutines.
errC: make(chan error, 1),
}
allStreams[i] = s
wg.Add(1)
go func() {
defer wg.Done()
s.errC <- searchBuilds(subCtx, s.buildC, client, req)
}()
}
// Collect initial stream heads and initialize a heap of active streams.
activeStreams := make(streamHeap, 0, len(allStreams))
for _, s := range allStreams {
select {
case s.head = <-s.buildC:
// This stream has some builds.
activeStreams = append(activeStreams, s)
case err := <-s.errC:
if err != nil {
return err
}
}
}
heap.Init(&activeStreams)
// Merge-sort streams.
lastSentID := int64(-1)
for len(activeStreams) > 0 {
// Pop the build with the smallest id amongst active stream heads.
s := heap.Pop(&activeStreams).(searchStream)
// Send it to buildC, unless it is a duplicate.
if s.head.Id != lastSentID {
select {
case buildC <- s.head:
lastSentID = s.head.Id
case <-ctx.Done():
return ctx.Err()
}
}
// Get the next build from the same stream and record it as a new
// stream head.
select {
case s.head = <-s.buildC:
// This stream is still active.
heap.Push(&activeStreams, s)
case err := <-s.errC:
// This stream is done.
if err != nil {
return err
}
}
}
return nil
}
// searchBuilds is like Search, but does not implement union of search results.
// Sent builds have ids.
func searchBuilds(ctx context.Context, buildC chan<- *pb.Build, client pb.BuildsClient, req *pb.SearchBuildsRequest) error {
// Prepare a channel of responses, s.t. we make an RPC as soon as we started
// consuming the response, as opposed to after the response is completely
// consumed.
resC := make(chan *pb.SearchBuildsResponse)
errC := make(chan error)
go func() {
err := searchResponses(ctx, resC, client, req)
close(resC)
errC <- err
}()
// Forward builds.
for res := range resC {
for _, b := range res.Builds {
select {
case buildC <- b:
case <-ctx.Done():
// Note that even if ctx is done, errC is not guaranteed to have nil
// because the context might have canceled after the goroutine above
// exited.
return ctx.Err()
}
}
}
return <-errC
}
// searchResponses pages through search results and sends search responses to
// resC.
// Builds in resC have ids.
func searchResponses(ctx context.Context, resC chan<- *pb.SearchBuildsResponse, client pb.BuildsClient, req *pb.SearchBuildsRequest) error {
req = proto.Clone(req).(*pb.SearchBuildsRequest)
// Ensure next_page_token and build id are requested
if paths := req.GetFields().GetPaths(); len(paths) > 0 {
pathSet := stringset.NewFromSlice(paths...)
pathSet.Add("next_page_token")
pathSet.Add("builds.*.id")
req.Fields.Paths = pathSet.ToSortedSlice()
}
// Page through results.
for {
res, err := client.SearchBuilds(ctx, req)
if err != nil {
return err
}
select {
case resC <- res:
case <-ctx.Done():
return ctx.Err()
}
if res.NextPageToken == "" || len(res.Builds) == 0 {
return nil
}
// Next page...
req.PageToken = res.NextPageToken
}
}
type searchStream struct {
buildC chan *pb.Build
errC chan error
head *pb.Build
}
type streamHeap []searchStream
// Len implements sort.Interface.
func (h streamHeap) Len() int { return len(h) }
// Less implements sort.Interface.
func (h streamHeap) Less(i, j int) bool {
return h[i].head.Id < h[j].head.Id
}
// Swap implements sort.Interface.
func (h streamHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
// Swap implements heap.Interface.
func (h *streamHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(searchStream))
}
// Pop implements heap.Interface.
func (h *streamHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}