blob: 3d58fbfd78ff9d97762ba697b97dfe433f37e741 [file] [log] [blame]
// Copyright 2018 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package remoteexec
import (
"context"
"time"
rpb "go.chromium.org/goma/server/proto/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
bpb "google.golang.org/genproto/googleapis/bytestream"
lpb "google.golang.org/genproto/googleapis/longrunning"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/goma/server/log"
"go.chromium.org/goma/server/rpc"
)
var (
numRunningOperations = stats.Int64(
"go.chromium.org/goma/server/remoteexec.running-operations",
"Number of current running exec operations",
stats.UnitDimensionless)
DefaultViews = []*view.View{
&view.View{
Description: `Number of current running exec operations`,
Measure: numRunningOperations,
Aggregation: view.Sum(),
},
}
)
// Client is a remoteexec API client to ClientConn.
// CallOptions will be added when calling RPC.
//
// prcred, _ := oauth.NewApplicationDefault(ctx,
// "https://www.googleapis.com/auth/cloud-build-service")
// conn, _ := grpc.DialContext(ctx, target,
// grpc.WithPerRPCCredentials(prcred),
// grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
// client := &remoteexec.Client{conn}
type Client struct {
*grpc.ClientConn
CallOptions []grpc.CallOption
}
func (c Client) callOptions(opts ...grpc.CallOption) []grpc.CallOption {
return append(append([]grpc.CallOption(nil), opts...), c.CallOptions...)
}
// Cache returns action cache client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L117
func (c Client) Cache() rpb.ActionCacheClient {
return c
}
// GetActionResult retrieves a cached execution result.
func (c Client) GetActionResult(ctx context.Context, req *rpb.GetActionResultRequest, opts ...grpc.CallOption) (*rpb.ActionResult, error) {
return rpb.NewActionCacheClient(c.ClientConn).GetActionResult(ctx, req, c.callOptions(opts...)...)
}
// UpdateActionResult uploads a new execution result.
func (c Client) UpdateActionResult(ctx context.Context, req *rpb.UpdateActionResultRequest, opts ...grpc.CallOption) (*rpb.ActionResult, error) {
return rpb.NewActionCacheClient(c.ClientConn).UpdateActionResult(ctx, req, c.callOptions(opts...)...)
}
// Exec returns execution client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L34
func (c Client) Exec() rpb.ExecutionClient {
return c
}
// Execute executes an action remotely.
func (c Client) Execute(ctx context.Context, req *rpb.ExecuteRequest, opts ...grpc.CallOption) (rpb.Execution_ExecuteClient, error) {
return rpb.NewExecutionClient(c.ClientConn).Execute(ctx, req, c.callOptions(opts...)...)
}
// WaitExecution waits for an execution operation to complete.
func (c Client) WaitExecution(ctx context.Context, req *rpb.WaitExecutionRequest, opts ...grpc.CallOption) (rpb.Execution_WaitExecutionClient, error) {
return rpb.NewExecutionClient(c.ClientConn).WaitExecution(ctx, req, c.callOptions(opts...)...)
}
// CAS returns content addressable storage client.
// https://github.com/bazelbuild/remote-apis/blob/c1c1ad2c97ed18943adb55f06657440daa60d833/build/bazel/remote/execution/v2/remote_execution.proto#L168
func (c Client) CAS() rpb.ContentAddressableStorageClient {
return c
}
// FindMissingBlobs determines if blobs are present in the CAS.
func (c Client) FindMissingBlobs(ctx context.Context, req *rpb.FindMissingBlobsRequest, opts ...grpc.CallOption) (*rpb.FindMissingBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).FindMissingBlobs(ctx, req, c.callOptions(opts...)...)
}
// BatchUpdateBlobs uploads many blobs at once.
func (c Client) BatchUpdateBlobs(ctx context.Context, req *rpb.BatchUpdateBlobsRequest, opts ...grpc.CallOption) (*rpb.BatchUpdateBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).BatchUpdateBlobs(ctx, req, c.callOptions(opts...)...)
}
// BatchReadBlobs downloads many blobs at once.
func (c Client) BatchReadBlobs(ctx context.Context, req *rpb.BatchReadBlobsRequest, opts ...grpc.CallOption) (*rpb.BatchReadBlobsResponse, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).BatchReadBlobs(ctx, req, c.callOptions(opts...)...)
}
// GetTree fetches the entire directory tree rooted at a node.
func (c Client) GetTree(ctx context.Context, req *rpb.GetTreeRequest, opts ...grpc.CallOption) (rpb.ContentAddressableStorage_GetTreeClient, error) {
return rpb.NewContentAddressableStorageClient(c.ClientConn).GetTree(ctx, req, c.callOptions(opts...)...)
}
// ByteStream returns byte stream client.
// https://godoc.org/google.golang.org/genproto/googleapis/bytestream#ByteStreamClient
func (c Client) ByteStream() bpb.ByteStreamClient {
return c
}
// Read is used to retrieve the contents of a resource as a sequence of bytes.
func (c Client) Read(ctx context.Context, in *bpb.ReadRequest, opts ...grpc.CallOption) (bpb.ByteStream_ReadClient, error) {
return bpb.NewByteStreamClient(c.ClientConn).Read(ctx, in, c.callOptions(opts...)...)
}
// Write is used to send the contents of a resource as a sequence of bytes.
func (c Client) Write(ctx context.Context, opts ...grpc.CallOption) (bpb.ByteStream_WriteClient, error) {
return bpb.NewByteStreamClient(c.ClientConn).Write(ctx, c.callOptions(opts...)...)
}
// QueryWriteStatus is used to find the committed_size for a resource
// that is being written, which can be then be used as the write_offset
// for the next Write call.
func (c Client) QueryWriteStatus(ctx context.Context, in *bpb.QueryWriteStatusRequest, opts ...grpc.CallOption) (*bpb.QueryWriteStatusResponse, error) {
return bpb.NewByteStreamClient(c.ClientConn).QueryWriteStatus(ctx, in, c.callOptions(opts...)...)
}
// Capabilities returns capabilities client.
func (c Client) Capabilities() rpb.CapabilitiesClient {
return c
}
// GetCapabilities returns the server capabilities configuration.
func (c Client) GetCapabilities(ctx context.Context, req *rpb.GetCapabilitiesRequest, opts ...grpc.CallOption) (*rpb.ServerCapabilities, error) {
return rpb.NewCapabilitiesClient(c.ClientConn).GetCapabilities(ctx, req, c.callOptions(opts...)...)
}
// ExecuteAndWait executes and action remotely and wait its response.
// it returns operation name, response and error.
func ExecuteAndWait(ctx context.Context, c Client, req *rpb.ExecuteRequest, opts ...grpc.CallOption) (string, *rpb.ExecuteResponse, error) {
logger := log.FromContext(ctx)
logger.Infof("execute action")
recordStart := func() {
stats.Record(ctx, numRunningOperations.M(1))
}
recordFinish := func() {
stats.Record(ctx, numRunningOperations.M(-1))
}
var opName string
var waitReq *rpb.WaitExecutionRequest
resp := &rpb.ExecuteResponse{}
type responseStream interface {
Recv() (*lpb.Operation, error)
}
err := rpc.Retry{}.Do(ctx, func() error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
var stream responseStream
var err error
if waitReq != nil {
stream, err = c.Exec().WaitExecution(ctx, waitReq, opts...)
} else {
recordStart()
stream, err = c.Exec().Execute(ctx, req, opts...)
}
if err != nil {
return grpc.Errorf(grpc.Code(err), "execute: %v", err)
}
for {
op, err := stream.Recv()
if err != nil {
// if not found, retry from execute
// otherwise, rerun from WaitExecution.
if status.Code(err) == codes.NotFound {
waitReq = nil
recordFinish()
return status.Errorf(codes.Unavailable, "operation stream lost: %v", err)
}
return err
}
if opName == "" {
opName = op.GetName()
logger.Infof("operation starts: %s", opName)
}
if !op.GetDone() {
md := &rpb.ExecuteOperationMetadata{}
err = ptypes.UnmarshalAny(op.GetMetadata(), md)
if err != nil {
logger.Warnf("operation update: %s: metadata bad type %T: %v", op.GetName(), op.GetMetadata(), err)
} else {
logger.Infof("operation update: %s: %v", op.GetName(), md)
}
waitReq = &rpb.WaitExecutionRequest{
Name: opName,
}
continue
}
waitReq = nil
err = ptypes.UnmarshalAny(op.GetResponse(), resp)
if err != nil {
err = status.Errorf(codes.Internal, "op %s response bad type %T: %v", op.GetName(), op.GetResponse(), err)
logger.Errorf("%s", err)
return err
}
return erespErr(resp)
}
})
recordFinish()
return opName, resp, err
}
// erespErr returns codes.Unavailable if it has retriable failure result.
// returns nil otherwise (to terminates retrying, even if eresp contains
// error status).
func erespErr(eresp *rpb.ExecuteResponse) error {
st := eresp.GetStatus()
// see bazel remote retrier.
// https://github.com/bazelbuild/bazel/blob/master/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
// also see https://cloud.google.com/pubsub/docs/reference/error-codes
//
// we don't retry codes.Unauthenticated; end user's access token would
// be expired, so retry also get unauthenticated (we don't have
// refresh token).
//
// codes.DeadlineExceeded might fail soon with codes.DeadlineExceeded,
// but if server side sets shorter deadline and caller has more time
// in context, retry would succeed.
switch codes.Code(st.GetCode()) {
case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Internal, codes.Unavailable, codes.ResourceExhausted:
st = proto.Clone(st).(*spb.Status)
// codes.Unavailable, so that rpc.Retry will retry.
st.Code = int32(codes.Unavailable)
return status.FromProto(st).Err()
}
return nil
}