blob: a502f328ffe29f0e88633e227dd7c8df90cbc2d8 [file] [log] [blame]
// Copyright 2018 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package remoteexec
import (
"context"
"io"
"path"
"github.com/google/uuid"
"go.opencensus.io/trace"
pb "google.golang.org/genproto/googleapis/bytestream"
)
// ByteStream is a proxy that reads/writes data to/from a server, as defined in
// googleapis/bytestream. In the context of the exec server, it accesses a resource on the
// RBE server, specifically a resource under an RBE instance.
type ByteStream struct {
// Adapter provides an interface to the RBE API.
Adapter *Adapter
// The name of the RBE instance, e.g. "projects/$PROJECT/instances/default_instance"
InstanceName string
}
// Read proxies bytestream Read stream.
func (bs *ByteStream) Read(req *pb.ReadRequest, s pb.ByteStream_ReadServer) error {
ctx := s.Context()
ctx, span := trace.StartSpan(ctx, "go.chromium.org/goma/server/remoteexec.Adapter.Read")
defer span.End()
ctx = bs.Adapter.outgoingContext(ctx, nil)
rd, err := bs.Adapter.client(ctx).ByteStream().Read(ctx, &pb.ReadRequest{
ResourceName: path.Join(bs.InstanceName, req.ResourceName),
ReadOffset: req.ReadOffset,
ReadLimit: req.ReadLimit,
})
if err != nil {
return err
}
for {
resp, err := rd.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
err = s.Send(resp)
if err != nil {
return err
}
}
return nil
}
// Write proxies bytestream Write stream.
func (bs *ByteStream) Write(s pb.ByteStream_WriteServer) error {
ctx := s.Context()
ctx, span := trace.StartSpan(ctx, "go.chromium.org/goma/server/remoteexec.Adapter.Write")
defer span.End()
ctx = bs.Adapter.outgoingContext(ctx, nil)
uuid := uuid.New()
wr, err := bs.Adapter.client(ctx).ByteStream().Write(ctx)
if err != nil {
return err
}
for {
req, err := s.Recv()
if err == io.EOF {
break
}
if err != nil {
wr.CloseAndRecv()
return nil
}
err = wr.Send(&pb.WriteRequest{
ResourceName: path.Join(bs.InstanceName, "uploads", uuid.String(), req.ResourceName),
WriteOffset: req.WriteOffset,
FinishWrite: req.FinishWrite,
Data: req.Data,
})
if err != nil {
wr.CloseAndRecv()
return err
}
}
resp, err := wr.CloseAndRecv()
if err != nil {
return err
}
return s.SendAndClose(resp)
}
// Write proxies bytestream QueryWriteStatus call.
func (bs *ByteStream) QueryWriteStatus(ctx context.Context, req *pb.QueryWriteStatusRequest) (*pb.QueryWriteStatusResponse, error) {
ctx, span := trace.StartSpan(ctx, "go.chromium.org/goma/server/remoteexec.Adapter.QueryWriteStatus")
defer span.End()
ctx = bs.Adapter.outgoingContext(ctx, nil)
return bs.Adapter.client(ctx).ByteStream().QueryWriteStatus(ctx, &pb.QueryWriteStatusRequest{
ResourceName: path.Join(bs.InstanceName, req.ResourceName),
})
}