blob: 66da54cc1a7989c1d31085b8ca4a7c96e58cb797 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package history
import (
"context"
"time"
"github.com/golang/protobuf/ptypes/timestamp"
"golang.org/x/sync/errgroup"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/testresults"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// nWorkers is the number of concurrent workers fetching results to serve
// a single request.
const nWorkers = 10
// workItem represents a task to get test results reachable from the given
// invocation.
type workItem struct {
inv invocations.ID
ip indexPoint
resultsC chan *pb.GetTestResultHistoryResponse_Entry
predicate *pb.TestResultPredicate
}
func (wi *workItem) fetchOneInv(ctx context.Context) error {
defer close(wi.resultsC)
reachableInvs, err := invocations.Reachable(ctx, invocations.NewIDSet(wi.inv))
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
defer eg.Wait()
for _, batch := range reachableInvs.Batches() {
batch := batch
eg.Go(func() error {
// TODO(crbug.com/1107678): Implement support for FieldMask to
// return only a subset of each result.
query := &testresults.Query{
InvocationIDs: batch,
Predicate: wi.predicate,
}
return query.Run(ctx, func(r *pb.TestResult) error {
entry := &pb.GetTestResultHistoryResponse_Entry{Result: r}
wi.ip.initEntry(entry)
select {
case <-ctx.Done():
return ctx.Err()
case wi.resultsC <- entry:
return nil
}
})
})
}
return eg.Wait()
}
// Query represents the details of the results history to be retrieved.
type Query struct {
Request *pb.GetTestResultHistoryRequest
workC chan *workItem
allResultsC chan *workItem
results []*pb.GetTestResultHistoryResponse_Entry
resultOffset int
nextPageToken string
deadline time.Time
ignoreDeadline bool
}
// Execute runs the query and returns the results in a slice, along with a token
// to get the next page of results (if any) and an error.
func (q *Query) Execute(ctx context.Context) ([]*pb.GetTestResultHistoryResponse_Entry, string, error) {
q.resultOffset = initPaging(q.Request)
q.results = make([]*pb.GetTestResultHistoryResponse_Entry, 0, q.Request.PageSize)
q.workC = make(chan *workItem, nWorkers)
q.allResultsC = make(chan *workItem, nWorkers)
ctx, cancelWorker := context.WithCancel(ctx)
defer cancelWorker()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(q.allResultsC)
defer close(q.workC)
return errors.Annotate(q.dispatchWorkItems(ctx), "history.Query.dispatchWorkItems").Err()
})
eg.Go(func() error {
return errors.Annotate(q.fetchAll(ctx), "history.Query.fetchAll").Err()
})
eg.Go(func() error {
return errors.Annotate(q.collect(ctx), "history.Query.collect").Err()
})
err := eg.Wait()
if pageBreak.In(err) {
err = nil
}
return q.results, q.nextPageToken, err
}
// dispatchWorkItems creates a task for each indexed invocation in the
// requested range as well as a channel to collect each task's result.
func (q *Query) dispatchWorkItems(ctx context.Context) error {
predicate := &pb.TestResultPredicate{
Variant: q.Request.VariantPredicate,
TestIdRegexp: q.Request.TestIdRegexp,
}
return invocations.ByTimestamp(ctx, q.Request.Realm, q.Request.GetTimeRange(), func(inv invocations.ID, ts *timestamp.Timestamp) error {
wi := &workItem{
inv: inv,
ip: (*tsIndexPoint)(ts),
resultsC: make(chan *pb.GetTestResultHistoryResponse_Entry),
predicate: predicate,
}
select {
case q.workC <- wi:
case <-ctx.Done():
return ctx.Err()
}
select {
case q.allResultsC <- wi:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}
// collect gets results from individual tasks, saves it to the query's `results`
// slice.
// For each item, we compute its offset within the index point s.t.
// we can generate a page token for resuming the results at the next item.
// Also, note that this assumes that each indexed invocation will have a unique
// timestamp, and thus, at least for now, a single indexed invocation counts as
// an index point.
func (q *Query) collect(ctx context.Context) error {
entries := make([]*pb.GetTestResultHistoryResponse_Entry, 0)
for wi := range q.allResultsC {
entries = entries[:0]
for entry := range wi.resultsC {
entries = append(entries, entry)
}
if ctx.Err() != nil {
return ctx.Err()
}
sortEntries(entries)
for i, entry := range entries {
if i < q.resultOffset {
continue
}
q.resultOffset = 0
q.results = append(q.results, entry)
if len(q.results) == int(q.Request.PageSize) {
q.nextPageToken = pageToken(wi.ip, i+1)
return errors.Reason("page full").Tag(pageBreak).Err()
}
}
// If we are about to run out of time, this is a good breaking point.
if q.outOfTime(ctx) && len(q.results) > 0 {
q.nextPageToken = pageToken(wi.ip, len(entries))
return errors.Reason("out of time").Tag(pageBreak).Err()
}
}
return nil
}
// fetchAll starts workers that will fetch results for each invocation in
// q.workC and stream them over the task's channel.
func (q *Query) fetchAll(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < nWorkers; i++ {
eg.Go(func() error {
for wi := range q.workC {
if err := wi.fetchOneInv(ctx); err != nil {
return err
}
}
return nil
})
}
return eg.Wait()
}
// outOfTime returns true if the context will expire in less than 5 seconds.
func (q *Query) outOfTime(ctx context.Context) bool {
if q.ignoreDeadline {
return false
}
dl, ok := ctx.Deadline()
return ok && clock.Until(ctx, dl) < 5*time.Second
}