blob: 65a44e83cb3a8cf3f57eb9d133d527b6713f9b9b [file] [log] [blame]
// Copyright 2018 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package remoteexec
import (
"context"
"time"
rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
bpb "google.golang.org/genproto/googleapis/bytestream"
lpb "google.golang.org/genproto/googleapis/longrunning"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/goma/server/log"
"go.chromium.org/goma/server/rpc"
)
// Client is a remoteexec API client to ClientConn.
// CallOptions will be added when calling RPC.
//
// prcred, _ := oauth.NewApplicationDefault(ctx,
// "https://www.googleapis.com/auth/cloud-build-service")
// conn, _ := grpc.DialContext(ctx, target,
// grpc.WithPerRPCCredentials(prcred),
// grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
// client := &remoteexec.Client{conn}
type Client struct {
*grpc.ClientConn
CallOptions []grpc.CallOption
Retry rpc.Retry
}
func (c Client) callOptions(opts ...grpc.CallOption) []grpc.CallOption {
return append(append([]grpc.CallOption(nil), opts...), c.CallOptions...)
}
// Cache returns action cache client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L117
func (c Client) Cache() rpb.ActionCacheClient {
return c
}
// GetActionResult retrieves a cached execution result.
func (c Client) GetActionResult(ctx context.Context, req *rpb.GetActionResultRequest, opts ...grpc.CallOption) (*rpb.ActionResult, error) {
return rpb.NewActionCacheClient(c.ClientConn).GetActionResult(ctx, req, c.callOptions(opts...)...)
}
// UpdateActionResult uploads a new execution result.
func (c Client) UpdateActionResult(ctx context.Context, req *rpb.UpdateActionResultRequest, opts ...grpc.CallOption) (*rpb.ActionResult, error) {
return rpb.NewActionCacheClient(c.ClientConn).UpdateActionResult(ctx, req, c.callOptions(opts...)...)
}
// Exec returns execution client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L34
func (c Client) Exec() rpb.ExecutionClient {
return c
}
// Execute executes an action remotely.
func (c Client) Execute(ctx context.Context, req *rpb.ExecuteRequest, opts ...grpc.CallOption) (rpb.Execution_ExecuteClient, error) {
return rpb.NewExecutionClient(c.ClientConn).Execute(ctx, req, c.callOptions(opts...)...)
}
// WaitExecution waits for an execution operation to complete.
func (c Client) WaitExecution(ctx context.Context, req *rpb.WaitExecutionRequest, opts ...grpc.CallOption) (rpb.Execution_WaitExecutionClient, error) {
return rpb.NewExecutionClient(c.ClientConn).WaitExecution(ctx, req, c.callOptions(opts...)...)
}
// CAS returns content addressable storage client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L168
func (c Client) CAS() rpb.ContentAddressableStorageClient {
return c
}
// FindMissingBlobs determines if blobs are present in the CAS.
func (c Client) FindMissingBlobs(ctx context.Context, req *rpb.FindMissingBlobsRequest, opts ...grpc.CallOption) (*rpb.FindMissingBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).FindMissingBlobs(ctx, req, c.callOptions(opts...)...)
}
// BatchUpdateBlobs uploads many blobs at once.
func (c Client) BatchUpdateBlobs(ctx context.Context, req *rpb.BatchUpdateBlobsRequest, opts ...grpc.CallOption) (*rpb.BatchUpdateBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).BatchUpdateBlobs(ctx, req, c.callOptions(opts...)...)
}
// BatchReadBlobs downloads many blobs at once.
func (c Client) BatchReadBlobs(ctx context.Context, req *rpb.BatchReadBlobsRequest, opts ...grpc.CallOption) (*rpb.BatchReadBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).BatchReadBlobs(ctx, req, c.callOptions(opts...)...)
}
// GetTree fetches the entire directory tree rooted at a node.
func (c Client) GetTree(ctx context.Context, req *rpb.GetTreeRequest, opts ...grpc.CallOption) (rpb.ContentAddressableStorage_GetTreeClient, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).GetTree(ctx, req, c.callOptions(opts...)...)
}
// ByteStream returns byte stream client.
// https://godoc.org/google.golang.org/genproto/googleapis/bytestream#ByteStreamClient
func (c Client) ByteStream() bpb.ByteStreamClient {
return c
}
// Read is used to retrieve the contents of a resource as a sequence of bytes.
func (c Client) Read(ctx context.Context, in *bpb.ReadRequest, opts ...grpc.CallOption) (bpb.ByteStream_ReadClient, error) {
return bpb.NewByteStreamClient(c.ClientConn).Read(ctx, in, c.callOptions(opts...)...)
}
// Write is used to send the contents of a resource as a sequence of bytes.
func (c Client) Write(ctx context.Context, opts ...grpc.CallOption) (bpb.ByteStream_WriteClient, error) {
return bpb.NewByteStreamClient(c.ClientConn).Write(ctx, c.callOptions(opts...)...)
}
// QueryWriteStatus is used to find the committed_size for a resource
// that is being written, which can be then be used as the write_offset
// for the next Write call.
func (c Client) QueryWriteStatus(ctx context.Context, in *bpb.QueryWriteStatusRequest, opts ...grpc.CallOption) (*bpb.QueryWriteStatusResponse, error) {
return bpb.NewByteStreamClient(c.ClientConn).QueryWriteStatus(ctx, in, c.callOptions(opts...)...)
}
// Capabilities returns capabilities client.
func (c Client) Capabilities() rpb.CapabilitiesClient {
return c
}
// GetCapabilities returns the server capabilities configuration.
func (c Client) GetCapabilities(ctx context.Context, req *rpb.GetCapabilitiesRequest, opts ...grpc.CallOption) (*rpb.ServerCapabilities, error) {
return rpb.NewCapabilitiesClient(c.ClientConn).GetCapabilities(ctx, req, c.callOptions(opts...)...)
}
func logOpMetadata(logger log.Logger, op *lpb.Operation) {
if op.GetMetadata() == nil {
logger.Infof("operation update: no metadata")
return
}
md := &rpb.ExecuteOperationMetadata{}
err := ptypes.UnmarshalAny(op.GetMetadata(), md)
if err != nil {
logger.Warnf("operation update: %s: metadata bad type %T: %v", op.GetName(), op.GetMetadata(), err)
return
}
logger.Infof("operation update: %s: %v", op.GetName(), md)
}
// ExecuteAndWait executes and action remotely and wait its response.
// it returns operation name, response and error.
func ExecuteAndWait(ctx context.Context, c Client, req *rpb.ExecuteRequest, opts ...grpc.CallOption) (string, *rpb.ExecuteResponse, error) {
logger := log.FromContext(ctx)
logger.Infof("execute action")
var opName string
var waitReq *rpb.WaitExecutionRequest
resp := &rpb.ExecuteResponse{}
type responseStream interface {
Recv() (*lpb.Operation, error)
}
pctx := ctx
err := c.Retry.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(pctx, 1*time.Minute)
defer cancel()
var stream responseStream
var err error
if waitReq != nil {
stream, err = c.Exec().WaitExecution(ctx, waitReq, opts...)
} else {
recordRemoteExecStart(ctx)
stream, err = c.Exec().Execute(ctx, req, opts...)
}
if err != nil {
return grpc.Errorf(grpc.Code(err), "execute: %v", err)
}
for {
op, err := stream.Recv()
if err != nil {
// if not found, retry from execute
// otherwise, rerun from WaitExecution.
if status.Code(err) == codes.NotFound {
waitReq = nil
recordRemoteExecFinish(ctx)
return status.Errorf(codes.Unavailable, "operation stream lost: %v", err)
}
// record error details if any
// https://github.com/bazelbuild/remote-apis/blob/1aeb399731780e61dece542dc656da1775a20840/build/bazel/remote/execution/v2/remote_execution.proto#L102
st, ok := status.FromError(err)
if ok {
for _, dt := range st.Details() {
logger.Warnf("error details for %v: %v", err, dt)
}
}
return err
}
if opName == "" {
opName = op.GetName()
logger.Infof("operation starts: %s", opName)
}
if !op.GetDone() {
logOpMetadata(logger, op)
waitReq = &rpb.WaitExecutionRequest{
Name: opName,
}
continue
}
waitReq = nil
err = ptypes.UnmarshalAny(op.GetResponse(), resp)
if err != nil {
err = status.Errorf(codes.Internal, "op %s response bad type %T: %v", op.GetName(), op.GetResponse(), err)
logger.Errorf("%s", err)
return err
}
return erespErr(pctx, resp)
}
})
recordRemoteExecFinish(ctx)
if err == nil {
err = status.FromProto(resp.GetStatus()).Err()
}
return opName, resp, err
}
// erespErr returns codes.Unavailable if it has retriable failure result.
// returns nil otherwise (to terminates retrying, even if eresp contains
// error status).
func erespErr(ctx context.Context, eresp *rpb.ExecuteResponse) error {
logger := log.FromContext(ctx)
st := eresp.GetStatus()
// record details if any
// when OK, it has empty google.devtools.remotebuildbot.ComamndStatus
// {code=0, message=""}.
if codes.Code(st.GetCode()) != codes.OK && len(st.GetDetails()) > 0 {
logger.Warnf("error details for %v: %v", codes.Code(st.GetCode()), st.GetDetails())
}
// https://github.com/bazelbuild/remote-apis/blob/e7282cf0f0e16e7ba84209be5417279e6815bee7/build/bazel/remote/execution/v2/remote_execution.proto#L83
// FAILED_PRECONDITION:
// one or more errors occured in setting up the action
// requested, such as a missing input or command or
// no worker being available. The client may be able to
// fix the errors and retry.
// exec_server doesn't fix request, so don't retry.
// UNAVAILABLE:
// Due to transient condition, such as all workers being
// occupied (and the server does not support a queue), the
// action could not be started. The client should retry.
// RESOURCE_EXHAUSTED:
// There is insufficient quota of some resource to run the action.
// INTERNAL
// An internal error occurred in the execution engine or the worker.
// could be handled as unavailable error.
//
// Other error would be non retriable.
switch codes.Code(st.GetCode()) {
case codes.OK:
case codes.ResourceExhausted, codes.FailedPrecondition:
logger.Warnf("execute response: status=%s", st)
return status.FromProto(st).Err()
case codes.Internal:
logger.Warnf("execute response: status=%s", st)
fallthrough
case codes.Unavailable:
st = proto.Clone(st).(*spb.Status)
// codes.Unavailable, so that rpc.Retry will retry.
st.Code = int32(codes.Unavailable)
return status.FromProto(st).Err()
case codes.Aborted:
if ctx.Err() == nil {
// ctx is not cancelled, but returned
// code = Aborted, context canceled
// in this case, it would be retriable.
logger.Warnf("execute reponse: aborted %s, but ctx is still active", st)
st = proto.Clone(st).(*spb.Status)
// codes.Unavailable, so that rpc.Retry will retry.
st.Code = int32(codes.Unavailable)
return status.FromProto(st).Err()
}
fallthrough
default:
logger.Errorf("execute response: error %s", st)
}
return nil
}
func fixRBEInternalError(err error) error {
if status.Code(err) == codes.Internal {
return status.Errorf(codes.Unavailable, "%v", err)
}
return err
}