blob: a939090368e03130031dece57b39e15b3920caa7 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package cli
import (
pb ""
sinkpb ""
var matchInvalidInvocationIDChars = regexp.MustCompile(`[^a-z0-9_\-:.]`)
const (
// reservePeriodSecs is how many seconds should be reserved for `rdb stream` to
// complete (out of a grace period), the rest can be given to the payload.
reservePeriodSecs = 3
// MustReturnInvURL returns a string of the Invocation URL.
func MustReturnInvURL(rdbHost, invName string) string {
invID, err := pbutil.ParseInvocationName(invName)
if err != nil {
miloHost := chromeinfra.MiloDevHost
if rdbHost == chromeinfra.ResultDBHost {
miloHost = chromeinfra.MiloHost
return fmt.Sprintf("https://%s/ui/inv/%s", miloHost, invID)
func cmdStream(p Params) *subcommands.Command {
return &subcommands.Command{
UsageLine: `stream [flags] TEST_CMD [TEST_ARG]...`,
ShortDesc: "Run a given test command and upload the results to ResultDB",
// TODO( add a link to ResultSink protocol doc
LongDesc: text.Doc(`
Run a given test command, continuously collect the results over IPC, and
upload them to ResultDB. Either use the current invocation from
LUCI_CONTEXT or create/finalize a new one. Example:
rdb stream -new -realm chromium:public ./out/chrome/test/browser_tests
CommandRun: func() subcommands.CommandRun {
r := &streamRun{
vars: make(map[string]string),
tags: make(strpair.Map),
r.Flags.BoolVar(&r.isNew, "new", false, text.Doc(`
If true, create and use a new invocation for the test command.
If false, use the current invocation, set in LUCI_CONTEXT.
r.Flags.BoolVar(&r.isIncluded, "include", false, text.Doc(`
If true with -new, the new invocation will be included
in the current invocation, set in LUCI_CONTEXT.
r.Flags.StringVar(&r.realm, "realm", "", text.Doc(`
Realm to create the new invocation in. Required if -new is set,
ignored otherwise.
e.g. "chromium:public"
r.Flags.StringVar(&r.testIDPrefix, "test-id-prefix", "", text.Doc(`
Prefix to prepend to the test ID of every test result.
r.Flags.Var(flag.StringMap(r.vars), "var", text.Doc(`
Variant to add to every test result in "key:value" format.
If the test command adds a variant with the same key, the value given by
this flag will get overridden.
r.Flags.UintVar(&r.artChannelMaxLeases, "max-concurrent-artifact-uploads",
sink.DefaultArtChannelMaxLeases, text.Doc(`
The maximum number of goroutines uploading artifacts.
r.Flags.UintVar(&r.trChannelMaxLeases, "max-concurrent-test-result-uploads",
sink.DefaultTestResultChannelMaxLeases, text.Doc(`
The maximum number of goroutines uploading test results.
r.Flags.StringVar(&r.testTestLocationBase, "test-location-base", "", text.Doc(`
File base to prepend to the test location file name, if the file name is a relative path.
It must start with "//".
r.Flags.Var(flag.StringPairs(r.tags), "tag", text.Doc(`
Tag to add to every test result in "key:value" format.
A key can be repeated.
r.Flags.BoolVar(&r.coerceNegativeDuration, "coerce-negative-duration",
false, text.Doc(`
If true, all negative durations will be coerced to 0.
If false, test results with negative durations will be rejected.
r.Flags.StringVar(&r.locTagsFile, "location-tags-file", "", text.Doc(`
Path to the file that contains test location tags in JSON format. See
r.Flags.BoolVar(&r.exonerateUnexpectedPass, "exonerate-unexpected-pass",
false, text.Doc(`
If true, any unexpected pass result will be exonerated.
return r
type streamRun struct {
// flags
isNew bool
isIncluded bool
realm string
testIDPrefix string
testTestLocationBase string
vars map[string]string
artChannelMaxLeases uint
trChannelMaxLeases uint
tags strpair.Map
coerceNegativeDuration bool
locTagsFile string
exonerateUnexpectedPass bool
// TODO(ddoman): add flags
// - invocation-tag
// - log-file
invocation *lucictx.ResultDBInvocation
func (r *streamRun) validate(ctx context.Context, args []string) (err error) {
if len(args) == 0 {
return errors.Reason("missing a test command to run").Err()
if err := pbutil.ValidateVariant(&pb.Variant{Def: r.vars}); err != nil {
return errors.Annotate(err, "invalid variant").Err()
if r.realm != "" {
if err := realms.ValidateRealmName(r.realm, realms.GlobalScope); err != nil {
return errors.Annotate(err, "invalid realm").Err()
return nil
func (r *streamRun) Run(a subcommands.Application, args []string, env subcommands.Env) (ret int) {
ctx := cli.GetContext(a, r, env)
if err := r.validate(ctx, args); err != nil {
return r.done(err)
loginMode := auth.OptionalLogin
// login is required only if it creates a new invocation.
if r.isNew {
if r.realm == "" {
return r.done(errors.Reason("-realm is required for new invocations").Err())
loginMode = auth.SilentLogin
if err := r.initClients(ctx, loginMode); err != nil {
return r.done(err)
// if -new is passed, create a new invocation. If not, use the existing one set in
// lucictx.
switch {
case r.isNew:
if r.isIncluded && r.resultdbCtx == nil {
return r.done(errors.Reason("missing an invocation in LUCI_CONTEXT, but -include was given").Err())
newInv, err := r.createInvocation(ctx, r.realm)
if err != nil {
return r.done(err)
fmt.Fprintf(os.Stderr, "rdb-stream: created invocation - %s\n", MustReturnInvURL(, newInv.Name))
if r.isIncluded {
curInv := r.resultdbCtx.CurrentInvocation
if err := r.includeInvocation(ctx, curInv, &newInv); err != nil {
if ferr := r.finalizeInvocation(ctx); ferr != nil {
logging.Errorf(ctx, "failed to finalize the invocation: %s", ferr)
return r.done(err)
fmt.Fprintf(os.Stderr, "rdb-stream: included %q in %q\n", newInv.Name, curInv.Name)
// Update lucictx with the new invocation.
r.invocation = &newInv
ctx = lucictx.SetResultDB(ctx, &lucictx.ResultDB{
CurrentInvocation: r.invocation,
case r.isIncluded:
return r.done(errors.Reason("-new is required for -include").Err())
case r.resultdbCtx == nil:
return r.done(errors.Reason("missing an invocation in LUCI_CONTEXT; use -new to create a new one").Err())
if err := r.validateCurrentInvocation(); err != nil {
return r.done(err)
r.invocation = r.resultdbCtx.CurrentInvocation
defer func() {
// Finalize the invocation if it was created by -new.
if r.isNew {
if err := r.finalizeInvocation(ctx); err != nil {
logging.Errorf(ctx, "failed to finalize the invocation: %s", err)
ret = r.done(err)
fmt.Fprintf(os.Stderr, "rdb-stream: finalized invocation - %s\n", MustReturnInvURL(, r.invocation.Name))
err := r.runTestCmd(ctx, args)
ec, ok := exitcode.Get(err)
if !ok {
logging.Errorf(ctx, "rdb-stream: failed to run the test command: %s", err)
return r.done(err)
logging.Infof(ctx, "rdb-stream: exiting with %d", ec)
return ec
func (r *streamRun) runTestCmd(ctx context.Context, args []string) error {
cmdCtx, cancelCmd := lucictx.TrackSoftDeadline(ctx, reservePeriodSecs*time.Second)
defer cancelCmd()
cmd := exec.CommandContext(cmdCtx, args[0], args[1:]...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Interrupt the subprocess if rdb-stream is interrupted or the deadline
// approaches.
// If it does not finish before the grace period expires, it will be
// SIGKILLed by the the expiration of cmdCtx.
go func() {
evt := <-lucictx.SoftDeadlineDone(cmdCtx)
if cmd.ProcessState == nil || cmd.ProcessState.Exited() {
// No process is running. Do nothing.
switch evt {
case lucictx.ClosureEvent:
// This should almost never happen, if cmdCtx is already done
// (as implied by this event), then the process would have been
// killed, and we would have exited above.
// It is possible that the process is currently being killed,
// so do nothing.
logging.Infof(ctx, "Caught %s", evt.String())
if err := terminate(ctx, cmd); err != nil {
logging.Warningf(ctx, "Could not terminate subprocess (%s), cancelling its context", err)
logging.Infof(ctx, "Sent termination signal to subprocess, it has ~%s to terminate", lucictx.GetDeadline(cmdCtx).GracePeriodDuration())
locationTags, err := r.getLocationTags(ctx)
if err != nil {
return errors.Annotate(err, "get location tags").Err()
// TODO(ddoman): send the logs of SinkServer to --log-file
cfg := sink.ServerConfig{
ArtChannelMaxLeases: r.artChannelMaxLeases,
ArtifactStreamClient: r.http,
Recorder: r.recorder,
TestResultChannelMaxLeases: r.trChannelMaxLeases,
Invocation: r.invocation.Name,
UpdateToken: r.invocation.UpdateToken,
BaseTags: pbutil.FromStrpairMap(r.tags),
BaseVariant: &pb.Variant{Def: r.vars},
CoerceNegativeDuration: r.coerceNegativeDuration,
LocationTags: locationTags,
TestLocationBase: r.testTestLocationBase,
TestIDPrefix: r.testIDPrefix,
ExonerateUnexpectedPass: r.exonerateUnexpectedPass,
return sink.Run(ctx, cfg, func(ctx context.Context, cfg sink.ServerConfig) error {
exported, err := lucictx.Export(ctx)
if err != nil {
return err
defer func() {
logging.Infof(ctx, "rdb-stream: the test process terminated")
logging.Infof(ctx, "rdb-stream: starting the test command - %q", cmd.Args)
if err := cmd.Start(); err != nil {
return errors.Annotate(err, "cmd.start").Err()
return cmd.Wait()
func (r *streamRun) getLocationTags(ctx context.Context) (*sinkpb.LocationTags, error) {
if r.locTagsFile == "" {
return nil, nil
f, err := ioutil.ReadFile(r.locTagsFile)
switch {
case os.IsNotExist(err):
logging.Warningf(ctx, "rdb-stream: %s doesn not exist", r.locTagsFile)
return nil, nil
case err != nil:
return nil, err
locationTags := &sinkpb.LocationTags{}
if err = protojson.Unmarshal(f, locationTags); err != nil {
return nil, err
return locationTags, nil
func (r *streamRun) createInvocation(ctx context.Context, realm string) (ret lucictx.ResultDBInvocation, err error) {
invID, err := GenInvID(ctx)
if err != nil {
md := metadata.MD{}
resp, err := r.recorder.CreateInvocation(ctx, &pb.CreateInvocationRequest{
InvocationId: invID,
Invocation: &pb.Invocation{
Realm: realm,
}, grpc.Header(&md))
if err != nil {
err = errors.Annotate(err, "failed to create an invocation").Err()
tks := md.Get(recorder.UpdateTokenMetadataKey)
if len(tks) == 0 {
err = errors.Reason("Missing header: update-token").Err()
ret = lucictx.ResultDBInvocation{Name: resp.Name, UpdateToken: tks[0]}
func (r *streamRun) includeInvocation(ctx context.Context, parent, child *lucictx.ResultDBInvocation) error {
ctx = metadata.AppendToOutgoingContext(ctx, recorder.UpdateTokenMetadataKey, parent.UpdateToken)
_, err := r.recorder.UpdateIncludedInvocations(ctx, &pb.UpdateIncludedInvocationsRequest{
IncludingInvocation: parent.Name,
AddInvocations: []string{child.Name},
return err
// finalizeInvocation finalizes the invocation.
func (r *streamRun) finalizeInvocation(ctx context.Context) error {
ctx = metadata.AppendToOutgoingContext(ctx, recorder.UpdateTokenMetadataKey, r.invocation.UpdateToken)
_, err := r.recorder.FinalizeInvocation(ctx, &pb.FinalizeInvocationRequest{
Name: r.invocation.Name,
return err
// GenInvID generates an invocation ID, made of the username, the current timestamp
// in a human-friendly format, and a random suffix.
// This can be used to generate a random invocation ID, but the creator and creation time
// can be easily found.
func GenInvID(ctx context.Context) (string, error) {
whoami, err := user.Current()
if err != nil {
return "", err
bytes := make([]byte, 8)
if _, err := mathrand.Read(ctx, bytes); err != nil {
return "", err
username := strings.ToLower(whoami.Username)
username = matchInvalidInvocationIDChars.ReplaceAllString(username, "")
suffix := strings.ToLower(fmt.Sprintf(
"%s-%s", time.Now().UTC().Format("2006-01-02-15-04-00"),
// Note: cannot use base64 because not all of its characters are allowed
// in invocation IDs.
// An invocation ID can contain up to 100 ascii characters that conform to the regex,
return fmt.Sprintf("u-%.*s-%s", 100-len(suffix), username, suffix), nil