blob: 56b79c2b05902c61395af9853b59294f0865550e [file] [log] [blame]
package common
import (
"encoding/base64"
"errors"
"fmt"
"net/url"
"strings"
"time"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"github.com/luci/gae/service/info"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/milo/api/config"
)
var pubSubClientKey = "stores a pubsubClient"
type PubSubMessage struct {
Attributes map[string]string `json:"attributes"`
Data string `json:"data"`
MessageID string `json:"message_id"`
}
type PubSubSubscription struct {
Message PubSubMessage `json:"message"`
Subscription string `json:"subscription"`
}
var errNotExist = errors.New("does not exist")
// GetData returns the expanded form of Data (decoded from base64).
func (m *PubSubSubscription) GetData() ([]byte, error) {
return base64.StdEncoding.DecodeString(m.Message.Data)
}
// pubsubClient is an interface representing a pubsub.Client containing only
// the functions that Milo calls. Internal use only, can be swapped
// out for testing.
type pubsubClient interface {
// getTopic returns the pubsub topic if it exists, a notExist error if
// it does not exist, or an error if there was an error.
getTopic(string) (*pubsub.Topic, error)
// getSubscription returns the pubsub subscription if it exists,
// a notExist error if it does not exist, or an error if there was an error.
getSubscription(string) (*pubsub.Subscription, error)
createSubscription(string, pubsub.SubscriptionConfig) (
*pubsub.Subscription, error)
}
// prodPubSubClient is a wrapper around the production pubsub client.
type prodPubSubClient struct {
ctx context.Context
client *pubsub.Client
}
func (pc *prodPubSubClient) getTopic(id string) (*pubsub.Topic, error) {
topic := pc.client.Topic(id)
exists, err := topic.Exists(pc.ctx)
switch {
case err != nil:
return nil, err
case !exists:
return nil, errNotExist
}
return topic, nil
}
func (pc *prodPubSubClient) getSubscription(id string) (*pubsub.Subscription, error) {
sub := pc.client.Subscription(id)
exists, err := sub.Exists(pc.ctx)
switch {
case err != nil:
return nil, err
case !exists:
return nil, errNotExist
}
return sub, nil
}
func (pc *prodPubSubClient) createSubscription(id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
return pc.client.CreateSubscription(pc.ctx, id, cfg)
}
// getPubSubClient extracts a debug PubSub client out of the context.
func getPubSubClient(c context.Context) (pubsubClient, error) {
if client, ok := c.Value(&pubSubClientKey).(pubsubClient); ok {
return client, nil
}
return nil, errors.New("no pubsub clients installed")
}
// withClient returns a context with a pubsub client instantiated to the
// given project ID
func withClient(c context.Context, projectID string) (context.Context, error) {
if projectID == "" {
return nil, errors.New("missing buildbucket project")
}
client, err := pubsub.NewClient(c, projectID)
if err != nil {
return nil, err
}
return context.WithValue(c, &pubSubClientKey, &prodPubSubClient{c, client}), nil
}
func getTopic(c context.Context, id string) (*pubsub.Topic, error) {
client, err := getPubSubClient(c)
if err != nil {
return nil, err
}
return client.getTopic(id)
}
func getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
client, err := getPubSubClient(c)
if err != nil {
return nil, err
}
return client.getSubscription(id)
}
func createSubscription(c context.Context, id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
client, err := getPubSubClient(c)
if err != nil {
return nil, err
}
return client.createSubscription(id, cfg)
}
// EnsurePubSubSubscribed makes sure the following subscriptions are in place:
// * buildbucket, via the settings.Buildbucket.Topic setting
func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
if settings.Buildbucket != nil {
// Install the production pubsub client pointing to the buildbucket project
// into the context.
c, err := withClient(c, settings.Buildbucket.Project)
if err != nil {
return err
}
return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
}
// TODO(hinoka): Ensure buildbot subscribed.
return nil
}
// ensureSubscribed is called by a cron job and ensures that the Milo
// instance is properly subscribed to the buildbucket subscription endpoint.
func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
topicID := "builds"
// Check to see if the topic exists first.
topic, err := getTopic(c, topicID)
switch err {
case errNotExist:
logging.WithError(err).Errorf(c, "%s does not exist", topicID)
return err
case nil:
// continue
default:
if strings.Contains(err.Error(), "PermissionDenied") {
URL := "https://console.cloud.google.com/iam-admin/iam/project?project=" + projectID
acct, serr := info.ServiceAccount(c)
if serr != nil {
acct = fmt.Sprintf("Unknown: %s", serr.Error())
}
// The documentation is incorrect. We need Editor permission because
// the Subscriber permission does NOT permit attaching subscriptions to
// topics or to view topics.
logging.WithError(err).Errorf(
c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct)
} else {
logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
}
return err
}
// Now check to see if the subscription already exists.
subID := info.AppID(c)
// Get the pubsub module of our app. We do not want to use info.ModuleHostname()
// because it returns a version pinned hostname instead of the default route.
sub, err := getSubscription(c, subID)
switch err {
case errNotExist:
// continue
case nil:
logging.Infof(c, "subscription %#v exists, no need to update", sub)
return nil
default:
logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
return err
}
pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
// No subscription exists, attach a new subscription to the existing topic.
endpointURL := url.URL{
Scheme: "https",
Host: pubsubModuleHost,
Path: "/_ah/push-handlers/buildbucket",
}
subConfig := pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
AckDeadline: time.Minute * 10,
}
newSub, err := createSubscription(c, subID, subConfig)
if err != nil {
if strings.Contains(err.Error(), "The supplied HTTP URL is not registered") {
registerURL := "https://console.cloud.google.com/apis/credentials/domainverification?project=" + projectID
verifyURL := "https://www.google.com/webmasters/verification/verification?hl=en-GB&siteUrl=http://" + pubsubModuleHost
logging.WithError(err).Errorf(
c, "The domain has to be verified and added.\n\n"+
"1. Go to %s\n"+
"2. Verify the domain\n"+
"3. Go to %s\n"+
"4. Add %s to allowed domains\n\n",
verifyURL, registerURL, pubsubModuleHost)
} else {
logging.WithError(err).Errorf(c, "could not create subscription %#v", sub)
}
return err
}
// Success!
logging.Infof(c, "successfully created subscription %#v", newSub)
return nil
}