blob: 97bd04a9ba4dccc4890dbc42fc296862733742ef [file] [log] [blame]
// Copyright 2019 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
/*
Binary remoteexec-proxy is a proxy server between Goma client and Remote Execution API.
*/
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"html/template"
"io/ioutil"
"net/http"
"os"
"os/user"
"path"
"path/filepath"
"strings"
"time"
"cloud.google.com/go/storage"
rpb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"go.chromium.org/goma/server/auth"
"go.chromium.org/goma/server/auth/account"
"go.chromium.org/goma/server/auth/acl"
"go.chromium.org/goma/server/cache"
"go.chromium.org/goma/server/cache/gcs"
"go.chromium.org/goma/server/cache/redis"
"go.chromium.org/goma/server/file"
"go.chromium.org/goma/server/frontend"
"go.chromium.org/goma/server/httprpc"
execrpc "go.chromium.org/goma/server/httprpc/exec"
execlogrpc "go.chromium.org/goma/server/httprpc/execlog"
filerpc "go.chromium.org/goma/server/httprpc/file"
"go.chromium.org/goma/server/log"
"go.chromium.org/goma/server/profiler"
gomapb "go.chromium.org/goma/server/proto/api"
authpb "go.chromium.org/goma/server/proto/auth"
cachepb "go.chromium.org/goma/server/proto/cache"
cmdpb "go.chromium.org/goma/server/proto/command"
execpb "go.chromium.org/goma/server/proto/exec"
filepb "go.chromium.org/goma/server/proto/file"
"go.chromium.org/goma/server/remoteexec"
"go.chromium.org/goma/server/remoteexec/digest"
"go.chromium.org/goma/server/rpc"
"go.chromium.org/goma/server/server"
)
var (
port = flag.Int("port", 8090, "listening port (goma api endpoints)")
remoteexecAddr = flag.String("remoteexec-addr", "", "remoteexec API endpoint")
remoteInstanceName = flag.String("remote-instance-name", "", "remote instance name")
allowedUsers = flag.String("allowed-users", "", "comma separated list of allowed users. `*@domain` will match any user in domain. if empty, current user is allowed.")
serviceAccountJSON = flag.String("service-account-json", "", "service account json, used to talk to RBE and cloud storage (if --file-cache-bucket is used)")
platformContainerImage = flag.String("platform-container-image", "", "docker uri of platform container image")
insecureRemoteexec = flag.Bool("insecure-remoteexec", false, "insecure grpc for remoteexec API")
insecureSkipVerify = flag.Bool("insecure-skip-verify", false, "insecure skip verifying the server certificate")
additionalTLSCertificate = flag.String("additional-tls-certificate", "", "additional TLS root certificate for verifying the server certificate")
execMaxRetryCount = flag.Int("exec-max-retry-count", 5, "max retry count for exec call. 0 is unlimited count, but bound to ctx timtout. Use small number for powerful clients to run local fallback quickly. Use large number for powerless clients to use remote more than local.")
fileCacheBucket = flag.String("file-cache-bucket", "", "file cache bucking store bucket")
execConfigFile = flag.String("exec-config-file", "", "exec inventory config file")
maxDigestCacheEntries = flag.Int("max-digest-cache-entries", 2e6, "maximum entries in in-memory digest cache")
traceProjectID = flag.String("trace-project-id", "", "project id for cloud tracing")
traceFraction = flag.Float64("trace-sampling-fraction", 1.0, "sampling fraction for stackdriver trace")
traceQPS = flag.Float64("trace-sampling-qps-limit", 1.0, "sampling qps limit for stackdriver trace")
redisMaxIdleConns = flag.Int("redis-max-idle-conns", redis.DefaultMaxIdleConns, "maximum number of idle connections to redis.")
redisMaxActiveConns = flag.Int("redis-max-active-conns", redis.DefaultMaxActiveConns, "maximum number of active connections to redis.")
)
func myEmail(ctx context.Context) string {
logger := log.FromContext(ctx)
username := os.Getenv("USER")
if username == "" {
u, err := user.Current()
if err != nil {
logger.Fatalf("failed to get username: need --allowed-users: %v", err)
}
username = u.Username
}
buf, err := ioutil.ReadFile("/etc/mailname")
if err != nil {
logger.Fatalf("failed to get email: need --allowed-users: %v", err)
}
return fmt.Sprintf("%s@%s", username, strings.TrimSpace(string(buf)))
}
type authClient struct {
Service *auth.Service
}
func (c authClient) Auth(ctx context.Context, req *authpb.AuthReq, opts ...grpc.CallOption) (*authpb.AuthResp, error) {
return c.Service.Auth(ctx, req)
}
type fileClient struct {
Service filepb.FileServiceServer
}
func (c fileClient) StoreFile(ctx context.Context, req *gomapb.StoreFileReq, opts ...grpc.CallOption) (*gomapb.StoreFileResp, error) {
return c.Service.StoreFile(ctx, req)
}
func (c fileClient) LookupFile(ctx context.Context, req *gomapb.LookupFileReq, opts ...grpc.CallOption) (*gomapb.LookupFileResp, error) {
return c.Service.LookupFile(ctx, req)
}
type execlogService struct{}
func (c execlogService) SaveLog(ctx context.Context, req *gomapb.SaveLogReq) (*gomapb.SaveLogResp, error) {
return &gomapb.SaveLogResp{}, nil
}
type cacheClient struct {
Service cachepb.CacheServiceServer
}
func (c cacheClient) Get(ctx context.Context, req *cachepb.GetReq, opts ...grpc.CallOption) (*cachepb.GetResp, error) {
return c.Service.Get(ctx, req)
}
func (c cacheClient) Put(ctx context.Context, req *cachepb.PutReq, opts ...grpc.CallOption) (*cachepb.PutResp, error) {
return c.Service.Put(ctx, req)
}
const gomaClientClientID = "687418631491-r6m1c3pr0lth5atp4ie07f03ae8omefc.apps.googleusercontent.com"
type defaultACL struct {
allowedUser []string
allowedDomains []string
}
func (a defaultACL) Load(ctx context.Context) (*authpb.ACL, error) {
serviceAccount := "default"
if *serviceAccountJSON != "" {
serviceAccount = strings.TrimSuffix(filepath.Base(*serviceAccountJSON), ".json")
}
return &authpb.ACL{
Groups: []*authpb.Group{
{
Id: "user",
Audience: gomaClientClientID,
Emails: a.allowedUser,
Domains: a.allowedDomains,
ServiceAccount: serviceAccount,
},
},
}, nil
}
type reExecServer struct {
re *remoteexec.Adapter
}
func (r reExecServer) Exec(ctx context.Context, req *gomapb.ExecReq) (*gomapb.ExecResp, error) {
ctx, id := rpc.TagID(ctx, req.GetRequesterInfo())
logger := log.FromContext(ctx)
logger.Infof("call exec %s", id)
return r.re.Exec(ctx, req)
}
type reFileServer struct {
s filepb.FileServiceServer
}
func (r reFileServer) StoreFile(ctx context.Context, req *gomapb.StoreFileReq) (*gomapb.StoreFileResp, error) {
ctx, id := rpc.TagID(ctx, req.GetRequesterInfo())
logger := log.FromContext(ctx)
logger.Infof("call storefile %s", id)
return r.s.StoreFile(ctx, req)
}
func (r reFileServer) LookupFile(ctx context.Context, req *gomapb.LookupFileReq) (*gomapb.LookupFileResp, error) {
ctx, id := rpc.TagID(ctx, req.GetRequesterInfo())
logger := log.FromContext(ctx)
logger.Infof("call lookupfile %s", id)
return r.s.LookupFile(ctx, req)
}
type localBackend struct {
ExecService execpb.ExecServiceServer
FileService filepb.FileServiceServer
Auth httprpc.Auth
}
func (b localBackend) Ping() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
logger := log.FromContext(ctx)
ctx, err := b.Auth.Auth(ctx, req)
if err != nil {
http.Error(w, fmt.Sprintf("auth failed: %v", err), http.StatusUnauthorized)
logger.Errorf("ping unauthorized: %v", err)
return
}
w.Header().Set("Accept-Encoding", "gzip, deflate")
fmt.Fprintln(w, "ok")
})
}
func (b localBackend) Exec() http.Handler {
return execrpc.Handler(b.ExecService, httprpc.Timeout(5*time.Minute), httprpc.WithAuth(b.Auth))
}
func (b localBackend) ByteStream() http.Handler {
return http.HandlerFunc(http.NotFound)
}
func (b localBackend) StoreFile() http.Handler {
return filerpc.StoreHandler(b.FileService, httprpc.Timeout(1.*time.Minute), httprpc.WithAuth(b.Auth))
}
func (b localBackend) LookupFile() http.Handler {
return filerpc.LookupHandler(b.FileService, httprpc.Timeout(1*time.Minute), httprpc.WithAuth(b.Auth))
}
func (b localBackend) Execlog() http.Handler {
return execlogrpc.Handler(execlogService{}, httprpc.Timeout(1*time.Minute), httprpc.WithAuth(b.Auth))
}
func readConfigResp(fname string) (*cmdpb.ConfigResp, error) {
b, err := ioutil.ReadFile(fname)
if err != nil {
return nil, err
}
resp := &cmdpb.ConfigResp{}
err = proto.UnmarshalText(string(b), resp)
if err != nil {
return nil, err
}
// fix target address etc.
for _, c := range resp.Configs {
if c.Target == nil {
c.Target = &cmdpb.Target{}
}
c.Target.Addr = *remoteexecAddr
if c.BuildInfo == nil {
c.BuildInfo = &cmdpb.BuildInfo{}
}
}
return resp, nil
}
func main() {
spanTimeout := remoteexec.DefaultSpanTimeout
flag.DurationVar(&spanTimeout.Inventory, "exec-inventory-timeout", spanTimeout.Inventory, "timeout of exec-inventory")
flag.DurationVar(&spanTimeout.InputTree, "exec-input-tree-timeout", spanTimeout.InputTree, "timeout of exec-iput-tree")
flag.DurationVar(&spanTimeout.Setup, "exec-setup-timeout", spanTimeout.Setup, "timeout of exec-setup")
flag.DurationVar(&spanTimeout.CheckCache, "exec-check-cache-timeout", spanTimeout.CheckCache, "timeout of exec-check-cache")
flag.DurationVar(&spanTimeout.CheckMissing, "exec-check-missing-timeout", spanTimeout.CheckMissing, "timeout of exec-check-missing")
flag.DurationVar(&spanTimeout.UploadBlobs, "exec-upload-blobs-timeout", spanTimeout.UploadBlobs, "timeout of exec-upload-blobs")
flag.DurationVar(&spanTimeout.Execute, "exec-execute-timeout", spanTimeout.Execute, "timeout of exec-execute")
flag.DurationVar(&spanTimeout.Response, "exec-response-timeout", spanTimeout.Response, "timeout of exec-response")
flag.Parse()
ctx := context.Background()
profiler.Setup(ctx)
logger := log.FromContext(ctx)
defer logger.Sync()
if *allowedUsers == "" {
*allowedUsers = myEmail(ctx)
}
var allowed []string
var allowedDomains []string
for _, u := range strings.Split(*allowedUsers, ",") {
u = strings.TrimSpace(u)
if strings.HasPrefix(u, "*@") {
allowedDomains = append(allowedDomains, strings.TrimPrefix(u, "*@"))
} else {
allowed = append(allowed, u)
}
}
logger.Infof("allow access for %q / domains %q", allowed, allowedDomains)
err := server.Init(ctx, *traceProjectID, "remoteexec-proxy")
if err != nil {
logger.Fatal(err)
}
trace.ApplyConfig(trace.Config{
DefaultSampler: server.NewLimitedSampler(*traceFraction, *traceQPS),
})
saDir := "/"
if *serviceAccountJSON != "" {
logger.Infof("using service account: %s", *serviceAccountJSON)
saDir = filepath.Dir(*serviceAccountJSON)
} else {
logger.Infof("using default service account")
}
aclCheck := acl.ACL{
Loader: defaultACL{
allowedUser: allowed,
allowedDomains: allowedDomains,
},
Checker: acl.Checker{
Pool: account.JSONDir{
Dir: saDir,
Scopes: []string{
"https://www.googleapis.com/auth/cloud-build-service",
},
},
},
}
err = aclCheck.Update(ctx)
if err != nil {
logger.Fatal(err)
}
authService := &auth.Service{
CheckToken: aclCheck.CheckToken,
}
var cclient cachepb.CacheServiceClient
if *fileCacheBucket != "" {
logger.Infof("use cloud storage bucket: %s", *fileCacheBucket)
var opts []option.ClientOption
if *serviceAccountJSON != "" {
opts = append(opts, option.WithServiceAccountFile(*serviceAccountJSON))
}
gsclient, err := storage.NewClient(ctx, opts...)
if err != nil {
logger.Fatalf("storage client failed: %v", err)
}
defer gsclient.Close()
cclient = cache.LocalClient{
CacheServiceServer: gcs.New(gsclient.Bucket(*fileCacheBucket)),
}
} else {
cacheService, err := cache.New(cache.Config{
MaxBytes: 1 * 1024 * 1024 * 1024,
})
if err != nil {
logger.Fatal(err)
}
cclient = cacheClient{
Service: cacheService,
}
}
fileServiceClient := fileClient{
Service: &file.Service{
Cache: cclient,
},
}
certPool, err := x509.SystemCertPool()
if err != nil {
logger.Fatal(err)
}
if certPool == nil {
logger.Fatal("got nil certPool")
}
if *additionalTLSCertificate != "" {
caCert, err := ioutil.ReadFile(*additionalTLSCertificate)
if err != nil {
logger.Fatal(err)
}
if ok := certPool.AppendCertsFromPEM(caCert); !ok {
logger.Fatal("No certificates loaded from %s", *additionalTLSCertificate)
}
}
tlsConfig := &tls.Config{
InsecureSkipVerify: *insecureSkipVerify,
RootCAs: certPool,
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
}
if *insecureRemoteexec {
opts[0] = grpc.WithInsecure()
logger.Warnf("use insecrure remoteexec API")
}
reConn, err := grpc.DialContext(ctx, *remoteexecAddr, opts...)
if err != nil {
logger.Fatal(err)
}
defer reConn.Close()
var digestCache remoteexec.DigestCache
redisAddr, err := redis.AddrFromEnv()
if err != nil {
logger.Warnf("redis disabled for gomafile-digest: %v", err)
digestCache = digest.NewCache(nil, *maxDigestCacheEntries)
} else {
logger.Infof("redis enabled for gomafile-digest: %v idle=%d active=%d", redisAddr, *redisMaxIdleConns, *redisMaxActiveConns)
digestCache = digest.NewCache(redis.NewClient(ctx, redisAddr, redis.Opts{
Prefix: "gomafile-digest:",
MaxIdleConns: *redisMaxIdleConns,
MaxActiveConns: *redisMaxActiveConns,
}), *maxDigestCacheEntries)
}
re := &remoteexec.Adapter{
InstancePrefix: path.Dir(*remoteInstanceName),
ExecTimeout: 15 * time.Minute,
SpanTimeout: spanTimeout,
Client: remoteexec.Client{
ClientConn: reConn,
Retry: rpc.Retry{
MaxRetry: *execMaxRetryCount,
},
},
InsecureClient: *insecureRemoteexec,
GomaFile: fileServiceClient,
DigestCache: digestCache,
ToolDetails: &rpb.ToolDetails{
ToolName: "remoteexec_proxy",
ToolVersion: "0.0.0-experimental",
},
FileLookupSema: make(chan struct{}, 2),
CASBlobLookupSema: make(chan struct{}, 20),
}
configResp := &cmdpb.ConfigResp{
VersionId: time.Now().UTC().Format(time.RFC3339),
Configs: []*cmdpb.Config{
{
Target: &cmdpb.Target{
Addr: *remoteexecAddr,
},
BuildInfo: &cmdpb.BuildInfo{},
Dimensions: []string{
"os:linux",
},
RemoteexecPlatform: &cmdpb.RemoteexecPlatform{
RbeInstanceBasename: path.Base(*remoteInstanceName),
Properties: []*cmdpb.RemoteexecPlatform_Property{
{
Name: "container-image",
Value: *platformContainerImage,
}, {
Name: "OSFamily",
Value: "Linux",
},
},
},
},
},
}
// TODO: document config example?
if *execConfigFile != "" {
c, err := readConfigResp(*execConfigFile)
if err != nil {
logger.Fatal(err)
}
configResp = c
}
err = re.Inventory.Configure(ctx, configResp)
if err != nil {
logger.Fatal(err)
}
mux := http.DefaultServeMux
frontend.Register(mux, frontend.Frontend{
Backend: localBackend{
ExecService: reExecServer{re},
FileService: reFileServer{fileServiceClient.Service},
Auth: &auth.Auth{
Client: authClient{Service: authService},
},
},
})
mux.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintln(w, "ok")
}))
tmpl := template.Must(template.New("index").Parse(`
<html>
<head>
<title>Goma remoteexec_proxy at {{.Port}}</title>
</head>
<body>
<h1>Goma remoteexec_proxy</h1>
<p><b>remoteexec-addr:</b> {{.RemoteexecAddr}}</p>
<p><b>remote-instance-name:</b> {{.RemoteInstanceName}}</p>
<p><b>allowed-users:</b> {{.AllowedUsers}}</p>
<p><b>service-account-json:</b> <a href="file://{{.ServiceAccountJSON}}">{{.ServiceAccountJSON}}</a></p>
<p><b>platform-container-image:</b> {{.PlatformContainerImage}}</p>
<p><b>redis:</b> {{.RedisAddr}}</p>
<p><b>file-cache-bucket:</b> {{.FileCacheBucket}}</p>
<p><b>config:</b>
<pre>{{.Config}}</pre>
<hr>
<p>
<a href="/debug/tracez">/debug/tracez</a> |
<a href="/debug/rpcz">/debug/rpcz</a> |
<a href="/healthz">/healthz - for health check</a>
</body>
</html>`))
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
err := tmpl.Execute(w, struct {
Port int
RemoteexecAddr string
RemoteInstanceName string
AllowedUsers []string
ServiceAccountJSON string
PlatformContainerImage string
RedisAddr string
FileCacheBucket string
Config *cmdpb.ConfigResp
}{
Port: *port,
RemoteexecAddr: *remoteexecAddr,
RemoteInstanceName: *remoteInstanceName,
AllowedUsers: allowed,
ServiceAccountJSON: *serviceAccountJSON,
PlatformContainerImage: *platformContainerImage,
RedisAddr: redisAddr,
FileCacheBucket: *fileCacheBucket,
Config: configResp,
})
if err != nil {
logger := log.FromContext(ctx)
logger.Errorf("index template: %v", err)
}
}))
hsMain := server.NewHTTP(*port, mux)
server.Run(ctx, hsMain)
}