blob: 51d98122ac2c5774fb3b392f109b3e5c3550f0fd [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"math"
"net/http"
"net/url"
"sort"
"sync"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
pb "go.chromium.org/luci/config_service/proto"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/config"
)
// retryPolicy is the default grpc retry policy for this Luci-config client.
const retryPolicy = `{
"methodConfig": [{
"name": [{ "service": "config.service.v2.Configs" }],
"timeout": "60s",
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "10s",
"backoffMultiplier": 1.5,
"retryableStatusCodes": ["UNAVAILABLE", "INTERNAL", "UNKNOWN"]
}
}]
}`
const (
// defaultUserAgent is the default user-agent header value to use.
defaultUserAgent = "Config Go Client 1.0"
)
type V2Options struct {
// Host is the hostname of a LUCI Config service.
Host string
// Creds is the credential to use when creating the grpc connection.
Creds credentials.PerRPCCredentials
// UserAgent is the optional additional User-Agent fragment which will be
// appended to gRPC calls
//
// If empty, defaultUserAgent is used.
UserAgent string
// DialOpts are the options to use to dial.
//
// If nil, DefaultDialOptions() are used
DialOpts []grpc.DialOption
}
// DefaultDialOptions returns default grpc dial options to connect to Luci-config v2.
func DefaultDialOptions() []grpc.DialOption {
return []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewTLS(nil)),
grpc.WithStatsHandler(&grpcmon.ClientRPCStatsMonitor{}),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
grpc.WithDefaultServiceConfig(retryPolicy),
// Luci-config V2 can return gzip-compressed msg. But the grpc client
// doesn't provide a way to check the pure compressed response size. It also
// checks size after decompression. It's hard to set a fixed size. And for
// very large size config, Luci-config already uses GCS to pass the file.
// So it's fine to not limit the received msg size.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
}
}
// NewV2 returns an implementation of the config Interface which talks to the
// real Luci-config service v2.
func NewV2(ctx context.Context, opts V2Options) (config.Interface, error) {
if opts.Host == "" {
return nil, errors.New("host is not specified")
}
dialOpts := opts.DialOpts
if dialOpts == nil {
dialOpts = DefaultDialOptions()
}
if opts.Creds != nil {
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(opts.Creds))
}
if opts.UserAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(opts.UserAgent))
} else {
dialOpts = append(dialOpts, grpc.WithUserAgent(defaultUserAgent))
}
conn, err := grpc.DialContext(ctx, opts.Host+":443", dialOpts...)
if err != nil {
return nil, errors.Annotate(err, "cannot dial to %s", opts.Host).Err()
}
t := http.DefaultTransport
if s := auth.GetState(ctx); s != nil {
t, err = auth.GetRPCTransport(ctx, auth.NoAuth)
if err != nil {
return nil, errors.Annotate(err, "failed to create a transport").Err()
}
}
return &remoteV2Impl{
conn: conn,
grpcClient: pb.NewConfigsClient(conn),
httpClient: &http.Client{Transport: t},
}, nil
}
var _ config.Interface = &remoteV2Impl{}
// remoteV2Impl implements config.Interface and will make gRPC calls to Config
// Service V2.
type remoteV2Impl struct {
conn *grpc.ClientConn
grpcClient pb.ConfigsClient
// A http client with no additional authentication. Only used for downloading from signed urls.
httpClient *http.Client
}
func (r *remoteV2Impl) GetConfig(ctx context.Context, configSet config.Set, path string, metaOnly bool) (*config.Config, error) {
if err := r.checkInitialized(); err != nil {
return nil, err
}
req := &pb.GetConfigRequest{
ConfigSet: string(configSet),
Path: path,
}
if metaOnly {
req.Fields = &field_mask.FieldMask{
Paths: []string{"config_set", "path", "content_sha256", "revision", "url"},
}
}
res, err := r.grpcClient.GetConfig(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
return nil, wrapGrpcErr(err)
}
cfg := toConfig(res)
if res.GetSignedUrl() != "" {
content, err := config.DownloadConfigFromSignedURL(ctx, r.httpClient, res.GetSignedUrl())
if err != nil {
return nil, transient.Tag.Apply(err)
}
cfg.Content = string(content)
}
return cfg, nil
}
func (r *remoteV2Impl) GetConfigs(ctx context.Context, cfgSet config.Set, filter func(path string) bool, metaOnly bool) (map[string]config.Config, error) {
if err := r.checkInitialized(); err != nil {
return nil, err
}
// Fetch the list of files in the config set together with their hashes.
confSetPb, err := r.grpcClient.GetConfigSet(ctx, &pb.GetConfigSetRequest{
ConfigSet: string(cfgSet),
Fields: &field_mask.FieldMask{
Paths: []string{"configs"},
},
})
if err != nil {
return nil, wrapGrpcErr(err)
}
// An edge case. This should be impossible in practice.
if len(confSetPb.Configs) == 0 {
return nil, nil
}
// Assert all returned files are from the same revision. They should be.
rev := confSetPb.Configs[0].Revision
for _, cfg := range confSetPb.Configs {
if cfg.Revision != rev {
return nil, errors.Reason("internal error: the reply contains files from revisions %q and %q", cfg.Revision, rev).Err()
}
}
// Filter the file list through the callback.
var filtered []*pb.Config
if filter != nil {
filtered = confSetPb.Configs[:0]
for _, cfg := range confSetPb.Configs {
if filter(cfg.Path) {
filtered = append(filtered, cfg)
}
}
} else {
filtered = confSetPb.Configs
}
// If the caller only cares about metadata, we are done.
if metaOnly {
out := make(map[string]config.Config, len(filtered))
for _, cfg := range filtered {
cfg.Content = nil // in case the server decides to return something
out[cfg.Path] = *toConfig(cfg)
}
return out, nil
}
// Fetch all files in parallel using their SHA256 as the key.
out := make(map[string]config.Config, len(filtered))
var m sync.Mutex
eg, ectx := errgroup.WithContext(ctx)
eg.SetLimit(8)
for _, cfg := range filtered {
cfg := cfg
eg.Go(func() error {
body, err := r.grpcClient.GetConfig(ectx, &pb.GetConfigRequest{
ConfigSet: string(cfgSet),
ContentSha256: cfg.ContentSha256,
}, grpc.UseCompressor(gzip.Name))
if err != nil {
err = wrapGrpcErr(err)
// Do not return ErrNoConfig if an individual file is missing. First of
// all, it should never happen. If it does happen for some reason, we
// must not return ErrNoConfig anyway, because it will be interpreted
// as if the config set is gone, which will be incorrect.
if err == config.ErrNoConfig {
return errors.Reason("internal error: config %q at SHA256 %q is unexpectedly gone", cfg.Path, cfg.ContentSha256).Err()
}
return errors.Annotate(err, "fetching %q at SHA256 %q", cfg.Path, cfg.ContentSha256).Err()
}
// Ignore all metadata from `body`. It may be pointing to some other
// file or revision that happened to have the exact same SHA256 as the one
// we are requesting. We only care about the content.
resolved := toConfig(cfg)
if url := body.GetSignedUrl(); url != "" {
content, err := config.DownloadConfigFromSignedURL(ectx, r.httpClient, url)
if err != nil {
return errors.Annotate(err, "fetching %q from signed URL", cfg.Path).Tag(transient.Tag).Err()
}
resolved.Content = string(content)
} else {
resolved.Content = string(body.GetRawContent())
}
m.Lock()
out[resolved.Path] = *resolved
m.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return out, nil
}
func (r *remoteV2Impl) GetProjectConfigs(ctx context.Context, path string, metaOnly bool) ([]config.Config, error) {
if err := r.checkInitialized(); err != nil {
return nil, err
}
req := &pb.GetProjectConfigsRequest{Path: path}
if metaOnly {
req.Fields = &field_mask.FieldMask{
Paths: []string{"config_set", "path", "content_sha256", "revision", "url"},
}
}
// This rpc response is usually larger than others. So instruct the Server to
// return a compressed response to allow data transfer faster.
res, err := r.grpcClient.GetProjectConfigs(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
return nil, wrapGrpcErr(err)
}
eg, ectx := errgroup.WithContext(ctx)
eg.SetLimit(8)
configs := make([]config.Config, len(res.Configs))
for i, cfg := range res.Configs {
configs[i] = *toConfig(cfg)
if cfg.GetSignedUrl() != "" {
i := i
signedURL := cfg.GetSignedUrl()
eg.Go(func() error {
content, err := config.DownloadConfigFromSignedURL(ectx, r.httpClient, signedURL)
if err != nil {
return errors.Annotate(err, "for file(%s) in config_set(%s)", configs[i].Path, configs[i].ConfigSet).Tag(transient.Tag).Err()
}
configs[i].Content = string(content)
return nil
})
}
}
if err := eg.Wait(); err != nil {
return nil, err
}
return configs, nil
}
func (r *remoteV2Impl) GetProjects(ctx context.Context) ([]config.Project, error) {
if err := r.checkInitialized(); err != nil {
return nil, err
}
res, err := r.grpcClient.ListConfigSets(ctx, &pb.ListConfigSetsRequest{Domain: pb.ListConfigSetsRequest_PROJECT})
if err != nil {
return nil, wrapGrpcErr(err)
}
projects := make([]config.Project, len(res.ConfigSets))
for i, cs := range res.ConfigSets {
projectID := config.Set(cs.Name).Project()
parsedURL, err := url.Parse(cs.Url)
if err != nil {
return nil, errors.Annotate(err, "failed to parse repo url %s in project %s", cs.Url, projectID).Err()
}
projects[i] = config.Project{
ID: projectID,
Name: projectID,
RepoURL: parsedURL,
RepoType: config.GitilesRepo,
}
}
return projects, nil
}
func (r *remoteV2Impl) ListFiles(ctx context.Context, configSet config.Set) ([]string, error) {
if err := r.checkInitialized(); err != nil {
return nil, err
}
res, err := r.grpcClient.GetConfigSet(ctx, &pb.GetConfigSetRequest{
ConfigSet: string(configSet),
Fields: &field_mask.FieldMask{
Paths: []string{"configs"},
},
})
if err != nil {
return nil, wrapGrpcErr(err)
}
paths := make([]string, len(res.Configs))
for i, cfg := range res.Configs {
paths[i] = cfg.Path
}
sort.Strings(paths)
return paths, nil
}
func (r *remoteV2Impl) Close() error {
if r == nil || r.conn == nil {
return nil
}
return r.conn.Close()
}
func (r *remoteV2Impl) checkInitialized() error {
if r == nil || r.grpcClient == nil || r.httpClient == nil {
return errors.New("The Luci-config client is not initialized")
}
return nil
}
func wrapGrpcErr(err error) error {
switch code := grpcutil.Code(err); {
case code == codes.NotFound:
return config.ErrNoConfig
case grpcutil.IsTransientCode(code):
return transient.Tag.Apply(err)
default:
return err
}
}
func toConfig(configPb *pb.Config) *config.Config {
return &config.Config{
Meta: config.Meta{
ConfigSet: config.Set(configPb.ConfigSet),
Path: configPb.Path,
ContentHash: configPb.ContentSha256,
Revision: configPb.Revision,
ViewURL: configPb.Url,
},
Content: string(configPb.GetRawContent()),
}
}