blob: f8b11ff80fbd707e1c9dfbf24f5895eb4eef8e1f [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package recorder
import (
"context"
"time"
"cloud.google.com/go/spanner"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/grpc/appstatus"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/resultcount"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
func emptyOrEqual(name, actual, expected string) error {
switch actual {
case "", expected:
return nil
}
return errors.Reason("%s must be either empty or equal to %q, but %q", name, expected, actual).Err()
}
func validateBatchCreateTestResultsRequest(req *pb.BatchCreateTestResultsRequest, now time.Time) error {
if err := pbutil.ValidateInvocationName(req.Invocation); err != nil {
return errors.Annotate(err, "invocation").Err()
}
if err := pbutil.ValidateRequestID(req.RequestId); err != nil {
return errors.Annotate(err, "request_id").Err()
}
if err := pbutil.ValidateBatchRequestCount(len(req.Requests)); err != nil {
return err
}
type Key struct {
testID string
resultID string
}
keySet := map[Key]struct{}{}
for i, r := range req.Requests {
if err := emptyOrEqual("invocation", r.Invocation, req.Invocation); err != nil {
return errors.Annotate(err, "requests: %d", i).Err()
}
if err := emptyOrEqual("request_id", r.RequestId, req.RequestId); err != nil {
return errors.Annotate(err, "requests: %d", i).Err()
}
if err := pbutil.ValidateTestResult(now, r.TestResult); err != nil {
return errors.Annotate(err, "requests: %d: test_result", i).Err()
}
key := Key{
testID: r.TestResult.TestId,
resultID: r.TestResult.ResultId,
}
if _, ok := keySet[key]; ok {
// Duplicated results.
return errors.Reason("duplicate test results in request: testID %q, resultID %q", key.testID, key.resultID).Err()
}
keySet[key] = struct{}{}
}
return nil
}
// BatchCreateTestResults implements pb.RecorderServer.
func (s *recorderServer) BatchCreateTestResults(ctx context.Context, in *pb.BatchCreateTestResultsRequest) (*pb.BatchCreateTestResultsResponse, error) {
now := clock.Now(ctx).UTC()
if err := validateBatchCreateTestResultsRequest(in, now); err != nil {
return nil, appstatus.BadRequest(err)
}
invID := invocations.MustParseName(in.Invocation)
ret := &pb.BatchCreateTestResultsResponse{
TestResults: make([]*pb.TestResult, len(in.Requests)),
}
ms := make([]*spanner.Mutation, len(in.Requests))
var commonPrefix string
varUnion := stringset.New(0)
for i, r := range in.Requests {
ret.TestResults[i], ms[i] = insertTestResult(ctx, invID, in.RequestId, r.TestResult)
if i == 0 {
commonPrefix = r.TestResult.TestId
} else {
commonPrefix = longestCommonPrefix(commonPrefix, r.TestResult.TestId)
}
varUnion.AddAll(pbutil.VariantToStrings(r.TestResult.GetVariant()))
}
var realm string
err := mutateInvocation(ctx, invID, func(ctx context.Context) error {
span.BufferWrite(ctx, ms...)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
var invCommonTestIdPrefix spanner.NullString
var invVars []string
if err = invocations.ReadColumns(ctx, invID, map[string]any{
"Realm": &realm,
"CommonTestIDPrefix": &invCommonTestIdPrefix,
"TestResultVariantUnion": &invVars,
}); err != nil {
return
}
newPrefix := commonPrefix
if !invCommonTestIdPrefix.IsNull() {
newPrefix = longestCommonPrefix(invCommonTestIdPrefix.String(), commonPrefix)
}
varUnion.AddAll(invVars)
if invCommonTestIdPrefix.String() != newPrefix || varUnion.Len() > len(invVars) {
span.BufferWrite(ctx, spanutil.UpdateMap("Invocations", map[string]any{
"InvocationId": invID,
"CommonTestIDPrefix": newPrefix,
"TestResultVariantUnion": varUnion.ToSortedSlice(),
}))
}
return
})
eg.Go(func() error {
return resultcount.IncrementTestResultCount(ctx, invID, int64(len(in.Requests)))
})
return eg.Wait()
})
if err != nil {
return nil, err
}
spanutil.IncRowCount(ctx, len(in.Requests), spanutil.TestResults, spanutil.Inserted, realm)
return ret, nil
}
func insertTestResult(ctx context.Context, invID invocations.ID, requestID string, body *pb.TestResult) (*pb.TestResult, *spanner.Mutation) {
// create a copy of the input message with the OUTPUT_ONLY field(s) to be used in
// the response
ret := proto.Clone(body).(*pb.TestResult)
ret.Name = pbutil.TestResultName(string(invID), ret.TestId, ret.ResultId)
ret.VariantHash = pbutil.VariantHash(ret.Variant)
// handle values for nullable columns
var runDuration spanner.NullInt64
if ret.Duration != nil {
runDuration.Int64 = pbutil.MustDuration(ret.Duration).Microseconds()
runDuration.Valid = true
}
row := map[string]any{
"InvocationId": invID,
"TestId": ret.TestId,
"ResultId": ret.ResultId,
"Variant": ret.Variant,
"VariantHash": ret.VariantHash,
"CommitTimestamp": spanner.CommitTimestamp,
"IsUnexpected": spanner.NullBool{Bool: true, Valid: !body.Expected},
"Status": ret.Status,
"SummaryHTML": spanutil.Compressed(ret.SummaryHtml),
"StartTime": ret.StartTime,
"RunDurationUsec": runDuration,
"Tags": ret.Tags,
}
if ret.SkipReason != pb.SkipReason_SKIP_REASON_UNSPECIFIED {
// Unspecified is mapped to NULL, so only write if we have some other value.
row["SkipReason"] = ret.SkipReason
}
if ret.TestMetadata != nil {
row["TestMetadata"] = spanutil.Compressed(pbutil.MustMarshal(ret.TestMetadata))
}
if ret.FailureReason != nil {
row["FailureReason"] = spanutil.Compressed(pbutil.MustMarshal(ret.FailureReason))
}
if ret.Properties != nil {
row["Properties"] = spanutil.Compressed(pbutil.MustMarshal(ret.Properties))
}
mutation := spanner.InsertOrUpdateMap("TestResults", spanutil.ToSpannerMap(row))
return ret, mutation
}
func longestCommonPrefix(str1, str2 string) string {
for i := 0; i < len(str1) && i < len(str2); i++ {
if str1[i] != str2[i] {
return str1[:i]
}
}
if len(str1) <= len(str2) {
return str1
}
return str2
}