blob: 86cd1ab26e9b6980ca4567ffc3e385957e2f2918 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package build
import (
"context"
"fmt"
"io"
"strings"
"sync"
"google.golang.org/protobuf/types/known/timestamppb"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/buildbucket/protoutil"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/system/environ"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/client/butlerlib/streamclient"
"go.chromium.org/luci/logdog/client/butlerlib/streamproto"
ldTypes "go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/luciexe"
)
// Step represents the state of a single step.
//
// This is properly initialized by the StartStep and ScheduleStep functions.
type Step struct {
// NOTE: I think it should be possible to remove `ctx` and `state` from Step
// here, relying on passing `ctx` to a few more methods.
//
// State is needed because of Step.mutate - the Step needs a link back to the
// whole build state to allow it to notify that a change to an inner proto
// message (stepPb) has occurred and so the Build should now be published.
ctx context.Context
ctxCloser func()
state *State
// duplicated from stepPb.Name at construction time to avoid need for locks.
// Read-only.
name string
stepPbMu sync.Mutex
stepPb *bbpb.Step
logNamespace string
logSuffix string
logNames nameTracker
logClosers map[string]func() error
loggingStream io.Closer
}
var _ Loggable = (*Step)(nil)
// wasWritten is a very basic Writer wrapper which tracks if Write was ever
// called.
//
// Used to elide the `log` log from steps where it was never used.
type wasWritten struct {
w io.Writer
written bool
}
var _ io.Writer = (*wasWritten)(nil)
func (w *wasWritten) Write(b []byte) (int, error) {
w.written = true
return w.w.Write(b)
}
func (w *wasWritten) wasWritten() bool {
return w.written
}
// StartStep adds a new step to the build.
//
// The step will have a "STARTED" status with a StartTime.
//
// If `name` contains `|` this function will panic, since this is a reserved
// character for delimiting hierarchy in steps.
//
// Duplicate step names will be disambiguated by appending " (N)" for the 2nd,
// 3rd, etc. duplicate.
//
// The returned context has the following changes:
//
// 1. It contains the returned *Step as the current step, which
// means that calling the package-level StartStep/ScheduleStep on it will
// create sub-steps of this one.
// 2. The returned context also has an updated `environ.FromCtx` containing
// a unique $LOGDOG_NAMESPACE value. If you launch a subprocess, you
// should use this environment to correctly namespace any logdog log
// streams your subprocess attempts to open.
// Using `go.chromium.org/luci/luciexe/build/exec` does this automatically.
// 3. `go.chromium.org/luci/common/logging` is wired up to a new step log
// stream called "log".
//
// You MUST call Step.End. To automatically map errors and panics to their
// correct visual representation, End the Step like:
//
// var err error
// step, ctx := build.StartStep(ctx, "Step name")
// defer func() { step.End(err) }()
//
// err = opThatErrsOrPanics(ctx)
//
// NOTE: A panic will still crash the program as usual. This does NOT
// `recover()` the panic. Please use conventional Go error handling and control
// flow mechanisms.
func StartStep(ctx context.Context, name string) (*Step, context.Context) {
return getCurrentStep(ctx).StartStep(ctx, name)
}
// ScheduleStep is like StartStep, except that it leaves the new step in the
// SCHEDULED status, and does not set a StartTime.
//
// The step will move to STARTED when calling any other methods on
// the Step, when creating a sub-Step, or if you explicitly call
// Step.Start().
func ScheduleStep(ctx context.Context, name string) (*Step, context.Context) {
return getCurrentStep(ctx).ScheduleStep(ctx, name)
}
// StartStep will create a child step of this one with `name`.
//
// This behaves identically to the package level [StartStep], except that the
// 'current step' is `s` and is not pulled from `ctx. This includes all
// documented behaviors around changes to the returned context.
func (s *Step) StartStep(ctx context.Context, name string) (*Step, context.Context) {
ret, ctx := s.ScheduleStep(ctx, name)
ret.Start()
return ret, ctx
}
// ScheduleStep will create a child step of this one with `name` in the SCHEDULED
// status.
//
// This behaves identically to the package level [ScheduleStep], except that the
// 'current step' is `s` and is not pulled from `ctx. This includes all
// documented behaviors around changes to the returned context.
func (s *Step) ScheduleStep(ctx context.Context, name string) (*Step, context.Context) {
if strings.Contains(name, "|") {
panic(errors.Reason("step name %q contains reserved character `|`", name).Err())
}
ctx, ctxCloser := context.WithCancel(ctx)
if s != nil {
s.Start()
}
// We avoid looking in context for the state if we can help it - however
// calling ScheduleStep on nil is allowed when making a top-level step.
var state *State
if s == nil {
state = getState(ctx)
} else {
state = s.state
}
ret := &Step{
ctx: ctx,
ctxCloser: ctxCloser,
state: state,
logClosers: map[string]func() error{},
}
candidateName := name
if s != nil {
candidateName = fmt.Sprintf("%s|%s", s.name, name)
}
ret.stepPb, ret.logNamespace, ret.logSuffix = ret.state.registerStep(&bbpb.Step{
Name: candidateName,
Status: bbpb.Status_SCHEDULED,
})
ret.name = ret.stepPb.Name
if ls := ret.logsink(); ls == nil {
ctx = logging.SetField(ctx, "build.step", ret.stepPb.Name)
logging.Infof(ctx, "set status: %s", ret.stepPb.Status)
} else {
ret.addLog("log", func(name string, relLdName ldTypes.StreamName) func() error {
var err error
var stream io.WriteCloser
stream, err = ls.NewStream(ret.ctx, relLdName)
if err != nil {
panic(err)
}
// wasWritten allows us to remove the log link if the log wasn't used at
// all.
ww := &wasWritten{w: stream}
// TODO(iannucci): figure out how to preserve log format from context?
ctx = (&gologger.LoggerConfig{Out: ww}).Use(ctx)
// we track this in ret.loggingStream so don't have addLog track it.
ret.loggingStream = stream
return func() error {
if !ww.wasWritten() {
if ret.stepPb.Logs[0].Name != "log" {
// NOTE: We know that the first log is always "log" because it was
// added above in this function before the user had the opportunity
// to add any additional logs.
panic("impossible: first log in step is not called `log`")
}
ret.stepPb.Logs = ret.stepPb.Logs[1:]
}
return nil
}
})
// Each step gets its own logdog namespace "step/X/u". Any subprocesses
// running within this ctx SHOULD use environ.FromCtx to pick this up.
logPrefix := ret.logSuffix
if ret.logNamespace != "" {
logPrefix = fmt.Sprintf("%s/%s", ret.logNamespace, ret.logSuffix)
}
env := environ.FromCtx(ctx)
env.Set(luciexe.LogdogNamespaceEnv, logPrefix+"/u")
ctx = env.SetInCtx(ctx)
}
ret.ctx = ctx
return ret, setCurrentStep(ctx, ret)
}
// End sets the step's final status, according to `err` (See ExtractStatus).
//
// End will also be able to set INFRA_FAILURE status and log additional
// information if the program is panic'ing.
//
// End'ing a Step will Cancel the context associated with this step (returned
// from StartStep or ScheduleStep).
//
// End must be invoked like:
//
// var err error
// step, ctx := build.StartStep(ctx, ...) // or build.ScheduleStep
// defer func() { step.End(err) }()
//
// err = opThatErrsOrPanics()
//
// NOTE: A panic will still crash the program as usual. This does NOT
// `recover()` the panic. Please use conventional Go error handling and control
// flow mechanisms.
func (s *Step) End(err error) {
var message string
s.mutate(func() bool {
s.stepPb.Status, message = computePanicStatus(err)
s.stepPb.EndTime = timestamppb.New(clock.Now(s.ctx))
if s.stepPb.StartTime == nil {
// In case the user scheduled the step, but never Start'd it.
s.stepPb.StartTime = s.stepPb.EndTime
}
for logName, closer := range s.logClosers {
if err := closer(); err != nil {
logging.Warningf(s.ctx, "error closing log %q: %s", logName, err)
}
}
s.logClosers = nil
return true
})
// stepPb is immutable after mutate ends, so we should be fine to access it
// outside the locks.
if s.logsink() == nil || s.stepPb.Status != bbpb.Status_SUCCESS {
// If we're panicking, we need to log. In a situation where we have a log
// sink (i.e. a real build), all other information is already reflected via
// the Build message itself.
logStatus(s.ctx, s.stepPb.Status, message, s.stepPb.SummaryMarkdown)
}
if s.loggingStream != nil {
s.loggingStream.Close()
}
s.ctxCloser()
}
// addLog adds a new Log entry to this Step.
//
// `name` is the user-provided name for the log.
//
// `openStream` is a callback which takes
// - `dedupedName` - the deduplicated version of `name`
// - `relLdName` - The logdog stream name, relative to this process'
// LOGDOG_NAMESPACE, suitable for use with s.state.logsink.
func (s *Step) addLog(name string, openStream func(dedupedName string, relLdName ldTypes.StreamName) func() error) *bbpb.Log {
var logRef *bbpb.Log
s.mutate(func() bool {
name = s.logNames.resolveName(name)
relLdName := fmt.Sprintf("%s/log/%d", s.logSuffix, len(s.stepPb.Logs))
logRef = &bbpb.Log{
Name: name,
Url: relLdName,
}
s.stepPb.Logs = append(s.stepPb.Logs, logRef)
if closer := openStream(name, ldTypes.StreamName(relLdName)); closer != nil {
s.logClosers[relLdName] = closer
}
return true
})
return logRef
}
// Log creates a new step-level line-oriented text log stream with the given name.
// Returns a Log value which can be written to directly, but also provides additional
// information about the log itself.
//
// The stream will close when the step is End'd.
func (s *Step) Log(name string, opts ...streamclient.Option) *Log {
ls := s.logsink()
var ret io.WriteCloser
var openStream func(string, ldTypes.StreamName) func() error
if ls == nil {
openStream = func(name string, relLdName ldTypes.StreamName) func() error {
if desc, _ := streamclient.RenderOptions(opts...); desc.Type != streamproto.StreamType(logpb.StreamType_TEXT) {
// logpb.StreamType cast is necessary or .String() doesn't work.
typ := logpb.StreamType(desc.Type)
logging.Warningf(s.ctx, "dropping %s log %q", typ, name)
ret = nopStream{}
} else {
ret = makeLoggingWriter(s.ctx, name)
}
return ret.Close
}
} else {
openStream = func(name string, relLdName ldTypes.StreamName) func() error {
var err error
ret, err = ls.NewStream(s.ctx, relLdName, opts...)
if err != nil {
panic(err)
}
return ret.Close
}
}
var infra *bbpb.BuildInfra_LogDog
if s.state != nil && s.state.Build() != nil {
infra = s.state.Build().GetInfra().GetLogdog()
}
return &Log{
Writer: ret,
ref: s.addLog(name, openStream),
namespace: ldTypes.StreamName(s.logNamespace).AsNamespace(),
infra: infra,
}
}
// LogDatagram creates a new step-level datagram log stream with the given name.
// Each call to WriteDatagram will produce a single datagram message in the
// stream.
//
// The stream will close when the step is End'd.
func (s *Step) LogDatagram(name string, opts ...streamclient.Option) streamclient.DatagramWriter {
ls := s.logsink()
var ret streamclient.DatagramStream
var openStream func(string, ldTypes.StreamName) func() error
if ls == nil {
openStream = func(name string, relLdName ldTypes.StreamName) func() error {
logging.Warningf(s.ctx, "dropping DATAGRAM log %q", name)
ret = nopDatagramStream{}
return ret.Close
}
} else {
openStream = func(name string, relLdName ldTypes.StreamName) func() error {
var err error
ret, err = ls.NewDatagramStream(s.ctx, relLdName, opts...)
if err != nil {
panic(err)
}
return ret.Close
}
}
s.addLog(name, openStream)
return ret
}
func (s *Step) logsink() *streamclient.Client {
if s.state == nil {
return nil
}
return s.state.logsink
}
// mutate gives exclusive access to read+write stepPb
//
// Will panic if stepPb is in a terminal (ended) state.
func (s *Step) mutate(cb func() bool) {
s.state.excludeCopy(func() bool {
s.stepPbMu.Lock()
defer s.stepPbMu.Unlock()
if protoutil.IsEnded(s.stepPb.Status) {
panic(errors.Reason("cannot mutate ended step %q", s.stepPb.Name).Err())
}
modified := false
if s.stepPb.Status == bbpb.Status_SCHEDULED {
s.stepPb.Status = bbpb.Status_STARTED
s.stepPb.StartTime = timestamppb.New(clock.Now(s.ctx))
if s.logsink() == nil {
logging.Infof(s.ctx, "set status: %s", bbpb.Status_STARTED)
}
modified = true
}
if cb != nil {
modified = cb() || modified
}
return modified
})
}