blob: 4bd1c5263f48652cd10b035fe12acf8fa87ae678 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package recorder
import (
"context"
"time"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/grpc/prpc"
"go.chromium.org/luci/server"
"go.chromium.org/luci/resultdb/internal"
"go.chromium.org/luci/resultdb/internal/artifactcontent"
"go.chromium.org/luci/resultdb/internal/services/artifactexporter"
"go.chromium.org/luci/resultdb/internal/spanutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// recorderServer implements pb.RecorderServer.
//
// It does not return gRPC-native errors; use DecoratedRecorder with
// internal.CommonPostlude.
type recorderServer struct {
*Options
// casClient is an instance of ContentAddressableStorageClient which is used for
// artifact batch operations.
casClient repb.ContentAddressableStorageClient
// bqExportClient is used for exporting artifacts to BigQuery.
bqExportClient BQExportClient
}
// Options is recorder server configuration.
type Options struct {
// Duration since invocation creation after which to delete expected test
// results.
ExpectedResultsExpiration time.Duration
// ArtifactRBEInstance is the name of the RBE instance to use for artifact
// storage. Example: "projects/luci-resultdb/instances/artifacts".
ArtifactRBEInstance string
// MaxArtifactContentStreamLength is the maximum size of an artifact content
// to accept via streaming API.
MaxArtifactContentStreamLength int64
}
// InitServer initializes a recorder server.
func InitServer(srv *server.Server, opt Options) error {
conn, err := artifactcontent.RBEConn(srv.Context)
if err != nil {
return err
}
bqClient, err := artifactexporter.NewClient(srv.Context, srv.Options.CloudProject)
if err != nil {
return errors.Annotate(err, "create bq export client").Err()
}
srv.RegisterCleanup(func(ctx context.Context) {
err := bqClient.Close()
if err != nil {
logging.Errorf(ctx, "Cleaning up BigQuery export client: %s", err)
}
})
pb.RegisterRecorderServer(srv, NewRecorderServer(opt, repb.NewContentAddressableStorageClient(conn), bqClient))
// TODO(crbug/1082369): Remove this workaround once field masks can be decoded.
srv.ConfigurePRPC(func(p *prpc.Server) {
p.HackFixFieldMasksForJSON = true
})
srv.RegisterUnaryServerInterceptors(spanutil.SpannerDefaultsInterceptor(sppb.RequestOptions_PRIORITY_HIGH))
return installArtifactCreationHandler(srv, &opt, conn)
}
func NewRecorderServer(opts Options, casClient repb.ContentAddressableStorageClient, bqClient BQExportClient) *pb.DecoratedRecorder {
return &pb.DecoratedRecorder{
Service: &recorderServer{
Options: &opts,
casClient: casClient,
bqExportClient: bqClient,
},
Postlude: internal.CommonPostlude,
}
}
// installArtifactCreationHandler installs artifact creation handler.
func installArtifactCreationHandler(srv *server.Server, opt *Options, rbeConn *grpc.ClientConn) error {
if opt.ArtifactRBEInstance == "" {
return errors.Reason("opt.ArtifactRBEInstance is required").Err()
}
bs := bytestream.NewByteStreamClient(rbeConn)
ach := &artifactCreationHandler{
RBEInstance: opt.ArtifactRBEInstance,
NewCASWriter: func(ctx context.Context) (bytestream.ByteStream_WriteClient, error) {
return bs.Write(ctx)
},
MaxArtifactContentStreamLength: opt.MaxArtifactContentStreamLength,
}
// Ideally we define more specific routes, but
// "github.com/julienschmidt/httprouter" does not support routes over
// unescaped paths: https://github.com/julienschmidt/httprouter/issues/208
srv.Routes.PUT("invocations/*rest", nil, ach.Handle)
return nil
}