blob: 693abc80dd74fb43e25490311a25e169b4a53176 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package host
import (
"bytes"
"compress/zlib"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"google.golang.org/protobuf/proto"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/logdog/client/butler"
"go.chromium.org/luci/logdog/client/butlerlib/bootstrap"
"go.chromium.org/luci/logdog/client/butlerlib/streamclient"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/luciexe"
"go.chromium.org/luci/luciexe/host/buildmerge"
)
// spy represents an active Spy on a Butler.
//
// Its job is to interpret all of the build.proto streams within the Butler into
// a single, merged, 'build.proto' stream on the Butler. All merged protos are
// also delivered to the MergedBuildC channel, which the owner of this spy
// MUST drain as quickly as possible.
//
// If a protocol violation occurs within the Butler run, the spy will mark the
// merged build as INFRA_FAILURE status, and report the error in the build's
// SummaryMarkdown.
type spy struct {
// MergedBuildC is the channel which sends EVERY merged Build message which
// this spy produces
//
// MergedBuildC will close when the spy is done processing ALL data.
//
// The owner of the spy MUST drain this channel as quickly as possible, or
// it will block the merge build process.
MergedBuildC <-chan *bbpb.Build
// Wait on this channel for the spy to drain. Will only drain after calling
// Close() at least once.
DrainC <-chan struct{}
// Close makes the spy stop processing data, and will cause MergedBuildC to
// close.
//
// Safe to call more than once.
Close func()
// The namespace under which all user build.proto streams are expected.
UserNamespace types.StreamName
}
// spyOn installs a Build spy on the Butler.
//
// Monitors '$LOGDOG_NAMESPACE/u/build.proto' datagram stream for Build
// messages, merges them according to the luciexe protocol, and exports the
// merged Build messages to '$LOGDOG_NAMESPACE/build.proto' as well as
// spy.MergedBuildC.
//
// The spy should be Close()'d once the caller is no longer interested in
// receiving merged builds.
//
// Environment: Observes logdog environment variables to determine base values
// for Build.Log.Url and Build.Log.ViewUrl. Accordingly, this relies on the
// Butler's environment already having been exported.
//
// Side-effect: Opens "$LOGDOG_NAMESPACE/build.proto" datagram stream in Butler
//
// to output merged Build messages.
//
// Side-effect: Exports LOGDOG_NAMESPACE="$LOGDOG_NAMESPACE/u" to the
//
// environment.
func spyOn(ctx context.Context, b *butler.Butler, base *bbpb.Build) (*spy, error) {
curNamespace := types.StreamName(os.Getenv(luciexe.LogdogNamespaceEnv))
ldClient := streamclient.NewLoopback(b, types.StreamName(curNamespace))
// curNamespace is "$LOGDOG_NAMESPACE"
// userNamespace is "$LOGDOG_NAMESPACE/u"
// userNamespaceSlash is "$LOGDOG_NAMESPACE/u/"
userNamespace := curNamespace.AsNamespace() + "u"
if err := os.Setenv(luciexe.LogdogNamespaceEnv, string(userNamespace)); err != nil {
panic(err)
}
builds, err := buildmerge.New(ctx, userNamespace, base, mkURLCalcFn())
if err != nil {
return nil, err
}
fwdChan := teeLogdog(ctx, builds.MergedBuildC, ldClient)
builds.Attach(b)
return &spy{
MergedBuildC: fwdChan,
DrainC: builds.DrainC,
Close: builds.Close,
UserNamespace: types.StreamName(userNamespace).AsNamespace(),
}, nil
}
// teeLogdog tees Build messages to a new "build.proto" datagram stream on the
// given logdog client.
func teeLogdog(ctx context.Context, in <-chan *bbpb.Build, ldClient *streamclient.Client) <-chan *bbpb.Build {
out := make(chan *bbpb.Build)
dgStream, err := ldClient.NewDatagramStream(
ctx, luciexe.BuildProtoStreamSuffix,
streamclient.WithContentType(luciexe.BuildProtoZlibContentType))
if err != nil {
panic(err)
}
go func() {
defer close(out)
defer func() {
if err := dgStream.Close(); err != nil {
panic(err)
}
}()
// keep buf and z between rounds; this means we should be able to "learn"
// how to compress build.proto's between rounds, too, since zlib.Reset()
// keeps the compressor dictionary.
buf := bytes.Buffer{}
z := zlib.NewWriter(&buf)
done := make(chan struct{})
for build := range in {
go func() {
defer func() {
done <- struct{}{}
}()
out <- build
}()
buildData, err := proto.Marshal(build)
if err != nil {
panic(err)
}
buf.Reset()
z.Reset(&buf)
if _, err := z.Write(buildData); err != nil {
panic(err)
}
if err := z.Close(); err != nil {
panic(err)
}
if err := dgStream.WriteDatagram(buf.Bytes()); err != nil {
panic(err)
}
<-done
}
}()
return out
}
func mkURLCalcFn() buildmerge.CalcURLFn {
// TODO(iannucci): This sort of coupling with the environment variables and
// their interpretation is pretty bad. This should be fixed so that URL
// generation is an RPC to Butler instead of string assembly by the user.
host := os.Getenv(bootstrap.EnvCoordinatorHost)
if strings.HasPrefix(host, "file://") {
hostSlash := host
if !strings.HasSuffix(hostSlash, "/") {
hostSlash += "/"
}
viewURLPrefix := filepath.FromSlash(hostSlash)
return func(ns, streamName types.StreamName) (url string, viewURL string) {
fullStreamName := string(ns + streamName)
url = hostSlash + filepath.FromSlash(fullStreamName)
// TODO(iannucci): actually implement strict types.StreamName -> (url,
// filesystem) mapping. Currently ':' is a permitted character, which is
// not legal on Windows file systems. Fortunately stream names must begin
// with an alnum character, so "." and ".." are illegal stream names.
viewURL = viewURLPrefix + filepath.ToSlash(fullStreamName)
return
}
}
project := os.Getenv(bootstrap.EnvStreamProject)
prefix := os.Getenv(bootstrap.EnvStreamPrefix)
urlPrefix := fmt.Sprintf("logdog://%s/%s/%s/+/", host, project, prefix)
viewURLPrefix := fmt.Sprintf("https://%s/logs/%s/%s/+/", host, project, prefix)
return func(ns, streamName types.StreamName) (url string, viewURL string) {
fullStreamName := string(ns + streamName)
url = urlPrefix + fullStreamName
viewURL = viewURLPrefix + fullStreamName
return
}
}