blob: e075e06e76ea341706a278831acecf1276e64f21 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"time"
"golang.org/x/time/rate"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/buildbucket"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/lhttp"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/lucictx"
)
// BuildsClient is a trimmed version of `bbpb.BuildsClient` which only
// contains the required RPC method for bbagent.
//
// The live implementation automatically binds the "X-Build-Token" key with
// a token.
type BuildsClient interface {
UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error)
}
var _ BuildsClient = dummyBBClient{}
type dummyBBClient struct{}
func (dummyBBClient) UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error) {
return nil, nil
}
type liveBBClient struct {
tok string
c bbpb.BuildsClient
}
func (bb *liveBBClient) UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error) {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(buildbucket.BuildTokenHeader, bb.tok))
return bb.c.UpdateBuild(ctx, in, opts...)
}
// Reads the build secrets from the environment and constructs a BuildsClient
// which can be used to update the build state.
//
// retryEnabled allows us to switch retries for this client on and off
func newBuildsClient(ctx context.Context, infraOpts *bbpb.BuildInfra_Buildbucket, retryEnabled *bool) (BuildsClient, *bbpb.BuildSecrets, error) {
hostname := infraOpts.GetHostname()
if hostname == "" {
logging.Infof(ctx, "No buildbucket hostname set; making dummy buildbucket client.")
return dummyBBClient{}, &bbpb.BuildSecrets{BuildToken: "dummy token"}, nil
}
opts := prpc.DefaultOptions()
opts.Insecure = lhttp.IsLocalHost(hostname)
originalRetry := opts.Retry
opts.Retry = func() retry.Iterator {
if *retryEnabled {
return originalRetry()
}
return nil
}
prpcClient := &prpc.Client{
Host: hostname,
Options: opts,
}
secrets, err := readBuildSecrets(ctx)
if err != nil {
return nil, nil, err
}
// Use "system" account to call UpdateBuild RPCs.
sctx, err := lucictx.SwitchLocalAccount(ctx, "system")
if err != nil {
return nil, nil, errors.Annotate(err, "could not switch to 'system' account in LUCI_CONTEXT").Err()
}
prpcClient.C, err = auth.NewAuthenticator(sctx, auth.SilentLogin, auth.Options{
MonitorAs: "bbagent/buildbucket",
}).Client()
if err != nil {
return nil, nil, err
}
// TODO(iannucci): Exchange secret build token+nonce for a running build token
// here to confirm that:
// * We're the ONLY ones servicing this build (detect duplicate Swarming
// tasks). Failure to exchange the token would let us know that we got
// double-booked.
// * Auth is properly configured for buildbucket before we start running the
// user code.
return &liveBBClient{
secrets.BuildToken,
bbpb.NewBuildsPRPCClient(prpcClient),
}, secrets, nil
}
// options for the dispatcher.Channel
func channelOpts(ctx context.Context) (*dispatcher.Options, <-chan error) {
errorFn, errCh := dispatcher.ErrorFnReport(10, func(failedBatch *buffer.Batch, err error) bool {
return transient.Tag.In(err)
})
opts := &dispatcher.Options{
QPSLimit: rate.NewLimiter(1, 1),
Buffer: buffer.Options{
MaxLeases: 1,
BatchItemsMax: 1,
FullBehavior: &buffer.DropOldestBatch{MaxLiveItems: 1},
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond, // initial delay
Retries: -1,
MaxTotal: 5 * time.Minute,
},
Multiplier: 1.2,
MaxDelay: 30 * time.Second,
}
},
},
DropFn: dispatcher.DropFnSummarized(ctx, rate.NewLimiter(.1, 1)),
ErrorFn: errorFn,
}
return opts, errCh
}
func mkSendFn(ctx context.Context, client BuildsClient) dispatcher.SendFn {
return func(b *buffer.Batch) error {
var req *bbpb.UpdateBuildRequest
if b.Meta != nil {
req = b.Meta.(*bbpb.UpdateBuildRequest)
} else {
build := b.Data[0].Item.(*bbpb.Build)
req = &bbpb.UpdateBuildRequest{
Build: build,
UpdateMask: &field_mask.FieldMask{
Paths: []string{
"build.steps",
"build.output",
"build.summary_markdown",
},
},
}
if len(build.Tags) > 0 {
req.UpdateMask.Paths = append(req.UpdateMask.Paths, "build.tags")
}
b.Meta = req
b.Data[0].Item = nil
}
tctx, cancel := clock.WithTimeout(ctx, defaultUpdateBuildTimeout)
defer cancel()
_, err := client.UpdateBuild(tctx, req)
return err
}
}