blob: 49ba7464e4372350590a15165526bc2ab7b6beb0 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sink
import (
"context"
"fmt"
"mime"
"net/http"
"os"
"strconv"
"sync"
"sync/atomic"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
sinkpb "go.chromium.org/luci/resultdb/sink/proto/v1"
)
type uploadTask struct {
art *sinkpb.Artifact
artName string
size int64 // content size
testStatus pb.TestStatus
}
// newUploadTask constructs an uploadTask for the artifact.
//
// If FilePath is set on the artifact, this calls os.Stat to obtain the file information,
// and may return an error if the Stat call fails. e.g., permission denied, not found.
// It also returns an error if the artifact file path is a directory.
func newUploadTask(name string, art *sinkpb.Artifact, testStatus pb.TestStatus) (*uploadTask, error) {
ret := &uploadTask{
art: art,
artName: name,
size: int64(len(art.GetContents())),
testStatus: testStatus,
}
// Find and save the content size on uploadTask creation, so that the task scheduling
// and processing logic can use the size information w/o issuing system calls.
if fp := art.GetFilePath(); fp != "" {
st, err := os.Stat(fp)
switch {
case err != nil:
return nil, errors.Annotate(err, "querying file info").Err()
case st.Mode().IsRegular():
// break
// Return a more human friendly error than 1000....0.
case st.IsDir():
return nil, errors.Reason("%q is a directory", fp).Err()
default:
return nil, errors.Reason("%q is not a regular file: %s", fp, strconv.FormatInt(int64(st.Mode()), 2)).Err()
}
ret.size = st.Size()
}
return ret, nil
}
// CreateRequest returns a CreateArtifactRequest for the upload task.
//
// Note that this will open and read content from the file, the artifact is set with
// Artifact_FilePath. Save the returned request to avoid unnecessary I/Os,
// if necessary.
func (t *uploadTask) CreateRequest() (*pb.CreateArtifactRequest, error) {
invID, tID, rID, aID, err := pbutil.ParseArtifactName(t.artName)
req := &pb.CreateArtifactRequest{
Artifact: &pb.Artifact{
ArtifactId: aID,
ContentType: t.art.GetContentType(),
SizeBytes: t.size,
Contents: t.art.GetContents(),
GcsUri: t.art.GetGcsUri(),
TestStatus: t.testStatus,
},
}
// parent
switch {
case err != nil:
// This should not happen.
// uploadTask should be created with validated artifacts only.
panic(fmt.Sprintf("invalid uploadTask.artName %q: %s", t.artName, err))
case tID == "":
// Invocation-level artifact
req.Parent = pbutil.InvocationName(invID)
default:
req.Parent = pbutil.TestResultName(invID, tID, rID)
}
// contents
if fp := t.art.GetFilePath(); fp != "" {
if req.Artifact.Contents, err = os.ReadFile(fp); err != nil {
return nil, err
}
}
// Update the content type if it is missing
req.Artifact.ContentType = artifactContentType(req.Artifact.ContentType, req.Artifact.Contents)
// Perform size check only for non gcs artifact.
if req.Artifact.GcsUri == "" {
// If the size of the read content is different to what stat claimed initially, then
// return an error, so that the batching logic can be kept simple. Test frameworks
// should send finalized artifacts only.
if int64(len(req.Artifact.Contents)) != t.size {
return nil, errors.Reason(
"the size of the artifact contents changed from %d to %d",
t.size, len(req.Artifact.Contents)).Err()
}
}
return req, nil
}
type artifactChannel struct {
// batchChannel uploads artifacts via pb.BatchCreateArtifacts().
//
// This batches input artifacts and uploads them all at once.
// This is suitable for uploading a large number of small artifacts.
//
// The downside of this channel is that there is a limit on the maximum size of
// an artifact that can be included in a batch. Use streamChannel for artifacts
// greater than ServerConfig.MaxBatchableArtifactSize.
batchChannel dispatcher.Channel[any]
// streamChannel uploads artifacts in a streaming manner via HTTP.
//
// This is suitable for uploading large files, but with limited parallelism.
// Use batchChannel, if possible.
streamChannel dispatcher.Channel[any]
// wgActive indicates if there are active goroutines invoking reportTestResults.
//
// reportTestResults can be invoked by multiple goroutines in parallel. wgActive is used
// to ensure that all active goroutines finish enqueuing messages to the channel before
// closeAndDrain closes and drains the channel.
wgActive sync.WaitGroup
// 1 indicates that artifactChannel started the process of closing and draining
// the channel. 0, otherwise.
closed int32
cfg *ServerConfig
}
func newArtifactChannel(ctx context.Context, cfg *ServerConfig) *artifactChannel {
var err error
c := &artifactChannel{cfg: cfg}
au := artifactUploader{
MaxBatchable: cfg.MaxBatchableArtifactSize,
Recorder: cfg.Recorder,
StreamClient: cfg.ArtifactStreamClient,
StreamHost: cfg.ArtifactStreamHost,
}
// batchChannel
bcOpts := &dispatcher.Options{
Buffer: buffer.Options{
// BatchCreateArtifactRequest can include up to 500 requests and at most 10MiB
// of artifact contents. uploadTaskSlicer slices tasks, as the number of size
// limits apply.
//
// It's recommended to keep BatchItemsMax >= 500 to increase the chance of
// BatchCreateArtifactRequest to contain 500 artifacts.
//
// Depending on the estimated pattern of artifact size distribution, consider
// to tune ServerConfig.MaxBatchableArtifactSize and BatchDuration to find
// the optimal point between artifact upload latency and throughput.
//
// For more details, visit
// https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#BatchCreateArtifactsRequest
BatchItemsMax: 500,
MaxLeases: int(cfg.ArtChannelMaxLeases),
FullBehavior: &buffer.BlockNewItems{MaxItems: 8000},
},
}
c.batchChannel, err = dispatcher.NewChannel[any](ctx, bcOpts, func(b *buffer.Batch) error {
return errors.Annotate(au.BatchUpload(ctx, b), "BatchUpload").Err()
})
if err != nil {
panic(fmt.Sprintf("failed to create batch channel for artifacts: %s", err))
}
// streamChannel
stOpts := &dispatcher.Options{
Buffer: buffer.Options{
// BatchItemsMax MUST be 1.
BatchItemsMax: 1,
MaxLeases: int(cfg.ArtChannelMaxLeases),
FullBehavior: &buffer.BlockNewItems{MaxItems: 4000},
},
}
c.streamChannel, err = dispatcher.NewChannel[any](ctx, stOpts, func(b *buffer.Batch) error {
return errors.Annotate(
au.StreamUpload(ctx, b.Data[0].Item.(*uploadTask), cfg.UpdateToken),
"StreamUpload").Err()
})
if err != nil {
panic(fmt.Sprintf("failed to create stream channel for artifacts: %s", err))
}
return c
}
func (c *artifactChannel) closeAndDrain(ctx context.Context) {
// mark the channel as closed, so that schedule() won't accept new tasks.
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return
}
// wait for all the active sessions to finish enquing tests results to the channel
c.wgActive.Wait()
var draining sync.WaitGroup
draining.Add(2)
go func() {
defer draining.Done()
c.batchChannel.CloseAndDrain(ctx)
}()
go func() {
defer draining.Done()
c.streamChannel.CloseAndDrain(ctx)
}()
draining.Wait()
}
func (c *artifactChannel) schedule(tasks ...*uploadTask) {
c.wgActive.Add(1)
defer c.wgActive.Done()
// if the channel already has been closed, drop the test results.
if atomic.LoadInt32(&c.closed) == 1 {
return
}
for _, task := range tasks {
if task.size > c.cfg.MaxBatchableArtifactSize {
c.streamChannel.C <- task
} else {
c.batchChannel.C <- task
}
}
}
// artifactContentType gets the MIME media type by looking at the content.
// It considers at most the first 512 bytes of content.
// If the contentType is already present, it returns that instead.
func artifactContentType(contentType string, contents []byte) string {
if len(contentType) != 0 || len(contents) == 0 {
return contentType
}
mediaType, _, err := mime.ParseMediaType(http.DetectContentType(contents))
if err != nil {
return ""
}
return mediaType
}