blob: 09b613abf0ebad7244899222392ebe388b4e09a3 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package sink provides a server for aggregating test results and sending them
// to the ResultDB backend.
package sink
import (
pb ""
sinkpb ""
const (
// AuthTokenKey is the key of the HTTP request header where the auth token should
// be specified. Note that the an HTTP request must have the auth token in the header
// with the following format.
// Authorization: ResultSink <auth_token>
AuthTokenKey = "Authorization"
// AuthTokenPrefix is embedded into the value of the Authorization HTTP request header,
// where the auth_token must be present. For the details about the value format of
// the Authoization HTTP request header, find the description of `AuthTokenKey`.
AuthTokenPrefix = "ResultSink"
// maxWarmUpDuration is how long warmUp may wait until the Server is ready.
// Note that that retry.Default stops the iteration ~3m24s after the first try.
// Any value greater than 3m24s won't make sense.
maxWarmUpDuration = 30 * time.Second
// DefaultArtChannelMaxLeases is the default value of ServerConfig.ArtChannelMaxLeases
DefaultArtChannelMaxLeases = 16
// DefaultTestResultChannelMaxLeases is the default value of ServerConfig.TestResultChannelMaxLeases
DefaultTestResultChannelMaxLeases = 4
// ErrCloseBeforeStart is returned by Close(), when it was invoked before the server
// started.
var ErrCloseBeforeStart error = errors.Reason("the server is not started yet").Err()
// ServerConfig defines the parameters of the server.
type ServerConfig struct {
// Recorder is the gRPC client to the Recorder service exposed by ResultDB.
Recorder pb.RecorderClient
// ArtifactStreamClient is an HTTP client to be used for streaming artifacts larger than
// MaxBatchableArtifactSize.
ArtifactStreamClient *http.Client
// ArtifactStreamHost is the hostname of an ResultDB service instance to which
// artifacts are streamed.
ArtifactStreamHost string
// AuthToken is a secret token to expect from clients. If it is "" then it
// will be randomly generated in a secure way.
AuthToken string
// Address is the HTTP address to listen on.
// If empty, the server will use "localhost" with a random port available at
// the time of Server.Start call, and the generated address can be found at
// Server.Config().Address.
Address string
// Invocation is the name of the invocation that test results should append to.
Invocation string
// invocationID is a cached copy of the ID extracted from `Invocation`.
invocationID string
// UpdateToken is the token that allows writes to Invocation.
UpdateToken string
// TestIDPrefix will be prepended to the test_id of each TestResult.
TestIDPrefix string
// BaseVariant will be added to the variant of each TestResult. If there are duplicate
// keys, the variant value given by the test command always wins.
BaseVariant *pb.Variant
// TestLocationBase will be prepended to the Location.FileName of each TestResult.
TestLocationBase string
// ArtChannelMaxLeases specifies that the max lease of the Artifact upload channel.
ArtChannelMaxLeases uint
// TestResultChannelMaxLeases specifies that the max lease of the TestResult upload channel.
TestResultChannelMaxLeases uint
// BaseTags will be added to each TestResult in addition to the original tags that
// the tests were reported with.
BaseTags []*pb.StringPair
// CoerceNegativeDuration specifies whether ReportTestResults() should coerece
// the negative duration of a test result.
// If true, the API will coerce negative durations to 0.
// If false, the API will return an error for negative durations.
CoerceNegativeDuration bool
// LocationTags is a map from a directory to the tags of this directory.
// For each test with test location, look for the location's directory (or
// ancestor directory) in the map and append the directory's tags to the
// test results' tags.
LocationTags *sinkpb.LocationTags
// MaxBatchableArtifactSize is the maximum size of an artifact that can be uploaded
// in a batch.
// Artifacts smaller or equal to this size will be uploaded in a batch, whereas
// greater artifacts will be uploaded in a stream manner.
// Must be < 10MiB, and NewServer panics, otherwise.
MaxBatchableArtifactSize int64
// ExonerateUnexpectedPass is a flag to control if an unexpected pass should
// be exonerated.
ExonerateUnexpectedPass bool
// Validate validates all the config fields.
func (c *ServerConfig) Validate() error {
var err error
isErr := func(e error) bool {
err = e
return e != nil
switch {
case c.ArtifactStreamClient == nil:
return errors.Reason("ArtifactStreamClient: unspecified").Err()
case isErr(pbutil.ValidateStringPairs(c.BaseTags)):
return errors.Annotate(err, "BaseTags").Err()
case isErr(pbutil.ValidateVariant(c.BaseVariant)):
return errors.Annotate(err, "BaseVariant").Err()
case isErr(pbutil.ValidateInvocationName(c.Invocation)):
return errors.Annotate(err, "Invocation").Err()
case c.Recorder == nil:
return errors.Reason("Recorder: unspecified").Err()
case c.UpdateToken == "":
return errors.Reason("UpdateToken: unspecified").Err()
case c.TestIDPrefix != "" && isErr(pbutil.ValidateTestID(c.TestIDPrefix)):
return errors.Annotate(err, "TestIDPrefix").Err()
case c.TestLocationBase != "" && isErr(pbutil.ValidateFilePath(c.TestLocationBase)):
return errors.Annotate(err, "TestLocationBase").Err()
case c.MaxBatchableArtifactSize > 10*1024*1024:
return errors.Reason("MaxBatchableArtifactSize: %d is greater than 10MiB", c.MaxBatchableArtifactSize).Err()
return nil
// Server contains state relevant to the server itself.
// It should always be created by a call to NewServer.
// After a call to Serve(), Server will accept connections on its Port and
// gather test results to send to its Recorder.
type Server struct {
cfg ServerConfig
doneC chan struct{}
httpSrv http.Server
// 1 indicates that the server is starting or has started. 0, otherwise.
started int32
mu sync.Mutex // protects err
err error
// NewServer creates a Server value and populates optional values with defaults.
func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error) {
if err := cfg.Validate(); err != nil {
return nil, errors.Annotate(err, "invalid ServerConfig").Err()
if cfg.AuthToken == "" {
tk, err := genAuthToken(ctx)
if err != nil {
return nil, errors.Annotate(err, "ServerConfig: failed to generate AuthToken").Err()
cfg.AuthToken = tk
if cfg.ArtChannelMaxLeases == 0 {
cfg.ArtChannelMaxLeases = DefaultArtChannelMaxLeases
if cfg.TestResultChannelMaxLeases == 0 {
cfg.TestResultChannelMaxLeases = DefaultTestResultChannelMaxLeases
if cfg.MaxBatchableArtifactSize == 0 {
cfg.MaxBatchableArtifactSize = 2 * 1024 * 1024
// extract the invocation ID from cfg.Invocation so that other modules don't need to
// parse the name to extract ID repeatedly.
cfg.invocationID, _ = pbutil.ParseInvocationName(cfg.Invocation)
s := &Server{
cfg: cfg,
doneC: make(chan struct{}),
return s, nil
// Done returns a channel that is closed when the server terminated and finished processing
// all the ongoing requests.
func (s *Server) Done() <-chan struct{} {
return s.doneC
// Config retrieves the ServerConfig of a previously created Server.
// Use this to retrieve the resolved values of unset optional fields in the
// original ServerConfig.
func (s *Server) Config() ServerConfig {
return s.cfg
// Err returns a server error, explaining the reason of the sink server closed.
// If Done is not yet closed, Err returns nil.
// If Done is closed, Err returns a non-nil error explaining why.
func (s *Server) Err() error {
return s.err
// Run starts a server and runs callback in a context where the server is running.
// The context passed to callback will be cancelled if the server has stopped due to
// critical errors or Close being invoked. The context also has the server's information
// exported into it. If callback finishes, Run will stop the server and return the error
// callback returned.
func Run(ctx context.Context, cfg ServerConfig, callback func(context.Context, ServerConfig) error) (err error) {
s, err := NewServer(ctx, cfg)
if err != nil {
return err
if err := s.Start(ctx); err != nil {
return err
defer func() {
// Run returns nil IFF both of the callback and Shutdown returned nil.
sErr := s.Shutdown(ctx)
if err == nil {
err = sErr
// ensure that the server is ready for serving traffic before executing the callback.
logging.Infof(ctx, "SinkServer: warm-up started")
if err = s.warmUp(ctx); err != nil {
logging.Errorf(ctx, "SinkServer: warm-up failed: %s", err)
return errors.Annotate(err, "warm-up").Err()
logging.Infof(ctx, "SinkServer: warm-up ended")
// It's necessary to create a new context with a new variable. If param `ctx` was
// re-used, the new context would be passed to Shutdown in the deferred function after
// being cancelled.
cbCtx, cancel := context.WithCancel(s.Export(ctx))
defer cancel()
go func() {
select {
case <-s.Done():
// cancel the callback context if the server terminates before callback
// finishes.
case <-ctx.Done():
return callback(cbCtx, cfg)
// Start runs the server.
// On success, Start will return nil, and a subsequent error can be obtained from Err.
func (s *Server) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
return errors.Reason("cannot call Start twice").Err()
// create an HTTP server with a pRPC service.
routes := router.New()
s.httpSrv.Handler = routes
s.httpSrv.BaseContext = func(net.Listener) context.Context { return ctx }
ss, err := newSinkServer(ctx, s.cfg)
if err != nil {
return err
prpc := &prpc.Server{}
prpc.InstallHandlers(routes, nil)
sinkpb.RegisterSinkServer(prpc, ss)
addr := s.cfg.Address
if addr == "" {
// If the config is missing the address, choose a random available port.
addr = "localhost:0"
l, err := net.Listen("tcp", addr)
if err != nil {
s.err = err
return err
if s.cfg.Address == "" {
s.cfg.Address = fmt.Sprint("localhost:", l.Addr().(*net.TCPAddr).Port)
go func() {
defer func() {
// close SinkServer to complete all the outgoing requests before closing doneC
// to send a signal.
closeSinkServer(ctx, ss)
logging.Infof(ctx, "SinkServer: starting HTTP server...")
// No need to close the listener, because http.Serve always closes it on return.
err = s.httpSrv.Serve(l)
logging.Infof(ctx, "SinkServer: HTTP server stopped with %q", err)
// if the reason of the server stopped was due to s.Close or s.Shutdown invoked,
// then s.Err should return nil instead.
if err == http.ErrServerClosed {
err = nil
s.err = err
return nil
// Shutdown gracefully shuts down the server without interrupting any ongoing requests.
// Shutdown works by first closing the listener for incoming requests, and then waiting for
// all the ongoing requests to be processed and pending results to be uploaded. If
// the provided context expires before the shutdown is complete, Shutdown returns
// the context's error.
func (s *Server) Shutdown(ctx context.Context) (err error) {
logging.Infof(ctx, "SinkServer: shutdown started")
defer func() {
if err == nil {
logging.Infof(ctx, "SinkServer: shutdown completed successfully")
} else {
logging.Errorf(ctx, "SinkServer: shutdown failed: %s", err)
if atomic.LoadInt32(&s.started) == 0 {
// hasn't been started
return ErrCloseBeforeStart
if err = s.httpSrv.Shutdown(ctx); err != nil {
select {
case <-s.Done():
// Shutdown always returns nil as long as the server was closed and drained.
case <-ctx.Done():
err = ctx.Err()
// Close stops the server.
// Server processes sinkRequests asynchronously, and Close doesn't guarantee completion
// of all the ongoing requests. After Close returns, wait for Done to be closed to ensure
// that all the pending results have been uploaded.
// It's recommended to use Shutdown() instead of Close with Done.
func (s *Server) Close(ctx context.Context) (err error) {
logging.Infof(ctx, "SinkServer: close started")
defer logging.Infof(ctx, "SinkServer: close completed with %s", err)
if atomic.LoadInt32(&s.started) == 0 {
// hasn't been started
err = ErrCloseBeforeStart
return s.httpSrv.Close()
// Export exports lucictx.ResultSink derived from the server configuration into
// the context.
func (s *Server) Export(ctx context.Context) context.Context {
return lucictx.SetResultSink(ctx, &lucictx.ResultSink{
Address: s.cfg.Address,
AuthToken: s.cfg.AuthToken,
// genAuthToken generates and returns a random auth token.
func genAuthToken(ctx context.Context) (string, error) {
buf := make([]byte, 32)
if _, err := cryptorand.Read(ctx, buf); err != nil {
return "", err
return hex.EncodeToString(buf), nil
// warmUp continuously sends an empty test result to the SinkServer until it receives
// a succeesful response.
// This function is used to ensure that the SinkServer is ready for serving traffic.
// It returns to the caller, if the context times out, it reached the maximum allowed
// retry count (10), or a non-error reponse was returned from the SinkServer.
func (s *Server) warmUp(ctx context.Context) error {
sinkClient := sinkpb.NewSinkPRPCClient(&prpc.Client{
Host: s.cfg.Address,
Options: &prpc.Options{Insecure: true},
ctx = metadata.AppendToOutgoingContext(ctx, AuthTokenKey, authTokenValue(s.cfg.AuthToken))
ctx, cancel := context.WithTimeout(ctx, maxWarmUpDuration)
defer cancel()
return retry.Retry(ctx, retry.Default, func() error {
_, err := sinkClient.ReportTestResults(ctx, &sinkpb.ReportTestResultsRequest{},
// connection errors will happen until the server is ready for serving.
return err
}, nil)