blob: 978b4a9de28928516b10cffb023e6a554f1ed032 [file] [log] [blame]
// Copyright 2024 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package artifactexporter handles uploading artifacts to BigQuery.
// This is the replacement of the legacy artifact exporter in bqexp
package artifactexporter
import (
repb ""
bqpb ""
pb ""
// MaxShardContentSize is the maximum content size in BQ row.
// Artifacts content bigger than this size needs to be sharded.
// Leave 10 KB for other fields, the rest is content.
const MaxShardContentSize = bqutil.RowMaxBytes - 10*1024
// MaxRBECasBatchSize is the batch size limit when we read artifact content.
// TODO(nqmtuan): Call the Capacity API to find out the exact size limit for
// batch operations.
// For now, hardcode to be 2MB. It should be under the limit,
// since BatchUpdateBlobs in BatchCreateArtifacts can handle up to 10MB.
const MaxRBECasBatchSize = 2 * 1024 * 1024 // 2 MB
// ArtifactRequestOverhead is the overhead (in bytes) applying to each artifact
// when calculating the size.
const ArtifactRequestOverhead = 100
type Artifact struct {
InvocationID string
TestID string
ResultID string
ArtifactID string
ContentType string
Size int64
RBECASHash string
TestStatus pb.TestStatus
// ExportArtifactsTask describes how to route export artifact task.
var ExportArtifactsTask = tq.RegisterTaskClass(tq.TaskClass{
ID: "export-artifacts",
Prototype: &taskspb.ExportArtifacts{},
Kind: tq.Transactional,
Queue: "artifactexporter",
RoutingPrefix: "/internal/tasks/artifactexporter", // for routing to "artifactexporter" service
var (
ErrInvalidUTF8 = fmt.Errorf("invalid UTF-8 character")
var (
artifactExportCounter = metric.NewCounter(
"The number of artifacts rows to export to BigQuery, grouped by project and status.",
// The LUCI Project.
// The status of the export.
// Possible values:
// - "success": The export was successful.
// - "failure_input": There was an error with the input artifact
// (e.g. artifact contains invalid UTF-8 character).
// - "failure_bq": There was an error with BigQuery (e.g. throttling, load shedding),
// which made the artifact failed to export.
artifactContentCounter = metric.NewCounter(
"The number of artifacts for a particular content type.",
// The LUCI Project.
// The status of the export.
// Possible values: "text", "nontext", "empty".
// We record the group instead of the actual value to prevent
// the explosion in cardinality.
// Options is artifact exporter configuration.
type Options struct {
// ArtifactRBEInstance is the name of the RBE instance to use for artifact
// storage. Example: "projects/luci-resultdb/instances/artifacts".
ArtifactRBEInstance string
// BQExportClient is the interface for exporting artifacts.
type BQExportClient interface {
InsertArtifactRows(ctx context.Context, rows []*bqpb.TextArtifactRow) error
type artifactExporter struct {
rbecasInstance string
// bytestreamClient is the client to stream big artifacts from RBE-CAS.
bytestreamClient bytestream.ByteStreamClient
// casClient is used to read artifacts in batch.
casClient repb.ContentAddressableStorageClient
// Client to export to BigQuery.
bqExportClient BQExportClient
// InitServer initializes a artifactexporter server.
func InitServer(srv *server.Server, opts Options) error {
if opts.ArtifactRBEInstance == "" {
return errors.Reason("No rbe instance specified").Err()
conn, err := artifactcontent.RBEConn(srv.Context)
if err != nil {
return err
bqClient, err := NewClient(srv.Context, srv.Options.CloudProject)
if err != nil {
return errors.Annotate(err, "create bq export client").Err()
srv.RegisterCleanup(func(ctx context.Context) {
err := conn.Close()
if err != nil {
logging.Errorf(ctx, "Cleaning up artifact RBE connection: %s", err)
err = bqClient.Close()
if err != nil {
logging.Errorf(ctx, "Cleaning up BigQuery export client: %s", err)
ae := &artifactExporter{
rbecasInstance: opts.ArtifactRBEInstance,
bytestreamClient: bytestream.NewByteStreamClient(conn),
casClient: repb.NewContentAddressableStorageClient(conn),
bqExportClient: bqClient,
ExportArtifactsTask.AttachHandler(func(ctx context.Context, msg proto.Message) error {
task := msg.(*taskspb.ExportArtifacts)
return ae.exportArtifacts(ctx, invocations.ID(task.InvocationId))
return nil
// Schedule schedules tasks for all the finalized invocations.
func Schedule(ctx context.Context, invID invocations.ID) error {
return tq.AddTask(ctx, &tq.Task{
Title: string(invID),
Payload: &taskspb.ExportArtifacts{InvocationId: string(invID)},
// exportArtifact reads all text artifacts (including artifact content
// in RBE-CAS) for an invocation and exports to BigQuery.
func (ae *artifactExporter) exportArtifacts(ctx context.Context, invID invocations.ID) error {
logging.Infof(ctx, "Exporting artifacts for invocation ID %s", invID)
shouldUpload, err := shouldUploadToBQ(ctx)
if err != nil {
return errors.Annotate(err, "getting config").Err()
if !shouldUpload {
logging.Infof(ctx, "Uploading to BigQuery is disabled")
return nil
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
// Get the invocation.
inv, err := invocations.Read(ctx, invID)
if err != nil {
return errors.Annotate(err, "error reading exported invocation").Err()
if inv.State != pb.Invocation_FINALIZED {
return errors.Reason("invocation not finalized").Err()
// Query for artifacts.
project, _ := realms.Split(inv.Realm)
artifacts, err := ae.queryTextArtifacts(ctx, invID, project)
if err != nil {
return errors.Annotate(err, "query text artifacts").Err()
logging.Infof(ctx, "Found %d text artifacts for invocation %v", len(artifacts), invID)
percent, err := percentOfArtifactsToBQ(ctx)
if err != nil {
return errors.Annotate(err, "read percent from config").Err()
artifacts, err = throttleArtifactsForBQ(artifacts, percent)
if err != nil {
return errors.Annotate(err, "throttle artifacts for bq").Err()
logging.Infof(ctx, "Found %d text artifact after throttling", len(artifacts))
if len(artifacts) == 0 {
return nil
// Get artifact content.
// Channel to push the artifact rows.
// In theory, some artifact content may be too big to
// fit into memory (we allow artifacts upto 640MB [1]).
// We need to "break" them in to sizeable chunks.
// [1];l=307?q=max-artifact-content-stream-length
rowC := make(chan *bqpb.TextArtifactRow, 10)
err = parallel.FanOutIn(func(work chan<- func() error) {
// Download artifacts content and put the rows in a channel.
work <- func() error {
defer close(rowC)
err := ae.downloadMultipleArtifactContent(ctx, artifacts, inv, rowC, MaxShardContentSize, MaxRBECasBatchSize)
if err != nil {
return errors.Annotate(err, "download multiple artifact content").Err()
return nil
// Upload the rows to BQ.
work <- func() error {
err := ae.exportToBigQuery(ctx, rowC)
if err != nil {
return errors.Annotate(err, "export to bigquery").Err()
return nil
return err
// exportToBigQuery reads rows from channel rowC and export to BigQuery.
// It groups the rows into chunks and appends to the default stream.
// There is a caveat in using default stream.
// If the exporter failed in the middle (with some data already exported to BQ),
// and then retried, duplicated rows may be inserted into BQ.
// Another alternative is to use pending type [1] instead of default stream,
// which allows atomic commit operation. The problem is that it has the limit
// of 10k CreateWriteStream limit per hour, which is not enough for the number
// of invocations we have (1.5 mil/day).
// Another alternative is add a field in the Artifact spanner table to mark which
// rows have been exported.
// [1]
func (ae *artifactExporter) exportToBigQuery(ctx context.Context, rowC chan *bqpb.TextArtifactRow) error {
// rows contains the rows that will be sent to AppendRows.
rows := []*bqpb.TextArtifactRow{}
currentSize := 0
for row := range rowC {
// Group the rows together such that the total size of the batch
// does not exceed AppendRows max size.
// The bqutil package also does the batching, but we don't want to send
// a just slightly bigger group of row to it, to prevent a big batch and
// a tiny batch, so we make sure that the rows we send just fit in one batch.
rowSize := bqutil.RowSize(row)
// Exceed max batch size, send whatever we have.
if currentSize+rowSize > bqutil.RowMaxBytes {
err := ae.bqExportClient.InsertArtifactRows(ctx, rows)
if err != nil {
artifactExportCounter.Add(ctx, int64(len(rows)), row.Project, "failure_bq")
return errors.Annotate(err, "insert artifact rows").Err()
artifactExportCounter.Add(ctx, int64(len(rows)), row.Project, "success")
// Reset
rows = []*bqpb.TextArtifactRow{}
currentSize = 0
rows = append(rows, row)
currentSize += rowSize
// Upload the remaining rows.
if currentSize > 0 {
err := ae.bqExportClient.InsertArtifactRows(ctx, rows)
if err != nil {
artifactExportCounter.Add(ctx, int64(len(rows)), rows[0].Project, "failure_bq")
return errors.Annotate(err, "insert artifact rows").Err()
artifactExportCounter.Add(ctx, int64(len(rows)), rows[0].Project, "success")
return nil
// queryTextArtifacts queries all text artifacts contained in an invocation.
// The query also join with TestResult table for test status.
// The content of the artifact is not populated in this function,
// this will keep the size of the slice small enough to fit in memory.
func (ae *artifactExporter) queryTextArtifacts(ctx context.Context, invID invocations.ID, project string) ([]*Artifact, error) {
st := spanner.NewStatement(`
IFNULL(tr.Status, 0)
FROM Artifacts a
LEFT JOIN TestResults tr
ON a.InvocationId = tr.InvocationId
AND a.ParentId = CONCAT("tr/", tr.TestId , "/" , tr.ResultId)
WHERE a.InvocationId=@invID
ORDER BY tr.TestId, tr.ResultId, a.ArtifactId
st.Params = spanutil.ToSpannerMap(map[string]any{
"invID": invID,
results := []*Artifact{}
it := span.Query(ctx, st)
var b spanutil.Buffer
err := it.Do(func(r *spanner.Row) error {
a := &Artifact{InvocationID: string(invID)}
err := b.FromSpanner(r, &a.TestID, &a.ResultID, &a.ArtifactID, &a.ContentType, &a.Size, &a.RBECASHash, &a.TestStatus)
if err != nil {
return errors.Annotate(err, "read row").Err()
if a.ContentType == "" {
artifactContentCounter.Add(ctx, 1, project, "empty")
} else {
if pbutil.IsTextArtifact(a.ContentType) {
artifactContentCounter.Add(ctx, 1, project, "text")
results = append(results, a)
} else {
artifactContentCounter.Add(ctx, 1, project, "nontext")
return nil
if err != nil {
return nil, errors.Annotate(err, "query artifact").Err()
return results, nil
// downloadMutipleArtifactContent downloads multiple artifact content
// in parallel and creates TextArtifactRows for each rows.
// The rows will be pushed in rowC.
// Artifacts with content size smaller or equal than MaxRBECasBatchSize will be downloaded in batch.
// Artifacts with content size bigger than MaxRBECasBatchSize will be downloaded in streaming manner.
func (ae *artifactExporter) downloadMultipleArtifactContent(ctx context.Context, artifacts []*Artifact, inv *pb.Invocation, rowC chan *bqpb.TextArtifactRow, maxShardContentSize, maxRBECasBatchSize int) error {
project, _ := realms.Split(inv.Realm)
// Sort the artifacts by size, make it easier to do batching.
sort.Slice(artifacts, func(i, j int) bool {
return (artifacts[i].Size < artifacts[j].Size)
// Limit to 10 workers to avoid possible OOM.
return parallel.WorkPool(10, func(work chan<- func() error) {
// Smaller artifacts should be downloaded in batch.
currentBatchSize := 0
batch := []*Artifact{}
bigArtifactStartIndex := -1
for i, artifact := range artifacts {
if artifactSizeWithOverhead(artifact.Size) > maxRBECasBatchSize {
bigArtifactStartIndex = i
// Exceed batch limit, download whatever in batch.
if currentBatchSize+artifactSizeWithOverhead(artifact.Size) > maxRBECasBatchSize {
b := batch
work <- func() error {
err := ae.batchDownloadArtifacts(ctx, b, inv, rowC)
if err != nil {
return errors.Annotate(err, "batch download artifacts").Err()
return nil
// Reset
currentBatchSize = 0
batch = []*Artifact{}
batch = append(batch, artifact)
// Add ArtifactRequestOverhead bytes for the overhead of the request.
currentBatchSize += artifactSizeWithOverhead(artifact.Size)
// Download whatever left in batch.
if len(batch) > 0 {
b := batch
work <- func() error {
err := ae.batchDownloadArtifacts(ctx, b, inv, rowC)
if err != nil {
return errors.Annotate(err, "batch download artifacts").Err()
return nil
// Download the big artifacts in streaming manner.
if bigArtifactStartIndex > -1 {
for i := bigArtifactStartIndex; i < len(artifacts); i++ {
artifact := artifacts[i]
work <- func() error {
err := ae.streamArtifactContent(ctx, artifact, inv, rowC, maxShardContentSize)
if err != nil {
// We don't want to retry on this error. Just log a warning.
if errors.Is(err, ErrInvalidUTF8) {
"InvocationID": artifact.InvocationID,
"TestID": artifact.TestID,
"ResultID": artifact.ResultID,
"ArtifactID": artifact.ArtifactID,
}.Warningf(ctx, "Test result artifact has invalid UTF-8")
artifactExportCounter.Add(ctx, 1, project, "failure_input")
} else {
return errors.Annotate(err, "download artifact content inv_id = %q test id =%q result_id=%q artifact_id = %q", artifact.InvocationID, artifact.TestID, artifact.ResultID, artifact.ArtifactID).Err()
return nil
func artifactSizeWithOverhead(artifactSize int64) int {
return int(artifactSize) + ArtifactRequestOverhead
// batchDownloadArtifacts downloads artifact content from RBE-CAS in batch.
// It generates one or more TextArtifactRows for the content and push in rowC.
func (ae *artifactExporter) batchDownloadArtifacts(ctx context.Context, batch []*Artifact, inv *pb.Invocation, rowC chan *bqpb.TextArtifactRow) error {
logging.Infof(ctx, "batch download artifacts for %d artifacts", len(batch))
req := &repb.BatchReadBlobsRequest{InstanceName: ae.rbecasInstance}
for _, artifact := range batch {
req.Digests = append(req.Digests, &repb.Digest{
Hash: artifacts.TrimHashPrefix(artifact.RBECASHash),
SizeBytes: artifact.Size,
resp, err := ae.casClient.BatchReadBlobs(ctx, req)
if err != nil {
// The Invalid argument or ResourceExhausted suggest something is wrong with the
// input, so we should not retry in this case.
// ResourceExhausted may happen when we try to download more than the capacity.
if grpcutil.Code(err) == codes.InvalidArgument {
logging.Errorf(ctx, "BatchReadBlobs: invalid argument for invocation %q. Error: %v", inv.Name, err.Error())
return nil
if grpcutil.Code(err) == codes.ResourceExhausted {
logging.Errorf(ctx, "BatchReadBlobs: resource exhausted for invocation %q. Error: %v", inv.Name, err.Error())
return nil
return errors.Annotate(err, "batch read blobs").Err()
project, realm := realms.Split(inv.Realm)
for i, r := range resp.GetResponses() {
artifact := batch[i]
c := codes.Code(r.GetStatus().GetCode())
loggingFields := logging.Fields{
"InvocationID": artifact.InvocationID,
"TestID": artifact.TestID,
"ResultID": artifact.ResultID,
"ArtifactID": artifact.ArtifactID,
// It's no use to retry if we get those code.
// We'll just log the error.
if c == codes.InvalidArgument {
loggingFields.Warningf(ctx, "Invalid artifact")
artifactExportCounter.Add(ctx, 1, project, "failure_input")
if c == codes.NotFound {
loggingFields.Warningf(ctx, "Not found artifact")
artifactExportCounter.Add(ctx, 1, project, "failure_input")
// Perhaps something is wrong with RBE, we should retry.
if c != codes.OK {
loggingFields.Errorf(ctx, "Error downloading artifact. Code = %v message = %q", c, r.Status.GetMessage())
return errors.Reason("downloading artifact").Err()
// Check data, make sure it is of valid UTF-8 format.
if !utf8.Valid(r.Data) {
loggingFields.Warningf(ctx, "Invalid UTF-8 content")
artifactExportCounter.Add(ctx, 1, project, "failure_input")
// Everything is ok, send to rowC.
// This is guaranteed to be small artifact, so we need only 1 shard.
row := &bqpb.TextArtifactRow{
Project: project,
Realm: realm,
InvocationId: string(artifact.InvocationID),
TestId: artifact.TestID,
ResultId: artifact.ResultID,
ArtifactId: artifact.ArtifactID,
ContentType: artifact.ContentType,
Content: string(r.Data),
ArtifactContentSize: int32(artifact.Size),
ShardContentSize: int32(artifact.Size),
TestStatus: testStatusToString(artifact.TestStatus),
PartitionTime: timestamppb.New(inv.CreateTime.AsTime()),
ArtifactShard: fmt.Sprintf("%s:0", artifact.ArtifactID),
rowC <- row
return nil
// streamArtifactContent downloads artifact content from RBE-CAS in streaming manner.
// It generates one or more TextArtifactRows for the content and push in rowC.
func (ae *artifactExporter) streamArtifactContent(ctx context.Context, a *Artifact, inv *pb.Invocation, rowC chan *bqpb.TextArtifactRow, maxShardContentSize int) error {
ac := artifactcontent.Reader{
RBEInstance: ae.rbecasInstance,
Hash: a.RBECASHash,
Size: a.Size,
var str strings.Builder
shardID := 0
project, realm := realms.Split(inv.Realm)
input := func() *bqpb.TextArtifactRow {
return &bqpb.TextArtifactRow{
Project: project,
Realm: realm,
InvocationId: string(a.InvocationID),
TestId: a.TestID,
ResultId: a.ResultID,
ArtifactId: a.ArtifactID,
ShardId: int32(shardID),
ContentType: a.ContentType,
Content: str.String(),
ArtifactContentSize: int32(a.Size),
ShardContentSize: int32(str.Len()),
TestStatus: testStatusToString(a.TestStatus),
PartitionTime: timestamppb.New(inv.CreateTime.AsTime()),
ArtifactShard: fmt.Sprintf("%s:%d", a.ArtifactID, shardID),
// We don't populated numshards here because we don't know
// exactly how many shards we need until we finish scanning.
// Still we are not sure if this field is useful (e.g. from bigquery, we can
// just query for all artifacts and get the max shardID).
NumShards: 0,
err := ac.DownloadRBECASContent(ctx, ae.bytestreamClient, func(ctx context.Context, pr io.Reader) error {
sc := bufio.NewScanner(pr)
// Read by runes, so we will know if the input contains invalid Unicode
// character, so we can exit early.
// The invalid characters will cause proto.Marshal to fail for the row,
// so we don't support it.
for sc.Scan() {
// Detect invalid rune. Just stop.
r, _ := utf8.DecodeRune(sc.Bytes())
if r == utf8.RuneError {
return ErrInvalidUTF8
if str.Len()+len(sc.Bytes()) > maxShardContentSize {
select {
case <-ctx.Done():
return ctx.Err()
case rowC <- input():
if err := sc.Err(); err != nil {
return errors.Annotate(err, "scanner error").Err()
if str.Len() > 0 {
select {
case <-ctx.Done():
return ctx.Err()
case rowC <- input():
return nil
if err != nil {
return errors.Annotate(err, "read artifact content").Err()
return err
// throttleArtifactsForBQ limits the artifacts being to BigQuery based on percentage.
// It will allow us to roll out the feature slowly.
func throttleArtifactsForBQ(artifacts []*Artifact, percent int) ([]*Artifact, error) {
results := []*Artifact{}
for _, artifact := range artifacts {
hashStr := fmt.Sprintf("%s%s", artifact.TestID, artifact.ArtifactID)
hashVal := hash64([]byte(hashStr)) % 100
// This is complement with the throttle function in BatchCreateArtifact.
// For example, if percent = 2, only choose hashVal = 98, 99
// This allows smoother roll out of the service.
if (hashVal + uint64(percent)) >= 100 {
results = append(results, artifact)
return results, nil
// hash64 returns a hash value (uint64) for a given string.
func hash64(bt []byte) uint64 {
hasher := fnv.New64a()
return hasher.Sum64()
func testStatusToString(status pb.TestStatus) string {
if status == pb.TestStatus_STATUS_UNSPECIFIED {
return ""
return pb.TestStatus_name[int32(status)]
// shouldUploadToBQ returns true if we should upload artifacts to BigQuery.
func shouldUploadToBQ(ctx context.Context) (bool, error) {
cfg, err := config.GetServiceConfig(ctx)
if err != nil {
return false, errors.Annotate(err, "get service config").Err()
return cfg.GetBqArtifactExporterServiceConfig().GetEnabled(), nil
// percentOfArtifactsToBQ returns how many percents of artifact to be uploaded.
// Return value is an integer between [0, 100].
func percentOfArtifactsToBQ(ctx context.Context) (int, error) {
cfg, err := config.GetServiceConfig(ctx)
if err != nil {
return 0, errors.Annotate(err, "get service config").Err()
return int(cfg.GetBqArtifactExporterServiceConfig().GetExportPercent()), nil