blob: 3813871e39e8a1e4f0022718a9582dfd08b42410 [file] [log] [blame]
// Copyright 2015 The LUCI Authors. All rights reserved.
// Use of this source code is governed under the Apache License, Version 2.0
// that can be found in the LICENSE file.
// Package isolatedfake implements an in-process fake Isolated server for
// integration testing.
package isolatedfake
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
isolateservice "github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
"github.com/luci/luci-go/common/isolated"
)
const contentType = "application/json; charset=utf-8"
type jsonAPI func(r *http.Request) interface{}
type failure interface {
Fail(err error)
}
// handlerJSON converts a jsonAPI http handler to a proper http.Handler.
func handlerJSON(f failure, handler jsonAPI) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//log.Printf("%s", r.URL)
if r.Header.Get("Content-Type") != contentType {
f.Fail(fmt.Errorf("invalid content type: %s", r.Header.Get("Content-Type")))
return
}
defer r.Body.Close()
out := handler(r)
w.Header().Set("Content-Type", contentType)
j := json.NewEncoder(w)
if err := j.Encode(out); err != nil {
f.Fail(err)
}
})
}
// IsolatedFake is a functional fake in-memory isolated server.
type IsolatedFake interface {
http.Handler
// Contents returns all the uncompressed data on the fake isolated server.
Contents() map[isolated.HexDigest][]byte
// Inject adds uncompressed data in the fake isolated server.
Inject(data []byte)
Error() error
}
type isolatedFake struct {
mux *http.ServeMux
lock sync.Mutex
err error
contents map[isolated.HexDigest][]byte
staging map[isolated.HexDigest][]byte // Uploaded to GCS but not yet finalized.
}
// New create a HTTP router that implements an isolated server.
func New() IsolatedFake {
server := &isolatedFake{
mux: http.NewServeMux(),
contents: map[isolated.HexDigest][]byte{},
staging: map[isolated.HexDigest][]byte{},
}
server.handleJSON("/api/isolateservice/v1/server_details", server.serverDetails)
server.handleJSON("/api/isolateservice/v1/preupload", server.preupload)
server.handleJSON("/api/isolateservice/v1/finalize_gs_upload", server.finalizeGSUpload)
server.handleJSON("/api/isolateservice/v1/store_inline", server.storeInline)
server.mux.HandleFunc("/fake/cloudstorage", server.fakeCloudStorage)
// Fail on anything else.
server.mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
server.Fail(fmt.Errorf("unknwown endpoint %s", req.URL))
})
return server
}
// Private details.
func (server *isolatedFake) ServeHTTP(w http.ResponseWriter, r *http.Request) {
server.mux.ServeHTTP(w, r)
}
func (server *isolatedFake) Contents() map[isolated.HexDigest][]byte {
server.lock.Lock()
defer server.lock.Unlock()
out := map[isolated.HexDigest][]byte{}
for k, v := range server.contents {
out[k] = v
}
return out
}
func (server *isolatedFake) Inject(data []byte) {
h := isolated.HashBytes(data)
server.lock.Lock()
defer server.lock.Unlock()
server.contents[h] = data
}
func (server *isolatedFake) Fail(err error) {
server.lock.Lock()
defer server.lock.Unlock()
server.failLocked(err)
}
func (server *isolatedFake) Error() error {
server.lock.Lock()
defer server.lock.Unlock()
return server.err
}
func (server *isolatedFake) failLocked(err error) {
if server.err == nil {
server.err = err
}
}
func (server *isolatedFake) handleJSON(path string, handler jsonAPI) {
server.mux.Handle(path, handlerJSON(server, handler))
}
func (server *isolatedFake) serverDetails(r *http.Request) interface{} {
content, err := ioutil.ReadAll(r.Body)
if err != nil {
server.Fail(err)
}
if string(content) != "{}" {
server.Fail(fmt.Errorf("unexpected content %#v", string(content)))
}
return map[string]string{"server_version": "v1"}
}
func (server *isolatedFake) preupload(r *http.Request) interface{} {
data := &isolateservice.HandlersEndpointsV1DigestCollection{}
if err := json.NewDecoder(r.Body).Decode(data); err != nil {
server.Fail(err)
}
if data.Namespace == nil || data.Namespace.Namespace != "default-gzip" {
server.Fail(fmt.Errorf("unexpected namespace %#v", data.Namespace.Namespace))
}
out := &isolateservice.HandlersEndpointsV1UrlCollection{}
server.lock.Lock()
defer server.lock.Unlock()
for i, d := range data.Items {
if _, ok := server.contents[isolated.HexDigest(d.Digest)]; !ok {
// Simulate a write to Cloud Storage for larger writes.
ticket := "ticket:" + string(d.Digest)
s := &isolateservice.HandlersEndpointsV1PreuploadStatus{
Index: int64(i),
UploadTicket: ticket,
}
if d.Size > 1024 {
v := url.Values{}
v.Add("digest", string(d.Digest))
u := &url.URL{Scheme: "http", Host: r.Host, Path: "/fake/cloudstorage", RawQuery: v.Encode()}
s.GsUploadUrl = u.String()
//log.Printf("%s", s.GsUploadUrl)
}
out.Items = append(out.Items, s)
}
}
return out
}
func (server *isolatedFake) fakeCloudStorage(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if r.Header.Get("Content-Type") != "application/octet-stream" {
w.WriteHeader(400)
server.Fail(fmt.Errorf("invalid content type: %s", r.Header.Get("Content-Type")))
return
}
if r.Method != "PUT" {
w.WriteHeader(405)
server.Fail(fmt.Errorf("invalid method: %s", r.Method))
return
}
decompressor, err := isolated.GetDecompressor(r.Body)
if err != nil {
w.WriteHeader(500)
server.Fail(err)
return
}
defer decompressor.Close()
raw, err := ioutil.ReadAll(decompressor)
if err != nil {
w.WriteHeader(500)
server.Fail(err)
return
}
digest := isolated.HexDigest(r.URL.Query().Get("digest"))
if digest != isolated.HashBytes(raw) {
w.WriteHeader(400)
server.Fail(fmt.Errorf("invalid digest %#v", digest))
return
}
server.lock.Lock()
defer server.lock.Unlock()
server.staging[digest] = raw
w.WriteHeader(200)
}
func (server *isolatedFake) finalizeGSUpload(r *http.Request) interface{} {
data := &isolateservice.HandlersEndpointsV1FinalizeRequest{}
if err := json.NewDecoder(r.Body).Decode(data); err != nil {
server.Fail(err)
return map[string]string{"err": err.Error()}
}
prefix := "ticket:"
if !strings.HasPrefix(data.UploadTicket, prefix) {
err := fmt.Errorf("unexpected ticket %#v", data.UploadTicket)
server.Fail(err)
return map[string]string{"err": err.Error()}
}
digest := isolated.HexDigest(data.UploadTicket[len(prefix):])
if !digest.Validate() {
err := fmt.Errorf("invalid digest %#v", digest)
server.Fail(err)
return map[string]string{"err": err.Error()}
}
server.lock.Lock()
defer server.lock.Unlock()
if _, ok := server.staging[digest]; !ok {
err := fmt.Errorf("finalizing non uploaded file")
server.failLocked(err)
return map[string]string{"err": err.Error()}
}
server.contents[digest] = server.staging[digest]
delete(server.staging, digest)
return map[string]string{"ok": "true"}
}
func (server *isolatedFake) storeInline(r *http.Request) interface{} {
data := &isolateservice.HandlersEndpointsV1StorageRequest{}
if err := json.NewDecoder(r.Body).Decode(data); err != nil {
server.Fail(err)
return map[string]string{"err": err.Error()}
}
prefix := "ticket:"
if !strings.HasPrefix(data.UploadTicket, prefix) {
err := fmt.Errorf("unexpected ticket %#v", data.UploadTicket)
server.Fail(err)
return map[string]string{"err": err.Error()}
}
digest := isolated.HexDigest(data.UploadTicket[len(prefix):])
if !digest.Validate() {
err := fmt.Errorf("invalid digest %#v", digest)
server.Fail(err)
return map[string]string{"err": err.Error()}
}
blob, err := base64.StdEncoding.DecodeString(data.Content)
if err != nil {
server.Fail(err)
return map[string]string{"err": err.Error()}
}
decompressor, err := isolated.GetDecompressor(bytes.NewReader(blob))
if err != nil {
server.Fail(err)
return map[string]string{"err": err.Error()}
}
defer decompressor.Close()
raw, err := ioutil.ReadAll(decompressor)
if err != nil {
server.Fail(err)
return map[string]string{"err": err.Error()}
}
if digest != isolated.HashBytes(raw) {
err := fmt.Errorf("invalid digest %#v", digest)
server.Fail(err)
return map[string]string{"err": err.Error()}
}
server.lock.Lock()
defer server.lock.Unlock()
server.contents[digest] = raw
//log.Printf(" storing %s = %d bytes", digest, len(raw))
return map[string]string{"ok": "true"}
}