blob: 462da9c917fd3d909c70fd7b7bda963cad563622 [file] [log] [blame]
// Copyright 2015 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
package isolatedclient
import (
"bytes"
"encoding/base64"
"errors"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"golang.org/x/net/context"
isolateservice "github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
"github.com/luci/luci-go/common/isolated"
"github.com/luci/luci-go/common/lhttp"
"github.com/luci/luci-go/common/retry"
"github.com/luci/luci-go/common/runtime/tracer"
)
// DefaultNamespace is the namespace that should be used with the New function.
const DefaultNamespace = "default-gzip"
// Source is a generator method to return source data. A generated Source must
// be Closed before the generator is called again.
type Source func() (io.ReadCloser, error)
// NewBytesSource returns a Source implementation that reads from the supplied
// byte slice.
func NewBytesSource(d []byte) Source {
return func() (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(d)), nil
}
}
// PushState is per-item state passed from IsolateServer.Contains() to
// IsolateServer.Push().
//
// Its content is implementation specific.
type PushState struct {
status isolateservice.HandlersEndpointsV1PreuploadStatus
digest isolated.HexDigest
size int64
uploaded bool
finalized bool
}
// CloudStorage is the interface for clients to fetch from and push to GCS storage.
type CloudStorage interface {
// Fetch is a handler for retrieving specified content from GCS and storing
// the response in the provided destination buffer.
Fetch(context.Context, *Client, isolateservice.HandlersEndpointsV1RetrievedContent, io.WriteSeeker) error
// Push is a handler for pushing content from provided buffer to GCS.
Push(context.Context, *Client, isolateservice.HandlersEndpointsV1PreuploadStatus, Source) error
}
// Client is a client to an isolated server.
type Client struct {
// All the members are immutable.
retryFactory retry.Factory
url string
namespace string
authClient *http.Client // client that sends auth tokens
anonClient *http.Client // client that does NOT send auth tokens
gcsHandler CloudStorage // implements GCS fetch and push handlers
}
// New returns a new IsolateServer client.
//
// 'authClient' must implement authentication sufficient to talk to Isolate server
// (OAuth tokens with 'email' scope).
//
// 'anonClient' must be a functional http.Client.
//
// If either client is nil, it will use http.DefaultClient (which will not work
// on Classic AppEngine!).
//
// If you're unsure which namespace to use, use the DefaultNamespace constant.
//
// If gcs is nil, the defaultGCSHandler is used for fetching from and pushing to GCS.
func New(anonClient, authClient *http.Client, host, namespace string, rFn retry.Factory, gcs CloudStorage) *Client {
if anonClient == nil {
anonClient = http.DefaultClient
}
if authClient == nil {
authClient = http.DefaultClient
}
if gcs == nil {
gcs = defaultGCSHandler{}
}
i := &Client{
retryFactory: rFn,
url: strings.TrimRight(host, "/"),
namespace: namespace,
authClient: authClient,
anonClient: anonClient,
gcsHandler: gcs,
}
tracer.NewPID(i, "isolatedclient:"+i.url)
return i
}
// ServerCapabilities returns the server details.
func (i *Client) ServerCapabilities(c context.Context) (*isolateservice.HandlersEndpointsV1ServerDetails, error) {
out := &isolateservice.HandlersEndpointsV1ServerDetails{}
if err := i.postJSON(c, "/api/isolateservice/v1/server_details", nil, map[string]string{}, out); err != nil {
return nil, err
}
return out, nil
}
// Contains looks up cache presence on the server of multiple items.
//
// The returned list is in the same order as 'items', with entries nil for
// items that were present.
func (i *Client) Contains(c context.Context, items []*isolateservice.HandlersEndpointsV1Digest) (out []*PushState, err error) {
end := tracer.Span(i, "contains", tracer.Args{"number": len(items)})
defer func() { end(tracer.Args{"err": err}) }()
in := isolateservice.HandlersEndpointsV1DigestCollection{Items: items, Namespace: &isolateservice.HandlersEndpointsV1Namespace{}}
in.Namespace.Namespace = i.namespace
data := &isolateservice.HandlersEndpointsV1UrlCollection{}
if err = i.postJSON(c, "/api/isolateservice/v1/preupload", nil, in, data); err != nil {
return nil, err
}
out = make([]*PushState, len(items))
for _, e := range data.Items {
index := int(e.Index)
out[index] = &PushState{
status: *e,
digest: isolated.HexDigest(items[index].Digest),
size: items[index].Size,
}
}
return out, nil
}
// Push pushed a missing item, as reported by Contains(), to the server.
func (i *Client) Push(c context.Context, state *PushState, source Source) (err error) {
// This push operation may be a retry after failed finalization call below,
// no need to reupload contents in that case.
if !state.uploaded {
// PUT file to uploadURL.
if err = i.doPush(c, state, source); err != nil {
log.Printf("doPush(%s) failed: %s\n%#v", state.digest, err, state)
return
}
state.uploaded = true
}
// Optionally notify the server that it's done.
if state.status.GsUploadUrl != "" {
end := tracer.Span(i, "finalize", nil)
defer func() { end(tracer.Args{"err": err}) }()
// TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
// send it to isolated server. That way isolate server can verify that
// the data safely reached Google Storage (GS provides MD5 and CRC32C of
// stored files).
in := isolateservice.HandlersEndpointsV1FinalizeRequest{UploadTicket: state.status.UploadTicket}
headers := map[string]string{"Cache-Control": "public, max-age=31536000"}
if err = i.postJSON(c, "/api/isolateservice/v1/finalize_gs_upload", headers, in, nil); err != nil {
log.Printf("Push(%s) (finalize) failed: %s\n%#v", state.digest, err, state)
return
}
}
state.finalized = true
return
}
// Fetch downloads an item from the server.
func (i *Client) Fetch(c context.Context, item *isolateservice.HandlersEndpointsV1Digest, dest io.WriteSeeker) error {
// Perform initial request.
url := i.url + "/api/isolateservice/v1/retrieve"
in := &isolateservice.HandlersEndpointsV1RetrieveRequest{
Digest: item.Digest,
Namespace: &isolateservice.HandlersEndpointsV1Namespace{
DigestHash: "sha-1",
Namespace: i.namespace,
},
Offset: 0,
}
var out isolateservice.HandlersEndpointsV1RetrievedContent
if _, err := lhttp.PostJSON(c, i.retryFactory, i.authClient, url, nil, in, &out); err != nil {
return err
}
// Handle DB items.
if out.Content != "" {
decoded, err := base64.StdEncoding.DecodeString(out.Content)
if err != nil {
return err
}
decompressor, err := isolated.GetDecompressor(bytes.NewReader(decoded))
if err != nil {
return err
}
defer decompressor.Close()
_, err = io.Copy(dest, decompressor)
return err
}
// Handle GCS items.
return i.gcsHandler.Fetch(c, i, out, dest)
}
// postJSON does authenticated POST request.
func (i *Client) postJSON(c context.Context, resource string, headers map[string]string, in, out interface{}) error {
if len(resource) == 0 || resource[0] != '/' {
return errors.New("resource must start with '/'")
}
_, err := lhttp.PostJSON(c, i.retryFactory, i.authClient, i.url+resource, headers, in, out)
return err
}
func (i *Client) doPush(c context.Context, state *PushState, source Source) (err error) {
useDB := state.status.GsUploadUrl == ""
end := tracer.Span(i, "push", tracer.Args{"useDB": useDB, "size": state.size})
defer func() { end(tracer.Args{"err": err}) }()
if useDB {
var src io.ReadCloser
src, err = source()
if err != nil {
return err
}
defer src.Close()
err = i.doPushDB(c, state, src)
} else {
err = i.gcsHandler.Push(c, i, state.status, source)
}
if err != nil {
tracer.CounterAdd(i, "bytesUploaded", float64(state.size))
}
return err
}
func (i *Client) doPushDB(c context.Context, state *PushState, reader io.Reader) error {
buf := bytes.Buffer{}
compressor, err := isolated.GetCompressor(&buf)
if err != nil {
return err
}
if _, err := io.Copy(compressor, reader); err != nil {
return err
}
if err := compressor.Close(); err != nil {
return err
}
in := &isolateservice.HandlersEndpointsV1StorageRequest{
UploadTicket: state.status.UploadTicket,
Content: base64.StdEncoding.EncodeToString(buf.Bytes()),
}
return i.postJSON(c, "/api/isolateservice/v1/store_inline", nil, in, nil)
}
// defaultGCSHandler implements the default Fetch and Push handlers for
// interacting with GCS.
type defaultGCSHandler struct{}
// Fetch uses the provided HandlersEndpointsV1RetrievedContent response to
// download content from GCS to the provided dest.
func (gcs defaultGCSHandler) Fetch(c context.Context, i *Client, content isolateservice.HandlersEndpointsV1RetrievedContent, dest io.WriteSeeker) error {
rgen := func() (*http.Request, error) {
return http.NewRequest("GET", content.Url, nil)
}
handler := func(resp *http.Response) error {
defer resp.Body.Close()
decompressor, err := isolated.GetDecompressor(resp.Body)
if err != nil {
return err
}
defer decompressor.Close()
if _, err := dest.Seek(0, os.SEEK_SET); err != nil {
return err
}
_, err = io.Copy(dest, decompressor)
return err
}
_, err := lhttp.NewRequest(c, i.authClient, i.retryFactory, rgen, handler, nil)()
return err
}
// Push uploads content from the provided source to the GCS path specified in
// the HandlersEndpointsV1PreuploadStatus response.
func (gcs defaultGCSHandler) Push(c context.Context, i *Client, status isolateservice.HandlersEndpointsV1PreuploadStatus, source Source) error {
// GsUploadUrl is signed Google Storage URL that doesn't require additional
// authentication. In fact, using authClient causes HTTP 403 because
// authClient's tokens don't have Cloud Storage OAuth scope. Use anonymous
// client instead.
req := lhttp.NewRequest(c, i.anonClient, i.retryFactory, func() (*http.Request, error) {
src, err := source()
if err != nil {
return nil, err
}
request, err := http.NewRequest("PUT", status.GsUploadUrl, nil)
if err != nil {
src.Close()
return nil, err
}
request.Body = newCompressed(src)
request.Header.Set("Content-Type", "application/octet-stream")
return request, nil
}, func(resp *http.Response) error {
_, err4 := io.Copy(ioutil.Discard, resp.Body)
err5 := resp.Body.Close()
if err4 != nil {
return err4
}
return err5
}, nil)
_, err := req()
return err
}
// compressed is an io.ReadCloser that transparently compresses source data in
// a separate goroutine.
type compressed struct {
pr *io.PipeReader
src io.ReadCloser
}
func (c *compressed) Read(data []byte) (int, error) {
return c.pr.Read(data)
}
func (c *compressed) Close() error {
err := c.pr.Close()
if err1 := c.src.Close(); err == nil {
err = err1
}
return err
}
func newCompressed(src io.ReadCloser) io.ReadCloser {
pr, pw := io.Pipe()
go func() {
// The compressor itself is not thread safe.
compressor, err := isolated.GetCompressor(pw)
if err != nil {
pw.CloseWithError(err)
return
}
buf := make([]byte, 4096)
if _, err := io.CopyBuffer(compressor, src, buf); err != nil {
compressor.Close()
pw.CloseWithError(err)
return
}
pw.CloseWithError(compressor.Close())
}()
return &compressed{pr, src}
}