blob: 93d247b57fe01231c339a9b67aa8d99e1b6f2f0a [file] [log] [blame]
/* Copyright 2018 Google Inc. All Rights Reserved. */
// Package cas manages content addressable storage.
package cas
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"go.opencensus.io/trace"
bpb "google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"go.chromium.org/goma/server/log"
"go.chromium.org/goma/server/remoteexec/datasource"
"go.chromium.org/goma/server/remoteexec/digest"
"go.chromium.org/goma/server/rpc"
)
const (
// DefaultBatchByteLimit is bytes limit for cas BatchUploadBlobs.
DefaultBatchByteLimit = 4 * 1024 * 1024
// BatchBlobLimit is max number of blobs in BatchUploadBlobs.
batchBlobLimit = 1000
)
// Client is a client of cas service.
type Client interface {
CAS() rpb.ContentAddressableStorageClient
ByteStream() bpb.ByteStreamClient
}
type client struct {
*grpc.ClientConn
}
func (c client) CAS() rpb.ContentAddressableStorageClient {
return rpb.NewContentAddressableStorageClient(c.ClientConn)
}
func (c client) ByteStream() bpb.ByteStreamClient {
return bpb.NewByteStreamClient(c.ClientConn)
}
// NewClient creates client from conn.
func NewClient(conn *grpc.ClientConn) Client {
return client{conn}
}
// CAS is content-addressable-storage synced between local and cas service.
type CAS struct {
Client
*digest.Store
CacheCapabilities *rpb.CacheCapabilities
}
// TODO: unit test
// Missing checks blobs in local exists in instance of cas service,
// and returns missing blobs.
func (c CAS) Missing(ctx context.Context, instance string, blobs []*rpb.Digest) ([]*rpb.Digest, error) {
span := trace.FromContext(ctx)
logger := log.FromContext(ctx)
logger.Infof("check %d blobs in %s", len(blobs), instance)
span.Annotatef(nil, "check %d blobs", len(blobs))
resp, err := c.Client.CAS().FindMissingBlobs(ctx, &rpb.FindMissingBlobsRequest{
InstanceName: instance,
BlobDigests: blobs,
})
if err != nil {
return nil, grpc.Errorf(grpc.Code(err), "missing blobs: %v", err)
}
span.Annotatef(nil, "missings %d blobs", len(resp.MissingBlobDigests))
logger.Infof("missings %v", resp.MissingBlobDigests)
return resp.MissingBlobDigests, nil
}
var (
errBlobNotInReq = errors.New("blob not in request")
)
// MissingBlob is a missing blog.
type MissingBlob struct {
Digest *rpb.Digest
Err error
}
// MissingError is an error about missing content for blobs.
type MissingError struct {
Blobs []MissingBlob
}
func (e MissingError) Error() string {
return fmt.Sprintf("missing %d blobs", len(e.Blobs))
}
type batchUpdateBlobsRequestPool struct {
mu sync.Mutex
pool sync.Pool // New is protected by mu
byteLimit int64
}
var batchReqPool batchUpdateBlobsRequestPool
// Get gets dummy BachUpdateBlobsRequest to check serialized size.
// maxSizeBytes is max size needed for data buffer.
// byteLimit is limit that RBE server sets for batch total size bytes.
// maxSizeBytes must not exceeds byteLimit.
func (p *batchUpdateBlobsRequestPool) Get(instance string, maxSizeBytes, byteLimit int64) *rpb.BatchUpdateBlobsRequest {
p.mu.Lock()
if p.byteLimit < byteLimit {
p.byteLimit = byteLimit
}
if p.pool.New == nil {
p.pool.New = func() interface{} {
return make([]byte, 0, p.byteLimit)
}
}
buf := p.pool.Get().([]byte)
if int64(cap(buf)) < maxSizeBytes {
buf = make([]byte, 0, maxSizeBytes)
}
p.mu.Unlock()
return &rpb.BatchUpdateBlobsRequest{
InstanceName: instance,
Requests: []*rpb.BatchUpdateBlobsRequest_Request{{Data: buf}},
}
}
func (p *batchUpdateBlobsRequestPool) Put(req *rpb.BatchUpdateBlobsRequest) {
p.pool.Put(req.Requests[0].Data)
}
func separateBlobsByByteLimit(blobs []*rpb.Digest, instance string, byteLimit int64) ([]*rpb.Digest, []*rpb.Digest) {
if len(blobs) == 0 {
return nil, nil
}
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].SizeBytes < blobs[j].SizeBytes
})
// Create dummy data to check protobuf size. To avoid redundant allocations, find the largest digest size.
// string/bytes will be [encoded <wire-type><tag>] [length] [content...]
// so no need to allocate more than byteLimit here.
// https://developers.google.com/protocol-buffers/docs/encoding#structure
// https://developers.google.com/protocol-buffers/docs/encoding#strings
maxSizeBytes := blobs[len(blobs)-1].SizeBytes
if maxSizeBytes >= byteLimit {
maxSizeBytes = byteLimit
}
dummyReq := batchReqPool.Get(instance, maxSizeBytes, byteLimit)
defer batchReqPool.Put(dummyReq)
i := sort.Search(len(blobs), func(i int) bool {
if blobs[i].SizeBytes >= byteLimit {
return true
}
dummyReq.Requests[0].Digest = blobs[i]
dummyReq.Requests[0].Data = dummyReq.Requests[0].Data[:blobs[i].SizeBytes]
return int64(proto.Size(dummyReq)) >= byteLimit
})
if i < len(blobs) {
return blobs[:i], blobs[i:]
}
return blobs, nil
}
func lookupBlobsInStore(ctx context.Context, blobs []*rpb.Digest, store *digest.Store, sema chan struct{}) ([]*rpb.BatchUpdateBlobsRequest_Request, []MissingBlob) {
span := trace.FromContext(ctx)
var wg sync.WaitGroup
type blobLookupResult struct {
err error
req *rpb.BatchUpdateBlobsRequest_Request
}
results := make([]blobLookupResult, len(blobs))
for i := range blobs {
wg.Add(1)
go func(blob *rpb.Digest, result *blobLookupResult) {
defer wg.Done()
sema <- struct{}{}
defer func() {
<-sema
}()
data, ok := store.Get(blob)
if !ok {
span.Annotatef(nil, "blob not found in cas: %v", blob)
result.err = errBlobNotInReq
return
}
b, err := datasource.ReadAll(ctx, data)
if err != nil {
span.Annotatef(nil, "blob data for %v: %v", blob, err)
result.err = err
return
}
// TODO: This is inefficient because we are reading all
// sources whether or not they are going to be returned, due to the
// size computation happening later. This might be okay as long as
// we are not reading too much extra data in one operation.
//
// We should instead return all blob requests for blobs < `byteLimit`,
// batched into multiple BatchUpdateBlobsRequests.
result.req = &rpb.BatchUpdateBlobsRequest_Request{
Digest: data.Digest(),
Data: b,
}
}(blobs[i], &results[i])
}
wg.Wait()
var reqs []*rpb.BatchUpdateBlobsRequest_Request
var missingBlobs []MissingBlob
logger := log.FromContext(ctx)
for i, result := range results {
blob := blobs[i]
if result.err != nil {
missingBlobs = append(missingBlobs, MissingBlob{
Digest: blob,
Err: result.err,
})
continue
}
if result.req != nil {
reqs = append(reqs, result.req)
continue
}
logger.Errorf("Lookup of blobs[%d]=%v yielded neither error nor request", i, blob)
}
return reqs, missingBlobs
}
func createBatchUpdateBlobsRequests(blobReqs []*rpb.BatchUpdateBlobsRequest_Request, instance string, byteLimit int64) []*rpb.BatchUpdateBlobsRequest {
var batchReqs []*rpb.BatchUpdateBlobsRequest
batchReqNoReqsSize := int64(proto.Size(&rpb.BatchUpdateBlobsRequest{InstanceName: instance}))
size := batchReqNoReqsSize
lastOffset := 0
for i := range blobReqs {
// This code assumes that all blobs in `blobReqs`, when added as the only element of
// `batchReq.Requests`, will keep the marshaled proto size of `batchReq` < `byteLimit`.
// If `byteLimit` is 0, then it is ignored.
// Determine the extra proto size introduced by adding the current req.
size += int64(proto.Size(&rpb.BatchUpdateBlobsRequest{Requests: blobReqs[i : i+1]}))
// Add a new BatchUpdateBlobsRequest with blobs from the first blob after the
// previous BatchUpdateBlobsRequest up to and including the current blob, if:
// - this is the final blob
// - adding this blob reaches the blob count limit
// - adding the next blob pushes the size over the byte limit
switch {
case i == len(blobReqs)-1:
fallthrough
case i+1 == lastOffset+batchBlobLimit:
fallthrough
case byteLimit > 0 && size+int64(proto.Size(&rpb.BatchUpdateBlobsRequest{Requests: blobReqs[i+1 : i+2]})) > byteLimit:
batchReqs = append(batchReqs, &rpb.BatchUpdateBlobsRequest{
InstanceName: instance,
Requests: blobReqs[lastOffset : i+1],
})
size = batchReqNoReqsSize
lastOffset = i + 1
}
}
return batchReqs
}
// Upload uploads blobs stored in Store to instance of cas service.
func (c CAS) Upload(ctx context.Context, instance string, sema chan struct{}, blobs ...*rpb.Digest) error {
span := trace.FromContext(ctx)
logger := log.FromContext(ctx)
logger.Infof("upload blobs %v", blobs)
// up to max_batch_total_size_bytes, use BatchUpdateBlobs.
// more than this, use bytestream.Write.
byteLimit := int64(DefaultBatchByteLimit)
if c.CacheCapabilities != nil && c.CacheCapabilities.MaxBatchTotalSizeBytes > 0 {
byteLimit = c.CacheCapabilities.MaxBatchTotalSizeBytes
}
smallBlobs, largeBlobs := separateBlobsByByteLimit(blobs, instance, byteLimit)
logger.Infof("upload by batch %d out of %d", len(smallBlobs), len(blobs))
blobReqs, missingBlobs := lookupBlobsInStore(ctx, smallBlobs, c.Store, sema)
missing := MissingError{
Blobs: missingBlobs,
}
batchReqs := createBatchUpdateBlobsRequests(blobReqs, instance, byteLimit)
for _, batchReq := range batchReqs {
uploaded := false
for !uploaded {
t := time.Now()
span.Annotatef(nil, "batch update %d blobs", len(batchReq.Requests))
// TODO: should we report rpc error as missing input too?
var batchResp *rpb.BatchUpdateBlobsResponse
err := rpc.Retry{}.Do(ctx, func() error {
var err error
batchResp, err = c.Client.CAS().BatchUpdateBlobs(ctx, batchReq)
return fixRBEInternalError(err)
})
if err != nil {
if grpc.Code(err) == codes.ResourceExhausted {
// gRPC returns ResourceExhausted if request message is larger than max.
logger.Warnf("upload by batch %d blobs: %v", len(batchReq.Requests), err)
// try with bytestream.
// TODO: retry with fewer blobs?
continue
}
return grpc.Errorf(grpc.Code(err), "batch update blobs: %v", err)
}
for _, res := range batchResp.Responses {
st := status.FromProto(res.GetStatus())
if st.Code() != codes.OK {
span.Annotatef(nil, "batch update blob %v: %v", res.Digest, res.Status)
missing.Blobs = append(missing.Blobs, MissingBlob{
Digest: res.Digest,
Err: grpc.Errorf(st.Code(), "batch update blob: %v", res.Status),
})
}
}
uploaded = true
logger.Infof("upload by batch %d blobs (missing:%d) in %s", len(batchReq.Requests), len(missing.Blobs), time.Since(t))
}
}
logger.Infof("upload by streaming from %d out of %d", len(largeBlobs), len(blobs))
t := time.Now()
for _, blob := range largeBlobs {
data, ok := c.Store.Get(blob)
if !ok {
span.Annotatef(nil, "blob not found in cas: %v", blob)
missing.Blobs = append(missing.Blobs, MissingBlob{
Digest: blob,
Err: errBlobNotInReq,
})
continue
}
err := rpc.Retry{}.Do(ctx, func() error {
rd, err := data.Open(ctx)
if err != nil {
span.Annotatef(nil, "upload open %v: %v", blob, err)
return err
}
err = UploadDigest(ctx, c.Client.ByteStream(), instance, blob, rd)
if err != nil {
rd.Close()
return fixRBEInternalError(err)
}
rd.Close()
return nil
})
if err != nil {
logger.Errorf("upload streaming %s error: %v", blob, err)
missing.Blobs = append(missing.Blobs, MissingBlob{
Digest: blob,
Err: err,
})
continue
}
}
logger.Infof("uploaded by streaming %d (missing:%d) in %s", len(blobs), len(missing.Blobs), time.Since(t))
if len(missing.Blobs) > 0 {
return missing
}
return nil
}
func fixRBEInternalError(err error) error {
if status.Code(err) == codes.Internal {
return status.Errorf(codes.Unavailable, "%v", err)
}
return err
}