blob: 273b2944d924605e80beb496dc6662388e52d360 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package recorder
import (
"context"
"crypto/sha256"
"encoding/hex"
"mime"
"time"
"cloud.google.com/go/spanner"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/grpc/appstatus"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/auth/realms"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/bqutil"
"go.chromium.org/luci/resultdb/internal/artifacts"
"go.chromium.org/luci/resultdb/internal/config"
"go.chromium.org/luci/resultdb/internal/gsutil"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// TODO(crbug.com/1177213) - make this configurable.
const MaxBatchCreateArtifactSize = 10 * 1024 * 1024
// MaxShardContentSize is the maximum content size in BQ row.
// Artifacts content bigger than this size needs to be sharded.
// Leave 10 KB for other fields, the rest is content.
const MaxShardContentSize = bqutil.RowMaxBytes - 10*1024
// LookbackWindow is used when chunking. It specifies how many bytes we should
// look back to find new line/white space characters to split the chunks.
const LookbackWindow = 1024
type artifactCreationRequest struct {
testID string
resultID string
artifactID string
contentType string
// hash is a hash of the artifact data. It is not supplied or calculated for GCS artifacts.
hash string
// size is the size of the artifact data in bytes. In the case of a GCS artifact it is user-specified, optional and not verified.
size int64
// data is the artifact contents data that will be stored in RBE-CAS. If gcsURI is provided, this must be empty.
data []byte
// gcsURI is the location of the artifact content if it is stored in GCS. If this is provided, data must be empty.
gcsURI string
// status of the artifact's parent test result.
testStatus pb.TestStatus
}
type invocationInfo struct {
id string
realm string
createTime time.Time
}
// name returns the artifact name.
func (a *artifactCreationRequest) name(invID invocations.ID) string {
if a.testID == "" {
return pbutil.InvocationArtifactName(string(invID), a.artifactID)
}
return pbutil.TestResultArtifactName(string(invID), a.testID, a.resultID, a.artifactID)
}
// parentID returns the local parent ID of the artifact.
func (a *artifactCreationRequest) parentID() string {
return artifacts.ParentID(a.testID, a.resultID)
}
func parseCreateArtifactRequest(req *pb.CreateArtifactRequest) (invocations.ID, *artifactCreationRequest, error) {
if req.GetArtifact() == nil {
return "", nil, errors.Reason("artifact: unspecified").Err()
}
if err := pbutil.ValidateArtifactID(req.Artifact.ArtifactId); err != nil {
return "", nil, errors.Annotate(err, "artifact_id").Err()
}
if req.Artifact.ContentType != "" {
if _, _, err := mime.ParseMediaType(req.Artifact.ContentType); err != nil {
return "", nil, errors.Annotate(err, "content_type").Err()
}
}
// parent
if req.Parent == "" {
return "", nil, errors.Reason("parent: unspecified").Err()
}
invIDStr, testID, resultID, err := pbutil.ParseTestResultName(req.Parent)
if err != nil {
if invIDStr, err = pbutil.ParseInvocationName(req.Parent); err != nil {
return "", nil, errors.Reason("parent: neither valid invocation name nor valid test result name").Err()
}
}
if len(req.Artifact.Contents) != 0 && req.Artifact.GcsUri != "" {
return "", nil, errors.Reason("only one of contents and gcs_uri can be given").Err()
}
sizeBytes := int64(len(req.Artifact.Contents))
if sizeBytes != 0 && req.Artifact.SizeBytes != 0 && sizeBytes != req.Artifact.SizeBytes {
return "", nil, errors.Reason("sizeBytes and contents are specified but don't match").Err()
}
// If contents field is empty, try to set size from the request instead.
if sizeBytes == 0 {
if req.Artifact.SizeBytes != 0 {
sizeBytes = req.Artifact.SizeBytes
}
}
return invocations.ID(invIDStr), &artifactCreationRequest{
artifactID: req.Artifact.ArtifactId,
contentType: req.Artifact.ContentType,
data: req.Artifact.Contents,
size: sizeBytes,
testID: testID,
resultID: resultID,
gcsURI: req.Artifact.GcsUri,
testStatus: req.Artifact.TestStatus,
}, nil
}
// parseBatchCreateArtifactsRequest parses a batch request and returns
// artifactCreationRequests for each of the artifacts w/o hash computation.
// It returns an error, if
// - any of the artifact IDs or contentTypes are invalid,
// - the total size exceeds MaxBatchCreateArtifactSize, or
// - there are more than one invocations associated with the artifacts.
// - both data and a GCS URI are supplied
func parseBatchCreateArtifactsRequest(in *pb.BatchCreateArtifactsRequest) (invocations.ID, []*artifactCreationRequest, error) {
var tSize int64
var invID invocations.ID
if err := pbutil.ValidateBatchRequestCount(len(in.Requests)); err != nil {
return "", nil, err
}
arts := make([]*artifactCreationRequest, len(in.Requests))
for i, req := range in.Requests {
inv, art, err := parseCreateArtifactRequest(req)
if err != nil {
return "", nil, errors.Annotate(err, "requests[%d]", i).Err()
}
switch {
case invID == "":
invID = inv
case invID != inv:
return "", nil, errors.Reason("requests[%d]: only one invocation is allowed: %q, %q", i, invID, inv).Err()
}
// TODO(ddoman): limit the max request body size in prpc level.
tSize += art.size
if tSize > MaxBatchCreateArtifactSize {
return "", nil, errors.Reason("the total size of artifact contents exceeded %d", MaxBatchCreateArtifactSize).Err()
}
arts[i] = art
}
return invID, arts, nil
}
// findNewArtifacts returns a list of the artifacts that don't have states yet.
// If one exists w/ different hash/size, this returns an error.
func findNewArtifacts(ctx context.Context, invID invocations.ID, arts []*artifactCreationRequest) ([]*artifactCreationRequest, error) {
// artifacts are not expected to exist in most cases, and this map would likely
// be empty.
type state struct {
hash string
size int64
gcsURI string
}
var states map[string]state
ks := spanner.KeySets()
for _, a := range arts {
ks = spanner.KeySets(invID.Key(a.parentID(), a.artifactID), ks)
}
var b spanutil.Buffer
err := span.Read(ctx, "Artifacts", ks, []string{"ParentId", "ArtifactId", "RBECASHash", "Size", "GcsURI"}).Do(
func(row *spanner.Row) (err error) {
var pid, aid string
var hash string
var size = new(int64)
var gcsURI string
if err = b.FromSpanner(row, &pid, &aid, &hash, &size, &gcsURI); err != nil {
return
}
if states == nil {
states = make(map[string]state)
}
// treat non-existing size as 0.
if size == nil {
size = new(int64)
}
// The artifact exists.
states[invID.Key(pid, aid).String()] = state{hash, *size, gcsURI}
return
},
)
if err != nil {
return nil, appstatus.Errorf(codes.Internal, "%s", err)
}
newArts := make([]*artifactCreationRequest, 0, len(arts)-len(states))
for _, a := range arts {
// Save the hash, so that it can be reused in the post-verification
// after rbecase.UpdateBlob().
if a.gcsURI == "" && a.hash == "" {
h := sha256.Sum256(a.data)
a.hash = artifacts.AddHashPrefix(hex.EncodeToString(h[:]))
}
st, ok := states[invID.Key(a.parentID(), a.artifactID).String()]
if !ok {
newArts = append(newArts, a)
continue
}
if (a.gcsURI == "") != (st.gcsURI == "") {
// Can't change from GCS to non-GCS and vice-versa
return nil, appstatus.Errorf(codes.AlreadyExists, `%q: exists w/ different storage scheme`, a.name(invID))
}
if a.size != st.size {
return nil, appstatus.Errorf(codes.AlreadyExists, `%q: exists w/ different size: %d != %d`, a.name(invID), a.size, st.size)
}
if a.gcsURI != "" {
if a.gcsURI != st.gcsURI {
return nil, appstatus.Errorf(codes.AlreadyExists, `%q: exists w/ different GCS URI: %s != %s`, a.name(invID), a.gcsURI, st.gcsURI)
}
} else {
if a.hash != st.hash {
return nil, appstatus.Errorf(codes.AlreadyExists, `%q: exists w/ different hash`, a.name(invID))
}
}
}
return newArts, nil
}
// checkArtStates checks if the states of the associated invocation and artifacts are
// compatible with creation of the artifacts. On success, it returns a list of
// the artifactCreationRequests of which artifact don't have states in Spanner yet.
func checkArtStates(ctx context.Context, invID invocations.ID, arts []*artifactCreationRequest) (reqs []*artifactCreationRequest, invInfo *invocationInfo, err error) {
var invState pb.Invocation_State
var createTime time.Time
var realm string
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return invocations.ReadColumns(ctx, invID, map[string]any{
"State": &invState, "Realm": &realm, "CreateTime": &createTime,
})
})
eg.Go(func() (err error) {
reqs, err = findNewArtifacts(ctx, invID, arts)
return
})
switch err := eg.Wait(); {
case err != nil:
return nil, nil, err
case invState != pb.Invocation_ACTIVE:
return nil, nil, appstatus.Errorf(codes.FailedPrecondition, "%s is not active", invID.Name())
}
return reqs, &invocationInfo{
id: string(invID),
realm: realm,
createTime: createTime,
}, nil
}
// createArtifactStates creates the states of given artifacts in Spanner.
func createArtifactStates(ctx context.Context, realm string, invID invocations.ID, arts []*artifactCreationRequest) error {
var noStateArts []*artifactCreationRequest
_, err := span.ReadWriteTransaction(ctx, func(ctx context.Context) (err error) {
// Verify all the states again.
noStateArts, _, err = checkArtStates(ctx, invID, arts)
if err != nil {
return err
}
if len(noStateArts) == 0 {
logging.Warningf(ctx, "The states of all the artifacts already exist.")
}
for _, a := range noStateArts {
span.BufferWrite(ctx, spanutil.InsertMap("Artifacts", map[string]any{
"InvocationId": invID,
"ParentId": a.parentID(),
"ArtifactId": a.artifactID,
"ContentType": a.contentType,
"Size": a.size,
"RBECASHash": a.hash,
"GcsURI": a.gcsURI,
}))
}
return nil
})
if err != nil {
return errors.Annotate(err, "failed to write artifact to Spanner").Err()
}
spanutil.IncRowCount(ctx, len(noStateArts), spanutil.Artifacts, spanutil.Inserted, realm)
return nil
}
func uploadArtifactBlobs(ctx context.Context, rbeIns string, casClient repb.ContentAddressableStorageClient, invID invocations.ID, arts []*artifactCreationRequest) error {
casReq := &repb.BatchUpdateBlobsRequest{InstanceName: rbeIns}
for _, a := range arts {
casReq.Requests = append(casReq.Requests, &repb.BatchUpdateBlobsRequest_Request{
Digest: &repb.Digest{Hash: artifacts.TrimHashPrefix(a.hash), SizeBytes: a.size},
Data: a.data,
})
}
resp, err := casClient.BatchUpdateBlobs(ctx, casReq, &grpc.MaxSendMsgSizeCallOption{MaxSendMsgSize: MaxBatchCreateArtifactSize})
if err != nil {
// If BatchUpdateBlobs() returns INVALID_ARGUMENT, it means that
// the total size of the artifact contents was bigger than the max size that
// BatchUpdateBlobs() can accept.
return errors.Annotate(err, "cas.BatchUpdateBlobs failed").Err()
}
for i, r := range resp.GetResponses() {
cd := codes.Code(r.Status.Code)
if cd != codes.OK {
// Each individual error can be due to resource exhausted or unmatched digest.
// If unmatched digest, this RPC has a bug and needs to be fixed.
// If resource exhausted, the RBE server quota needs to be adjusted.
//
// Either case, it's a server-error, and an internal error will be returned.
return errors.Reason("artifact %q: cas.BatchUpdateBlobs failed", arts[i].name(invID)).Err()
}
}
return nil
}
// allowedBucketsForUser returns the GCS buckets a user is allowed to reference by reading
// the project config.
// If no config exists for the user, an empty map will be returned, rather than an error.
func allowedBucketsForUser(ctx context.Context, project, user string) (allowedBuckets map[string]bool, err error) {
allowedBuckets = map[string]bool{}
// This is cached for 1 minute, so no need to re-optimize here.
cfg, err := config.Project(ctx, project)
if err != nil {
if errors.Is(err, config.ErrNotFoundProjectConfig) {
return allowedBuckets, nil
}
return nil, err
}
for _, list := range cfg.GcsAllowList {
for _, listUser := range list.Users {
if listUser == user {
for _, bucket := range list.Buckets {
allowedBuckets[bucket] = true
}
return allowedBuckets, nil
}
}
}
return allowedBuckets, nil
}
// BatchCreateArtifacts implements pb.RecorderServer.
// This functions uploads the artifacts to RBE-CAS.
// If the artifact is a text-based artifact, it will also get uploaded to BigQuery.
// We have a percentage control to determine how many percent of artifacts got
// uploaded to BigQuery.
func (s *recorderServer) BatchCreateArtifacts(ctx context.Context, in *pb.BatchCreateArtifactsRequest) (*pb.BatchCreateArtifactsResponse, error) {
token, err := extractUpdateToken(ctx)
if err != nil {
return nil, err
}
if len(in.Requests) == 0 {
logging.Debugf(ctx, "Received a BatchCreateArtifactsRequest with 0 requests; returning")
return &pb.BatchCreateArtifactsResponse{}, nil
}
invID, arts, err := parseBatchCreateArtifactsRequest(in)
if err != nil {
return nil, appstatus.BadRequest(err)
}
if err := validateInvocationToken(ctx, token, invID); err != nil {
return nil, appstatus.Errorf(codes.PermissionDenied, "invalid update token")
}
var artsToCreate []*artifactCreationRequest
var invInfo *invocationInfo
func() {
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
artsToCreate, invInfo, err = checkArtStates(ctx, invID, arts)
}()
if err != nil {
return nil, err
}
if len(artsToCreate) == 0 {
logging.Debugf(ctx, "Found no artifacts to create")
return &pb.BatchCreateArtifactsResponse{}, nil
}
realm := invInfo.realm
project, _ := realms.Split(realm)
user := auth.CurrentUser(ctx).Identity
var allowedBuckets map[string]bool = nil
artsToUpload := make([]*artifactCreationRequest, 0, len(artsToCreate))
for _, a := range artsToCreate {
// Only upload to RBE CAS the ones that are not in GCS
if a.gcsURI == "" {
artsToUpload = append(artsToUpload, a)
} else {
// Check this GCS reference is allowed by the project config.
// Delay construction of the checker (which may occasionally involve an RPC) until we know we
// actually need it.
if allowedBuckets == nil {
allowedBuckets, err = allowedBucketsForUser(ctx, project, string(user))
if err != nil {
return nil, errors.Annotate(err, "fetch allowed buckets for user %s", string(user)).Err()
}
}
bucket, _ := gsutil.Split(a.gcsURI)
if _, ok := allowedBuckets[bucket]; !ok {
return nil, appstatus.Errorf(codes.PermissionDenied, "the user %s does not have permission to reference GCS objects in bucket %s in project %s", string(user), bucket, project)
}
}
}
if err := uploadArtifactBlobs(ctx, s.ArtifactRBEInstance, s.casClient, invID, artsToUpload); err != nil {
return nil, err
}
if err := createArtifactStates(ctx, realm, invID, artsToCreate); err != nil {
return nil, err
}
// Return all the artifacts to indicate that they were created.
ret := &pb.BatchCreateArtifactsResponse{Artifacts: make([]*pb.Artifact, len(arts))}
for i, a := range arts {
ret.Artifacts[i] = &pb.Artifact{
Name: a.name(invID),
ArtifactId: a.artifactID,
ContentType: a.contentType,
SizeBytes: a.size,
}
}
return ret, nil
}