blob: 35272476fdf1c8e232d4b0fcc1a2c34be7993a65 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pbutil
import (
"context"
"github.com/golang/protobuf/proto"
"golang.org/x/sync/errgroup"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// Query queries for results continuously, sending individual items to
// dest channel until the paging query is exhausted or the context is canceled.
// A request must be *pb.QueryTestResultRequest, pb.QueryTestExonerationsRequest
// or *pb.QueryArtifactsRequest. Messages sent to dest are *pb.TestResult,
// *pb.TestExoneration or *pb.Artifact respectively.
//
// Does not return the next page token because ctx can be canceled in the middle
// of a page.
//
// If there are multiple requests in reqs, then runs them all concurrently and
// sends all of their results to dest.
// This is useful to query items of different types, e.g. test results and
// test exonerations.
// Does not limit concurrency.
func Query(ctx context.Context, dest chan<- proto.Message, client pb.ResultDBClient, reqs ...proto.Message) error {
eg, ctx := errgroup.WithContext(ctx)
for _, req := range reqs {
// Check req type.
switch req.(type) {
case *pb.QueryTestResultsRequest:
case *pb.QueryTestExonerationsRequest:
case *pb.QueryArtifactsRequest:
default:
panic("req must be *QueryTestResultRequest, *QueryTestExonerationRequest or *QueryArtifactsRequest")
}
q := &queryResults{
client: client,
// Make a copy because we will be modifying it.
req: proto.Clone(req),
dest: dest,
}
eg.Go(func() error {
return q.run(ctx)
})
}
return eg.Wait()
}
// queryResults implements Query for one request.
type queryResults struct {
client pb.ResultDBClient
req proto.Message
dest chan<- proto.Message
}
func (q *queryResults) run(ctx context.Context) error {
// Prepare a channel for responses such that we can make an RPC as soon as we
// started consuming the response, as opposed to after the response is
// completely consumed.
batchC := make(chan []proto.Message)
errC := make(chan error, 1)
go func() {
defer close(batchC)
errC <- q.queryResponses(ctx, batchC)
}()
// Forward items to q.dest.
for batch := range batchC {
for _, item := range batch {
// Note: selecting on errC here would be a race because the batch
// goroutine might have been already done, but we still did not send all
// items to the caller.
select {
case <-ctx.Done():
return <-errC
case q.dest <- item:
}
}
}
return <-errC
}
// queryResponses pages through query results and sends item batches to batchC.
func (q *queryResults) queryResponses(ctx context.Context, batchC chan<- []proto.Message) error {
// The initial token is the current token in the base request.
token := (q.req).(interface{ GetPageToken() string }).GetPageToken()
for {
var batch []proto.Message
var err error
batch, token, err = q.call(ctx, token)
if err != nil || len(batch) == 0 {
if ctx.Err() != nil {
return ctx.Err()
}
return err
}
select {
case batchC <- batch:
case <-ctx.Done():
return ctx.Err()
}
if token == "" {
return nil
}
}
}
// call makes a request with a given page token.
func (q *queryResults) call(ctx context.Context, pageToken string) (batch []proto.Message, nextPageToken string, err error) {
// Make a call and convert the response to generic types.
switch req := q.req.(type) {
case *pb.QueryTestResultsRequest:
req.PageToken = pageToken
var res *pb.QueryTestResultsResponse
res, err = q.client.QueryTestResults(ctx, req)
if res != nil {
batch = make([]proto.Message, len(res.TestResults))
for i, r := range res.TestResults {
batch[i] = r
}
nextPageToken = res.NextPageToken
}
case *pb.QueryTestExonerationsRequest:
req.PageToken = pageToken
var res *pb.QueryTestExonerationsResponse
res, err = q.client.QueryTestExonerations(ctx, req)
if res != nil {
batch = make([]proto.Message, len(res.TestExonerations))
for i, r := range res.TestExonerations {
batch[i] = r
}
nextPageToken = res.NextPageToken
}
case *pb.QueryArtifactsRequest:
req.PageToken = pageToken
var res *pb.QueryArtifactsResponse
res, err = q.client.QueryArtifacts(ctx, req)
if res != nil {
batch = make([]proto.Message, len(res.Artifacts))
for i, r := range res.Artifacts {
batch[i] = r
}
nextPageToken = res.NextPageToken
}
default:
panic("impossible")
}
return
}