blob: dd1a9305372e6800014e9c98c1de03eeaf1882c2 [file] [log] [blame]
// Copyright 2018 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package main
import (
"bytes"
"context"
"fmt"
"regexp"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"go.chromium.org/luci/buildbucket"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/lucictx"
"go.chromium.org/luci/luciexe/legacy/annotee"
annopb "go.chromium.org/luci/luciexe/legacy/annotee/proto"
)
// buildUpdater implements an annotee callback for updated annotations
// and makes buildbucket.v2.Builds.UpdateBuild RPCs accordingly.
type buildUpdater struct {
annAddr *types.StreamAddr
buildID int64
buildToken string
client buildbucketpb.BuildsClient
// annotations contains latest state of the build in the form of
// binary serialized annopb.Step.
// Must not be closed.
annotations chan []byte
}
// Run calls client.UpdateBuild on new b.annotations.
// Logs transient errors and returns a fatal error, if any.
// Stops when done is closed or ctx is done.
func (b *buildUpdater) Run(ctx context.Context, done <-chan struct{}) error {
return b.run(ctx, done, b.updateBuildBytes)
}
func (b *buildUpdater) run(
ctx context.Context,
done <-chan struct{},
update func(ctx context.Context, annBytes []byte) error,
) error {
cond := sync.NewCond(&sync.Mutex{})
// protected by cond.L
var state struct {
latest []byte
latestVer int
done bool
}
// Listen to new requests.
go func() {
locked := func(f func()) {
cond.L.Lock()
f()
cond.L.Unlock()
cond.Signal()
}
for {
select {
case ann := <-b.annotations:
locked(func() {
state.latest = ann
state.latestVer++
})
case <-ctx.Done():
locked(func() { state.done = true })
case <-done:
locked(func() { state.done = true })
}
}
}()
// Send requests.
var sentVer int
// how long did we wait after most recent update call
var errSleep time.Duration
var lastRequestTime time.Time
for {
// Ensure at least 1s between calls.
if !lastRequestTime.IsZero() {
ellapsed := clock.Since(ctx, lastRequestTime)
if d := time.Second - ellapsed; d > 0 {
clock.Sleep(clock.Tag(ctx, "update-build-distance"), d)
}
}
// Wait for news.
cond.L.Lock()
if sentVer == state.latestVer && !state.done {
cond.Wait()
}
local := state
cond.L.Unlock()
var err error
if sentVer != local.latestVer {
lastRequestTime = clock.Now(ctx)
err = update(ctx, local.latest)
switch status.Code(errors.Unwrap(err)) {
case codes.OK:
errSleep = 0
sentVer = local.latestVer
case codes.InvalidArgument:
// This is fatal.
return err
default:
// Hope another future request will succeed.
// There is another final UpdateBuild call anyway.
logging.Errorf(ctx, "failed to update build: %s", err)
// Sleep.
if errSleep == 0 {
errSleep = time.Second
} else if errSleep < 16*time.Second {
errSleep *= 2
}
logging.Debugf(ctx, "will sleep for %s", errSleep)
clock.Sleep(clock.Tag(ctx, "update-build-error"), errSleep)
}
}
if local.done {
return err
}
}
}
// updateBuildBytes is a version of updateBuild that accepts raw annotation
// bytes.
func (b *buildUpdater) updateBuildBytes(ctx context.Context, annBytes []byte) error {
ann := &annopb.Step{}
if err := proto.Unmarshal(annBytes, ann); err != nil {
return errors.Annotate(err, "failed to parse annotation proto").Err()
}
req, err := b.ParseAnnotations(ctx, ann)
if err != nil {
return errors.Annotate(err, "failed to parse UpdateBuild request").Err()
}
req.Build.Status = buildbucketpb.Status_STARTED
req.UpdateMask.Paths = append(req.UpdateMask.Paths, "build.status")
return b.UpdateBuild(ctx, req)
}
// UpdateBuild updates a build on the buildbucket server.
// Includes a build token in the request.
func (b *buildUpdater) UpdateBuild(ctx context.Context, req *buildbucketpb.UpdateBuildRequest) error {
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(buildbucket.BuildTokenHeader, b.buildToken))
_, err := b.client.UpdateBuild(ctx, req)
return err
}
// ParseAnnotations converts an annotation proto to a UpdateBuildRequest that
// updates steps, output properties and output gitiles commit.
func (b *buildUpdater) ParseAnnotations(ctx context.Context, ann *annopb.Step) (*buildbucketpb.UpdateBuildRequest, error) {
updatePaths := []string{"build.steps", "build.output.properties"}
prefix, _ := b.annAddr.Path.Split()
fullPrefix := fmt.Sprintf("%s/%s", b.annAddr.Project, prefix)
steps, err := annotee.ConvertBuildSteps(ctx, ann.Substep, true, b.annAddr.Host, fullPrefix)
if err != nil {
return nil, errors.Annotate(err, "failed to parse steps from an annotation proto").Err()
}
props, err := annopb.ExtractProperties(ann)
if err != nil {
return nil, errors.Annotate(err, "failed to extract properties from an annotation proto").Err()
}
delete(props.Fields, "buildbucket")
delete(props.Fields, "$recipe_engine/buildbucket")
// Extract output commit
// The other side: https://cs.chromium.org/chromium/infra/recipes-py/recipe_modules/buildbucket/api.py?q=set_output_gitiles_commit
var outputCommit *buildbucketpb.GitilesCommit
const outputCommitProp = "$recipe_engine/buildbucket/output_gitiles_commit"
if f, ok := props.Fields[outputCommitProp]; ok {
dict := f.GetStructValue().GetFields()
outputCommit = &buildbucketpb.GitilesCommit{
Host: dict["host"].GetStringValue(),
Project: dict["project"].GetStringValue(),
Ref: dict["ref"].GetStringValue(),
Id: dict["id"].GetStringValue(),
Position: uint32(dict["position"].GetNumberValue()),
}
delete(props.Fields, outputCommitProp)
}
if outputCommit == nil {
if outputCommit, err = outputCommitFromLegacyProperties(props); err != nil {
logging.Warningf(ctx, "failed to parse output commit from legacy properties: %s", err)
}
}
if outputCommit != nil {
updatePaths = append(updatePaths, "build.output.gitiles_commit")
}
tags := parseAdditionalTags(props)
if len(tags) > 0 {
updatePaths = append(updatePaths, "build.tags")
}
return &buildbucketpb.UpdateBuildRequest{
Build: &buildbucketpb.Build{
Id: b.buildID,
Steps: steps,
Output: &buildbucketpb.Build_Output{
Properties: props,
GitilesCommit: outputCommit,
},
Tags: tags,
},
UpdateMask: &field_mask.FieldMask{Paths: updatePaths},
}, nil
}
// regular expressions for outputCommitFromLegacyProperties.
var (
commitPositionRe = regexp.MustCompile(`^(refs/[^@]+)@{#(\d+)}$`)
sha1HexRe = regexp.MustCompile(`^[0-9a-f]{40}$`)
refRe = regexp.MustCompile(`^refs/`)
)
// outputCommitFromLegacyProperties synthesizes an output commit from
// legacy got_revision and got_revision_cp properties.
func outputCommitFromLegacyProperties(props *structpb.Struct) (*buildbucketpb.GitilesCommit, error) {
gotRevision := props.Fields["got_revision"].GetStringValue()
if gotRevision == "" {
return nil, nil
}
// Retrieve gitiles host and project from build input, using buildbucket
// internal property.
buildStruct := props.Fields["$recipe_engine/buildbucket"].GetStructValue().GetFields()["build"].GetStructValue()
if buildStruct == nil {
return nil, fmt.Errorf("no buildbucket build property")
}
build := &buildbucketpb.Build{}
if err := structToMessage(buildStruct, build); err != nil {
return nil, err
}
ret := &buildbucketpb.GitilesCommit{}
switch {
case build.GetInput().GetGitilesCommit() != nil:
ic := build.GetInput().GetGitilesCommit()
ret.Host = ic.Host
ret.Project = ic.Project
ret.Ref = ic.Ref
case len(build.GetInput().GetGerritChanges()) == 1:
cl := build.GetInput().GetGerritChanges()[0]
ret.Host = cl.Host
ret.Project = cl.Project
}
if ret.Host == "" || ret.Project == "" {
return nil, fmt.Errorf("failed to determine gitiles host or project")
}
if ret.Ref == "" {
ret.Ref = "refs/heads/master"
}
cp := props.Fields["got_revision_cp"].GetStringValue()
// Parse got_revision.
switch {
case sha1HexRe.MatchString(gotRevision):
ret.Id = gotRevision
case refRe.MatchString(gotRevision):
if cp != "" {
return nil, errors.Reason("got_revision is a ref and got_revision_cp is provided; this is unexpected").Err()
}
ret.Ref = gotRevision
return ret, nil
default:
return nil, errors.Reason("unrecognized got_revision format: %q", gotRevision).Err()
}
// Parse commit position.
if cp != "" {
m := commitPositionRe.FindStringSubmatch(cp)
if m == nil {
return nil, errors.Reason("unexpected got_revision_cp format: %q", cp).Err()
}
ret.Ref = m[1]
pos, err := strconv.ParseUint(m[2], 10, 32)
if err != nil {
return nil, errors.Annotate(err, "malformed uint32 in got_revision_cp").Err()
}
ret.Position = uint32(pos)
}
return ret, nil
}
// structToMessage converts the struct to msg via JSONPB format.
// This is inefficient.
func structToMessage(s *structpb.Struct, msg proto.Message) error {
buf := &bytes.Buffer{}
if err := (&jsonpb.Marshaler{}).Marshal(buf, s); err != nil {
return err
}
return (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(buf, msg)
}
// AnnotationUpdated is an annotee.Options.AnnotationUpdated callback
// that enqueues an UpdateBuild RPC.
// Assumes annBytes will stay unchanged.
func (b *buildUpdater) AnnotationUpdated(annBytes []byte) {
b.annotations <- annBytes
}
// readBuildSecrets populates c.buildSecrets from swarming secret bytes, if any.
func readBuildSecrets(ctx context.Context) (*buildbucketpb.BuildSecrets, error) {
secrets := &buildbucketpb.BuildSecrets{}
swarming := lucictx.GetSwarming(ctx)
if swarming != nil {
if err := proto.Unmarshal(swarming.SecretBytes, secrets); err != nil {
return nil, err
}
}
return secrets, nil
}
// parseAdditionalTags parses tags from output properties.
// The other side: https://cs.chromium.org/chromium/infra/recipes-py/recipe_modules/buildbucket/api.py?q=add_tags_to_current_build
func parseAdditionalTags(props *structpb.Struct) []*buildbucketpb.StringPair {
tags := []*buildbucketpb.StringPair{}
tagsProp := "$recipe_engine/buildbucket/runtime-tags"
v, ok := props.Fields[tagsProp]
if !ok {
return tags
}
dict := v.GetStructValue().GetFields()
for key, values := range dict {
for _, value := range values.GetListValue().GetValues() {
tags = append(tags, &buildbucketpb.StringPair{
Key: key,
Value: value.GetStringValue(),
})
}
}
delete(props.Fields, tagsProp)
return tags
}