blob: 673977d651fed86c4e12c653b3ac4cfa204983f5 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testvariants
import (
"context"
"fmt"
"strings"
"text/template"
"cloud.google.com/go/spanner"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/proto/mask"
"go.chromium.org/luci/resultdb/internal/exonerations"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/invocations/graph"
"go.chromium.org/luci/resultdb/internal/pagination"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/internal/testresults"
"go.chromium.org/luci/resultdb/internal/tracing"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
"go.chromium.org/luci/resultdb/rdbperms"
"go.chromium.org/luci/server/auth"
)
const (
// resultLimitMax is the maximum number of results can be included in a test
// variant when querying test variants. The client may specify a lower limit.
// It is required to prevent client-caused OOMs.
resultLimitMax = 100
resultLimitDefault = 10
// DefaultResponseLimitBytes is the default soft limit on the number of bytes
// that should be returned by a Test Variants query.
DefaultResponseLimitBytes = 20 * 1000 * 1000 // 20 MB
)
// AllFields is a field mask that selects all TestVariant fields.
var AllFields = mask.All(&pb.TestVariant{})
// QueryMask returns mask.Mask converted from field_mask.FieldMask.
// It returns a default mask with all fields if readMask is empty.
func QueryMask(readMask *field_mask.FieldMask) (*mask.Mask, error) {
if len(readMask.GetPaths()) == 0 {
return AllFields, nil
}
return mask.FromFieldMask(readMask, &pb.TestVariant{}, false, false)
}
// AdjustResultLimit takes the given requested resultLimit and adjusts as
// necessary.
func AdjustResultLimit(resultLimit int32) int {
switch {
case resultLimit >= resultLimitMax:
return resultLimitMax
case resultLimit > 0:
return int(resultLimit)
default:
return resultLimitDefault
}
}
// ValidateResultLimit returns a non-nil error if resultLimit is invalid.
// Returns nil if resultLimit is 0.
func ValidateResultLimit(resultLimit int32) error {
if resultLimit < 0 {
return errors.Reason("negative").Err()
}
return nil
}
type AccessLevel int
const (
// The access level is invalid; either an error occurred when checking the
// caller's access, or the caller does not have any kind of access to data in
// the realms of the invocations being queried.
AccessLevelInvalid AccessLevel = iota
// Limited access. The caller has access to metadata only, such as
// pass/fail status of tests, test IDs and sanitised failure reasons.
AccessLevelLimited
// The caller has access to all data in the realms of the invocations being
// queried.
AccessLevelUnrestricted
)
// Query specifies test variants to fetch.
type Query struct {
ReachableInvocations graph.ReachableInvocations
Predicate *pb.TestVariantPredicate
// ResultLimit is a limit on the number of test results returned
// per test variant. Must be postiive.
ResultLimit int
// PageSize is a limit on the number of test variants returned
// per page. The actual number of test variants returned may be
// less than this, or even zero. (It may be zero even if there are
// more test variants in later pages). Must be positive.
PageSize int
// ResponseLimitBytes is the soft limit on the number of bytes returned
// by a query. The soft limit is on JSON-serialised form of the
// result. A page will be truncated once it exceeds this value.
// Must be positive.
ResponseLimitBytes int
// Consists of test variant status, test id and variant hash.
PageToken string
Mask *mask.Mask
TestIDs []string
// The level of access the user has to test results and test exoneration data.
AccessLevel AccessLevel
decompressBuf []byte // buffer for decompressing blobs
params map[string]any // query parameters
}
// trim is equivalent to q.Mask.Trim with the exception that "test_id",
// "variant_hash", and "status" are always kept intact. Those fields are needed
// to generate page tokens.
func (q *Query) trim(tv *pb.TestVariant) error {
testID := tv.TestId
vHash := tv.VariantHash
status := tv.Status
if err := q.Mask.Trim(tv); err != nil {
return errors.Annotate(err, "error trimming fields for test variant with ID: %s, variant hash: %s", tv.TestId, tv.VariantHash).Err()
}
tv.TestId = testID
tv.VariantHash = vHash
tv.Status = status
return nil
}
// tvResult matches the result STRUCT of a test variant from the query.
type tvResult struct {
InvocationID string
ResultID string
IsUnexpected spanner.NullBool
Status int64
StartTime spanner.NullTime
RunDurationUsec spanner.NullInt64
SummaryHTML []byte
FailureReason []byte
Tags []string
Properties []byte
SkipReason spanner.NullInt64
}
// resultSelectColumns returns a list of columns needed to fetch `tvResult`s
// according to the fieldmask. `InvocationId`, `ResultId` and `IsUnexpected` are
// always selected.
func (q *Query) resultSelectColumns() []string {
// Note: `InvocationId` and `ResultId` are necessary to construct
// TestResult.Name, so they must be included.
columnSet := stringset.NewFromSlice(
"InvocationId",
"ResultId",
"IsUnexpected",
)
// Select extra columns depending on the mask.
readMask := q.Mask
if readMask.IsEmpty() {
readMask = AllFields
}
selectIfIncluded := func(column string, fields ...string) {
for _, field := range fields {
switch inc, err := readMask.Includes(field); {
case err != nil:
panic(err)
case inc != mask.Exclude:
columnSet.Add(column)
return
}
}
}
selectIfIncluded("Status", "results.*.result.status")
selectIfIncluded("StartTime", "results.*.result.start_time")
selectIfIncluded("RunDurationUsec", "results.*.result.duration")
selectIfIncluded("SummaryHTML", "results.*.result.summary_html")
selectIfIncluded("FailureReason", "results.*.result.failure_reason")
selectIfIncluded("Tags", "results.*.result.tags")
selectIfIncluded("Properties", "results.*.result.properties")
selectIfIncluded("SkipReason", "results.*.result.skip_reason")
return columnSet.ToSortedSlice()
}
func (q *Query) decompressText(src []byte) (string, error) {
if len(src) == 0 {
return "", nil
}
var err error
if q.decompressBuf, err = spanutil.Decompress(src, q.decompressBuf); err != nil {
return "", err
}
return string(q.decompressBuf), nil
}
// decompressProto decompresses and unmarshals src to dest. It's a noop if src
// is empty.
func (q *Query) decompressProto(src []byte, dest proto.Message) error {
if len(src) == 0 {
return nil
}
var err error
if q.decompressBuf, err = spanutil.Decompress(src, q.decompressBuf); err != nil {
return err
}
return proto.Unmarshal(q.decompressBuf, dest)
}
func (q *Query) toTestResultProto(r *tvResult, testID string) (*pb.TestResult, error) {
tr := &pb.TestResult{
ResultId: r.ResultID,
Status: pb.TestStatus(r.Status),
}
tr.Name = pbutil.TestResultName(
string(invocations.IDFromRowID(r.InvocationID)), testID, r.ResultID)
if r.StartTime.Valid {
tr.StartTime = pbutil.MustTimestampProto(r.StartTime.Time)
}
testresults.PopulateExpectedField(tr, r.IsUnexpected)
testresults.PopulateDurationField(tr, r.RunDurationUsec)
testresults.PopulateSkipReasonField(tr, r.SkipReason)
var err error
if tr.SummaryHtml, err = q.decompressText(r.SummaryHTML); err != nil {
return nil, err
}
if len(r.FailureReason) != 0 {
// Don't initialize FailureReason when r.FailureReason is empty so
// it won't produce {"failureReason": {}} when serialized to JSON.
tr.FailureReason = &pb.FailureReason{}
if err := q.decompressProto(r.FailureReason, tr.FailureReason); err != nil {
return nil, err
}
}
if len(r.Properties) != 0 {
// Don't initialize properties when r.Properties is empty so
// it won't produce {"Properties": {}} when serialized to JSON.
tr.Properties = &structpb.Struct{}
if err := q.decompressProto(r.Properties, tr.Properties); err != nil {
return nil, err
}
}
// Populate Tags.
tr.Tags = make([]*pb.StringPair, len(r.Tags))
for i, p := range r.Tags {
tr.Tags[i] = pbutil.StringPairFromStringUnvalidated(p)
}
return tr, nil
}
// populateSources populates the sources tested by each test variant, by
// setting its source_id and ensuring a corresponding entry exists in
// the distinctSources map. The sources tested come from the invocation of
// an *arbitrary* test result in the test variant.
func (q *Query) populateSources(tv *pb.TestVariant, distinctSources map[string]*pb.Sources) error {
if len(tv.Results) == 0 {
return nil
}
// Find the invocation for one of the test results in the test variant.
tr := tv.Results[0].Result
invID, _, _, err := pbutil.ParseTestResultName(tr.Name)
if err != nil {
return err
}
// Look up that invocation's sources.
inv, ok := q.ReachableInvocations.Invocations[invocations.ID(invID)]
if !ok {
return errors.Reason("test result in response referenced an unreachable invocation").Err()
}
sources := q.ReachableInvocations.Sources[inv.SourceHash]
if sources != nil {
// Associate those sources with the test variant.
tv.SourcesId = inv.SourceHash.String()
distinctSources[inv.SourceHash.String()] = sources
} else {
// No sources available.
tv.SourcesId = ""
}
return nil
}
// toLimitedData limits the given TestVariant (and its test results and test
// exonerations) to the fields allowed when the caller only has listLimited
// permissions for test results and test exonerations. If the caller has
// permission to get test results / test exonerations in the realm of the
// immediate parent invocation, the test result / test exoneration will not be
// restricted.
func (q *Query) toLimitedData(ctx context.Context, tv *pb.TestVariant,
resultPerms map[string]bool, exonerationPerms map[string]bool) error {
shouldMaskMetadata := true
shouldMaskVariant := true
for _, resultBundle := range tv.Results {
tr := resultBundle.Result
invID, _, _, err := pbutil.ParseTestResultName(tr.Name)
if err != nil {
return err
}
// Get the immediate invocation to which this test result belongs.
reachableInv, ok := q.ReachableInvocations.Invocations[invocations.ID(invID)]
if !ok {
return fmt.Errorf("error finding realm: invocation %s not found", invID)
}
// Check if the caller has permission to get test results in the immediate
// invocation's realm.
allowedResult, ok := resultPerms[reachableInv.Realm]
if !ok {
// The test result permission in this realm hasn't been checked before.
allowedResult, err = auth.HasPermission(ctx,
rdbperms.PermGetTestResult, reachableInv.Realm, nil)
if err != nil {
return errors.Annotate(err,
"error checking permission for test result data restriction").Err()
}
// Record whether the caller has permission to get test results in the
// realm so the HasPermission result can be re-used.
resultPerms[reachableInv.Realm] = allowedResult
}
if allowedResult {
shouldMaskMetadata = false
shouldMaskVariant = false
} else {
// Restrict test result data.
if err := testresults.ToLimitedData(ctx, tr); err != nil {
return err
}
}
}
if shouldMaskMetadata {
// All test results have been masked to limited data only, so the test
// metadata should be masked.
tv.TestMetadata = nil
tv.IsMasked = true
}
for _, exoneration := range tv.Exonerations {
invID, _, _, err := pbutil.ParseTestExonerationName(exoneration.Name)
if err != nil {
return err
}
// Get the immediate invocation to which this test exoneration belongs.
reachableInv, ok := q.ReachableInvocations.Invocations[invocations.ID(invID)]
if !ok {
return fmt.Errorf("error finding realm: invocation %s not found", invID)
}
// Check if the caller has permission to get test exonreations in the
// immediate invocation's realm.
allowedExoneration, ok := exonerationPerms[reachableInv.Realm]
if !ok {
// The test exoneration permission in this realm hasn't been checked
// before.
allowedExoneration, err = auth.HasPermission(ctx,
rdbperms.PermGetTestExoneration, reachableInv.Realm, nil)
if err != nil {
return errors.Annotate(err,
"error checking permission for test exoneration data restriction").Err()
}
// Record whether the caller has permission to get test exonerations in
// the realm so the HasPermission result can be re-used.
exonerationPerms[reachableInv.Realm] = allowedExoneration
}
if allowedExoneration {
shouldMaskVariant = false
} else {
// Restrict test exoneration data.
if err := exonerations.ToLimitedData(ctx, exoneration); err != nil {
return err
}
}
}
if shouldMaskVariant {
// All test results and test exonerations have been masked to limited data
// only, so the variant definition should be masked.
tv.Variant = nil
tv.IsMasked = true
}
return nil
}
func (q *Query) queryTestVariantsWithUnexpectedResults(ctx context.Context, f func(*pb.TestVariant) error) (err error) {
ctx, ts := tracing.Start(ctx, "testvariants.Query.run",
attribute.Int("cr.dev.invocations", len(q.ReachableInvocations.Invocations)),
)
defer func() { tracing.End(ts, err) }()
if q.PageSize < 0 {
panic("PageSize < 0")
}
st, err := spanutil.GenerateStatement(testVariantsWithUnexpectedResultsSQLTmpl, map[string]any{
"ResultColumns": strings.Join(q.resultSelectColumns(), ", "),
"HasTestIds": len(q.TestIDs) > 0,
"StatusFilter": q.Predicate.GetStatus() != 0 && q.Predicate.GetStatus() != pb.TestVariantStatus_UNEXPECTED_MASK,
})
if err != nil {
return
}
st.Params = q.params
st.Params["limit"] = q.PageSize
st.Params["testResultLimit"] = q.ResultLimit
var b spanutil.Buffer
return spanutil.Query(ctx, st, func(row *spanner.Row) error {
tv := &pb.TestVariant{}
var tvStatus int64
var results []*tvResult
var exonerationIDs []string
var exonerationInvocationIDs []string
var exonerationExplanationHTMLs [][]byte
var exonerationReasons []int64
var tmd spanutil.Compressed
err := b.FromSpanner(row,
&tv.TestId, &tv.VariantHash, &tv.Variant, &tmd, &tvStatus, &results,
&exonerationIDs, &exonerationInvocationIDs, &exonerationExplanationHTMLs,
&exonerationReasons)
if err != nil {
return err
}
tv.Status = pb.TestVariantStatus(tvStatus)
if tv.Status == pb.TestVariantStatus_EXPECTED {
panic("query of test variants with unexpected results returned a test variant with only expected results.")
}
if err := populateTestMetadata(tv, tmd); err != nil {
return errors.Annotate(err, "error unmarshalling test_metadata for %s", tv.TestId).Err()
}
// Populate tv.Results
tv.Results = make([]*pb.TestResultBundle, len(results))
for i, r := range results {
tr, err := q.toTestResultProto(r, tv.TestId)
if err != nil {
return err
}
tv.Results[i] = &pb.TestResultBundle{
Result: tr,
}
}
// Populate tv.Exonerations
if len(exonerationReasons) == 0 {
return f(tv)
}
tv.Exonerations = make([]*pb.TestExoneration, len(exonerationReasons))
for i, exonerationID := range exonerationIDs {
// Due to query design, exonerationIDs, exonerationInvocationIDs,
// exonerationExplanationHTMLs and exonerationReasons should all be
// the same length.
e := &pb.TestExoneration{}
e.Name = pbutil.TestExonerationName(
string(invocations.IDFromRowID(exonerationInvocationIDs[i])),
tv.TestId, exonerationID)
ex := exonerationExplanationHTMLs[i]
if e.ExplanationHtml, err = q.decompressText(ex); err != nil {
return err
}
e.Reason = pb.ExonerationReason(exonerationReasons[i])
tv.Exonerations[i] = e
}
return f(tv)
})
}
// Page represents a page of test variants fetched
// in response to a query.
type Page struct {
// TestVariants are the test variants in the page.
TestVariants []*pb.TestVariant
// DistinctSources are the sources referenced by each test variant's
// source_id, stored by their ID. The ID itself should be treated
// as an opaque value; its generation is an implementation detail.
DistinctSources map[string]*pb.Sources
// NextPageToken is used to iterate to the next page of results.
NextPageToken string
}
func (q *Query) fetchTestVariantsWithUnexpectedResults(ctx context.Context) (Page, error) {
responseSize := 0
tvs := make([]*pb.TestVariant, 0, q.PageSize)
// resultPerms and exonerationPerms are maps in which:
// * the key is a realm, and
// * the value is whether the caller has permission in that realm to get
// either test results (resultPerms),
// or test exonerations (exonerationPerms).
// They are populated by Query.toLimitedData as the test variants are fetched
// so that each realm/permission combination is checked at most once.
resultPerms := make(map[string]bool)
exonerationPerms := make(map[string]bool)
// Sources referenced from each test variant's source_id.
distinctSources := make(map[string]*pb.Sources)
// Fetch test variants with unexpected results.
err := q.queryTestVariantsWithUnexpectedResults(ctx, func(tv *pb.TestVariant) error {
// Populate the code sources tested.
if err := q.populateSources(tv, distinctSources); err != nil {
return errors.Annotate(err, "resolving sources").Err()
}
// Restrict test variant data as required.
if q.AccessLevel != AccessLevelUnrestricted {
if err := q.toLimitedData(ctx, tv, resultPerms, exonerationPerms); err != nil {
return errors.Annotate(err, "applying limited access mask").Err()
}
}
// Apply field mask.
if err := q.trim(tv); err != nil {
return errors.Annotate(err, "applying field mask").Err()
}
tvs = append(tvs, tv)
// Apply soft repsonse size limit.
responseSize += estimateTestVariantSize(tv)
if responseSize > q.ResponseLimitBytes {
return responseLimitReachedErr
}
return nil
})
if err != nil && err != responseLimitReachedErr {
return Page{}, err
}
var nextPageToken string
if len(tvs) == q.PageSize || err == responseLimitReachedErr {
// There could be more test variants to return.
last := tvs[len(tvs)-1]
nextPageToken = pagination.Token(last.Status.String(), last.TestId, last.VariantHash)
} else {
// We have finished reading all test variants with unexpected results.
if q.Predicate.GetStatus() != 0 {
// The query is for test variants with specific status, so the query reaches
// to its last results already.
nextPageToken = ""
} else {
// If we got less than one page of test variants with unexpected results,
// and the query is not for test variants with specific status,
// compute the nextPageToken for test variants with only expected results.
nextPageToken = pagination.Token(pb.TestVariantStatus_EXPECTED.String(), "", "")
}
}
return Page{TestVariants: tvs, DistinctSources: distinctSources, NextPageToken: nextPageToken}, nil
}
// queryTestResults returns a page of test results, calling f for each
// test result read. Test results are returned in test variant order.
// Within each test variant, unexpected results are returned first.
func (q *Query) queryTestResults(ctx context.Context, limit int, f func(testId, variantHash string, variant *pb.Variant, tmd spanutil.Compressed, tvr *tvResult) error) (err error) {
ctx, ts := tracing.Start(ctx, "testvariants.Query.queryTestResults",
attribute.Int("cr.dev.invocations", len(q.ReachableInvocations.Invocations)),
)
defer func() { tracing.End(ts, err) }()
st, err := spanutil.GenerateStatement(allTestResultsSQLTmpl, map[string]any{
"ResultColumns": strings.Join(q.resultSelectColumns(), ", "),
"HasTestIds": len(q.TestIDs) > 0,
})
st.Params = q.params
st.Params["limit"] = limit
var b spanutil.Buffer
return spanutil.Query(ctx, st, func(row *spanner.Row) error {
var testID string
var variantHash string
variant := &pb.Variant{}
var tmd spanutil.Compressed
var results []*tvResult
if err := b.FromSpanner(row, &testID, &variantHash, &variant, &tmd, &results); err != nil {
return err
}
return f(testID, variantHash, variant, tmd, results[0])
})
}
// queryTestVariants queries a page of test variants, calling f for each
// test variant read. Unless this method returns an error, it is guaranteed
// f will be called at least once, or finished will be set to true.
// finished indicates all test results from the invocation have been read.
//
// The number of test results provided per test variant is limited based
// on q.ResultLimit. Unexpected test results appear first in the
// list of test results provided on each test variant.
//
// The status of the test variant is not populated as exonerations are
// not read.
func (q *Query) queryTestVariants(ctx context.Context, f func(*pb.TestVariant) error) (finished bool, err error) {
// Number of the total test results returned by the query.
trCount := 0
// The test variant we're processing right now.
// It will be appended to tvs when all of its results are processed unless
// it has unexpected results.
var current *pb.TestVariant
// Query q.PageSize+1 test results, so that in the case of all test results
// correspond to one test variant, we will return q.PageSize test variants
// instead of q.PageSize-1.
pageSize := q.PageSize + 1
// Ensure we always make forward progress, by having enough test results
// to call the callback at least once.
if q.ResultLimit > pageSize {
pageSize = q.ResultLimit
}
// Number of test variants yielded so far. Used to ensure
// we never yield more test variants than the page size.
yieldCount := 0
yield := func(tv *pb.TestVariant) error {
yieldCount++
// Never yield more test variants to the caller than the requested
// query page size.
if yieldCount <= q.PageSize {
return f(tv)
}
return nil
}
err = q.queryTestResults(ctx, pageSize, func(testId, variantHash string, variant *pb.Variant, tmd spanutil.Compressed, tvr *tvResult) error {
trCount++
tr, err := q.toTestResultProto(tvr, testId)
if err != nil {
return err
}
var toYield *pb.TestVariant
if current != nil {
if current.TestId == testId && current.VariantHash == variantHash {
// Another result within the same test variant.
if len(current.Results) < q.ResultLimit {
// Only append if within result limit.
current.Results = append(current.Results, &pb.TestResultBundle{
Result: tr,
})
}
return nil
}
// Different TestId or VariantHash from current, so all test results of
// current have been processed.
toYield = current
}
// New test variant.
current = &pb.TestVariant{
TestId: testId,
VariantHash: variantHash,
Variant: variant,
Results: []*pb.TestResultBundle{
{
Result: tr,
},
},
}
if err := populateTestMetadata(current, tmd); err != nil {
return errors.Annotate(err, "error unmarshalling test_metadata for %s", current.TestId).Err()
}
if toYield != nil {
return yield(toYield)
}
return nil
})
if err != nil {
return false, err
}
if current != nil {
// Yield current if:
// - We finished reading all test results for the queried invocations, OR
// - We have ResultLimit test results for the test variant.
if len(current.Results) == q.ResultLimit || trCount < pageSize {
err := yield(current)
if err != nil {
return false, err
}
}
}
// We have finished if we have run out of test results in the queried
// invocations, and we managed to deliver all those test variants to
// the caller.
finished = trCount < pageSize && yieldCount <= q.PageSize
return finished, nil
}
// responseLimitReachedErr is an error returned to avoid iterating over more
// results when the soft response size limit has been reached.
var responseLimitReachedErr = errors.New("terminating iteration early as response size limit reached")
func (q *Query) fetchTestVariantsWithOnlyExpectedResults(ctx context.Context) (Page, error) {
responseSize := 0
tvs := make([]*pb.TestVariant, 0, q.PageSize)
// resultPerms and exonerationPerms are maps in which:
// * the key is a realm, and
// * the value is whether the caller has permission in that realm to get
// either test results (resultPerms),
// or test exonerations (exonerationPerms).
// They are populated by Query.toLimitedData as the test variants are fetched so
// that each realm/permission combination is checked at most once.
resultPerms := make(map[string]bool)
exonerationPerms := make(map[string]bool)
// Sources referenced from each test variant's source_id.
distinctSources := make(map[string]*pb.Sources)
// The last test variant we have completely processed.
var lastProcessedTestID string
var lastProcessedVariantHash string
finished, err := q.queryTestVariants(ctx, func(tv *pb.TestVariant) error {
lastProcessedTestID = tv.TestId
lastProcessedVariantHash = tv.VariantHash
// Within the test variant, unexpected results appear first.
// So if the first result is expected, all are.
isOnlyExpected := tv.Results[0].Result.Expected
if isOnlyExpected {
tv.Status = pb.TestVariantStatus_EXPECTED
// Populate the code sources tested.
if err := q.populateSources(tv, distinctSources); err != nil {
return errors.Annotate(err, "resolving sources").Err()
}
// Restrict test variant data as required.
if q.AccessLevel != AccessLevelUnrestricted {
if err := q.toLimitedData(ctx, tv, resultPerms, exonerationPerms); err != nil {
return err
}
}
// Apply field mask.
if err := q.trim(tv); err != nil {
return err
}
tvs = append(tvs, tv)
// Apply soft repsonse size limit.
responseSize += estimateTestVariantSize(tv)
if responseSize > q.ResponseLimitBytes {
return responseLimitReachedErr
}
}
return nil
})
if err != nil && err != responseLimitReachedErr {
return Page{}, err
}
var nextPageToken string
if finished {
nextPageToken = ""
} else {
nextPageToken = pagination.Token(pb.TestVariantStatus_EXPECTED.String(), lastProcessedTestID, lastProcessedVariantHash)
}
return Page{TestVariants: tvs, DistinctSources: distinctSources, NextPageToken: nextPageToken}, nil
}
// Fetch returns a page of test variants matching q.
// Returned test variants are ordered by test variant status, test ID and variant hash.
func (q *Query) Fetch(ctx context.Context) (Page, error) {
if q.PageSize <= 0 {
panic("PageSize <= 0")
}
if q.ResultLimit <= 0 {
panic("ResultLimit <= 0")
}
if q.ResponseLimitBytes <= 0 {
panic("ResponseLimitBytes <= 0")
}
status := int(q.Predicate.GetStatus())
if q.Predicate.GetStatus() == pb.TestVariantStatus_UNEXPECTED_MASK {
status = 0
}
testResultInvs, err := q.ReachableInvocations.WithTestResultsIDSet()
if err != nil {
return Page{}, errors.Annotate(err, "error getting invocations with test results").Err()
}
exonerationInvs, err := q.ReachableInvocations.WithExonerationsIDSet()
if err != nil {
return Page{}, errors.Annotate(err, "error getting invocations with exonerations").Err()
}
q.params = map[string]any{
"testResultInvIDs": testResultInvs,
"testExonerationInvIDs": exonerationInvs,
"testIDs": q.TestIDs,
"skipStatus": int(pb.TestStatus_SKIP),
"unexpected": int(pb.TestVariantStatus_UNEXPECTED),
"unexpectedlySkipped": int(pb.TestVariantStatus_UNEXPECTEDLY_SKIPPED),
"flaky": int(pb.TestVariantStatus_FLAKY),
"exonerated": int(pb.TestVariantStatus_EXONERATED),
"expected": int(pb.TestVariantStatus_EXPECTED),
"status": status,
}
var expected bool
switch parts, err := pagination.ParseToken(q.PageToken); {
case err != nil:
return Page{}, err
case len(parts) == 0:
expected = false
q.params["afterTvStatus"] = 0
q.params["afterTestId"] = ""
q.params["afterVariantHash"] = ""
case len(parts) != 3:
return Page{}, pagination.InvalidToken(errors.Reason("expected 3 components, got %q", parts).Err())
default:
status, ok := pb.TestVariantStatus_value[parts[0]]
if !ok {
return Page{}, pagination.InvalidToken(errors.Reason("unrecognized test variant status: %q", parts[0]).Err())
}
expected = pb.TestVariantStatus(status) == pb.TestVariantStatus_EXPECTED
q.params["afterTvStatus"] = int(status)
q.params["afterTestId"] = parts[1]
q.params["afterVariantHash"] = parts[2]
}
if q.Predicate.GetStatus() == pb.TestVariantStatus_EXPECTED {
expected = true
}
if expected {
return q.fetchTestVariantsWithOnlyExpectedResults(ctx)
} else {
return q.fetchTestVariantsWithUnexpectedResults(ctx)
}
}
var testVariantsWithUnexpectedResultsSQLTmpl = template.Must(template.New("testVariantsWithUnexpectedResultsSQL").Parse(`
@{USE_ADDITIONAL_PARALLELISM=TRUE}
WITH unexpectedTestVariants AS (
SELECT DISTINCT TestId, VariantHash
FROM TestResults@{FORCE_INDEX=UnexpectedTestResults, spanner_emulator.disable_query_null_filtered_index_check=true}
WHERE IsUnexpected AND InvocationId in UNNEST(@testResultInvIDs)
),
-- Get test variants and their results.
-- Also count the number of unexpected results and total results for each test
-- variant, which will be used to classify test variants.
test_variants AS (
SELECT
TestId,
VariantHash,
ANY_VALUE(Variant) Variant,
ANY_VALUE(TestMetadata) TestMetadata,
COUNTIF(IsUnexpected) num_unexpected,
COUNTIF(Status=@skipStatus) num_skipped,
COUNT(TestId) num_total,
ARRAY_AGG(STRUCT({{.ResultColumns}})) results,
FROM unexpectedTestVariants vur
JOIN@{FORCE_JOIN_ORDER=TRUE, JOIN_METHOD=HASH_JOIN} TestResults tr USING (TestId, VariantHash)
WHERE InvocationId in UNNEST(@testResultInvIDs)
GROUP BY TestId, VariantHash
),
exonerated AS (
SELECT
TestId,
VariantHash,
ARRAY_AGG(ExonerationId) ExonerationIDs,
ARRAY_AGG(InvocationId) InvocationIDs,
ARRAY_AGG(ExplanationHTML) ExonerationExplanationHTMLs,
ARRAY_AGG(Reason) ExonerationReasons
FROM TestExonerations
WHERE InvocationId IN UNNEST(@testExonerationInvIDs)
GROUP BY TestId, VariantHash
),
testVariantsWithUnexpectedResults AS (
SELECT
tv.TestId,
tv.VariantHash,
tv.Variant,
tv.TestMetadata,
CASE
WHEN exonerated.TestId IS NOT NULL THEN @exonerated
WHEN num_unexpected = 0 THEN @expected -- should never happen in this query
WHEN num_skipped = num_unexpected AND num_skipped = num_total THEN @unexpectedlySkipped
WHEN num_unexpected = num_total THEN @unexpected
ELSE @flaky
END TvStatus,
ARRAY(
SELECT AS STRUCT *
FROM UNNEST(tv.results)
LIMIT @testResultLimit) results,
exonerated.ExonerationIDs,
exonerated.InvocationIDs,
exonerated.ExonerationExplanationHTMLs,
exonerated.ExonerationReasons
FROM test_variants tv
LEFT JOIN exonerated USING(TestId, VariantHash)
ORDER BY TvStatus, TestId, VariantHash
)
SELECT
TestId,
VariantHash,
Variant,
TestMetadata,
TvStatus,
results,
ExonerationIDs,
InvocationIDs,
ExonerationExplanationHTMLs,
ExonerationReasons
FROM testVariantsWithUnexpectedResults
WHERE
{{if .HasTestIds}}
TestId in UNNEST(@testIDs) AND
{{end}}
{{if .StatusFilter}}
(TvStatus = @status AND TestId > @afterTestId) OR
(TvStatus = @status AND TestId = @afterTestId AND VariantHash > @afterVariantHash)
{{else}}
(TvStatus > @afterTvStatus) OR
(TvStatus = @afterTvStatus AND TestId > @afterTestId) OR
(TvStatus = @afterTvStatus AND TestId = @afterTestId AND VariantHash > @afterVariantHash)
{{end}}
ORDER BY TvStatus, TestId, VariantHash
LIMIT @limit
`))
var allTestResultsSQLTmpl = template.Must(template.New("allTestResultsSQL").Parse(`
@{USE_ADDITIONAL_PARALLELISM=TRUE}
SELECT
TestId,
VariantHash,
Variant,
TestMetadata,
-- Spanner doesn't support returning a struct as a column.
-- https://cloud.google.com/spanner/docs/reference/standard-sql/query-syntax#using_structs_with_select
-- Wrap it in an array instead.
ARRAY(SELECT STRUCT({{.ResultColumns}})) AS results,
FROM TestResults
WHERE InvocationId in UNNEST(@testResultInvIDs)
{{if .HasTestIds}}
AND TestId in UNNEST(@testIDs)
{{end}}
AND (
(TestId > @afterTestId) OR
(TestId = @afterTestId AND VariantHash > @afterVariantHash)
)
-- Within a test variant, return the unexpected results first.
ORDER BY TestId, VariantHash, IsUnexpected DESC
LIMIT @limit
`))
func populateTestMetadata(tv *pb.TestVariant, tmd spanutil.Compressed) error {
if len(tmd) == 0 {
return nil
}
tv.TestMetadata = &pb.TestMetadata{}
return proto.Unmarshal(tmd, tv.TestMetadata)
}
// estimateTestVariantSize estimates the size of a test variant in
// a pRPC response (pRPC responses use JSON serialisation).
func estimateTestVariantSize(tv *pb.TestVariant) int {
// Estimate the size of a JSON-serialised test variant,
// as the sum of the sizes of its fields, plus
// an overhead (for JSON grammar and the field names).
return 1000 + proto.Size(tv)
}