blob: 77a8f53120155434b96bfa44c04bdf2d432863d4 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sink
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
)
// DefaultMaxInMemoryFileSize is the default value for ArtifactUploader.MaxInMemoryFileSize.
const DefaultMaxInMemoryFileSize = 64 * 1024
// ArtifactUploader provides functions for uploading artifacts to ResultDB.
type ArtifactUploader struct {
// Client is an HTTP client used for uploading artifacts to ResultDB.
Client *http.Client
// Host is the host of a ResultDB instance to upload artifacts to.
Host string
// MaxInMemoryFileSize is the maximum size of an artifact file that can be loaded into
// memory.
//
// ArtifactUploader reads the entire contents of a file twice; one read for calculating
// the hash and another read for transmitting the contents over network. If the size
// is equal to or smaller than MaxInMemoryFileSize, ArtifactUploader loads the entire
// contents into a memory buffer to minimize disk I/O.
//
// If this field is set with 0, DefaultMaxInMemoryFileSize is used.
MaxInMemoryFileSize int64
}
func (u *ArtifactUploader) newRequest(ctx context.Context, name, contentType string, input io.ReadSeeker, updateToken string) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, "PUT", fmt.Sprintf("https://%s/%s", u.Host, name), input)
if err != nil {
return nil, errors.Annotate(err, "failed to create a request").Err()
}
hash, err := calculateHash(input)
if err != nil {
return nil, errors.Annotate(err, "failed to calcualte hash").Err()
}
// rewind the current position back to the beginning so that the request body can be
// re-read by HTTPClient.Do.
if _, err := input.Seek(0, io.SeekStart); err != nil {
return nil, errors.Annotate(err, "failed to reset the stream cursor").Err()
}
req.Header.Add("Content-Hash", hash)
if contentType != "" {
req.Header.Add("Content-Type", contentType)
}
req.Header.Add("Update-Token", updateToken)
return req, nil
}
// Upload uploads an artifact from a given slice of bytes.
//
// `name` is the artifact name, which uniquely identifies the artifact globally. It must
// conform to the format described in the name property of message Artifact at
// https://chromium.googlesource.com/infra/luci/luci-go/+/master/resultdb/proto/v1/artifact.proto
//
// go.chromium.org/luci/resultdb/pbutil package provides utility functions to generate
// an Artifact name for test-result-level and invocation-level artifacts.
// - https://pkg.go.dev/go.chromium.org/luci/resultdb/pbutil?tab=doc#InvocationArtifactName
// - https://pkg.go.dev/go.chromium.org/luci/resultdb/pbutil?tab=doc#TestResultArtifactName
func (u *ArtifactUploader) Upload(ctx context.Context, name, contentType string, contents []byte, updateToken string) error {
req, err := u.newRequest(ctx, name, contentType, bytes.NewReader(contents), updateToken)
if err != nil {
return err
}
return u.send(req)
}
// UploadFromFile uploads an artifact from a given file path.
//
// `name` is the artifact name, which uniquely identifies the artifact globally. It must
// conform to the format described in the name property of message Artifact at
// https://chromium.googlesource.com/infra/luci/luci-go/+/master/resultdb/proto/v1/artifact.proto
func (u *ArtifactUploader) UploadFromFile(ctx context.Context, name, contentType, path, updateToken string) error {
st, err := os.Stat(path)
if err != nil {
return errors.Annotate(err, "failed to query the file status").Err()
}
// small file?
smallFS := u.MaxInMemoryFileSize
if smallFS == 0 {
smallFS = DefaultMaxInMemoryFileSize
}
if st.Size() <= smallFS {
contents, err := ioutil.ReadFile(path)
if err != nil {
return errors.Annotate(err, "failed to read file contents").Err()
}
return u.Upload(ctx, name, contentType, contents, updateToken)
}
input, err := os.Open(path)
if err != nil {
return errors.Annotate(err, "failed to open file").Err()
}
defer input.Close()
req, err := u.newRequest(ctx, name, contentType, input, updateToken)
if err != nil {
return err
}
// Client.Do closes the body on 3xx responses and GetBody needs to reopen the file.
// This function wraps the file object with NopCloser and closes the body on return, so
// that GetBody can simply move the stream cursor to the beginning without reopening
// the file object.
req.Body = ioutil.NopCloser(req.Body)
req.GetBody = func() (io.ReadCloser, error) {
if _, err := input.Seek(0, io.SeekStart); err != nil {
return nil, errors.Annotate(err, "failed to reset the stream cursor").Err()
}
return ioutil.NopCloser(input), nil
}
req.ContentLength = st.Size()
return u.send(req)
}
func (u *ArtifactUploader) send(req *http.Request) error {
return retry.Retry(req.Context(), transient.Only(retry.Default), func() error {
resp, err := u.Client.Do(req)
if err != nil {
return errors.Annotate(err, "failed to send HTTP request").Err()
}
code := resp.StatusCode
// ResultDB returns StatusNoContent on success.
if code == http.StatusNoContent {
return nil
}
// Tag the error as an transient error, if retriable.
hErr := errors.Reason("http request failed(%d): %s", resp.StatusCode, resp.Status)
if code == http.StatusRequestTimeout || code == http.StatusTooManyRequests || code >= 500 {
hErr = hErr.Tag(transient.Tag)
}
return hErr.Err()
}, nil)
}
func calculateHash(input io.Reader) (string, error) {
hash := sha256.New()
if _, err := io.Copy(hash, input); err != nil {
return "", err
}
return "sha256:" + hex.EncodeToString(hash.Sum(nil)), nil
}