Milo: Move buildbucket pubsub sub from buildbucket project to milo project
This moves (for instance) the subscription from:
projects/cr-buildbucket/subscriptions/luci-milo
to:
projects/luci-milo/subscriptions/buildbucket
This removes a series of complex steps when setting up the pubsub pipeline.
Nothing else changes.
BUG=624960
Review-Url: https://codereview.chromium.org/2981683002
diff --git a/milo/common/pubsub.go b/milo/common/pubsub.go
index 56b79c2..56ec5c5 100644
--- a/milo/common/pubsub.go
+++ b/milo/common/pubsub.go
@@ -2,7 +2,6 @@
import (
"encoding/base64"
- "errors"
"fmt"
"net/url"
"strings"
@@ -12,11 +11,12 @@
"golang.org/x/net/context"
"github.com/luci/gae/service/info"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/milo/api/config"
)
-var pubSubClientKey = "stores a pubsubClient"
+var pubsubClientFactoryKey = "stores a pubsubClientFactory"
type PubSubMessage struct {
Attributes map[string]string `json:"attributes"`
@@ -42,25 +42,28 @@
type pubsubClient interface {
// getTopic returns the pubsub topic if it exists, a notExist error if
// it does not exist, or an error if there was an error.
- getTopic(string) (*pubsub.Topic, error)
+ getTopic(context.Context, string) (*pubsub.Topic, error)
// getSubscription returns the pubsub subscription if it exists,
// a notExist error if it does not exist, or an error if there was an error.
- getSubscription(string) (*pubsub.Subscription, error)
+ getSubscription(context.Context, string) (*pubsub.Subscription, error)
- createSubscription(string, pubsub.SubscriptionConfig) (
+ createSubscription(context.Context, string, pubsub.SubscriptionConfig) (
*pubsub.Subscription, error)
}
+// pubsubClientFactory is a stubbable factory that produces pubsubClients bound
+// to project IDs.
+type pubsubClientFactory func(context.Context, string) (pubsubClient, error)
+
// prodPubSubClient is a wrapper around the production pubsub client.
type prodPubSubClient struct {
- ctx context.Context
- client *pubsub.Client
+ *pubsub.Client
}
-func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
- topic := pc.client.Topic(id)
- exists, err := topic.Exists(pc.ctx)
+func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
+ topic := pc.Client.Topic(id)
+ exists, err := topic.Exists(c)
switch {
case err != nil:
return nil, err
@@ -70,9 +73,9 @@
return topic, nil
}
-func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
- sub := pc.client.Subscription(id)
- exists, err := sub.Exists(pc.ctx)
+func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
+ sub := pc.Client.Subscription(id)
+ exists, err := sub.Exists(c)
switch {
case err != nil:
return nil, err
@@ -82,85 +85,55 @@
return sub, nil
}
-func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
+func (pc *prodPubSubClient) createSubscription(
+ c context.Context, id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
- return pc.client.CreateSubscription(pc.ctx, id, cfg)
+ return pc.Client.CreateSubscription(c, id, cfg)
}
-// getPubSubClient extracts a debug PubSub client out of the context.
-func getPubSubClient(c context.Context) (pubsubClient, error) {
- if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
- return client, nil
- }
- return nil, errors.New("no pubsub clients installed")
+func prodPubSubClientFactory(c context.Context, projectID string) (pubsubClient, error) {
+ cli, err := pubsub.NewClient(c, projectID)
+ return &prodPubSubClient{cli}, err
}
-// withClient returns a context with a pubsub client instantiated to the
-// given project ID
-func withClient(c context.Context, projectID string) (context.Context, error) {
- if projectID == "" {
- return nil, errors.New("missing buildbucket project")
- }
- client, err := pubsub.NewClient(c, projectID)
- if err != nil {
- return nil, err
- }
- return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
+// withClientFactory returns a context with a given pubsub client factory.
+func withClientFactory(c context.Context, fac pubsubClientFactory) context.Context {
+ return context.WithValue(c, &pubsubClientFactoryKey, fac)
}
-func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
- client, err := getPubSubClient(c)
- if err != nil {
- return nil, err
+func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
+ if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !ok {
+ panic("no pubsub client factory installed")
+ } else {
+ return fac(c, projectID)
}
- return client.getTopic(id)
-}
-
-func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
- client, err := getPubSubClient(c)
- if err != nil {
- return nil, err
- }
- return client.getSubscription(id)
-}
-
-func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
- *pubsub.Subscription, error) {
-
- client, err := getPubSubClient(c)
- if err != nil {
- return nil, err
- }
- return client.createSubscription(id, cfg)
}
// EnsurePubSubSubscribed makes sure the following subscriptions are in place:
// * buildbucket, via the settings.Buildbucket.Topic setting
func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
if settings.Buildbucket != nil {
- // Install the production pubsub client pointing to the buildbucket project
- // into the context.
- c, err := withClient(c, settings.Buildbucket.Project)
- if err != nil {
- return err
- }
+ c = withClientFactory(c, prodPubSubClientFactory)
return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
}
// TODO(hinoka): Ensure buildbot subscribed.
return nil
}
-// ensureSubscribed is called by a cron job and ensures that the Milo
+// ensureBuildbucketSubscribedis called by a cron job and ensures that the Milo
// instance is properly subscribed to the buildbucket subscription endpoint.
func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
topicID := "builds"
- // Check to see if the topic exists first.
- topic, err := getTopic(c, topicID)
+ // Check the buildbucket project to see if the topic exists first.
+ bbClient, err := newPubSubClient(c, projectID)
+ if err != nil {
+ return err
+ }
+ topic, err := bbClient.getTopic(c, topicID)
switch err {
case errNotExist:
- logging.WithError(err).Errorf(c, "%s does not exist", topicID)
- return err
+ return errors.Annotate(err, "%s does not exist", topicID).Err()
case nil:
// continue
default:
@@ -181,10 +154,11 @@
return err
}
// Now check to see if the subscription already exists.
- subID := info.AppID(c)
- // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
- // because it returns a version pinned hostname instead of the default route.
- sub, err := getSubscription(c, subID)
+ miloClient, err := newPubSubClient(c, info.AppID(c))
+ if err != nil {
+ return err
+ }
+ sub, err := miloClient.getSubscription(c, "buildbucket")
switch err {
case errNotExist:
// continue
@@ -195,6 +169,8 @@
logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
return err
}
+ // Get the pubsub module of our app. We do not want to use info.ModuleHostname()
+ // because it returns a version pinned hostname instead of the default route.
pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
// No subscription exists, attach a new subscription to the existing topic.
@@ -208,22 +184,9 @@
PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
AckDeadline: time.Minute * 10,
}
- newSub, err := createSubscription(c, subID, subConfig)
+ newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig)
if err != nil {
- if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
- registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
- verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
- logging.WithError(err).Errorf(
- c, "The domain has to be verified and added.\n\n"+
- "1. Go to %s\n"+
- "2. Verify the domain\n"+
- "3. Go to %s\n"+
- "4. Add %s to allowed domains\n\n",
- verifyURL, registerURL, pubsubModuleHost)
- } else {
- logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
- }
- return err
+ return errors.Annotate(err, "could not create subscription %#v", sub).Err()
}
// Success!
logging.Infof(c, "successfully created subscription %#v", newSub)
diff --git a/milo/common/pubsub_test.go b/milo/common/pubsub_test.go
index 4ac7886..aa9ca3b 100644
--- a/milo/common/pubsub_test.go
+++ b/milo/common/pubsub_test.go
@@ -5,7 +5,6 @@
package common
import (
- "errors"
"fmt"
"testing"
@@ -13,6 +12,7 @@
"golang.org/x/net/context"
"github.com/luci/gae/impl/memory"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging/gologger"
. "github.com/smartystreets/goconvey/convey"
@@ -26,7 +26,7 @@
}
// Topic returns an empty pubsub topic reference.
-func (client *testPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
+func (client *testPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
if err, ok := client.topics[id]; ok {
return &pubsub.Topic{}, err
}
@@ -34,7 +34,7 @@
}
// Subscription returns an empty subscription reference.
-func (client *testPubSubClient) getSubscription(id string) (
+func (client *testPubSubClient) getSubscription(c context.Context, id string) (
*pubsub.Subscription, error) {
if err, ok := client.subscriptions[id]; ok {
return &pubsub.Subscription{}, err
@@ -45,7 +45,7 @@
// CreateSubscription records that an attempt to create a subscription with
// an id, then returns an empty subscription.
func (client *testPubSubClient) createSubscription(
- id string, cfg pubsub.SubscriptionConfig) (
+ c context.Context, id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
if err, ok := client.createdSubsErr[id]; ok {
@@ -55,59 +55,87 @@
panic(fmt.Errorf("test error: unknown created sub %s", id))
}
+type testFactory struct {
+ clients map[string]pubsubClient
+}
+
+// makeTestClientFactory returns a closed pubsubClientFactory.
+// Golang Protip: A bound method will not match the function type signature
+// of an unbound function, but a closed function will.
+func (fac *testFactory) makeTestClientFactory() pubsubClientFactory {
+ return func(c context.Context, projectID string) (pubsubClient, error) {
+ if cli, ok := fac.clients[projectID]; ok {
+ return cli, nil
+ }
+ return nil, fmt.Errorf("client for project %s does not exist", projectID)
+ }
+}
+
func TestPubSub(t *testing.T) {
t.Parallel()
Convey("Test Environment", t, func() {
c := memory.UseWithAppID(context.Background(), "dev~luci-milo")
c = gologger.StdConfig.Use(c)
- client := &testPubSubClient{
+ miloClient := &testPubSubClient{
topics: map[string]error{},
subscriptions: map[string]error{},
createdSubsErr: map[string]error{},
createdSubs: map[string]pubsub.SubscriptionConfig{}}
- c = context.WithValue(c, &pubSubClientKey, client)
+ bbClient := &testPubSubClient{
+ topics: map[string]error{},
+ subscriptions: map[string]error{},
+ createdSubsErr: map[string]error{},
+ createdSubs: map[string]pubsub.SubscriptionConfig{}}
+ fac := testFactory{
+ clients: map[string]pubsubClient{
+ "luci-milo": miloClient,
+ "buildbucket": bbClient,
+ },
+ }
+ c = context.WithValue(c, &pubsubClientFactoryKey, fac.makeTestClientFactory())
Convey("Buildbucket PubSub subscriber", func() {
- proj := "foo"
+ proj := "buildbucket"
Convey("Non-existant topic", func() {
- client.topics["builds"] = errNotExist
+ bbClient.topics["builds"] = errNotExist
err := ensureBuildbucketSubscribed(c, proj)
So(err.Error(), ShouldEndWith, "does not exist")
})
Convey("Permission denied", func() {
pErr := errors.New(
"something PermissionDenied something")
- client.topics["builds"] = pErr
+ bbClient.topics["builds"] = pErr
err := ensureBuildbucketSubscribed(c, proj)
So(err, ShouldEqual, pErr)
})
Convey("Normal error", func() {
pErr := errors.New("foobar")
- client.topics["builds"] = pErr
+ bbClient.topics["builds"] = pErr
err := ensureBuildbucketSubscribed(c, proj)
So(err, ShouldEqual, pErr)
})
- client.topics["builds"] = nil
+ bbClient.topics["builds"] = nil
Convey("Subscription exists", func() {
- client.subscriptions["luci-milo"] = nil
+ miloClient.subscriptions["buildbucket"] = nil
err := ensureBuildbucketSubscribed(c, proj)
So(err, ShouldBeNil)
- So(len(client.createdSubs), ShouldEqual, 0)
+ So(len(miloClient.createdSubs), ShouldEqual, 0)
+ So(len(bbClient.createdSubs), ShouldEqual, 0)
})
- client.subscriptions["luci-milo"] = errNotExist
+ miloClient.subscriptions["buildbucket"] = errNotExist
Convey("Not registered", func() {
errNotReg := errors.New("The supplied HTTP URL is not registered")
- client.createdSubsErr["luci-milo"] = errNotReg
+ miloClient.createdSubsErr["buildbucket"] = errNotReg
err := ensureBuildbucketSubscribed(c, proj)
- So(err, ShouldEqual, errNotReg)
+ So((err.(errors.Wrapped)).InnerError(), ShouldEqual, errNotReg)
})
Convey("Create subscription", func() {
- client.createdSubsErr["luci-milo"] = nil
+ miloClient.createdSubsErr["buildbucket"] = nil
err := ensureBuildbucketSubscribed(c, proj)
So(err, ShouldBeNil)
- So(len(client.createdSubs), ShouldEqual, 1)
- _, ok := client.createdSubs["luci-milo"]
+ So(len(miloClient.createdSubs), ShouldEqual, 1)
+ _, ok := miloClient.createdSubs["buildbucket"]
So(ok, ShouldEqual, true)
})
})