Milo: Move buildbucket pubsub sub from buildbucket project to milo project

This moves (for instance) the subscription from:
projects/cr-buildbucket/subscriptions/luci-milo
to:
projects/luci-milo/subscriptions/buildbucket

This removes a series of complex steps when setting up the pubsub pipeline.

Nothing else changes.

BUG=624960

Review-Url: https://codereview.chromium.org/2981683002
diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
index 56b79c2..56ec5c5 100644
--- a/milo/common/pubsub.go
+++ b/milo/common/pubsub.go
@@ -2,7 +2,6 @@
 
 import (
 	"encoding/base64"
-	"errors"
 	"fmt"
 	"net/url"
 	"strings"
@@ -12,11 +11,12 @@
 	"golang.org/x/net/context"
 
 	"github.com/luci/gae/service/info"
+	"github.com/luci/luci-go/common/errors"
 	"github.com/luci/luci-go/common/logging"
 	"github.com/luci/luci-go/milo/api/config"
 )
 
-var pubSubClientKey = "stores a pubsubClient"
+var pubsubClientFactoryKey = "stores a pubsubClientFactory"
 
 type PubSubMessage struct {
 	Attributes map[string]string `json:"attributes"`
@@ -42,25 +42,28 @@
 type pubsubClient interface {
 	// getTopic returns the pubsub topic if it exists, a notExist error if
 	// it does not exist, or an error if there was an error.
-	getTopic(string) (*pubsub.Topic, error)
+	getTopic(context.Context, string) (*pubsub.Topic, error)
 
 	// getSubscription returns the pubsub subscription if it exists,
 	// a notExist error if it does not exist, or an error if there was an error.
-	getSubscription(string) (*pubsub.Subscription, error)
+	getSubscription(context.Context, string) (*pubsub.Subscription, error)
 
-	createSubscription(string, pubsub.SubscriptionConfig) (
+	createSubscription(context.Context, string, pubsub.SubscriptionConfig) (
 		*pubsub.Subscription, error)
 }
 
+// pubsubClientFactory is a stubbable factory that produces pubsubClients bound
+// to project IDs.
+type pubsubClientFactory func(context.Context, string) (pubsubClient, error)
+
 // prodPubSubClient is a wrapper around the production pubsub client.
 type prodPubSubClient struct {
-	ctx    context.Context
-	client *pubsub.Client
+	*pubsub.Client
 }
 
-func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
-	topic := pc.client.Topic(id)
-	exists, err := topic.Exists(pc.ctx)
+func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
+	topic := pc.Client.Topic(id)
+	exists, err := topic.Exists(c)
 	switch {
 	case err != nil:
 		return nil, err
@@ -70,9 +73,9 @@
 	return topic, nil
 }
 
-func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
-	sub := pc.client.Subscription(id)
-	exists, err := sub.Exists(pc.ctx)
+func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
+	sub := pc.Client.Subscription(id)
+	exists, err := sub.Exists(c)
 	switch {
 	case err != nil:
 		return nil, err
@@ -82,85 +85,55 @@
 	return sub, nil
 }
 
-func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
+func (pc *prodPubSubClient) createSubscription(
+	c context.Context, id string, cfg pubsub.SubscriptionConfig) (
 	*pubsub.Subscription, error) {
 
-	return pc.client.CreateSubscription(pc.ctx, id, cfg)
+	return pc.Client.CreateSubscription(c, id, cfg)
 }
 
-// getPubSubClient extracts a debug PubSub client out of the context.
-func getPubSubClient(c context.Context) (pubsubClient, error) {
-	if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
-		return client, nil
-	}
-	return nil, errors.New("no pubsub clients installed")
+func prodPubSubClientFactory(c context.Context, projectID string) (pubsubClient, error) {
+	cli, err := pubsub.NewClient(c, projectID)
+	return &prodPubSubClient{cli}, err
 }
 
-// withClient returns a context with a pubsub client instantiated to the
-// given project ID
-func withClient(c context.Context, projectID string) (context.Context, error) {
-	if projectID == "" {
-		return nil, errors.New("missing buildbucket project")
-	}
-	client, err := pubsub.NewClient(c, projectID)
-	if err != nil {
-		return nil, err
-	}
-	return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
+// withClientFactory returns a context with a given pubsub client factory.
+func withClientFactory(c context.Context, fac pubsubClientFactory) context.Context {
+	return context.WithValue(c, &pubsubClientFactoryKey, fac)
 }
 
-func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
-	client, err := getPubSubClient(c)
-	if err != nil {
-		return nil, err
+func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
+	if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !ok {
+		panic("no pubsub client factory installed")
+	} else {
+		return fac(c, projectID)
 	}
-	return client.getTopic(id)
-}
-
-func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
-	client, err := getPubSubClient(c)
-	if err != nil {
-		return nil, err
-	}
-	return client.getSubscription(id)
-}
-
-func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
-	*pubsub.Subscription, error) {
-
-	client, err := getPubSubClient(c)
-	if err != nil {
-		return nil, err
-	}
-	return client.createSubscription(id, cfg)
 }
 
 // EnsurePubSubSubscribed makes sure the following subscriptions are in place:
 // * buildbucket, via the settings.Buildbucket.Topic setting
 func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
 	if settings.Buildbucket != nil {
-		// Install the production pubsub client pointing to the buildbucket project
-		// into the context.
-		c, err := withClient(c, settings.Buildbucket.Project)
-		if err != nil {
-			return err
-		}
+		c = withClientFactory(c, prodPubSubClientFactory)
 		return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
 	}
 	// TODO(hinoka): Ensure buildbot subscribed.
 	return nil
 }
 
-// ensureSubscribed is called by a cron job and ensures that the Milo
+// ensureBuildbucketSubscribedis called by a cron job and ensures that the Milo
 // instance is properly subscribed to the buildbucket subscription endpoint.
 func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
 	topicID := "builds"
-	// Check to see if the topic exists first.
-	topic, err := getTopic(c, topicID)
+	// Check the buildbucket project to see if the topic exists first.
+	bbClient, err := newPubSubClient(c, projectID)
+	if err != nil {
+		return err
+	}
+	topic, err := bbClient.getTopic(c, topicID)
 	switch err {
 	case errNotExist:
-		logging.WithError(err).Errorf(c, "%s does not exist", topicID)
-		return err
+		return errors.Annotate(err, "%s does not exist", topicID).Err()
 	case nil:
 		// continue
 	default:
@@ -181,10 +154,11 @@
 		return err
 	}
 	// Now check to see if the subscription already exists.
-	subID := info.AppID(c)
-	// Get the pubsub module of our app.  We do not want to use info.ModuleHostname()
-	// because it returns a version pinned hostname instead of the default route.
-	sub, err := getSubscription(c, subID)
+	miloClient, err := newPubSubClient(c, info.AppID(c))
+	if err != nil {
+		return err
+	}
+	sub, err := miloClient.getSubscription(c, "buildbucket")
 	switch err {
 	case errNotExist:
 		// continue
@@ -195,6 +169,8 @@
 		logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
 		return err
 	}
+	// Get the pubsub module of our app.  We do not want to use info.ModuleHostname()
+	// because it returns a version pinned hostname instead of the default route.
 	pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
 
 	// No subscription exists, attach a new subscription to the existing topic.
@@ -208,22 +184,9 @@
 		PushConfig:  pubsub.PushConfig{Endpoint: endpointURL.String()},
 		AckDeadline: time.Minute * 10,
 	}
-	newSub, err := createSubscription(c, subID, subConfig)
+	newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig)
 	if err != nil {
-		if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
-			registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
-			verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
-			logging.WithError(err).Errorf(
-				c, "The domain has to be verified and added.\n\n"+
-					"1. Go to %s\n"+
-					"2. Verify the domain\n"+
-					"3. Go to %s\n"+
-					"4. Add %s to allowed domains\n\n",
-				verifyURL, registerURL, pubsubModuleHost)
-		} else {
-			logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
-		}
-		return err
+		return errors.Annotate(err, "could not create subscription %#v", sub).Err()
 	}
 	// Success!
 	logging.Infof(c, "successfully created subscription %#v", newSub)
diff --git a/milo/common/pubsub_test.go b/milo/common/pubsub_test.go
index 4ac7886..aa9ca3b 100644
--- a/milo/common/pubsub_test.go
+++ b/milo/common/pubsub_test.go
@@ -5,7 +5,6 @@
 package common
 
 import (
-	"errors"
 	"fmt"
 	"testing"
 
@@ -13,6 +12,7 @@
 	"golang.org/x/net/context"
 
 	"github.com/luci/gae/impl/memory"
+	"github.com/luci/luci-go/common/errors"
 	"github.com/luci/luci-go/common/logging/gologger"
 
 	. "github.com/smartystreets/goconvey/convey"
@@ -26,7 +26,7 @@
 }
 
 // Topic returns an empty pubsub topic reference.
-func (client *testPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
+func (client *testPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
 	if err, ok := client.topics[id]; ok {
 		return &pubsub.Topic{}, err
 	}
@@ -34,7 +34,7 @@
 }
 
 // Subscription returns an empty subscription reference.
-func (client *testPubSubClient) getSubscription(id string) (
+func (client *testPubSubClient) getSubscription(c context.Context, id string) (
 	*pubsub.Subscription, error) {
 	if err, ok := client.subscriptions[id]; ok {
 		return &pubsub.Subscription{}, err
@@ -45,7 +45,7 @@
 // CreateSubscription records that an attempt to create a subscription with
 // an id, then returns an empty subscription.
 func (client *testPubSubClient) createSubscription(
-	id string, cfg pubsub.SubscriptionConfig) (
+	c context.Context, id string, cfg pubsub.SubscriptionConfig) (
 	*pubsub.Subscription, error) {
 
 	if err, ok := client.createdSubsErr[id]; ok {
@@ -55,59 +55,87 @@
 	panic(fmt.Errorf("test error: unknown created sub %s", id))
 }
 
+type testFactory struct {
+	clients map[string]pubsubClient
+}
+
+// makeTestClientFactory returns a closed pubsubClientFactory.
+// Golang Protip: A bound method will not match the function type signature
+// of an unbound function, but a closed function will.
+func (fac *testFactory) makeTestClientFactory() pubsubClientFactory {
+	return func(c context.Context, projectID string) (pubsubClient, error) {
+		if cli, ok := fac.clients[projectID]; ok {
+			return cli, nil
+		}
+		return nil, fmt.Errorf("client for project %s does not exist", projectID)
+	}
+}
+
 func TestPubSub(t *testing.T) {
 	t.Parallel()
 
 	Convey("Test Environment", t, func() {
 		c := memory.UseWithAppID(context.Background(), "dev~luci-milo")
 		c = gologger.StdConfig.Use(c)
-		client := &testPubSubClient{
+		miloClient := &testPubSubClient{
 			topics:         map[string]error{},
 			subscriptions:  map[string]error{},
 			createdSubsErr: map[string]error{},
 			createdSubs:    map[string]pubsub.SubscriptionConfig{}}
-		c = context.WithValue(c, &pubSubClientKey, client)
+		bbClient := &testPubSubClient{
+			topics:         map[string]error{},
+			subscriptions:  map[string]error{},
+			createdSubsErr: map[string]error{},
+			createdSubs:    map[string]pubsub.SubscriptionConfig{}}
+		fac := testFactory{
+			clients: map[string]pubsubClient{
+				"luci-milo":   miloClient,
+				"buildbucket": bbClient,
+			},
+		}
+		c = context.WithValue(c, &pubsubClientFactoryKey, fac.makeTestClientFactory())
 
 		Convey("Buildbucket PubSub subscriber", func() {
-			proj := "foo"
+			proj := "buildbucket"
 			Convey("Non-existant topic", func() {
-				client.topics["builds"] = errNotExist
+				bbClient.topics["builds"] = errNotExist
 				err := ensureBuildbucketSubscribed(c, proj)
 				So(err.Error(), ShouldEndWith, "does not exist")
 			})
 			Convey("Permission denied", func() {
 				pErr := errors.New(
 					"something PermissionDenied something")
-				client.topics["builds"] = pErr
+				bbClient.topics["builds"] = pErr
 				err := ensureBuildbucketSubscribed(c, proj)
 				So(err, ShouldEqual, pErr)
 			})
 			Convey("Normal error", func() {
 				pErr := errors.New("foobar")
-				client.topics["builds"] = pErr
+				bbClient.topics["builds"] = pErr
 				err := ensureBuildbucketSubscribed(c, proj)
 				So(err, ShouldEqual, pErr)
 			})
-			client.topics["builds"] = nil
+			bbClient.topics["builds"] = nil
 			Convey("Subscription exists", func() {
-				client.subscriptions["luci-milo"] = nil
+				miloClient.subscriptions["buildbucket"] = nil
 				err := ensureBuildbucketSubscribed(c, proj)
 				So(err, ShouldBeNil)
-				So(len(client.createdSubs), ShouldEqual, 0)
+				So(len(miloClient.createdSubs), ShouldEqual, 0)
+				So(len(bbClient.createdSubs), ShouldEqual, 0)
 			})
-			client.subscriptions["luci-milo"] = errNotExist
+			miloClient.subscriptions["buildbucket"] = errNotExist
 			Convey("Not registered", func() {
 				errNotReg := errors.New("The supplied HTTP URL is not registered")
-				client.createdSubsErr["luci-milo"] = errNotReg
+				miloClient.createdSubsErr["buildbucket"] = errNotReg
 				err := ensureBuildbucketSubscribed(c, proj)
-				So(err, ShouldEqual, errNotReg)
+				So((err.(errors.Wrapped)).InnerError(), ShouldEqual, errNotReg)
 			})
 			Convey("Create subscription", func() {
-				client.createdSubsErr["luci-milo"] = nil
+				miloClient.createdSubsErr["buildbucket"] = nil
 				err := ensureBuildbucketSubscribed(c, proj)
 				So(err, ShouldBeNil)
-				So(len(client.createdSubs), ShouldEqual, 1)
-				_, ok := client.createdSubs["luci-milo"]
+				So(len(miloClient.createdSubs), ShouldEqual, 1)
+				_, ok := miloClient.createdSubs["buildbucket"]
 				So(ok, ShouldEqual, true)
 			})
 		})