blob: 14adb045938f87bdac9b01523c2206128a8da102 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package casclient provides remote-apis-sdks client with luci integration.
package casclient
import (
"context"
"fmt"
"net"
"runtime"
"strings"
"time"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/cas"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/contextmd"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/hardcoded/chromeinfra"
)
// AddrProd is the PROD CAS service address.
const AddrProd = "remotebuildexecution.googleapis.com:443"
// New returns luci auth configured Client for RBE-CAS.
func New(ctx context.Context, addr string, instance string, opts auth.Options, readOnly bool) (*cas.Client, error) {
var dialParams client.DialParams
useLocal, err := isLocalAddr(addr)
if err != nil {
return nil, errors.Annotate(err, "invalid addr").Err()
}
if useLocal {
// Connect to local fake CAS server.
// See also go.chromium.org/luci/tools/cmd/fakecas
if instance != "" {
return nil, errors.Reason("do not specify instance with local address").Err()
}
instance = "instance"
dialParams = client.DialParams{
Service: addr,
NoSecurity: true,
}
} else {
creds, err := perRPCCreds(ctx, instance, opts, readOnly)
if err != nil {
return nil, err
}
dialParams = client.DialParams{
Service: addr,
UseExternalAuthToken: true,
ExternalPerRPCCreds: &client.PerRPCCreds{Creds: creds},
}
}
grpcOpts, _, err := client.OptsFromParams(ctx, dialParams)
if err != nil {
return nil, errors.Annotate(err, "failed to get grpc opts").Err()
}
conn, err := grpc.Dial(dialParams.Service, grpcOpts...)
if err != nil {
return nil, errors.Annotate(err, "failed to dial RBE").Err()
}
cl, err := cas.NewClientWithConfig(ctx, conn, instance, DefaultConfig())
if err != nil {
return nil, errors.Annotate(err, "failed to create client").Err()
}
return cl, nil
}
// DefaultConfig returns default CAS client configuration.
func DefaultConfig() cas.ClientConfig {
cfg := cas.DefaultClientConfig()
cfg.CompressedBytestreamThreshold = 0 // compress always
// The default 1 min timeout seems slow for some kinds of uploads, see also
// Options().
cfg.ByteStreamWrite.Timeout = 2 * time.Minute
// Do not read file less than 10MiB twice.
cfg.SmallFileThreshold = 10 * 1024 * 1024
// BUG(crbug.com/349853790) - encountered timeouts when uploading,
// so speculatively adjusting this.
cfg.BatchUpdateBlobs.Timeout = time.Minute * 2
return cfg
}
func perRPCCreds(ctx context.Context, instance string, opts auth.Options, readOnly bool) (credentials.PerRPCCredentials, error) {
project := strings.Split(instance, "/")[1]
var role string
if readOnly {
role = "cas-read-only"
} else {
role = "cas-read-write"
}
// Construct auth.Options.
opts.ActAsServiceAccount = fmt.Sprintf("%s@%s.iam.gserviceaccount.com", role, project)
opts.ActViaLUCIRealm = fmt.Sprintf("@internal:%s/%s", project, role)
opts.Scopes = []string{"https://www.googleapis.com/auth/cloud-platform"}
if strings.HasSuffix(project, "-dev") || strings.HasSuffix(project, "-staging") {
// use dev token server for dev/staging projects.
opts.TokenServerHost = chromeinfra.TokenServerDevHost
}
creds, err := auth.NewAuthenticator(ctx, auth.SilentLogin, opts).PerRPCCredentials()
if err != nil {
return nil, errors.Annotate(err, "failed to get PerRPCCredentials").Err()
}
return creds, nil
}
// NewLegacy returns luci auth configured legacy Client for RBE.
// In general, NewClient is preferred.
// TODO(crbug.com/1225524): remove this.
func NewLegacy(ctx context.Context, addr string, instance string, opts auth.Options, readOnly bool) (*client.Client, error) {
useLocal, err := isLocalAddr(addr)
if err != nil {
return nil, errors.Annotate(err, "invalid addr").Err()
}
if useLocal {
// Connect to local fake CAS server.
// See also go.chromium.org/luci/tools/cmd/fakecas
if instance != "" {
logging.Warningf(ctx, "instance %q is given, but will be ignored.", instance)
}
dialParams := client.DialParams{
Service: addr,
NoSecurity: true,
}
cl, err := client.NewClient(ctx, "instance", dialParams)
if err != nil {
return nil, errors.Annotate(err, "failed to create client").Err()
}
return cl, nil
}
creds, err := perRPCCreds(ctx, instance, opts, readOnly)
if err != nil {
return nil, err
}
dialParams := client.DialParams{
Service: "remotebuildexecution.googleapis.com:443",
UseExternalAuthToken: true,
ExternalPerRPCCreds: &client.PerRPCCreds{Creds: creds},
}
cl, err := client.NewClient(ctx, instance, dialParams, Options()...)
if err != nil {
logging.Errorf(ctx, "failed to create casclient: %+v", err)
return nil, errors.Annotate(err, "failed to create client").Err()
}
return cl, nil
}
// Options returns CAS client options.
func Options() []client.Opt {
rpcTimeouts := make(client.RPCTimeouts)
for k, v := range client.DefaultRPCTimeouts {
rpcTimeouts[k] = v
}
casConcurrency := runtime.NumCPU() * 2
if runtime.GOOS == "windows" {
// This is for better file write performance on Windows (http://b/171672371#comment6).
casConcurrency = runtime.NumCPU()
}
// BUG(crbug.com/349853790) - encountered timeouts when uploading,
// so speculatively adjusting this.
rpcTimeouts["BatchUpdateBlobs"] = time.Minute * 2
// Extend the timeout for write operations beyond the default, as writes can
// sometimes be quite slow. This timeout only applies to writing a single
// file chunk, so there isn't a risk of setting a timeout that's to low for
// large files.
rpcTimeouts["Write"] = 2 * time.Minute
// There's suspicion GetCapabilities sometimes takes longer than default
// 5 sec because it is the first call ever (and it needs to open the
// connection and refresh auth tokens). Give it more time.
rpcTimeouts["GetCapabilities"] = 30 * time.Second
return []client.Opt{
client.CASConcurrency(casConcurrency),
client.UtilizeLocality(true),
&client.TreeSymlinkOpts{
// Symlinks will be uploaded as-is...
Preserved: true,
// ... and the target file included in the CAS archive...
FollowsTarget: true,
// ... unless the target file is outside the root directory, in
// which case the target file will be uploaded instead of preserving
// the symlink.
MaterializeOutsideExecRoot: true,
},
rpcTimeouts,
// Set restricted permission for written files.
client.DirMode(0700),
client.ExecutableMode(0700),
client.RegularMode(0600),
client.CompressedBytestreamThreshold(0),
}
}
// ContextWithMetadata attaches RBE related metadata with tool name to the
// given context.
func ContextWithMetadata(ctx context.Context, toolName string) (context.Context, error) {
ctx, err := contextmd.WithMetadata(ctx, &contextmd.Metadata{
ToolName: toolName,
})
if err != nil {
return nil, errors.Annotate(err, "failed to attach metadata").Err()
}
m, err := contextmd.ExtractMetadata(ctx)
if err != nil {
return nil, errors.Annotate(err, "failed to extract metadata").Err()
}
logging.Infof(ctx, "context metadata: %#+v", *m)
return ctx, nil
}
func isLocalAddr(addr string) (bool, error) {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return false, err
}
if tcpaddr.IP == nil {
return true, nil
}
return tcpaddr.IP.IsLoopback(), nil
}