package main
import (
bbpb ""
// BuildsClient is a trimmed version of `bbpb.BuildsClient` which only
// contains the required RPC method for bbagent.
// The live implementation automatically binds the "X-Build-Token" key with
// a token.
type BuildsClient interface {
UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error)
var _ BuildsClient = dummyBBClient{}
type dummyBBClient struct{}
func (dummyBBClient) UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error) {
return nil, nil
type liveBBClient struct {
tok string
c bbpb.BuildsClient
func (bb *liveBBClient) UpdateBuild(ctx context.Context, in *bbpb.UpdateBuildRequest, opts ...grpc.CallOption) (*bbpb.Build, error) {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(buildbucket.BuildTokenHeader, bb.tok))
return bb.c.UpdateBuild(ctx, in, opts...)
// Reads the build secrets from the environment and constructs a BuildsClient
// which can be used to update the build state.
// retryEnabled allows us to switch retries for this client on and off
func newBuildsClient(ctx context.Context, infraOpts *bbpb.BuildInfra_Buildbucket, retryEnabled *bool) (BuildsClient, *bbpb.BuildSecrets, error) {
hostname := infraOpts.GetHostname()
if hostname == "" {
logging.Infof(ctx, "No buildbucket hostname set; making dummy buildbucket client.")
return dummyBBClient{}, &bbpb.BuildSecrets{BuildToken: "dummy token"}, nil
opts := prpc.DefaultOptions()
opts.Insecure = lhttp.IsLocalHost(hostname)
originalRetry := opts.Retry
opts.Retry = func() retry.Iterator {
if *retryEnabled {
return originalRetry()
return nil
prpcClient := &prpc.Client{
Host: hostname,
Options: opts,
secrets, err := readBuildSecrets(ctx)
if err != nil {
return nil, nil, err
// Use "system" account to call UpdateBuild RPCs.
sctx, err := lucictx.SwitchLocalAccount(ctx, "system")
if err != nil {
return nil, nil, errors.Annotate(err, "could not switch to 'system' account in LUCI_CONTEXT").Err()
prpcClient.C, err = auth.NewAuthenticator(sctx, auth.SilentLogin, auth.Options{
MonitorAs: "bbagent/buildbucket",
if err != nil {
return nil, nil, err
// TODO(iannucci): Exchange secret build token+nonce for a running build token
// here to confirm that:
// * We're the ONLY ones servicing this build (detect duplicate Swarming
// tasks). Failure to exchange the token would let us know that we got
// double-booked.
// * Auth is properly configured for buildbucket before we start running the
// user code.
return &liveBBClient{
}, secrets, nil
// options for the dispatcher.Channel
func channelOpts(ctx context.Context) (*dispatcher.Options, <-chan error) {
errorFn, errCh := dispatcher.ErrorFnReport(10, func(failedBatch *buffer.Batch, err error) bool {
return transient.Tag.In(err)
opts := &dispatcher.Options{
QPSLimit: rate.NewLimiter(1, 1),
Buffer: buffer.Options{
MaxLeases: 1,
BatchItemsMax: 1,
FullBehavior: &buffer.DropOldestBatch{MaxLiveItems: 1},
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond, // initial delay
Retries: -1,
MaxTotal: 5 * time.Minute,
Multiplier: 1.2,
MaxDelay: 30 * time.Second,
DropFn: dispatcher.DropFnSummarized(ctx, rate.NewLimiter(.1, 1)),
ErrorFn: errorFn,
return opts, errCh
func mkSendFn(ctx context.Context, client BuildsClient) dispatcher.SendFn {
return func(b *buffer.Batch) error {
var req *bbpb.UpdateBuildRequest
if b.Meta != nil {
req = b.Meta.(*bbpb.UpdateBuildRequest)
} else {
build := b.Data[0].Item.(*bbpb.Build)
req = &bbpb.UpdateBuildRequest{
Build: build,
UpdateMask: &field_mask.FieldMask{
Paths: []string{
if len(build.Tags) > 0 {
req.UpdateMask.Paths = append(req.UpdateMask.Paths, "build.tags")
b.Meta = req
b.Data[0].Item = nil
tctx, cancel := clock.WithTimeout(ctx, defaultUpdateBuildTimeout)
defer cancel()
_, err := client.UpdateBuild(tctx, req)
return err