blob: ce382622a369ab611951c6cd9b6911f695d4115d [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package buildmerge implements the build.proto tracking and merging logic for
// luciexe host applications.
//
// You probably want to use `go.chromium.org/luci/luciexe/host` instead.
//
// This package is separate from luciexe/host to avoid unnecessary entaglement
// with butler/logdog; All the logic here is implemented to avoid:
//
// - interacting with the environment
// - interacting with butler/logdog (except by implementing callbacks for
// those, but only acting on simple datastructures/proto messages)
// - handling errors in any 'brutal' ways (all errors in this package are
// handled by reporting them directly in the data structures that this
// package manipulates).
//
// This is done to simplify testing (as much as it can be) by concentrating all
// the environment stuff into luciexe/host, and all the 'pure' functional stuff
// here (search "imperative shell, functional core").
package buildmerge
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"github.com/golang/protobuf/ptypes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/proto/reflectutil"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/client/butler"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/luciexe"
)
// CalcURLFn is a stateless function which can calculate the absolute url and
// viewUrl from a given logdog namespace (with trailing slash) and streamName.
type CalcURLFn func(namespaceSlash, streamName types.StreamName) (url, viewUrl string)
// Agent holds all the logic around merging build.proto streams.
type Agent struct {
// MergedBuildC is the channel of all the merged builds generated by this
// Agent.
//
// The rate at which Agent merges Builds is governed by the consumption of
// this channel; Consuming it slowly will have Agent merge less frequently,
// and consuming it rapidly will have Agent merge more frequently.
//
// The last build before the channel closes will always be the final state of
// all builds at the time this Agent was Close()'d.
MergedBuildC <-chan *bbpb.Build
// Wait on this channel for the Agent to drain. Will only drain after calling
// Close() at least once.
DrainC <-chan struct{}
// used to cancel in-progress sendMerge calls.
ctx context.Context
// mergedBuildC is the send side of MergedBuildC
mergedBuildC chan<- *bbpb.Build
// userNamespace is the logdog namespace (with a trailing slash) which we'll
// use to determine if a new stream is potentially monitored, or not.
userNamespace types.StreamName
// userRootURL is the full url ('logdog://.../stream/build.proto') of the
// user's "root" build.proto stream (i.e. the one emitted by the top level
// luciexe implementation.
//
// This is used as a key to start the merge process.
userRootURL string
baseBuild *bbpb.Build
// statesMu covers `states`. It must be held when reading or writing to
// `states`, but doesn't need to be held while interacting with an individual
// *buildState obtained from the map.
statesMu sync.RWMutex
// states maps a stream URL (i.e. `logdog://.../stream/build.proto`) to the
// state tracker for that stream.
states map[string]*buildStateTracker
// mergeCh is used in production mode to send pings via informNewData
mergeCh dispatcher.Channel[struct{}]
// informNewData is used to 'ping' mergeCh; it's overwritten in tests.
informNewData func()
// done is an atomically-accessed boolean
done int32
// calculateURLs is a function which can convert a logdog namespace and
// streamname into both the full 'Url' and 'ViewUrl' values for a Log message.
// This is used by the buildMerger itself when deriving keys for the `states`
// map, as well as for individual buildState objects to adjust their build's
// logs' URLs.
calculateURLs CalcURLFn
}
// New returns a new Agent.
//
// Args:
// - ctx - used for logging, clock and cancelation. When canceled, the Agent
// will cease sending updates on MergedBuildC, but you must still invoke
// Agent.Close() in order to clean up all resources associated with the
// Agent.
// - userNamespace - The logdog namespace (with a trailing slash) under which
// we should monitor streams.
// - base - The "model" Build message that all generated builds should start
// with. All build proto streams will be merged onto a copy of this message.
// Any Output.Log's which have non-absolute URLs will have their Url and
// ViewUrl absolutized relative to userNamespace using calculateURLs.
// - calculateURLs - A function to calculate Log.Url and Log.ViewUrl values.
// Should be a pure function.
//
// The following fields will be merged into `base` from the user controlled
// build.proto stream(s):
//
// Steps
// SummaryMarkdown
// Status
// StatusDetails
// UpdateTime
// Tags
// EndTime
// Output
//
// The frequency of updates from this Agent is governed by how quickly the
// caller consumes from Agent.MergedBuildC.
func New(ctx context.Context, userNamespace types.StreamName, base *bbpb.Build, calculateURLs CalcURLFn) (*Agent, error) {
userNamespace = userNamespace.AsNamespace()
ch := make(chan *bbpb.Build)
userRootURL, _ := calculateURLs(userNamespace, luciexe.BuildProtoStreamSuffix)
ret := &Agent{
ctx: ctx,
MergedBuildC: ch,
mergedBuildC: ch,
states: map[string]*buildStateTracker{},
calculateURLs: calculateURLs,
userNamespace: userNamespace,
userRootURL: userRootURL,
baseBuild: proto.Clone(base).(*bbpb.Build),
}
for _, log := range ret.baseBuild.GetOutput().GetLogs() {
var err error
log.Url, log.ViewUrl, err = absolutizeURLs(log.Url, log.ViewUrl, userNamespace, calculateURLs)
if err != nil {
return nil, errors.Annotate(err, "build.output.logs[%q]", log.Name).Err()
}
}
var err error
ret.mergeCh, err = dispatcher.NewChannel[struct{}](ctx, &dispatcher.Options[struct{}]{
Buffer: buffer.Options{
MaxLeases: 1,
BatchItemsMax: 1,
FullBehavior: &buffer.DropOldestBatch{},
},
DropFn: dispatcher.DropFnQuiet[struct{}],
DrainedFn: ret.finalize,
}, ret.sendMerge)
if err != nil {
return nil, err // creating dispatcher with static config should never fail
}
ret.informNewData = func() {
ret.mergeCh.C <- struct{}{} // content doesn't matter
}
ret.DrainC = ret.mergeCh.DrainC
return ret, nil
}
// Attach should be called once to attach this to a Butler.
//
// This must be done before the butler receives any build.proto streams.
func (a *Agent) Attach(b *butler.Butler) {
b.AddStreamRegistrationCallback(a.onNewStream, true)
}
var validContentTypes = stringset.NewFromSlice(
luciexe.BuildProtoContentType,
luciexe.BuildProtoZlibContentType,
)
func (a *Agent) onNewStream(desc *logpb.LogStreamDescriptor) butler.StreamChunkCallback {
if !a.collectingData() {
return nil
}
namespace, base := types.StreamName(desc.Name).Split()
var err error
zlib := false
switch validStreamT, validContentT := desc.StreamType == logpb.StreamType_DATAGRAM, validContentTypes.Has(desc.ContentType); {
case validStreamT && validContentT:
zlib = desc.ContentType == luciexe.BuildProtoZlibContentType
case validStreamT && !validContentT:
err = errors.Reason("stream %q has content type %q, expected one of %v", desc.Name, desc.ContentType, validContentTypes.ToSortedSlice()).Err()
case !validStreamT && validContentT:
err = errors.Reason("build proto stream %q has type %q, expected %q", desc.Name, desc.StreamType, logpb.StreamType_DATAGRAM).Err()
case strings.HasPrefix(desc.Name, string(a.userNamespace)) && base == luciexe.BuildProtoStreamSuffix:
err = errors.Reason("build.proto stream %q has stream type %q and content type %q, expected %q and one of %v", desc.Name, desc.StreamType, desc.ContentType, logpb.StreamType_DATAGRAM, validContentTypes.ToSortedSlice()).Err()
default:
// neither a ".../build.proto" stream nor a stream with valid stream type
// or content type.
return nil
}
url, _ := a.calculateURLs("", types.StreamName(desc.Name))
bState := newBuildStateTracker(a.ctx, a, namespace, zlib, err)
a.statesMu.Lock()
defer a.statesMu.Unlock()
a.states[url] = bState
if err == nil {
return bState.handleNewData
}
return nil // no need to handle invalid stream.
}
// Close causes the Agent to stop collecting data, emit a final merged build,
// and then shut down all internal routines.
func (a *Agent) Close() {
// stops accepting new trackers
if atomic.SwapInt32(&a.done, 1) == 1 {
return
}
// close all states' and process their final work items. Closure should be
// very quick and will activate all final processing in parallel. GetFinal
// ensures that the state is completely settled.
states := a.snapStates()
for _, t := range states {
t.Close()
}
for _, t := range states {
t.Drain()
}
// tells our merge Channel to process all the current (now-final) states one
// last time.
a.informNewData()
// shut down the mergeCh so it will no longer accept new informNewData calls.
a.mergeCh.Close()
}
func (a *Agent) snapStates() map[string]*buildStateTracker {
a.statesMu.RLock()
trackers := make(map[string]*buildStateTracker, len(a.states))
for k, v := range a.states {
trackers[k] = v
}
a.statesMu.RUnlock()
return trackers
}
func (a *Agent) sendMerge(_ *buffer.Batch[struct{}]) error {
trackers := a.snapStates()
builds := make(map[string]*bbpb.Build, len(trackers))
stepCount := 0
for k, v := range trackers {
build := v.getLatestBuild()
stepCount += len(build.GetSteps())
builds[k] = build
}
base := reflectutil.ShallowCopy(a.baseBuild).(*bbpb.Build)
base.Steps = nil
if stepCount > 0 {
base.Steps = make([]*bbpb.Step, 0, stepCount)
}
var insertSteps func(stepNS []string, streamURL string, fromSubBuild bool) *bbpb.Build
insertSteps = func(stepNS []string, streamURL string, fromSubBuild bool) *bbpb.Build {
build, ok := builds[streamURL]
if !ok {
return nil
}
for _, step := range build.GetSteps() {
mb := step.GetMergeBuild()
mergeStream := mb.GetFromLogdogStream()
if mergeStream != "" || len(stepNS) > 0 || fromSubBuild {
step = proto.Clone(step).(*bbpb.Step)
}
baseName := step.Name
if len(stepNS) > 0 {
step.Name = strings.Join(append(stepNS, step.Name), "|")
}
base.Steps = append(base.Steps, step)
if mergeStream != "" {
var subNamespace []string
if !mb.LegacyGlobalNamespace {
subNamespace = append(stepNS, baseName)
}
subBuild := insertSteps(subNamespace, mergeStream, true)
if subBuild == nil {
var sb strings.Builder
if step.SummaryMarkdown != "" {
sb.WriteString(step.SummaryMarkdown)
sb.WriteString("\n\n")
}
if _, ok := builds[mergeStream]; ok {
sb.WriteString(fmt.Sprintf("build.proto stream: %q is empty", mergeStream))
} else {
sb.WriteString(fmt.Sprintf("build.proto stream: %q is not registered", mergeStream))
}
step.SummaryMarkdown = sb.String()
} else {
updateStepFromBuild(step, subBuild)
if mb.LegacyGlobalNamespace {
updateBuildFromGlobalSubBuild(build, subBuild)
}
}
}
}
return build
}
updateBaseFromUserBuild(base, insertSteps(nil, a.userRootURL, false))
select {
case a.mergedBuildC <- base:
case <-a.ctx.Done():
a.Close()
}
return nil
}
func (a *Agent) finalize() {
close(a.mergedBuildC)
}
func (a *Agent) collectingData() bool {
return atomic.LoadInt32(&a.done) == 0
}
// Used for minting protobuf timestamps for buildStateTrackers
func (a *Agent) clockNow() *timestamppb.Timestamp {
ret, err := ptypes.TimestampProto(clock.Now(a.ctx))
if err != nil {
panic(err)
}
return ret
}