blob: b4bf4cef40950dad2e88052a6a2bcca87659a5f4 [file] [log] [blame]
// Copyright 2017 The Goma Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
/*
Binary file_server provides goma file service via gRPC.
*/
package main
import (
"context"
"flag"
"fmt"
"cloud.google.com/go/storage"
"go.opencensus.io/trace"
k8sapi "golang.org/x/build/kubernetes/api"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/goma/server/cache"
"go.chromium.org/goma/server/cache/gcs"
"go.chromium.org/goma/server/cache/redis"
"go.chromium.org/goma/server/file"
"go.chromium.org/goma/server/log"
"go.chromium.org/goma/server/profiler"
"go.chromium.org/goma/server/server"
"go.chromium.org/goma/server/server/healthz"
cachepb "go.chromium.org/goma/server/proto/cache"
pb "go.chromium.org/goma/server/proto/file"
)
var (
port = flag.Int("port", 5050, "rpc port")
mport = flag.Int("mport", 8081, "monitor port")
cacheAddr = flag.String("file-cache-addr", "", "cache server address")
bucket = flag.String("bucket", "", "backing store bucket")
traceProjectID = flag.String("trace-project-id", "", "project id for cloud tracing")
serviceAccountFile = flag.String("service-account-file", "", "service account json file")
)
type admissionController struct {
limit int64
}
func (a admissionController) AdmitPut(ctx context.Context, in *cachepb.PutReq) error {
logger := log.FromContext(ctx)
if a.limit <= 0 {
return nil
}
rss := server.ResidentMemorySize()
s := int64(len(in.Kv.Key) + len(in.Kv.Value))
if rss+2*s <= a.limit {
return nil
}
newRSS := server.GC(ctx)
if newRSS+2*s <= a.limit {
logger.Infof("GC reduced memory size to %d", newRSS)
return nil
}
// TODO: with retryinfo?
msg := fmt.Sprintf("memory size %d + req:%d > limit %d: gc->%d", rss, s, a.limit, newRSS)
healthz.SetUnhealthy(msg)
return status.Error(codes.ResourceExhausted, msg)
}
func main() {
flag.Parse()
ctx := context.Background()
profiler.Setup(ctx)
logger := log.FromContext(ctx)
defer logger.Sync()
err := server.Init(ctx, *traceProjectID, "file_server")
if err != nil {
logger.Fatal(err)
}
trace.ApplyConfig(trace.Config{
DefaultSampler: server.NewLimitedSampler(server.DefaultTraceFraction, server.DefaultTraceQPS),
})
s, err := server.NewGRPC(*port,
grpc.MaxSendMsgSize(file.DefaultMaxMsgSize),
grpc.MaxRecvMsgSize(file.DefaultMaxMsgSize))
if err != nil {
logger.Fatal(err)
}
var cclient cachepb.CacheServiceClient
addr, err := redis.AddrFromEnv()
switch {
case err == nil:
logger.Infof("redis enabled for gomafile: %s", addr)
c := redis.NewClient(ctx, addr, "gomafile:")
defer c.Close()
cclient = c
case *cacheAddr != "":
logger.Infof("use cache server: %s", *cacheAddr)
c := cache.NewClient(ctx, *cacheAddr,
append([]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.FailFast(false)),
}, server.DefaultDialOption()...)...)
defer c.Close()
cclient = c
case *bucket != "":
logger.Infof("use cloud storage bucket: %s", *bucket)
var opts []option.ClientOption
if *serviceAccountFile != "" {
opts = append(opts, option.WithServiceAccountFile(*serviceAccountFile))
}
gsclient, err := storage.NewClient(ctx, opts...)
if err != nil {
logger.Fatalf("storage client failed: %v", err)
}
defer gsclient.Close()
c := gcs.New(gsclient.Bucket(*bucket))
limit, err := server.MemoryLimit()
if err != nil {
logger.Errorf("unknown memory limit: %v", err)
} else {
margin := int64(2 * file.DefaultMaxMsgSize)
a := admissionController{
limit: limit - margin,
}
c.AdmissionController = a
limitq := k8sapi.NewQuantity(limit, k8sapi.BinarySI)
marginq := k8sapi.NewQuantity(margin, k8sapi.BinarySI)
logger.Infof("memory check threshold: limit:%s - mergin:%s = %d", limitq, marginq, a.limit)
}
cclient = cache.LocalClient{CacheServiceServer: c}
default:
logger.Fatal("no cache server")
}
fs := &file.Service{
Cache: cclient,
}
pb.RegisterFileServiceServer(s.Server, fs)
hs := server.NewHTTP(*mport, nil)
server.Run(ctx, s, hs)
}