blob: 02fe9858160e6a071c83e8dbd18a89d465ef8265 [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
var pubsubClientFactoryKey = "stores a pubsubClientFactory"
type PubSubMessage struct {
Attributes map[string]interface{} `json:"attributes"`
Data string `json:"data"`
MessageID string `json:"message_id"`
type PubSubSubscription struct {
Message PubSubMessage `json:"message"`
Subscription string `json:"subscription"`
var errNotExist = errors.New("does not exist")
// GetData returns the expanded form of Data (decoded from base64).
func (m *PubSubSubscription) GetData() ([]byte, error) {
return base64.StdEncoding.DecodeString(m.Message.Data)
// pubsubClient is an interface representing a pubsub.Client containing only
// the functions that Milo calls. Internal use only, can be swapped
// out for testing.
type pubsubClient interface {
// close releases any resources held by the client, such as memory and
// goroutines.
// Uses the context for logging.
// getTopic returns the pubsub topic if it exists, a notExist error if
// it does not exist, or an error if there was an error.
getTopic(context.Context, string) (*pubsub.Topic, error)
// getSubscription returns the pubsub subscription if it exists,
// a notExist error if it does not exist, or an error if there was an error.
getSubscription(context.Context, string) (*pubsub.Subscription, error)
createSubscription(context.Context, string, pubsub.SubscriptionConfig) (
*pubsub.Subscription, error)
// pubsubClientFactory is a stubbable factory that produces pubsubClients bound
// to project IDs.
type pubsubClientFactory func(context.Context, string) (pubsubClient, error)
// prodPubSubClient is a wrapper around the production pubsub client.
type prodPubSubClient struct {
func (pc *prodPubSubClient) close(c context.Context) {
if err := pc.Close(); err != nil {
logging.WithError(err).Errorf(c, "Failed to close PubSub client")
func (pc *prodPubSubClient) getTopic(c context.Context, id string) (*pubsub.Topic, error) {
topic := pc.Client.Topic(id)
exists, err := topic.Exists(c)
switch {
case err != nil:
return nil, err
case !exists:
return nil, errNotExist
return topic, nil
func (pc *prodPubSubClient) getSubscription(c context.Context, id string) (*pubsub.Subscription, error) {
sub := pc.Client.Subscription(id)
exists, err := sub.Exists(c)
switch {
case err != nil:
return nil, err
case !exists:
return nil, errNotExist
return sub, nil
func (pc *prodPubSubClient) createSubscription(
c context.Context, id string, cfg pubsub.SubscriptionConfig) (
*pubsub.Subscription, error) {
return pc.Client.CreateSubscription(c, id, cfg)
func prodPubSubClientFactory(c context.Context, projectID string) (pubsubClient, error) {
cli, err := pubsub.NewClient(c, projectID)
return &prodPubSubClient{cli}, err
// withClientFactory returns a context with a given pubsub client factory.
func withClientFactory(c context.Context, fac pubsubClientFactory) context.Context {
return context.WithValue(c, &pubsubClientFactoryKey, fac)
func newPubSubClient(c context.Context, projectID string) (pubsubClient, error) {
if fac, ok := c.Value(&pubsubClientFactoryKey).(pubsubClientFactory); !ok {
panic("no pubsub client factory installed")
} else {
return fac(c, projectID)
// EnsurePubSubSubscribed makes sure the following subscriptions are in place:
// * buildbucket, via the settings.Buildbucket.Topic setting
func EnsurePubSubSubscribed(c context.Context, settings *config.Settings) error {
if settings.Buildbucket != nil {
c = withClientFactory(c, prodPubSubClientFactory)
return ensureBuildbucketSubscribed(c, settings.Buildbucket.Project)
return nil
// ensureBuildbucketSubscribed is called by a cron job and ensures that the Milo
// instance is properly subscribed to the buildbucket subscription endpoint.
func ensureBuildbucketSubscribed(c context.Context, projectID string) error {
topicID := "builds"
// Check the buildbucket project to see if the topic exists first.
bbClient, err := newPubSubClient(c, projectID)
if err != nil {
return err
defer bbClient.close(c)
topic, err := bbClient.getTopic(c, topicID)
switch err {
case errNotExist:
return errors.Annotate(err, "%s does not exist", topicID).Err()
case nil:
// continue
if strings.Contains(err.Error(), "PermissionDenied") {
URL := "" + projectID
acct, serr := info.ServiceAccount(c)
if serr != nil {
acct = fmt.Sprintf("Unknown: %s", serr.Error())
// The documentation is incorrect. We need Editor permission because
// the Subscriber permission does NOT permit attaching subscriptions to
// topics or to view topics.
c, "please go to %s and add %s as a Pub/Sub Editor", URL, acct)
} else {
logging.WithError(err).Errorf(c, "could not check topic %#v", topic)
return err
// Now check to see if the subscription already exists.
miloClient, err := newPubSubClient(c, info.AppID(c))
if err != nil {
return err
defer miloClient.close(c)
sub, err := miloClient.getSubscription(c, "buildbucket")
switch err {
case errNotExist:
// continue
case nil:
logging.Infof(c, "subscription %#v exists, no need to update", sub)
return nil
logging.WithError(err).Errorf(c, "could not check subscription %#v", sub)
return err
// Get the pubsub module of our app. We do not want to use info.ModuleHostname()
// because it returns a version pinned hostname instead of the default route.
pubsubModuleHost := "pubsub." + info.DefaultVersionHostname(c)
// No subscription exists, attach a new subscription to the existing topic.
endpointURL := url.URL{
Scheme: "https",
Host: pubsubModuleHost,
Path: "/_ah/push-handlers/buildbucket",
subConfig := pubsub.SubscriptionConfig{
Topic: topic,
PushConfig: pubsub.PushConfig{Endpoint: endpointURL.String()},
AckDeadline: time.Minute * 10,
newSub, err := miloClient.createSubscription(c, "buildbucket", subConfig)
if err != nil {
return errors.Annotate(err, "could not create subscription %#v", sub).Err()
// Success!
logging.Infof(c, "successfully created subscription %#v", newSub)
return nil