blob: 6a21944583788d19896c7170e5fba73da0c7334b [file] [log] [blame]
// Copyright 2018 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package remoteexec
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"testing"
"time"
rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"go.chromium.org/goma/server/cache"
"go.chromium.org/goma/server/file"
"go.chromium.org/goma/server/hash"
"go.chromium.org/goma/server/log"
gomapb "go.chromium.org/goma/server/proto/api"
cachepb "go.chromium.org/goma/server/proto/cache"
cpb "go.chromium.org/goma/server/proto/command"
fpb "go.chromium.org/goma/server/proto/file"
"go.chromium.org/goma/server/remoteexec/digest"
"go.chromium.org/goma/server/rpc/grpctest"
)
// fakeCluster represents fake goma cluster.
type fakeCluster struct {
rbe *fakeRBE
srv *grpc.Server
addr string
conn *grpc.ClientConn
stop func()
cache *cache.Cache
csrv *grpc.Server
cconn *grpc.ClientConn
cstop func()
gomafile *file.Service
fsrv *grpc.Server
fconn *grpc.ClientConn
fstop func()
cmdStorage fakeCmdStorage
redis fakeRedis
adapter Adapter
}
// setup sets up fake goma cluster with fake RBE instance.
func (f *fakeCluster) setup(ctx context.Context, instancePrefix string) error {
var err error
var defers []func()
defer func() {
for i := len(defers) - 1; i >= 0; i-- {
defers[i]()
}
}()
logger := log.FromContext(ctx)
// CAS BatchUpdateBlobs will accept at most max_batch_total_size_bytes.
// https://github.com/bazelbuild/remote-apis/blob/efd28d1832bd3ccddc3d2b29c341da1cce09c333/build/bazel/remote/execution/v2/remote_execution.proto#L1278
// TODO: set max msg size to match with max_batch_total_size_bytes
// in ServerCapabilities.CacheCapabitilies.
f.srv = grpc.NewServer()
registerFakeRBE(f.srv, f.rbe)
f.addr, f.stop, err = grpctest.StartServer(f.srv)
if err != nil {
return err
}
defers = append(defers, f.stop)
logger.Infof("f.conn = addr:%s", f.addr)
// TODO: make it secure and pass enduser credentials
f.conn, err = grpc.Dial(f.addr, grpc.WithInsecure())
if err != nil {
return err
}
defers = append(defers, func() { f.conn.Close() })
f.csrv = grpc.NewServer()
f.cache, err = cache.New(cache.Config{
MaxBytes: 1 * 1024 * 1024 * 1024,
})
if err != nil {
return err
}
cachepb.RegisterCacheServiceServer(f.csrv, f.cache)
var addr string
addr, f.cstop, err = grpctest.StartServer(f.csrv)
if err != nil {
return err
}
defers = append(defers, f.cstop)
logger.Infof("f.cconn = addr:%s", addr)
f.cconn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return err
}
defers = append(defers, func() { f.cconn.Close() })
f.fsrv = grpc.NewServer(grpc.MaxSendMsgSize(file.DefaultMaxMsgSize), grpc.MaxRecvMsgSize(file.DefaultMaxMsgSize))
f.gomafile = &file.Service{
Cache: cachepb.NewCacheServiceClient(f.cconn),
}
fpb.RegisterFileServiceServer(f.fsrv, f.gomafile)
addr, f.fstop, err = grpctest.StartServer(f.fsrv)
if err != nil {
return err
}
defers = append(defers, f.fstop)
logger.Infof("f.fconn = addr:%s", addr)
f.fconn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return err
}
defers = append(defers, func() { f.fconn.Close() })
f.adapter = Adapter{
InstancePrefix: instancePrefix,
ExecTimeout: 10 * time.Second,
Client: Client{ClientConn: f.conn},
GomaFile: fpb.NewFileServiceClient(f.fconn),
DigestCache: digest.NewCache(&f.redis, 1e6),
CmdStorage: &f.cmdStorage,
ToolDetails: &rpb.ToolDetails{},
FileLookupSema: make(chan struct{}, 2),
CASBlobLookupSema: make(chan struct{}, 2),
}
defers = nil
return nil
}
// teardown cleans up fake cluster.
func (f *fakeCluster) teardown() {
if f.fconn != nil {
f.fconn.Close()
}
if f.fstop != nil {
f.fstop()
}
if f.cconn != nil {
f.cconn.Close()
}
if f.cstop != nil {
f.cstop()
}
}
// fakeToolchain represents fake toolchain.
type fakeToolchain struct {
descs []*cpb.CmdDescriptor
RemoteexecPlatform *cpb.RemoteexecPlatform
}
// CommandSpec returns command spec for name and localPath.
func (f *fakeToolchain) CommandSpec(name, localPath string) *gomapb.CommandSpec {
for _, desc := range f.descs {
if desc.Selector.Name == name {
return &gomapb.CommandSpec{
Name: proto.String(desc.Selector.Name),
Version: proto.String(desc.Selector.Version),
Target: proto.String(desc.Selector.Target),
BinaryHash: []byte(desc.Selector.BinaryHash),
LocalCompilerPath: proto.String(localPath),
}
}
}
return &gomapb.CommandSpec{}
}
// newFakeClang creates new fake toolchain for clang version+target.
func newFakeClang(f *fakeCmdStorage, version, target string) *fakeToolchain {
clangFile := f.newFileSpec("bin/clang", true)
libFindBadConstructs := f.newFileSpec("lib/libFindBadConstructs.so", false)
return &fakeToolchain{
descs: []*cpb.CmdDescriptor{
{
Selector: &cpb.Selector{
Name: "clang",
Version: version,
Target: target,
BinaryHash: clangFile.Hash,
},
Setup: &cpb.CmdDescriptor_Setup{
CmdFile: clangFile,
PathType: cpb.CmdDescriptor_POSIX,
},
},
{
Selector: &cpb.Selector{
Name: "clang++",
Version: version,
Target: target,
BinaryHash: clangFile.Hash,
},
Setup: &cpb.CmdDescriptor_Setup{
CmdFile: clangFile,
PathType: cpb.CmdDescriptor_POSIX,
},
},
{
Selector: &cpb.Selector{
Name: "libFindBadConstructs.so",
BinaryHash: libFindBadConstructs.Hash,
},
Setup: &cpb.CmdDescriptor_Setup{
CmdFile: libFindBadConstructs,
PathType: cpb.CmdDescriptor_POSIX,
},
},
},
RemoteexecPlatform: &cpb.RemoteexecPlatform{
Properties: []*cpb.RemoteexecPlatform_Property{
{
Name: "container-image",
Value: "docker://grpc.io/goma-dev/container-image@sha256:xxxx",
},
},
},
}
}
// newFakeClangCL creates new fake toolchain for clang-cl.
func newFakeClangCL(f *fakeCmdStorage, version string) *fakeToolchain {
clangCLFile := f.newFileSpec("bin/clang-cl", true)
return &fakeToolchain{
descs: []*cpb.CmdDescriptor{
{
Selector: &cpb.Selector{
Name: "clang-cl",
Version: version,
Target: "x86_64-windows-msvc",
BinaryHash: clangCLFile.Hash,
},
Setup: &cpb.CmdDescriptor_Setup{
CmdFile: clangCLFile,
PathType: cpb.CmdDescriptor_POSIX,
},
Cross: &cpb.CmdDescriptor_Cross{
WindowsCross: true,
},
},
},
RemoteexecPlatform: &cpb.RemoteexecPlatform{
Properties: []*cpb.RemoteexecPlatform_Property{
{
Name: "container-image",
Value: "docker://grpc.io/goma-dev/container-image@sha256:yyy",
},
},
},
}
}
// pushToolchains push fake toolchain in fake cluster.
func (f *fakeCluster) pushToolchains(ctx context.Context, tc *fakeToolchain) error {
config := &cpb.ConfigResp{
VersionId: time.Now().String(),
}
for _, desc := range tc.descs {
config.Configs = append(config.Configs, &cpb.Config{
Target: &cpb.Target{
Addr: f.addr,
},
BuildInfo: &cpb.BuildInfo{},
CmdDescriptor: desc,
RemoteexecPlatform: tc.RemoteexecPlatform,
})
}
err := f.adapter.Inventory.Configure(ctx, config)
return err
}
// pushPlatform pushes a platform with dimensions.
func (f *fakeCluster) pushPlatform(ctx context.Context, containerImage string, dimensions []string) error {
config := &cpb.ConfigResp{
VersionId: time.Now().String(),
}
config.Configs = []*cpb.Config{
{
RemoteexecPlatform: &cpb.RemoteexecPlatform{
Properties: []*cpb.RemoteexecPlatform_Property{
{
Name: "container-image",
Value: containerImage,
},
},
},
Dimensions: dimensions,
},
}
err := f.adapter.Inventory.Configure(ctx, config)
return err
}
// fakeLocalFiles represents fake local files (in client side).
type fakeLocalFiles struct {
m map[string]string // path -> content
}
func randomSize() uint64 {
s := rand.Uint64() % (2 * 1024 * 1024)
if s == 0 {
s++
}
return s
}
func randomBigSize() uint64 {
return 2*1024*1024 + randomSize()
}
// Add adds new fake file named fname.
func (f *fakeLocalFiles) Add(fname string, size uint64) {
if f.m == nil {
f.m = make(map[string]string)
}
buf := make([]byte, size)
rand.Read(buf)
f.m[fname] = string(buf)
}
// Dup dups oldname as newname.
func (f *fakeLocalFiles) Dup(oldname, newname string) {
f.m[newname] = f.m[oldname]
}
// Open opens fake file.
func (f *fakeLocalFiles) Open(fname string) (io.Reader, error) {
data, ok := f.m[fname]
if !ok {
return nil, fmt.Errorf("%s not found", fname)
}
return strings.NewReader(data), nil
}
// mustFileHash returns SHA256 hash of file content.
func (f *fakeLocalFiles) mustFileHash(ctx context.Context, t *testing.T, fname string) string {
data, ok := f.m[fname]
if !ok {
t.Fatalf("%s not found", fname)
}
return hash.SHA256Content([]byte(data))
}
// mustDigest returns digest of file content.
func (f *fakeLocalFiles) mustDigest(ctx context.Context, t *testing.T, fname string) *rpb.Digest {
data, ok := f.m[fname]
if !ok {
t.Fatalf("%s not found", fname)
}
return &rpb.Digest{
Hash: hash.SHA256Content([]byte(data)),
SizeBytes: int64(len(data)),
}
}
// mustInput creates execreq input for fname (maybe relative) from fullpath.
// if fc is nil, content won't be uploaded and not set in input (i.e. hash only).
func (f *fakeLocalFiles) mustInput(ctx context.Context, t *testing.T, fc fpb.FileServiceClient, fullpath, fname string) *gomapb.ExecReq_Input {
data := f.m[fullpath]
input := &gomapb.ExecReq_Input{
Filename: proto.String(fname),
Content: &gomapb.FileBlob{
FileSize: proto.Int64(int64(len(data))),
},
}
err := file.FromReader(ctx, fc, strings.NewReader(data), input.Content)
if err != nil {
t.Fatalf("failed to read file %s: %v", fullpath, err)
}
hashKey, err := hash.SHA256Proto(input.Content)
if err != nil {
t.Fatalf("failed to compute sha256 %s: %v", fullpath, err)
}
input.HashKey = proto.String(hashKey)
if fc == nil {
input.Content = nil
}
return input
}
// fakeRedis represents cache client on fake memorystore.
type fakeRedis struct {
mu sync.Mutex
m map[string][]byte
}
func (f *fakeRedis) mustSet(ctx context.Context, t *testing.T, key string, d *rpb.Digest) {
f.mu.Lock()
defer f.mu.Unlock()
if f.m == nil {
f.m = make(map[string][]byte)
}
b, err := proto.Marshal(d)
if err != nil {
t.Fatal(err)
}
f.m[key] = b
}
func (f *fakeRedis) Get(ctx context.Context, req *cachepb.GetReq, opts ...grpc.CallOption) (*cachepb.GetResp, error) {
f.mu.Lock()
defer f.mu.Unlock()
b, ok := f.m[req.Key]
if !ok {
return nil, status.Errorf(codes.NotFound, "no digest cache for %s", req.Key)
}
return &cachepb.GetResp{
Kv: &cachepb.KV{
Key: req.Key,
Value: b,
},
}, nil
}
func (f *fakeRedis) Put(ctx context.Context, req *cachepb.PutReq, opts ...grpc.CallOption) (*cachepb.PutResp, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.m == nil {
f.m = make(map[string][]byte)
}
f.m[req.Kv.Key] = req.Kv.Value
return &cachepb.PutResp{}, nil
}
// fakeCmdStorage represents fake cmdstorage bucket.
type fakeCmdStorage struct {
m map[string]string // hash -> data
}
// Open opens cmd files identified by hash.
func (f *fakeCmdStorage) Open(ctx context.Context, hash string) (io.ReadCloser, error) {
v, ok := f.m[hash]
if !ok {
return nil, status.Errorf(codes.NotFound, "%s not found", hash)
}
return ioutil.NopCloser(strings.NewReader(v)), nil
}
// newFile stores a new file, and returns hash of it.
func (f *fakeCmdStorage) newFile(size int64) string {
buf := make([]byte, size)
rand.Read(buf)
h := hash.SHA256Content(buf)
if f.m == nil {
f.m = make(map[string]string)
}
f.m[h] = string(buf)
return h
}
// newFileSpec stores a new file and returns FileSpec of it.
func (f *fakeCmdStorage) newFileSpec(name string, isExecutable bool) *cpb.FileSpec {
size := int64(rand.Uint64() % (8 * 1024 * 1024))
h := f.newFile(size)
return &cpb.FileSpec{
Path: name,
Size: size,
Hash: h,
IsExecutable: isExecutable,
}
}