blob: 301d3bd50b90881ccfc2c8b77136dc5a29cdff54 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resultdb
import (
"context"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/grpc/appstatus"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/testresults"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
const (
historyDefaultPageSize = 100
)
// GetTestResultHistory implements pb.ResultDBServer.
func (s *resultDBServer) GetTestResultHistory(ctx context.Context, in *pb.GetTestResultHistoryRequest) (*pb.GetTestResultHistoryResponse, error) {
if err := verifyGetTestResultHistoryPermission(ctx, in.GetRealm()); err != nil {
return nil, err
}
if err := validateGetTestResultHistoryRequest(in); err != nil {
return nil, appstatus.BadRequest(err)
}
if in.PageSize == 0 {
in.PageSize = historyDefaultPageSize
}
ctx, cancelTx := span.ReadOnlyTransaction(ctx)
defer cancelTx()
workerCtx, cancelWorker := context.WithCancel(ctx)
defer cancelWorker()
// Ignore the derived context as we have a single worker.
// The closure of the results channel is sufficient to know that the work
// is done.
eg, _ := errgroup.WithContext(workerCtx)
results := make(chan *pb.GetTestResultHistoryResponse_Entry)
eg.Go(func() error {
defer close(results)
// TODO(crbug.com/1074407): Instead of getting a single fixed-length
// page of results here, pass a callback to get as many results as needed.
idxInvs, err := invocations.ByTimestamp(workerCtx, in.Realm, in.GetTimeRange())
if err != nil {
return err
}
// TODO(crbug.com/1107678): Parallelize the following loop.
for _, idxInv := range idxInvs {
select {
case <-workerCtx.Done():
return workerCtx.Err()
default:
if err := matchingResultsInInvTree(workerCtx, in, idxInv, results); err != nil {
return err
}
}
}
return nil
})
ret := &pb.GetTestResultHistoryResponse{
Entries: make([]*pb.GetTestResultHistoryResponse_Entry, 0, in.PageSize),
}
// Collect results sent over the results channel until either:
// - The page of results is full,
// - The context is Done,
// - Or the worker is done with its work (and closes the resutls channel).
//
// TODO(crbug.com/1074407): Implement ordering of the results indexed
// together by (TestID, VariantHash) .
//
// NB: The results in the channel are already ordered by indexed timestamp.
//
// TODO(crbug.com/1074407): Implement paging support.
ResultsLoop:
for {
select {
case entry, ok := <-results:
if !ok {
break ResultsLoop
}
ret.Entries = append(ret.Entries, entry)
if len(ret.Entries) == int(in.PageSize) {
cancelWorker()
// Ignore the cancellation returned by the worker.
if err := eg.Wait(); err != nil && err != context.Canceled {
return ret, err
}
return ret, nil
}
case <-ctx.Done():
break ResultsLoop
}
}
return ret, eg.Wait()
}
// verifyGetTestResultHistoryPermission checks that the caller has permission to
// get test results from the specified realm.
func verifyGetTestResultHistoryPermission(ctx context.Context, realm string) error {
if realm == "" {
return appstatus.BadRequest(errors.Reason("realm is required").Err())
}
switch allowed, err := auth.HasPermission(ctx, permListTestResults, realm); {
case err != nil:
return err
case !allowed:
return appstatus.Errorf(codes.PermissionDenied, `caller does not have permission %s in realm %q`, permListTestResults, realm)
}
return nil
}
// validateGetTestResultHistoryRequest checks that the required fields are set,
// and that field values are valid.
func validateGetTestResultHistoryRequest(in *pb.GetTestResultHistoryRequest) error {
if in.GetRealm() == "" {
return errors.Reason("realm is required").Err()
}
// TODO(crbug.com/1107680): Add support for commit position ranges.
tr := in.GetTimeRange()
if tr == nil {
return errors.Reason("time_range must be specified").Err()
}
if in.GetPageSize() < 0 {
return errors.Reason("page_size, if specified, must be a positive integer").Err()
}
if in.GetVariantPredicate() != nil {
if err := pbutil.ValidateVariantPredicate(in.GetVariantPredicate()); err != nil {
return errors.Annotate(err, "variant_predicate").Err()
}
}
return nil
}
// matchingResultsInInvTree gets the matching results reachable from a given
// invocation, and streams them over the given channel.
func matchingResultsInInvTree(ctx context.Context, in *pb.GetTestResultHistoryRequest, idxInv *invocations.Historical, ret chan<- *pb.GetTestResultHistoryResponse_Entry) error {
reachableInvs, err := invocations.Reachable(ctx, invocations.NewIDSet(idxInv.ID))
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for _, batch := range reachableInvs.Batches() {
batch := batch
eg.Go(func() error {
// TODO(crbug.com/1107678): Implement support for FieldMask to return
// only a subset of each result.
query := testresults.Query{
InvocationIDs: batch,
Predicate: &pb.TestResultPredicate{
Variant: in.VariantPredicate,
TestIdRegexp: in.TestIdRegexp,
},
}
return query.Run(ctx, func(r *pb.TestResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case ret <- &pb.GetTestResultHistoryResponse_Entry{Result: r, InvocationTimestamp: idxInv.IndexTimestamp}:
return nil
}
})
})
}
return eg.Wait()
}