blob: 6b8c41a7fc2c73f4d845b516f6a45b80eed768bb [file] [log] [blame]
// Copyright 2017 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package buildbucket
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"golang.org/x/net/context"
"github.com/luci/gae/service/datastore"
bucketApi "github.com/luci/luci-go/common/api/buildbucket/buildbucket/v1"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/common/tsmon/field"
"github.com/luci/luci-go/common/tsmon/metric"
"github.com/luci/luci-go/milo/api/resp"
"github.com/luci/luci-go/milo/buildsource/swarming"
"github.com/luci/luci-go/milo/common"
"github.com/luci/luci-go/milo/common/model"
"github.com/luci/luci-go/server/router"
)
var (
buildCounter = metric.NewCounter(
"luci/milo/buildbucket_pubsub/builds",
"The number of buildbucket builds received by Milo from PubSub",
nil,
field.String("bucket"),
// True for luci build, False for non-luci (ie buildbot) build.
field.Bool("luci"),
// Status can be "COMPLETED", "SCHEDULED", or "STARTED"
field.String("status"),
// Action can be one of 3 options. "New", "Replaced", "Rejected".
field.String("action"))
)
type psMsg struct {
Build bucketApi.ApiCommonBuildMessage
Hostname string
}
var (
errNoLogLocation = errors.New("log_location tag not found")
errNoProject = errors.New("project tag not found")
)
type parameters struct {
BuilderName string `json:"builder_name"`
}
func isLUCI(build *bucketApi.ApiCommonBuildMessage) bool {
// All luci buckets are assumed to be prefixed with luci.
return strings.HasPrefix(build.Bucket, "luci.")
}
// PubSubHandler is a webhook that stores the builds coming in from pubsub.
func PubSubHandler(ctx *router.Context) {
err := pubSubHandlerImpl(ctx.Context, ctx.Request)
if err != nil {
logging.WithError(err).Errorf(ctx.Context, "error while updating buildbucket")
}
if transient.Tag.In(err) {
// Transient errors are 500 so that PubSub retries them.
ctx.Writer.WriteHeader(http.StatusInternalServerError)
} else {
// No errors or non-transient errors are 200s so that PubSub does not retry
// them.
ctx.Writer.WriteHeader(http.StatusOK)
}
}
func maybeGetBuild(
c context.Context, build *bucketApi.ApiCommonBuildMessage) (*resp.MiloBuild, error) {
// Hasn't started yet, so definitely no buildinfo ready yet.
if build.Status == "SCHEDULED" {
return nil, nil
}
tags := ParseTags(build.Tags)
var host, task string
var ok bool
if host, ok = tags["swarming_hostname"]; !ok {
return nil, errors.New("no swarming hostname tag")
}
if task, ok = tags["swarming_task_id"]; !ok {
return nil, errors.New("no swarming task id")
}
swarmingSvc, err := swarming.NewProdService(c, host)
if err != nil {
return nil, err
}
bl := swarming.BuildLoader{}
return bl.SwarmingBuildImpl(c, swarmingSvc, task)
}
// processBuild queries swarming and logdog for annotation data, then adds or
// updates a buildEntry in datastore.
func processBuild(
c context.Context, host string, build *bucketApi.ApiCommonBuildMessage) (
*buildEntry, error) {
now := clock.Now(c).UTC()
entry := buildEntry{key: buildEntryKey(host, build.Id)}
err := datastore.Get(c)
switch err {
case datastore.ErrNoSuchEntity:
logging.Infof(c, "%s does not exist, will create", entry.key)
entry.created = now
case nil:
// continue
default:
return nil, err
}
// If the build is running, try to get the annotation data.
respBuild, err := maybeGetBuild(c, build)
if err != nil {
return nil, err
}
entry.respBuild = respBuild
entry.modified = now
entry.buildbucketData, err = json.Marshal(build)
if err != nil {
return nil, err
}
err = datastore.Put(c, &entry)
return &entry, err
}
// saveBuildSummary creates or updates a build summary based off a buildbucket
// build entry.
func saveBuildSummary(
c context.Context, key *datastore.Key, builderName string,
entry *buildEntry) error {
build, err := entry.getBuild()
if err != nil {
return err
}
status, err := parseStatus(build)
if err != nil {
return err
}
// TODO(hinoka): Console related items.
bs := model.BuildSummary{
BuildKey: key,
BuilderID: fmt.Sprintf("buildbucket/%s/%s", build.Bucket, builderName),
Created: parseTimestamp(build.CreatedTs),
Summary: model.Summary{
Status: status,
Start: parseTimestamp(build.StartedTs),
},
}
if entry.respBuild != nil {
// Add info from the respBuild into the build summary if we have the data.
if err := entry.respBuild.SummarizeTo(c, &bs); err != nil {
return err
}
}
logging.Debugf(c, "Created build summary: %#v", bs)
// Make datastore flakes transient errors
return transient.Tag.Apply(datastore.Put(c, &bs))
}
func handlePubSubBuild(c context.Context, data *psMsg) error {
host := data.Hostname
build := &data.Build
// We only care about the "builder_name" key from the parameter.
p := parameters{}
err := json.Unmarshal([]byte(build.ParametersJson), &p)
if err != nil {
err = errors.Annotate(
err, "could not unmarshal build parameters %s", build.ParametersJson).Err()
buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
// Permanent error, since this is probably a type of build we do not recognize.
return err
}
logging.Debugf(c, "Received from %s: build %s/%s (%s)\n%s",
host, build.Bucket, p.BuilderName, build.Status, build)
if !isLUCI(build) {
logging.Infof(c, "This is not a luci build, ignoring")
buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
return nil
}
buildEntry, err := processBuild(c, host, build)
if err != nil {
logging.WithError(err).Errorf(c, "failed to update build")
buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, "Rejected")
// Probably a datastore or network flake, make this into a transient error
return transient.Tag.Apply(err)
}
action := "Created"
if buildEntry.created != buildEntry.modified {
action = "Modified"
}
buildCounter.Add(c, 1, build.Bucket, isLUCI(build), build.Status, action)
return saveBuildSummary(
c, datastore.MakeKey(c, "buildEntry", buildEntry.key), p.BuilderName, buildEntry)
}
// This returns 500 (Internal Server Error) if it encounters a transient error,
// and returns 200 (OK) if everything is OK, or if it encounters a permanent error.
func pubSubHandlerImpl(c context.Context, r *http.Request) error {
var data psMsg
msg := common.PubSubSubscription{}
defer r.Body.Close()
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&msg); err != nil {
logging.WithError(err).Errorf(c, "could not decode message:\n%s", r.Body)
// This might be a transient error, e.g. when the json format changes
// and Milo isn't updated yet.
return transient.Tag.Apply(err)
}
bData, err := msg.GetData()
if err != nil {
logging.WithError(err).Errorf(c, "could not parse pubsub message string")
return err
}
if err := json.Unmarshal(bData, &data); err != nil {
logging.WithError(err).Errorf(c, "could not parse pubsub message data")
return err
}
return handlePubSubBuild(c, &data)
}