| // Copyright 2018 The Goma Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package remoteexec |
| |
| import ( |
| "bufio" |
| "bytes" |
| "context" |
| "fmt" |
| "io" |
| "os" |
| "sync" |
| "time" |
| |
| rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" |
| |
| "go.opencensus.io/trace" |
| "golang.org/x/sync/errgroup" |
| bpb "google.golang.org/genproto/googleapis/bytestream" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/goma/server/file" |
| "go.chromium.org/goma/server/log" |
| gomapb "go.chromium.org/goma/server/proto/api" |
| fpb "go.chromium.org/goma/server/proto/file" |
| "go.chromium.org/goma/server/remoteexec/cas" |
| "go.chromium.org/goma/server/remoteexec/datasource" |
| "go.chromium.org/goma/server/remoteexec/digest" |
| "go.chromium.org/goma/server/rpc" |
| ) |
| |
| // gomaOutput handles goma output files. |
| type gomaOutput struct { |
| gomaResp *gomapb.ExecResp |
| bs bpb.ByteStreamClient |
| instance string |
| gomaFile fpb.FileServiceClient |
| } |
| |
| func outputTimeout(size int64) time.Duration { |
| // assume at least 4MB/s |
| t := time.Duration(int64((float64(size) / (4 * 1024 * 1024)) * 1e9)) |
| if t < 3*time.Second { |
| return 3 * time.Second |
| } |
| return t |
| } |
| |
| func retryCAS(ctx context.Context, timeout time.Duration, f func(ctx context.Context) error) error { |
| n := 0 |
| return rpc.Retry{}.Do(ctx, func() error { |
| n++ |
| timeout *= time.Duration(n) |
| ctx, cancel := context.WithTimeout(ctx, timeout) |
| defer cancel() |
| err := f(ctx) |
| return fixRBEInternalError(err) |
| }) |
| } |
| |
| func (g gomaOutput) stdoutData(ctx context.Context, eresp *rpb.ExecuteResponse) error { |
| if len(eresp.Result.StdoutRaw) > 0 { |
| g.gomaResp.Result.StdoutBuffer = eresp.Result.StdoutRaw |
| return nil |
| } |
| if eresp.Result.StdoutDigest == nil { |
| return nil |
| } |
| var buf bytes.Buffer |
| err := retryCAS(ctx, outputTimeout(eresp.Result.StdoutDigest.SizeBytes), func(ctx context.Context) error { |
| return cas.DownloadDigest(ctx, g.bs, &buf, g.instance, eresp.Result.StdoutDigest) |
| }) |
| if err != nil { |
| logger := log.FromContext(ctx) |
| logger.Errorf("failed to fetch stdout %v: %v", eresp.Result.StdoutDigest, err) |
| if status.Code(err) == codes.Unauthenticated { |
| return err |
| } |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, fmt.Sprintf("failed to fetch stdout %v: %s", eresp.Result.StdoutDigest, status.Code(err))) |
| return nil |
| } |
| g.gomaResp.Result.StdoutBuffer = buf.Bytes() |
| return nil |
| } |
| |
| func (g gomaOutput) stderrData(ctx context.Context, eresp *rpb.ExecuteResponse) error { |
| if len(eresp.Result.StderrRaw) > 0 { |
| g.gomaResp.Result.StderrBuffer = eresp.Result.StderrRaw |
| return nil |
| } |
| if eresp.Result.StderrDigest == nil { |
| return nil |
| } |
| var buf bytes.Buffer |
| err := retryCAS(ctx, outputTimeout(eresp.Result.StderrDigest.SizeBytes), func(ctx context.Context) error { |
| return cas.DownloadDigest(ctx, g.bs, &buf, g.instance, eresp.Result.StderrDigest) |
| }) |
| if err != nil { |
| logger := log.FromContext(ctx) |
| logger.Errorf("failed to fetch stderr %v: %v", eresp.Result.StdoutDigest, err) |
| if status.Code(err) == codes.Unauthenticated { |
| return err |
| } |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, fmt.Sprintf("failed to fetch stderr %v: %s", eresp.Result.StderrDigest, status.Code(err))) |
| return nil |
| } |
| g.gomaResp.Result.StderrBuffer = buf.Bytes() |
| return nil |
| } |
| |
| func (g gomaOutput) outputFileHelper(ctx context.Context, fname string, output *rpb.OutputFile) (*gomapb.ExecResult_Output, error) { |
| var blob *gomapb.FileBlob |
| err := retryCAS(ctx, outputTimeout(output.GetDigest().GetSizeBytes()), func(ctx context.Context) error { |
| var err error |
| blob, err = g.toFileBlob(ctx, output) |
| return err |
| }) |
| if err != nil { |
| logger := log.FromContext(ctx) |
| switch status.Code(err) { |
| case codes.Unavailable, codes.Canceled, codes.Aborted: |
| logger.Warnf("goma blob for %s: %v", output.Path, err) |
| default: |
| logger.Errorf("goma blob for %s: %v", output.Path, err) |
| } |
| return nil, status.Errorf(status.Code(err), "goma blob for %s: %v", output.Path, status.Code(err)) |
| } |
| return &gomapb.ExecResult_Output{ |
| Filename: proto.String(fname), |
| Blob: blob, |
| IsExecutable: proto.Bool(output.IsExecutable), |
| }, nil |
| } |
| |
| func (g gomaOutput) outputFile(ctx context.Context, fname string, output *rpb.OutputFile) error { |
| result, err := g.outputFileHelper(ctx, fname, output) |
| if err != nil { |
| if status.Code(err) == codes.Unauthenticated { |
| return err |
| } |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, err.Error()) |
| return nil |
| } |
| g.gomaResp.Result.Output = append(g.gomaResp.Result.Output, result) |
| return nil |
| } |
| |
| func (g gomaOutput) outputFilesConcurrent(ctx context.Context, outputs []*rpb.OutputFile, sema chan struct{}) error { |
| var wg sync.WaitGroup |
| results := make([]*gomapb.ExecResult_Output, len(outputs)) |
| errs := make([]error, len(outputs)) |
| |
| for i := range outputs { |
| wg.Add(1) |
| go func(i int) { |
| defer wg.Done() |
| sema <- struct{}{} |
| defer func() { |
| <-sema |
| }() |
| |
| output := outputs[i] |
| results[i], errs[i] = g.outputFileHelper(ctx, output.Path, output) |
| }(i) |
| } |
| wg.Wait() |
| |
| for _, result := range results { |
| if result != nil { |
| g.gomaResp.Result.Output = append(g.gomaResp.Result.Output, result) |
| } |
| } |
| var rerr error |
| for _, err := range errs { |
| if err != nil { |
| if status.Code(err) == codes.Unauthenticated { |
| rerr = err |
| } |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, err.Error()) |
| } |
| } |
| return rerr |
| } |
| |
| func toChunkedFileBlob(ctx context.Context, rd io.Reader, size int64, fs fpb.FileServiceClient) (*gomapb.FileBlob, error) { |
| const bufsize = file.LargeFileThreshold |
| in := bufio.NewReaderSize(rd, bufsize) |
| blob := &gomapb.FileBlob{ |
| BlobType: gomapb.FileBlob_FILE_META.Enum(), |
| FileSize: proto.Int64(size), |
| } |
| buf := make([]byte, bufsize) |
| var offset int64 |
| eof := false |
| for offset < size && !eof { |
| remain := size - offset |
| if remain < bufsize { |
| buf = buf[:remain] |
| } |
| n, err := io.ReadFull(in, buf) |
| if err != nil && err != io.EOF { |
| return nil, err |
| } |
| eof = err == io.EOF |
| var resp *gomapb.StoreFileResp |
| err = rpc.Retry{}.Do(ctx, func() error { |
| resp, err = fs.StoreFile(ctx, &gomapb.StoreFileReq{ |
| Blob: []*gomapb.FileBlob{ |
| { |
| BlobType: gomapb.FileBlob_FILE_CHUNK.Enum(), |
| Offset: proto.Int64(offset), |
| Content: buf[:n], |
| FileSize: proto.Int64(size), |
| }, |
| }, |
| RequesterInfo: requesterInfo(ctx), |
| }) |
| return err |
| }) |
| if err != nil { |
| return nil, status.Errorf(status.Code(err), "store blob failed offset=%d: %v", offset, err) |
| } |
| for _, hashKey := range resp.HashKey { |
| if hashKey == "" { |
| return nil, fmt.Errorf("store blob failed offset=%d", offset) |
| } |
| blob.HashKey = append(blob.HashKey, hashKey) |
| } |
| offset += int64(n) |
| } |
| if size != offset { |
| return nil, fmt.Errorf("size mismatch: digest size=%d, store size=%d", size, offset) |
| } |
| // EOF is only returned when there is no more input available at the beginning of a read, |
| // not at the end of the final non-empty read. |
| n, err := in.Read(make([]byte, 1)) |
| if n != 0 { |
| return nil, fmt.Errorf("more bytes were read past end: %d", n) |
| } |
| if err != io.EOF { |
| return nil, status.Errorf(status.Code(err), "could not confirm EOF: %v", err) |
| } |
| return blob, nil |
| } |
| |
| func (g gomaOutput) toFileBlob(ctx context.Context, output *rpb.OutputFile) (*gomapb.FileBlob, error) { |
| ctx, span := trace.StartSpan(ctx, "go.chromium.org/goma/server/remoteexec.gomaOutput.toFileBlob") |
| defer span.End() |
| |
| logger := log.FromContext(ctx) |
| |
| if output.Digest.SizeBytes <= file.LargeFileThreshold { |
| // for single FileBlob. |
| var buf bytes.Buffer |
| err := cas.DownloadDigest(ctx, g.bs, &buf, g.instance, output.Digest) |
| if err != nil { |
| return nil, err |
| } |
| return &gomapb.FileBlob{ |
| BlobType: gomapb.FileBlob_FILE.Enum(), |
| Content: buf.Bytes(), |
| FileSize: proto.Int64(output.Digest.SizeBytes), |
| }, nil |
| } |
| |
| casErrCh := make(chan error, 1) |
| rd, wr, err := os.Pipe() |
| if err != nil { |
| return nil, err |
| } |
| defer rd.Close() |
| go func() { |
| err := cas.DownloadDigest(ctx, g.bs, wr, g.instance, output.Digest) |
| if err != nil { |
| switch status.Code(err) { |
| case codes.Unavailable, codes.Canceled, codes.Aborted: |
| logger.Warnf("cas download error %s: %v", output.Digest, err) |
| default: |
| logger.Errorf("cas download error %s: %v", output.Digest, err) |
| } |
| } |
| wr.Close() |
| casErrCh <- err |
| }() |
| |
| blob, err := toChunkedFileBlob(ctx, rd, output.Digest.SizeBytes, g.gomaFile) |
| // prefer cas err for Unauthenticated error. |
| // http://b/181914314 |
| if casErr := <-casErrCh; casErr != nil { |
| return nil, casErr |
| } |
| if err != nil { |
| return nil, status.Errorf(status.Code(err), "failed to convert output:{%v} to chunked FileBlob: %v", output, err) |
| } |
| return blob, nil |
| } |
| |
| func traverseTree(ctx context.Context, filepath clientFilePath, dname string, dir *rpb.Directory, ds *digest.Store) []*rpb.OutputFile { |
| logger := log.FromContext(ctx) |
| var result []*rpb.OutputFile |
| for _, f := range dir.Files { |
| fname := filepath.Join(dname, f.Name) |
| result = append(result, &rpb.OutputFile{ |
| Path: fname, |
| Digest: f.Digest, |
| IsExecutable: f.IsExecutable, |
| }) |
| } |
| for _, d := range dir.Directories { |
| subdname := filepath.Join(dname, d.Name) |
| db, found := ds.Get(d.Digest) |
| if !found { |
| logger.Errorf("no dir for %s %s", subdname, d.Digest) |
| continue |
| } |
| subdir := &rpb.Directory{} |
| err := datasource.ReadProto(ctx, db, subdir) |
| if err != nil { |
| logger.Errorf("bad dir proto for %s %s: %v", subdname, d.Digest, err) |
| continue |
| } |
| result = append(result, traverseTree(ctx, filepath, subdname, subdir, ds)...) |
| } |
| if len(dir.Symlinks) > 0 { |
| logger.Errorf("symlinks exists in output dir %s: %q", dname, dir.Symlinks) |
| } |
| return result |
| } |
| |
| func (g gomaOutput) outputDirectory(ctx context.Context, filepath clientFilePath, dname string, output *rpb.OutputDirectory, sema chan struct{}) error { |
| logger := log.FromContext(ctx) |
| if output.TreeDigest == nil { |
| logger.Warnf("no tree digest in %s", dname) |
| return nil |
| } |
| var buf bytes.Buffer |
| err := retryCAS(ctx, outputTimeout(output.TreeDigest.SizeBytes), func(ctx context.Context) error { |
| return cas.DownloadDigest(ctx, g.bs, &buf, g.instance, output.TreeDigest) |
| }) |
| if err != nil { |
| logger.Errorf("failed to download tree %s: %v", dname, err) |
| if status.Code(err) == codes.Unauthenticated { |
| return err |
| } |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, fmt.Sprintf("failed to download tree %s: %s", dname, status.Code(err))) |
| return nil |
| } |
| tree := &rpb.Tree{} |
| err = proto.Unmarshal(buf.Bytes(), tree) |
| if err != nil { |
| logger.Errorf("failed to unmarshal tree data %s: %v", dname, err) |
| g.gomaResp.ErrorMessage = append(g.gomaResp.ErrorMessage, fmt.Sprintf("failed to unmarshal tree data %s: %v", dname, err)) |
| return nil |
| } |
| |
| ds := digest.NewStore() |
| for _, c := range tree.Children { |
| d, err := digest.Proto(c) |
| if err != nil { |
| logger.Errorf("digest for children %s: %v", c, err) |
| continue |
| } |
| ds.Set(d) |
| } |
| outputFiles := traverseTree(ctx, filepath, dname, tree.Root, ds) |
| return g.outputFilesConcurrent(ctx, outputFiles, sema) |
| } |
| |
| // reduceRespSize attempts to reduce the encoded size of `g.gomaResp` to under `byteLimit`. |
| // It replaces all file blobs with blob_type=FILE (embedded data) with blob_type=FILE_REF |
| // (`content` in FileServer, `blob_type`=FILE). With each replaced blob, the response loses |
| // `sizeof(content)` bytes and gains `sizeof(hash_key)` bytes. This will result in a net |
| // reduction in encoded size as long as `sizeof(content)` > `sizeof(hash_key)`. |
| // |
| // See description of FileBlob for more information: |
| // https://chromium.googlesource.com/infra/goma/client/+/bd9711495c9357eead845f0ae2d4eef92494c6d5/lib/goma_data.proto#17 |
| func (g gomaOutput) reduceRespSize(ctx context.Context, byteLimit int, sema chan struct{}) error { |
| origSize := proto.Size(g.gomaResp) |
| if origSize <= byteLimit { |
| return nil |
| } |
| |
| toStoredFileBlob := func(ctx context.Context, input []byte, fs fpb.FileServiceClient) (*gomapb.FileBlob, error) { |
| size := int64(len(input)) |
| blob := &gomapb.FileBlob{ |
| BlobType: gomapb.FileBlob_FILE_REF.Enum(), |
| FileSize: proto.Int64(size), |
| } |
| var resp *gomapb.StoreFileResp |
| var err error |
| err = rpc.Retry{}.Do(ctx, func() error { |
| blob := &gomapb.FileBlob{ |
| BlobType: gomapb.FileBlob_FILE.Enum(), |
| Content: input, |
| FileSize: proto.Int64(size), |
| } |
| resp, err = fs.StoreFile(ctx, &gomapb.StoreFileReq{ |
| Blob: []*gomapb.FileBlob{blob}, |
| RequesterInfo: requesterInfo(ctx), |
| }) |
| return err |
| }) |
| if err != nil { |
| return nil, status.Errorf(status.Code(err), "store blob failed: %v", err) |
| } |
| if len(resp.HashKey) != 1 { |
| return nil, fmt.Errorf("store blob got len(resp.HashKey)=%d, want=1", len(resp.HashKey)) |
| } |
| if resp.HashKey[0] == "" { |
| return nil, fmt.Errorf("store blob failed with empty hash key") |
| } |
| blob.HashKey = resp.HashKey |
| return blob, nil |
| } |
| |
| output := g.gomaResp.Result.Output |
| eg, ctx := errgroup.WithContext(ctx) |
| // For simplicity, store all blobs in FileServer rather than worrying about which ones to |
| // store. |
| // TODO: We can optimize this later. |
| for _, out := range output { |
| out := out |
| blob := out.Blob |
| if blob.GetBlobType() != gomapb.FileBlob_FILE { |
| continue |
| } |
| eg.Go(func() error { |
| sema <- struct{}{} |
| defer func() { |
| <-sema |
| }() |
| |
| newBlob, err := toStoredFileBlob(ctx, blob.Content, g.gomaFile) |
| if err != nil { |
| return err |
| } |
| out.Blob = newBlob |
| return nil |
| }) |
| } |
| |
| if err := eg.Wait(); err != nil { |
| return err |
| } |
| |
| // The result could still be too big if there are many FILE_REF blobs. |
| newSize := proto.Size(g.gomaResp) |
| if newSize > byteLimit { |
| return fmt.Errorf("new resp size: got=%d, want<=%d", newSize, byteLimit) |
| } |
| return nil |
| } |