blob: b82e3f812695584df40ec41c1e611291dfbf9250 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buildbot
import (
"bytes"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/iotools"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/milo/common"
"github.com/luci/luci-go/milo/common/model"
"github.com/luci/luci-go/server/router"
"golang.org/x/net/context"
"github.com/luci/luci-go/common/tsmon/field"
"github.com/luci/luci-go/common/tsmon/metric"
)
var (
// Metrics
buildCounter = metric.NewCounter(
"luci/milo/buildbot_pubsub/builds",
"The number of buildbot builds received by Milo from PubSub",
nil,
field.Bool("internal"),
field.String("master"),
field.String("builder"),
field.Bool("finished"),
// Status can be one of 3 options. "New", "Replaced", "Rejected".
field.String("status"))
masterCounter = metric.NewCounter(
"luci/milo/buildbot_pubsub/masters",
"The number of buildbot master jsons received by Milo from PubSub",
nil,
field.Bool("internal"),
field.String("master"),
// Status can be one of 2 options. "success", "failure".
field.String("status"))
)
type buildMasterMsg struct {
Master *buildbotMaster `json:"master"`
Builds []*buildbotBuild `json:"builds"`
}
// buildbotMasterEntry is a container for a marshaled and packed buildbot
// master json.
type buildbotMasterEntry struct {
// Name of the buildbot master.
Name string `gae:"$id"`
// Internal
Internal bool
// Data is the json serialzed and gzipped blob of the master data.
Data []byte `gae:",noindex"`
// Modified is when this entry was last modified.
Modified time.Time
}
// buildbotMasterPublic is a struct that exists for public builtbot masters, and
// not for internal masters. It's used for ACL checks.
type buildbotMasterPublic struct {
Name string `gae:"$id"`
}
func putDSMasterJSON(
c context.Context, master *buildbotMaster, internal bool) error {
for _, builder := range master.Builders {
// Trim out extra info in the "Changes" portion of the pending build state,
// we don't actually need comments, files, and properties
for _, pbs := range builder.PendingBuildStates {
for i := range pbs.Source.Changes {
pbs.Source.Changes[i].Comments = ""
pbs.Source.Changes[i].Files = nil
pbs.Source.Changes[i].Properties = nil
}
}
}
entry := buildbotMasterEntry{
Name: master.Name,
Internal: internal,
Modified: clock.Now(c).UTC(),
}
toPut := []interface{}{&entry}
publicTag := &buildbotMasterPublic{master.Name}
if internal {
// do the deletion immediately so that the 'public' bit is removed from
// datastore before any internal details are actually written to datastore.
if err := datastore.Delete(c, publicTag); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
} else {
toPut = append(toPut, publicTag)
}
gzbs := bytes.Buffer{}
gsw := gzip.NewWriter(&gzbs)
cw := iotools.CountingWriter{Writer: gsw}
e := json.NewEncoder(&cw)
if err := e.Encode(master); err != nil {
return err
}
gsw.Close()
entry.Data = gzbs.Bytes()
logging.Debugf(c, "Length of json data: %d", cw.Count)
logging.Debugf(c, "Length of gzipped data: %d", len(entry.Data))
return datastore.Put(c, toPut)
}
// unmarshal a gzipped byte stream into a list of buildbot builds and masters.
func unmarshal(
c context.Context, msg []byte) ([]*buildbotBuild, *buildbotMaster, error) {
bm := buildMasterMsg{}
if len(msg) == 0 {
return bm.Builds, bm.Master, nil
}
reader, err := zlib.NewReader(bytes.NewReader(msg))
if err != nil {
logging.WithError(err).Errorf(c, "gzip decompression error")
return nil, nil, err
}
defer reader.Close()
d := json.NewDecoder(reader)
if err = d.Decode(&bm); err != nil {
logging.WithError(err).Errorf(c, "could not unmarshal message")
return nil, nil, err
}
// Extract the builds out of master and append it onto builds.
if bm.Master != nil {
for _, slave := range bm.Master.Slaves {
if slave.RunningbuildsMap == nil {
slave.RunningbuildsMap = map[string][]int{}
}
for _, build := range slave.Runningbuilds {
build.Master = bm.Master.Name
bm.Builds = append(bm.Builds, build)
slave.RunningbuildsMap[build.Buildername] = append(
slave.RunningbuildsMap[build.Buildername], build.Number)
}
slave.Runningbuilds = nil
}
}
return bm.Builds, bm.Master, nil
}
// getOSInfo fetches the os family and version of the slave the build was
// running on from the master json on a best-effort basis.
func getOSInfo(c context.Context, b *buildbotBuild, m *buildbotMaster) (
family, version string) {
// Fetch the master info from datastore if not provided.
if m.Name == "" {
logging.Infof(c, "Fetching info for master %s", b.Master)
entry := buildbotMasterEntry{Name: b.Master}
err := datastore.Get(c, &entry)
if err != nil {
logging.WithError(err).Errorf(
c, "Encountered error while fetching entry for %s", b.Master)
return
}
err = decodeMasterEntry(c, &entry, m)
if err != nil {
logging.WithError(err).Warningf(
c, "Failed to decode master information for OS info on master %s", b.Master)
return
}
if entry.Internal && !b.Internal {
logging.Errorf(c, "Build references an internal master, but build is not internal.")
return
}
}
s, ok := m.Slaves[b.Slave]
if !ok {
logging.Warningf(c, "Could not find slave %s in master %s", b.Slave, b.Master)
return
}
hostInfo := map[string]string{}
for _, v := range strings.Split(s.Host, "\n") {
if info := strings.SplitN(v, ":", 2); len(info) == 2 {
hostInfo[info[0]] = strings.TrimSpace(info[1])
}
}
// Extract OS and OS Family
if v, ok := hostInfo["os family"]; ok {
family = v
}
if v, ok := hostInfo["os version"]; ok {
version = v
}
return
}
// Marks a build as finished and expired.
func expireBuild(c context.Context, b *buildbotBuild) error {
finished := float64(clock.Now(c).Unix())
if b.TimeStamp != nil {
finished = float64(*b.TimeStamp)
}
results := int(4) // Exception
b.Times[1] = &finished
b.Finished = true
b.Results = &results
b.Currentstep = nil
b.Text = append(b.Text, "Build expired on Milo")
return datastore.Put(c, b)
}
// saveBuildSummary summerizes a build into a model.BuildSummary and then saves it.
func saveBuildSummary(c context.Context, b *buildbotBuild) error {
resp := renderBuild(c, b)
bs := model.BuildSummary{
BuildKey: datastore.KeyForObj(c, b),
BuilderID: fmt.Sprintf("buildbot/%s/%s", b.Master, b.Buildername),
}
if err := resp.SummarizeTo(c, &bs); err != nil {
return err
}
return datastore.Put(c, &bs)
}
func doMaster(c context.Context, master *buildbotMaster, internal bool) int {
// Store the master json into the datastore.
err := putDSMasterJSON(c, master, internal)
fullname := fmt.Sprintf("master.%s", master.Name)
if err != nil {
logging.WithError(err).Errorf(
c, "Could not save master in datastore %s", err)
masterCounter.Add(c, 1, internal, fullname, "failure")
// This is transient, we do want PubSub to retry.
return http.StatusInternalServerError
}
masterCounter.Add(c, 1, internal, fullname, "success")
// Extract current builds data out of the master json, and use it to
// clean up expired builds.
q := datastore.NewQuery("buildbotBuild").
Eq("finished", false).
Eq("master", master.Name)
builds := []*buildbotBuild{}
err = getBuildQueryBatcher(c).GetAll(c, q, &builds)
if err != nil {
logging.WithError(err).Errorf(c, "Could not load current builds from master %s",
master.Name)
return http.StatusInternalServerError
}
for _, b := range builds {
builder, ok := master.Builders[b.Buildername]
if !ok {
// Mark this build due to builder being removed.
buildCounter.Add(
c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired")
logging.Infof(c, "Expiring %s/%s/%d due to builder being removed",
master.Name, b.Buildername, b.Number)
err = expireBuild(c, b)
if err != nil {
logging.WithError(err).Errorf(c, "Could not expire build")
return http.StatusInternalServerError
}
continue
}
found := false
for _, bnum := range builder.CurrentBuilds {
if b.Number == bnum {
found = true
break
}
}
if !found {
now := int(clock.Now(c).Unix())
if b.TimeStamp == nil || ((*b.TimeStamp)+20*60 < now) {
// Expire builds after 20 minutes of not getting data.
// Mark this build due to build not current anymore.
buildCounter.Add(
c, 1, internal, b.Master, b.Buildername, b.Finished, "Expired")
logging.Infof(c, "Expiring %s/%s/%d due to build not current",
master.Name, b.Buildername, b.Number)
err = expireBuild(c, b)
if err != nil {
logging.WithError(err).Errorf(c, "Could not expire build")
return http.StatusInternalServerError
}
}
}
}
return 0
}
// PubSubHandler is a webhook that stores the builds coming in from pubsub.
func PubSubHandler(ctx *router.Context) {
statusCode := pubSubHandlerImpl(ctx.Context, ctx.Request)
ctx.Writer.WriteHeader(statusCode)
}
// This is the actual implementation of the pubsub handler. Returns
// a status code. StatusOK (200) for okay (ACK implied, don't retry).
// Anything else will signal to pubsub to retry.
func pubSubHandlerImpl(c context.Context, r *http.Request) int {
msg := common.PubSubSubscription{}
now := int(clock.Now(c).Unix())
defer r.Body.Close()
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&msg); err != nil {
logging.WithError(err).Errorf(
c, "Could not decode message. %s", err)
return http.StatusOK // This is a hard failure, we don't want PubSub to retry.
}
internal := true
// Get the name of the subscription on luci-config
settings := common.GetSettings(c)
switch msg.Subscription {
case settings.Buildbot.PublicSubscription:
internal = false
case settings.Buildbot.InternalSubscription:
// internal = true, but that's already set.
default:
logging.Errorf(
c, "Subscription name %s does not match %s or %s",
msg.Subscription, settings.Buildbot.PublicSubscription,
settings.Buildbot.InternalSubscription)
// This is a configuration error. Tell PubSub to retry until we fix our
// configs.
return http.StatusInternalServerError
}
logging.Infof(
c, "Message ID \"%s\" from subscription %s is %d bytes long",
msg.Message.MessageID, msg.Subscription, r.ContentLength)
bbMsg, err := msg.GetData()
if err != nil {
logging.WithError(err).Errorf(c, "Could not base64 decode message %s", err)
return http.StatusOK
}
builds, master, err := unmarshal(c, bbMsg)
if err != nil {
logging.WithError(err).Errorf(c, "Could not unmarshal message %s", err)
return http.StatusOK
}
logging.Infof(c, "There are %d builds", len(builds))
if master != nil {
logging.Infof(c, "The master name is %s", master.Name)
} else {
logging.Infof(c, "No master in this message")
}
// This is used to cache the master used for extracting OS information.
cachedMaster := buildbotMaster{}
// Do not use PutMulti because we might hit the 1MB limit.
for _, build := range builds {
if build.Master == "" {
logging.Errorf(c, "Invalid message, missing master name")
return http.StatusOK
}
existingBuild := &buildbotBuild{
Master: build.Master,
Buildername: build.Buildername,
Number: build.Number,
}
buildExists := false
if err := datastore.Get(c, existingBuild); err == nil {
if existingBuild.Finished {
// Never replace a completed build.
buildCounter.Add(
c, 1, false, build.Master, build.Buildername, false, "Rejected")
continue
}
buildExists = true
}
// Also set the finished, timestamp, and internal bit.
build.Finished = false
if build.TimeStamp == nil {
build.TimeStamp = &now
}
if len(build.Times) == 2 && build.Times[1] != nil {
build.Finished = true
logging.Infof(
c, "Recording finished build %s/%s/%d", build.Master,
build.Buildername, build.Number)
}
build.Internal = internal
// Try to get the OS information on a best-effort basis. This assumes that all
// builds come from one master.
build.OSFamily, build.OSVersion = getOSInfo(c, build, &cachedMaster)
err = datastore.Put(c, build)
if err != nil {
if _, ok := err.(errTooBig); ok {
// This will never work, we don't want PubSub to retry.
logging.WithError(err).Errorf(
c, "Could not save build to datastore, failing permanently")
return http.StatusOK
}
// This is transient, we do want PubSub to retry.
logging.WithError(err).Errorf(c, "Could not save build in datastore")
return http.StatusInternalServerError
}
err = saveBuildSummary(c, build)
if err != nil {
logging.WithError(err).Errorf(c, "could not save build summary into datastore")
return http.StatusInternalServerError
}
if buildExists {
buildCounter.Add(
c, 1, false, build.Master, build.Buildername, build.Finished, "Replaced")
} else {
buildCounter.Add(
c, 1, false, build.Master, build.Buildername, build.Finished, "New")
}
}
if master != nil {
code := doMaster(c, master, internal)
if code != 0 {
return code
}
}
return http.StatusOK
}