blob: 10acb2a2dbb9cf3836b9c0c42b47ad03e37428a5 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"bytes"
"compress/zlib"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/server/auth/service/protocol"
"go.chromium.org/luci/server/auth/signing"
"go.chromium.org/luci/server/auth/signing/signingtest"
"go.chromium.org/luci/server/caching"
. "github.com/smartystreets/goconvey/convey"
. "go.chromium.org/luci/common/testing/assertions"
)
func TestPubSubWorkflow(t *testing.T) {
Convey("PubSub pull workflow works", t, func(c C) {
ctx := caching.WithEmptyProcessCache(context.Background())
fakeSigner := signingtest.NewSigner(nil)
certs, _ := fakeSigner.Certificates(ctx)
certsJSON, _ := json.Marshal(certs)
// Expected calls.
calls := []struct {
Method string
URL string
Code int
Response string
}{
// Probing for existing subscription -> tell it is not found.
{
"GET",
"/pubsub/projects/p1/subscriptions/sub",
404,
`{"error": {"code": 404}}`,
},
// Authorizing access to PubSub topic.
{
"POST",
"/auth_service/api/v1/authdb/subscription/authorization",
200,
`{"topic": "projects/p2/topics/topic"}`,
},
// Creating the subscription.
{
"PUT",
"/pubsub/projects/p1/subscriptions/sub",
200,
"",
},
// Probing for existing subscription again.
{
"GET",
"/pubsub/projects/p1/subscriptions/sub",
200,
`{"pushConfig": {"pushEndpoint": "http://blah"}}`,
},
// Changing push URL.
{
"POST",
"/pubsub/projects/p1/subscriptions/sub:modifyPushConfig",
200,
"",
},
// Pulling messages from it, all bad.
{
"POST",
"/pubsub/projects/p1/subscriptions/sub:pull",
200,
`{"receivedMessages": [
{
"ackId": "ack1",
"message": {"data": "broken"}
}
]}`,
},
// Fetching certificates from auth service to authenticate messages.
{
"GET",
"/auth/api/v1/server/certificates",
200,
string(certsJSON),
},
// Bad messages are removed from the queue by ack.
{
"POST",
"/pubsub/projects/p1/subscriptions/sub:acknowledge",
200,
"",
},
// Pulling messages from the subscription, again.
{
"POST",
"/pubsub/projects/p1/subscriptions/sub:pull",
200,
fmt.Sprintf(`{"receivedMessages": [
{
"ackId": "ack1",
"message": {"data": "broken"}
},
%s,
%s
]}`, fakePubSubMessage(ctx, "ack2", 122, fakeSigner), fakePubSubMessage(ctx, "ack2", 123, fakeSigner)),
},
// Acknowledging messages.
{
"POST",
"/pubsub/projects/p1/subscriptions/sub:acknowledge",
200,
"",
},
// Removing existing subscription.
{
"DELETE",
"/pubsub/projects/p1/subscriptions/sub",
200,
"{}",
},
// Removing already deleted subscription.
{
"DELETE",
"/pubsub/projects/p1/subscriptions/sub",
404,
`{"error": {"code": 404}}`,
},
}
counter := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if counter >= len(calls) {
c.So(fmt.Sprintf("%s %s is unexpected", r.Method, r.URL.Path), ShouldBeNil)
}
call := &calls[counter]
c.So(r.Method, ShouldEqual, call.Method)
c.So(r.URL.Path, ShouldEqual, call.URL)
w.WriteHeader(call.Code)
if call.Response != "" {
w.Write([]byte(call.Response))
}
counter++
}))
defer ts.Close()
// Register.
srv := AuthService{
URL: ts.URL,
pubSubURLRoot: ts.URL + "/pubsub/",
}
So(srv.EnsureSubscription(ctx, "projects/p1/subscriptions/sub", "http://blah"), ShouldBeNil)
// Reregister with no push url. For code coverage.
So(srv.EnsureSubscription(ctx, "projects/p1/subscriptions/sub", ""), ShouldBeNil)
// First pull. No valid messages.
notify, err := srv.PullPubSub(ctx, "projects/p1/subscriptions/sub")
So(err, ShouldBeNil)
So(notify, ShouldBeNil)
// Second pull. Have something.
notify, err = srv.PullPubSub(ctx, "projects/p1/subscriptions/sub")
So(err, ShouldBeNil)
So(notify, ShouldNotBeNil)
So(notify.Revision, ShouldEqual, 123)
// Ack.
So(notify.Acknowledge(ctx), ShouldBeNil)
// Code coverage.
notify, err = srv.ProcessPubSubPush(ctx, []byte(fakePubSubMessage(ctx, "", 456, fakeSigner)))
So(err, ShouldBeNil)
So(notify, ShouldNotBeNil)
So(notify.Revision, ShouldEqual, 456)
// Killing existing subscription.
So(srv.DeleteSubscription(ctx, "projects/p1/subscriptions/sub"), ShouldBeNil)
// Killing already removed subscription.
So(srv.DeleteSubscription(ctx, "projects/p1/subscriptions/sub"), ShouldBeNil)
})
}
func TestGetSnapshot(t *testing.T) {
Convey("GetSnapshot works", t, func(c C) {
ctx := context.Background()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c.So(r.URL.Path, ShouldEqual, "/auth_service/api/v1/authdb/revisions/123")
body, digest := generateSnapshot(123)
w.Write([]byte(fmt.Sprintf(`{"snapshot": {
"auth_db_rev": 123,
"sha256": "%s",
"created_ts": 1446599918304238,
"deflated_body": "%s"
}}`, digest, body)))
}))
defer ts.Close()
srv := AuthService{
URL: ts.URL,
}
snap, err := srv.GetSnapshot(ctx, 123)
So(err, ShouldBeNil)
So(snap, ShouldResemble, &Snapshot{
AuthDB: &protocol.AuthDB{},
AuthServiceURL: ts.URL,
Rev: 123,
Created: time.Unix(0, 1446599918304238000),
})
})
}
func TestDeflateInflate(t *testing.T) {
Convey("Deflate then Inflate works", t, func() {
initial := &protocol.AuthDB{
OauthClientId: "abc",
OauthClientSecret: "def",
}
blob, err := DeflateAuthDB(initial)
So(err, ShouldBeNil)
inflated, err := InflateAuthDB(blob)
So(err, ShouldBeNil)
So(inflated, ShouldResembleProto, initial)
})
}
///
func generateSnapshot(rev int64) (body string, digest string) {
blob, err := proto.Marshal(&protocol.ReplicationPushRequest{
Revision: &protocol.AuthDBRevision{
AuthDbRev: rev,
PrimaryId: "primaryId",
ModifiedTs: 1446599918304238,
},
AuthDb: &protocol.AuthDB{},
})
if err != nil {
panic(err)
}
buf := bytes.Buffer{}
w := zlib.NewWriter(&buf)
_, err = w.Write(blob)
if err != nil {
panic(err)
}
w.Close()
body = base64.StdEncoding.EncodeToString(buf.Bytes())
hash := sha256.Sum256(blob)
digest = hex.EncodeToString(hash[:])
return
}
func fakePubSubMessage(c context.Context, ackID string, rev int64, signer signing.Signer) string {
msg := protocol.ChangeNotification{
Revision: &protocol.AuthDBRevision{
AuthDbRev: rev,
PrimaryId: "primaryId",
ModifiedTs: 1000,
},
}
blob, _ := proto.Marshal(&msg)
key, sig, _ := signer.SignBytes(c, blob)
ps := pubSubMessage{
AckID: ackID,
}
ps.Message.Data = base64.StdEncoding.EncodeToString(blob)
ps.Message.Attributes = map[string]string{
"X-AuthDB-SigKey-v1": key,
"X-AuthDB-SigVal-v1": base64.StdEncoding.EncodeToString(sig),
}
out, _ := json.Marshal(&ps)
return string(out)
}