blob: cafae45916e9afbbfe7b8d901c961d6fc19bc898 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain b copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package buildmerge implements the build.proto tracking and merging logic for
// luciexe host applications.
// You probably want to use `` instead.
// This package is separate from luciexe/host to avoid unnecessary entaglement
// with butler/logdog; All the logic here is implemented to avoid:
// * interacting with the environment
// * interacting with butler/logdog (except by implementing callbacks for
// those, but only acting on simple datastructures/proto messages)
// * handling errors in any 'brutal' ways (all errors in this package are
// handled by reporting them directly in the data structures that this
// package manipulates).
// This is done to simplify testing (as much as it can be) by concentrating all
// the environment stuff into luciexe/host, and all the 'pure' functional stuff
// here (search "imperative shell, functional core").
package buildmerge
import (
bbpb ""
// CalcURLFn is a stateless function which can calculate the absolute url and
// viewUrl from a given logdog namespace (with trailing slash) and streamName.
type CalcURLFn func(namespaceSlash, streamName types.StreamName) (url, viewUrl string)
// Agent holds all the logic around merging build.proto streams.
type Agent struct {
// MergedBuildC is the channel of all the merged builds generated by this
// Agent.
// The rate at which Agent merges Builds is governed by the consumption of
// this channel; Consuming it slowly will have Agent merge less frequently,
// and consuming it rapidly will have Agent merge more frequently.
// The last build before the channel closes will always be the final state of
// all builds at the time this Agent was Close()'d.
MergedBuildC <-chan *bbpb.Build
// Wait on this channel for the Agent to drain. Will only drain after calling
// Close() at least once.
DrainC <-chan struct{}
// used to cancel in-progress sendMerge calls.
ctx context.Context
// mergedBuildC is the send side of MergedBuildC
mergedBuildC chan<- *bbpb.Build
// userNamespace is the logdog namespace (with a trailing slash) which we'll
// use to determine if a new stream is potentially monitored, or not.
userNamespace types.StreamName
// userRootURL is the full url ('logdog://.../stream/build.proto') of the
// user's "root" build.proto stream (i.e. the one emitted by the top level
// luciexe implementation.
// This is used as a key to start the merge process.
userRootURL string
baseBuild *bbpb.Build
// statesMu covers `states`. It must be held when reading or writing to
// `states`, but doesn't need to be held while interacting with an individual
// *buildState obtained from the map.
statesMu sync.RWMutex
// states maps a stream URL (i.e. `logdog://.../stream/build.proto`) to the
// state tracker for that stream.
states map[string]*buildStateTracker
// mergeCh is used in production mode to send pings via informNewData
mergeCh dispatcher.Channel
// informNewData is used to 'ping' mergeCh; it's overwritten in tests.
informNewData func()
// done is an atomically-accessed boolean
done int32
// calculateURLs is a function which can convert a logdog namespace and
// streamname into both the full 'Url' and 'ViewUrl' values for a Log message.
// This is used by the buildMerger itself when deriving keys for the `states`
// map, as well as for individual buildState objects to adjust their build's
// logs' URLs.
calculateURLs CalcURLFn
// New returns a new Agent.
// Args:
// * ctx - used for logging, clock and cancelation. When canceled, the Agent
// will cease sending updates on MergedBuildC, but you must still invoke
// Agent.Close() in order to clean up all resources associated with the
// Agent.
// * userNamespace - The logdog namespace (with a trailing slash) under which
// we should monitor streams.
// * base - The "model" Build message that all generated builds should start
// with. All build proto streams will be merged onto a copy of this message.
// Any Output.Log's which have non-absolute URLs will have their Url and
// ViewUrl absolutized relative to userNamespace using calculateURLs.
// * calculateURLs - A function to calculate Log.Url and Log.ViewUrl values.
// Should be a pure function.
// The following fields will be merged into `base` from the user controlled
// build.proto stream(s):
// Steps
// SummaryMarkdown
// Status
// StatusDetails
// UpdateTime
// Tags
// EndTime
// Output
// The frequency of updates from this Agent is governed by how quickly the
// caller consumes from Agent.MergedBuildC.
func New(ctx context.Context, userNamespace types.StreamName, base *bbpb.Build, calculateURLs CalcURLFn) *Agent {
userNamespace = userNamespace.AsNamespace()
ch := make(chan *bbpb.Build)
userRootURL, _ := calculateURLs(userNamespace, luciexe.BuildProtoStreamSuffix)
ret := &Agent{
ctx: ctx,
MergedBuildC: ch,
mergedBuildC: ch,
states: map[string]*buildStateTracker{},
calculateURLs: calculateURLs,
userNamespace: userNamespace,
userRootURL: userRootURL,
baseBuild: proto.Clone(base).(*bbpb.Build),
for _, log := range ret.baseBuild.GetOutput().GetLogs() {
u, err := url.Parse(log.Url)
if err == nil && u.Scheme == "" {
log.Url, log.ViewUrl = calculateURLs(
userNamespace, types.StreamName(log.Url))
var err error
ret.mergeCh, err = dispatcher.NewChannel(ctx, &dispatcher.Options{
QPSLimit: rate.NewLimiter(rate.Inf, 1),
Buffer: buffer.Options{
MaxLeases: 1,
BatchSize: 1,
FullBehavior: &buffer.DropOldestBatch{},
DropFn: dispatcher.DropFnQuiet,
DrainedFn: ret.finalize,
}, ret.sendMerge)
if err != nil {
panic(err) // creating dispatcher with static config should never fail
ret.informNewData = func() {
ret.mergeCh.C <- nil // content doesn't matter
ret.DrainC = ret.mergeCh.DrainC
return ret
// Attach should be called once to attach this to a Butler.
// This must be done before the butler receives any build.proto streams.
func (a *Agent) Attach(b *butler.Butler) {
b.AddStreamRegistrationCallback(a.onNewStream, true)
func (a *Agent) onNewStream(desc *logpb.LogStreamDescriptor) butler.StreamChunkCallback {
if !a.collectingData() {
return nil
namespace, base := types.StreamName(desc.Name).Split()
if !strings.HasPrefix(desc.Name, string(a.userNamespace)) || base != luciexe.BuildProtoStreamSuffix {
// outside our monitored prefix, or not a ".../build.proto" stream
return nil
var err error
zlib := false
switch desc.ContentType {
case luciexe.BuildProtoContentType:
case luciexe.BuildProtoZlibContentType:
zlib = true
err = errors.Reason("stream %q has content type %q, expected %q", desc.Name, desc.ContentType, luciexe.BuildProtoContentType).Err()
if err == nil && desc.StreamType != logpb.StreamType_DATAGRAM {
err = errors.Reason("stream %q has type %q, expected %q", desc.Name, desc.StreamType, logpb.StreamType_DATAGRAM).Err()
url, _ := a.calculateURLs("", types.StreamName(desc.Name))
bState := newBuildStateTracker(a.ctx, a, namespace, zlib, err)
defer a.statesMu.Unlock()
a.states[url] = bState
return bState.handleNewData
// Close causes the Agent to stop collecting data, emit a final merged build,
// and then shut down all internal routines.
func (a *Agent) Close() {
// stops accepting new trackers
if atomic.SwapInt32(&a.done, 1) == 1 {
// close all states' and process their final work items. Closure should be
// very quick and will activate all final processing in parallel. GetFinal
// ensures that the state is completely settled.
states := a.snapStates()
for _, t := range states {
for _, t := range states {
// tells our merge Channel to process all the current (now-final) states one
// last time.
// shut down the mergeCh so it will no longer accept new informNewData calls.
func (a *Agent) snapStates() map[string]*buildStateTracker {
trackers := make(map[string]*buildStateTracker, len(a.states))
for k, v := range a.states {
trackers[k] = v
return trackers
func (a *Agent) sendMerge(_ *buffer.Batch) error {
trackers := a.snapStates()
validStates := make(map[string]*buildState, len(trackers))
stepCount := 0
for k, v := range trackers {
if state := v.getLatest(); !state.invalid {
stepCount += len(
validStates[k] = state
base := *a.baseBuild // shallow copy; we only assign to top level fields
base.Steps = nil
if stepCount > 0 {
base.Steps = make([]*bbpb.Step, 0, stepCount)
var insertSteps func(stepNS []string, streamURL string) *bbpb.Build
insertSteps = func(stepNS []string, streamURL string) *bbpb.Build {
state, ok := validStates[streamURL]
if !ok {
return nil
for _, step := range {
isMergeStep := luciexe.IsMergeStep(step)
if isMergeStep || len(stepNS) > 0 {
// make a shallow copy, much cheaper than proto.Clone
stepVal := *step
step = &stepVal
baseName := step.Name
if len(stepNS) > 0 {
step.Name = strings.Join(append(stepNS, step.Name), "|")
base.Steps = append(base.Steps, step)
if isMergeStep {
subBuild := insertSteps(append(stepNS, baseName), step.Logs[0].Url)
updateStepFromBuild(step, subBuild)
updateBaseFromUserBuild(&base, insertSteps(nil, a.userRootURL))
select {
case a.mergedBuildC <- &base:
case <-a.ctx.Done():
return nil
func (a *Agent) finalize() {
func (a *Agent) collectingData() bool {
return atomic.LoadInt32(&a.done) == 0
// Used for minting protobuf timestamps for buildStateTrackers
func (a *Agent) clockNow() *timestamp.Timestamp {
ret, err := ptypes.TimestampProto(clock.Now(a.ctx))
if err != nil {
return ret