blob: 76bf05cab745aa92b49a9ac2415809e1af54c2c3 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package isolatedclient
import (
"bufio"
"bytes"
"context"
"crypto"
"crypto/md5"
"encoding/base64"
"hash"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
isolateservice "go.chromium.org/luci/common/api/isolate/isolateservice/v1"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/isolated"
"go.chromium.org/luci/common/lhttp"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
)
// DefaultNamespace is the namespace that should be used with the New function.
const DefaultNamespace = "default-gzip"
// Source is a generator method to return source data. A generated Source must
// be Closed before the generator is called again.
type Source func() (io.ReadCloser, error)
// NewBytesSource returns a Source implementation that reads from the supplied
// byte slice.
func NewBytesSource(d []byte) Source {
return func() (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(d)), nil
}
}
// PushState is per-item state passed from IsolateServer.Contains() to
// IsolateServer.Push().
//
// Its content is implementation specific.
type PushState struct {
status isolateservice.HandlersEndpointsV1PreuploadStatus
digest isolated.HexDigest
size int64
uploaded bool
finalized bool
}
// CloudStorage is the interface for clients to fetch from and push to GCS storage.
type CloudStorage interface {
// Fetch is a handler for retrieving specified content from GCS and storing
// the response in the provided destination buffer.
Fetch(context.Context, *Client, isolateservice.HandlersEndpointsV1RetrievedContent, io.Writer) error
// Push is a handler for pushing content from provided buffer to GCS.
Push(context.Context, *Client, isolateservice.HandlersEndpointsV1PreuploadStatus, Source) error
}
// Client is a client to an isolated server.
type Client struct {
// All the members are immutable.
retryFactory retry.Factory
url string
// If you're unsure which namespace to use, use the DefaultNamespace constant.
namespace string
// The hashing algorithm used depends on the namespace.
h crypto.Hash
authClient *http.Client // client that sends auth tokens
anonClient *http.Client // client that does NOT send auth tokens
gcsHandler CloudStorage // implements GCS fetch and push handlers
userAgent string
}
type Option func(*Client)
func WithNamespace(namespace string) Option {
return func(i *Client) {
i.namespace = namespace
}
}
// WithAuthClient returns Option that sets client with authentication sufficient to talk to Isolate server
// (OAuth tokens with 'email' scope).
func WithAuthClient(c *http.Client) Option {
return func(i *Client) {
i.authClient = c
}
}
// WithAnonymousClient returns Option that sets client which will be used by gcsHandler.
func WithAnonymousClient(c *http.Client) Option {
return func(i *Client) {
i.anonClient = c
}
}
func WithRetryFactory(rFn retry.Factory) Option {
return func(i *Client) {
i.retryFactory = rFn
}
}
func WithGCSHandler(gcs CloudStorage) Option {
return func(i *Client) {
i.gcsHandler = gcs
}
}
func WithUserAgent(userAgent string) Option {
return func(i *Client) {
i.userAgent = userAgent
}
}
// NewClient returns a new IsolateServer client.
func NewClient(host string, opts ...Option) *Client {
i := &Client{
url: strings.TrimRight(host, "/"),
namespace: DefaultNamespace,
authClient: http.DefaultClient,
anonClient: http.DefaultClient,
gcsHandler: defaultGCSHandler{},
}
for _, o := range opts {
o(i)
}
i.h = isolated.GetHash(i.namespace)
return i
}
// Hash returns the hashing algorithm used for this client.
func (i *Client) Hash() crypto.Hash {
return i.h
}
// ServerCapabilities returns the server details.
func (i *Client) ServerCapabilities(c context.Context) (*isolateservice.HandlersEndpointsV1ServerDetails, error) {
out := &isolateservice.HandlersEndpointsV1ServerDetails{}
if err := i.postJSON(c, "/_ah/api/isolateservice/v1/server_details", nil, map[string]string{}, out); err != nil {
return nil, err
}
return out, nil
}
// Contains looks up cache presence on the server of multiple items.
//
// The returned list is in the same order as 'items', with entries nil for
// items that were present.
func (i *Client) Contains(c context.Context, items []*isolateservice.HandlersEndpointsV1Digest) (out []*PushState, err error) {
in := isolateservice.HandlersEndpointsV1DigestCollection{Items: items, Namespace: &isolateservice.HandlersEndpointsV1Namespace{}}
in.Namespace.Namespace = i.namespace
data := &isolateservice.HandlersEndpointsV1UrlCollection{}
if err = i.postJSON(c, "/_ah/api/isolateservice/v1/preupload", nil, in, data); err != nil {
return nil, err
}
out = make([]*PushState, len(items))
for _, e := range data.Items {
index := int(e.Index)
out[index] = &PushState{
status: *e,
digest: isolated.HexDigest(items[index].Digest),
size: items[index].Size,
}
}
return out, nil
}
// Push pushed a missing item, as reported by Contains(), to the server.
func (i *Client) Push(c context.Context, state *PushState, source Source) (err error) {
// This push operation may be a retry after failed finalization call below,
// no need to reupload contents in that case.
if !state.uploaded {
// PUT file to uploadURL.
if err = i.doPush(c, state, source); err != nil {
log.Printf("doPush(%s) failed: %s\n%#v", state.digest, err, state)
return
}
state.uploaded = true
}
// Optionally notify the server that it's done.
if state.status.GsUploadUrl != "" {
// TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
// send it to isolated server. That way isolate server can verify that
// the data safely reached Google Storage (GS provides MD5 and CRC32C of
// stored files).
in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTicket: state.status.UploadTicket}
headers := map[string]string{"Cache-Control": "public, max-age=31536000"}
if err = i.postJSON(c, "/_ah/api/isolateservice/v1/finalize_gs_upload", headers, in, nil); err != nil {
log.Printf("Push(%s) (finalize) failed: %s\n%#v", state.digest, err, state)
return
}
}
state.finalized = true
return
}
// Fetch downloads an item from the server.
func (i *Client) Fetch(c context.Context, digest isolated.HexDigest, dest io.Writer) error {
// Perform initial request.
in := &isolateservice.HandlersEndpointsV1RetrieveRequest{
Digest: string(digest),
Namespace: &isolateservice.HandlersEndpointsV1Namespace{
Namespace: i.namespace,
},
Offset: 0,
}
var out isolateservice.HandlersEndpointsV1RetrievedContent
if err := i.postJSON(c, "/_ah/api/isolateservice/v1/retrieve", nil, in, &out); err != nil {
return err
}
// Handle DB items.
if out.Content != "" {
decoded, err := base64.StdEncoding.DecodeString(out.Content)
if err != nil {
return err
}
decompressor, err := isolated.GetDecompressor(i.namespace, bytes.NewReader(decoded))
if err != nil {
return errors.Annotate(err, "GetDecompressor failed").Err()
}
defer decompressor.Close()
_, err = io.Copy(dest, decompressor)
if err != nil {
return errors.Annotate(err, "io.Copy failed").Tag(transient.Tag).Err()
}
return err
}
// Handle GCS items.
return i.gcsHandler.Fetch(c, i, out, dest)
}
// postJSON does authenticated POST request.
func (i *Client) postJSON(c context.Context, resource string, headers map[string]string, in, out interface{}) error {
if len(resource) == 0 || resource[0] != '/' {
return errors.Reason("resource must start with '/'").Err()
}
if i.userAgent != "" {
// Clone headers.
newheaders := map[string]string{}
for k, v := range headers {
newheaders[k] = v
}
headers = newheaders
headers["User-Agent"] = i.userAgent
}
_, err := lhttp.PostJSON(c, i.retryFactory, i.authClient, i.url+resource, headers, in, out)
return err
}
func (i *Client) doPush(c context.Context, state *PushState, source Source) (err error) {
useDB := state.status.GsUploadUrl == ""
if useDB {
// Fast inline storage.
var src io.ReadCloser
if src, err = source(); err != nil {
return err
}
defer src.Close()
err = i.doPushDB(c, state, src)
} else {
// Storage is deferred to Google Cloud Storage.
err = i.gcsHandler.Push(c, i, state.status, source)
}
return err
}
func (i *Client) doPushDB(c context.Context, state *PushState, reader io.Reader) error {
buf := bytes.Buffer{}
compressor, err := isolated.GetCompressor(i.namespace, &buf)
if err != nil {
return err
}
if _, err := io.Copy(compressor, reader); err != nil {
return err
}
if err := compressor.Close(); err != nil {
return err
}
in := &isolateservice.HandlersEndpointsV1StorageRequest{
UploadTicket: state.status.UploadTicket,
Content: base64.StdEncoding.EncodeToString(buf.Bytes()),
}
return i.postJSON(c, "/_ah/api/isolateservice/v1/store_inline", nil, in, nil)
}
// defaultGCSHandler implements the default Fetch and Push handlers for
// interacting with GCS.
type defaultGCSHandler struct{}
// Fetch uses the provided HandlersEndpointsV1RetrievedContent response to
// download content from GCS to the provided dest.
func (gcs defaultGCSHandler) Fetch(c context.Context, i *Client, content isolateservice.HandlersEndpointsV1RetrievedContent, dest io.Writer) error {
rgen := func() (*http.Request, error) {
return http.NewRequest("GET", content.Url, nil)
}
handler := func(resp *http.Response) error {
defer resp.Body.Close()
decompressor, err := isolated.GetDecompressor(i.namespace, resp.Body)
if err != nil {
annotator := errors.Annotate(err, "GCS GetDecompressor failed")
if _, ok := err.(*net.OpError); ok {
annotator.Tag(transient.Tag)
}
return annotator.Err()
}
defer decompressor.Close()
_, err = io.Copy(dest, decompressor)
if err != nil {
return errors.Annotate(err, "GCS io.Copy failed").Tag(transient.Tag).Err()
}
return err
}
_, err := lhttp.NewRequest(c, i.anonClient, i.retryFactory, rgen, handler, nil)()
return err
}
// Push uploads content from the provided source to the GCS path specified in
// the HandlersEndpointsV1PreuploadStatus response.
func (gcs defaultGCSHandler) Push(ctx context.Context, i *Client, status isolateservice.HandlersEndpointsV1PreuploadStatus, source Source) error {
// GsUploadUrl is signed Google Storage URL that doesn't require additional
// authentication. In fact, using authClient causes HTTP 403 because
// authClient's tokens don't have Cloud Storage OAuth scope. Use anonymous
// client instead.
var c *compressed
req := lhttp.NewRequest(ctx, i.anonClient, i.retryFactory, func() (*http.Request, error) {
src, err := source()
if err != nil {
return nil, err
}
request, err := http.NewRequest("PUT", status.GsUploadUrl, nil)
if err != nil {
src.Close()
return nil, err
}
c = newCompressed(i.namespace, src)
request.Body = c
request.Header.Set("Content-Type", "application/octet-stream")
return request, nil
}, func(resp *http.Response) error {
_, err4 := io.Copy(ioutil.Discard, resp.Body)
err5 := resp.Body.Close()
if err4 != nil {
return err4
}
base64MD5 := "md5=" + base64.StdEncoding.EncodeToString(c.h.Sum(nil))
xGoogHash := resp.Header["X-Goog-Hash"]
verified := false
for _, values := range xGoogHash {
for _, value := range strings.Split(values, ",") {
if value == base64MD5 {
verified = true
break
}
}
if verified {
break
}
}
if !verified {
return errors.Reason("hash mismatch, x-goog-hash='%s' md5 hash='%s'",
xGoogHash, base64MD5).Tag(transient.Tag).Err()
}
return err5
}, nil)
_, err := req()
return err
}
// compressed is an io.ReadCloser that transparently compresses source data in
// a separate goroutine.
type compressed struct {
pr *io.PipeReader
src io.ReadCloser
// Hold md5 checksum.
h hash.Hash
}
func (c *compressed) Read(data []byte) (int, error) {
return c.pr.Read(data)
}
func (c *compressed) Close() error {
err := c.pr.Close()
if err1 := c.src.Close(); err == nil {
err = err1
}
return err
}
// newCompressed creates a pipeline to compress a file into a ReadCloser via:
// src (file as ReadCloser) -> gzip compressor (via io.CopyBuffer) -> bufio.Writer
// -> io.Pipe Writer side -> io.Pipe Reader side
// \
// \-> hash.Hash writer for md5 hash checksum
func newCompressed(namespace string, src io.ReadCloser) *compressed {
pr, pw := io.Pipe()
h := md5.New()
go func() {
// Memory is cheap, we never want this pipeline to stall.
const outBufSize = 1024 * 1024
// We write into a buffer so that the Reader can read larger chunks of
// compressed data out of the compressor instead of being constrained to
// compressed(4096 bytes) each read.
bufWriter := bufio.NewWriterSize(io.MultiWriter(pw, h), outBufSize)
// The compressor itself is not thread safe.
compressor, err := isolated.GetCompressor(namespace, bufWriter)
if err != nil {
pw.CloseWithError(err)
return
}
// Make this 3x bigger than the output buffer since it is uncompressed.
buf := make([]byte, outBufSize*3)
if _, err := io.CopyBuffer(compressor, src, buf); err != nil {
compressor.Close()
pw.CloseWithError(err)
return
}
// compressor needs to be closed first to flush the rest of the data
// into the bufio.Writer
if err := compressor.Close(); err != nil {
pw.CloseWithError(err)
return
}
pw.CloseWithError(bufWriter.Flush())
}()
return &compressed{pr, src, h}
}