blob: 31147c82b7fadbe2e63df63378656e074e6030f4 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package recorder
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"net/http"
"strconv"
"strings"
"cloud.google.com/go/spanner"
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/grpc/appstatus"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/server/router"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/internal/artifactcontent"
"go.chromium.org/luci/resultdb/internal/artifacts"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/internal/tracing"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
const (
artifactContentHashHeaderKey = "Content-Hash"
artifactContentSizeHeaderKey = "Content-Length"
artifactContentTypeHeaderKey = "Content-Type"
updateTokenHeaderKey = "Update-Token"
)
// artifactCreationHandler can handle artifact creation requests.
//
// Request:
// - Router parameter "artifact" MUST be a valid artifact name.
// - The request body MUST be the artifact contents.
// - The request MUST include an Update-Token header with the value of
// invocation's update token.
// - The request MUST include a Content-Length header. It must be <= MaxArtifactContentSize.
// - The request MUST include a Content-Hash header with value "sha256:{hash}"
// where {hash} is a lower-case hex-encoded SHA256 hash of the artifact
// contents.
// - The request SHOULD have a Content-Type header.
type artifactCreationHandler struct {
// RBEInstance is the full name of the RBE instance used for artifact storage.
// Format: projects/{project}/instances/{instance}.
RBEInstance string
NewCASWriter func(context.Context) (bytestream.ByteStream_WriteClient, error)
MaxArtifactContentStreamLength int64
bufSize int
}
// Handle implements router.Handler.
func (h *artifactCreationHandler) Handle(c *router.Context) {
ac := &artifactCreator{artifactCreationHandler: h}
mw := artifactcontent.NewMetricsWriter(c)
defer func() {
mw.Upload(c.Request.Context(), ac.size)
}()
err := ac.handle(c)
st, ok := appstatus.Get(err)
switch {
case ok:
logging.Warningf(c.Request.Context(), "Responding with %s: %s", st.Code(), err)
http.Error(c.Writer, st.Message(), grpcutil.CodeStatus(st.Code()))
case err != nil:
logging.Errorf(c.Request.Context(), "Internal server error: %s", err)
http.Error(c.Writer, "Internal server error", http.StatusInternalServerError)
default:
c.Writer.WriteHeader(http.StatusNoContent)
}
}
// artifactCreator handles one artifact creation request.
type artifactCreator struct {
*artifactCreationHandler
artifactName string
invID invocations.ID
testID string
resultID string
artifactID string
localParentID string
contentType string
hash string
size int64
}
func (ac *artifactCreator) handle(c *router.Context) error {
ctx := c.Request.Context()
// Parse and validate the request.
if err := ac.parseRequest(c); err != nil {
return err
}
// Read and verify the current state.
switch sameExists, err := ac.verifyStateBeforeWriting(ctx); {
case err != nil:
return err
case sameExists:
return nil
}
// Read the request body through a digest verifying proxy.
// This is mandatory because RBE-CAS does not guarantee digest verification in
// all cases.
ver := &digestVerifier{
r: c.Request.Body,
expectedHash: artifacts.TrimHashPrefix(ac.hash),
expectedSize: ac.size,
actualHash: sha256.New(),
}
// Forward the request body to RBE-CAS.
if err := ac.writeToCAS(ctx, ver); err != nil {
return errors.Annotate(err, "failed to write to CAS").Err()
}
if err := ver.ReadVerify(ctx); err != nil {
return err
}
// Record the artifact in Spanner.
var realm string
_, err := span.ReadWriteTransaction(ctx, func(ctx context.Context) (err error) {
// Verify the state again.
var sameExists bool
realm, sameExists, err = ac.verifyState(ctx)
switch {
case err != nil:
return err
case sameExists:
return nil
}
span.BufferWrite(ctx, spanutil.InsertMap("Artifacts", map[string]any{
"InvocationId": ac.invID,
"ParentId": ac.localParentID,
"ArtifactId": ac.artifactID,
"ContentType": ac.contentType,
"Size": ac.size,
"RBECASHash": ac.hash,
}))
return nil
})
if err != nil {
return err
}
spanutil.IncRowCount(ctx, 1, spanutil.Artifacts, spanutil.Inserted, realm)
return nil
}
// writeToCAS writes contents in r to RBE-CAS.
// ac.hash and ac.size must match the contents.
func (ac *artifactCreator) writeToCAS(ctx context.Context, r io.Reader) (err error) {
ctx, overallSpan := tracing.Start(ctx, "resultdb.writeToCAS")
defer func() { tracing.End(overallSpan, err) }()
// Protocol:
// https://github.com/bazelbuild/remote-apis/blob/7802003e00901b4e740fe0ebec1243c221e02ae2/build/bazel/remote/execution/v2/remote_execution.proto#L193-L205
// https://github.com/googleapis/googleapis/blob/c8e291e6a4d60771219205b653715d5aeec3e96b/google/bytestream/bytestream.proto#L55
w, err := ac.NewCASWriter(ctx)
if err != nil {
return errors.Annotate(err, "failed to create a CAS writer").Err()
}
defer w.CloseSend()
bufSize := ac.bufSize
if bufSize == 0 {
bufSize = 1024 * 1024
if bufSize > int(ac.size) {
bufSize = int(ac.size)
}
}
buf := make([]byte, bufSize)
// Copy data from r to w using buffer buf.
// Include the resource name only in the first request.
first := true
bytesSent := 0
for {
_, readSpan := tracing.Start(ctx, "resultdb.readChunk")
n, err := r.Read(buf)
if err != nil && err != io.EOF {
tracing.End(readSpan, err)
if err != io.ErrUnexpectedEOF {
return errors.Annotate(err, "failed to read artifact contents").Err()
}
return appstatus.BadRequest(errors.Annotate(err, "failed to read artifact contents").Err())
}
tracing.End(readSpan, nil, attribute.Int("size", n))
last := err == io.EOF
// Prepare the request.
// WriteRequest message: https://github.com/googleapis/googleapis/blob/c8e291e6a4d60771219205b653715d5aeec3e96b/google/bytestream/bytestream.proto#L128
req := &bytestream.WriteRequest{
Data: buf[:n],
FinishWrite: last,
WriteOffset: int64(bytesSent),
}
// Include the resource name only in the first request.
if first {
first = false
req.ResourceName = ac.genWriteResourceName(ctx)
}
// Send the request.
_, writeSpan := tracing.Start(ctx, "resultdb.writeChunk",
attribute.Int("size", n),
)
// Do not shadow err! It is checked below again.
if err = w.Send(req); err != nil && err != io.EOF {
tracing.End(writeSpan, err)
return errors.Annotate(err, "failed to write data to RBE-CAS").Err()
}
tracing.End(writeSpan, nil)
bytesSent += n
if last || err == io.EOF {
// Either this was the last chunk, or server closed the stream.
break
}
}
// Read and interpret the response.
switch res, err := w.CloseAndRecv(); {
case status.Code(err) == codes.InvalidArgument:
logging.Warningf(ctx, "RBE-CAS responded with %s", err)
return appstatus.Errorf(codes.InvalidArgument, "Content-Hash and/or Content-Length do not match the request body")
case err != nil:
return errors.Annotate(err, "failed to read RBE-CAS write response").Err()
case res.CommittedSize == ac.size:
return nil
default:
return errors.Reason("unexpected blob commit size %d, expected %d", res.CommittedSize, ac.size).Err()
}
}
// genWriteResourceName generates a random resource name that can be used
// to write the blob to RBE-CAS.
func (ac *artifactCreator) genWriteResourceName(ctx context.Context) string {
uuidBytes := make([]byte, 16)
if _, err := mathrand.Read(ctx, uuidBytes); err != nil {
panic(err)
}
return fmt.Sprintf(
"%s/uploads/%s/blobs/%s/%d",
ac.RBEInstance,
uuid.Must(uuid.FromBytes(uuidBytes)),
artifacts.TrimHashPrefix(ac.hash),
ac.size)
}
// parseRequest populates ac fields based on the HTTP request.
func (ac *artifactCreator) parseRequest(c *router.Context) error {
// Read the artifact name.
// We must use EscapedPath(), not Path, to preserve test ID's own encoding.
ac.artifactName = strings.TrimPrefix(c.Request.URL.EscapedPath(), "/")
// Parse and validate the artifact name.
var invIDString string
var err error
invIDString, ac.testID, ac.resultID, ac.artifactID, err = pbutil.ParseArtifactName(ac.artifactName)
if err != nil {
return appstatus.Errorf(codes.InvalidArgument, "bad artifact name: %s", err)
}
ac.invID = invocations.ID(invIDString)
ac.localParentID = artifacts.ParentID(ac.testID, ac.resultID)
// Parse and validate the hash.
switch ac.hash = c.Request.Header.Get(artifactContentHashHeaderKey); {
case ac.hash == "":
return appstatus.Errorf(codes.InvalidArgument, "%s header is missing", artifactContentHashHeaderKey)
case !artifacts.ContentHashRe.MatchString(ac.hash):
return appstatus.Errorf(codes.InvalidArgument, "%s header value does not match %s", artifactContentHashHeaderKey, artifacts.ContentHashRe)
}
// Parse and validate the size.
sizeHeader := c.Request.Header.Get(artifactContentSizeHeaderKey)
if sizeHeader == "" {
return appstatus.Errorf(codes.InvalidArgument, "%s header is missing", artifactContentSizeHeaderKey)
}
switch ac.size, err = strconv.ParseInt(sizeHeader, 10, 64); {
case err != nil:
return appstatus.Errorf(codes.InvalidArgument, "%s header is malformed: %s", artifactContentSizeHeaderKey, err)
case ac.size < 0 || ac.size > ac.MaxArtifactContentStreamLength:
return appstatus.Errorf(codes.InvalidArgument, "%s header must be a value between 0 and %d", artifactContentSizeHeaderKey, ac.MaxArtifactContentStreamLength)
}
// Parse and validate the update token.
updateToken := c.Request.Header.Get(updateTokenHeaderKey)
if updateToken == "" {
return appstatus.Errorf(codes.Unauthenticated, "%s header is missing", updateTokenHeaderKey)
}
if err := validateInvocationToken(c.Request.Context(), updateToken, ac.invID); err != nil {
return appstatus.Errorf(codes.PermissionDenied, "invalid %s header value", updateTokenHeaderKey)
}
ac.contentType = c.Request.Header.Get(artifactContentTypeHeaderKey)
return nil
}
// verifyStateBeforeWriting checks Spanner state in a read-only transaction,
// see verifyState comment.
func (ac *artifactCreator) verifyStateBeforeWriting(ctx context.Context) (sameAlreadyExists bool, err error) {
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
_, sameAlreadyExists, err = ac.verifyState(ctx)
return
}
// verifyState checks if the Spanner state is compatible with creation of the
// artifact. If an identical artifact already exists, sameAlreadyExists is true.
func (ac *artifactCreator) verifyState(ctx context.Context) (realm string, sameAlreadyExists bool, err error) {
var (
invState pb.Invocation_State
hash spanner.NullString
size spanner.NullInt64
artifactExists bool
)
// Read the state concurrently.
err = parallel.FanOutIn(func(work chan<- func() error) {
work <- func() (err error) {
return invocations.ReadColumns(ctx, ac.invID, map[string]any{
"State": &invState, "Realm": &realm,
})
}
work <- func() error {
key := ac.invID.Key(ac.localParentID, ac.artifactID)
err := spanutil.ReadRow(ctx, "Artifacts", key, map[string]any{
"RBECASHash": &hash,
"Size": &size,
})
artifactExists = err == nil
if spanner.ErrCode(err) == codes.NotFound {
// This is expected.
return nil
}
return err
}
})
// Interpret the state.
switch {
case err != nil:
return
case invState != pb.Invocation_ACTIVE:
err = appstatus.Errorf(codes.FailedPrecondition, "%s is not active", ac.invID.Name())
return
case hash.Valid && hash.StringVal == ac.hash && size.Valid && size.Int64 == ac.size:
// The same artifact already exists.
sameAlreadyExists = true
return
case artifactExists:
// A different artifact already exists.
err = appstatus.Errorf(codes.AlreadyExists, "artifact %q already exists", ac.artifactName)
return
}
return
}
// digestVerifier is an io.Reader that also verifies the digest.
type digestVerifier struct {
r io.Reader
expectedSize int64
expectedHash string
actualSize int64
actualHash hash.Hash
}
func (v *digestVerifier) Read(p []byte) (n int, err error) {
n, err = v.r.Read(p)
v.actualSize += int64(n)
v.actualHash.Write(p[:n])
return n, err
}
// ReadVerify reads through the rest of the v.r
// and returns a non-nil error if the content have unexpected hash or size.
// The error may be annotated with appstatus.
func (v *digestVerifier) ReadVerify(ctx context.Context) (err error) {
_, ts := tracing.Start(ctx, "resultdb.digestVerifier.ReadVerify")
defer func() { tracing.End(ts, err) }()
// Read until the end.
if _, err := io.Copy(io.Discard, v); err != nil {
return err
}
// Verify size.
if v.actualSize != v.expectedSize {
return appstatus.Errorf(
codes.InvalidArgument,
"Content-Length header value %d does not match the length of the request body, %d",
v.expectedSize,
v.actualSize,
)
}
// Verify hash.
hashFromBody := hex.EncodeToString(v.actualHash.Sum(nil))
hashFromHeader := v.expectedHash
if hashFromBody != hashFromHeader {
return appstatus.Errorf(
codes.InvalidArgument,
`Content-Hash header value "%s" does not match the hash of the request body, "%s"`,
artifacts.AddHashPrefix(hashFromHeader),
artifacts.AddHashPrefix(hashFromBody),
)
}
return nil
}