blob: 323abad82638198233f9cb9bc33b8d192d4a9f04 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package authdbimpl
import (
const (
pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only
pubSubPushURLPath = "/auth/pubsub/authdb:push"
// InstallHandlers installs PubSub related HTTP handlers.
func InstallHandlers(r *router.Router, base router.MiddlewareChain) {
if appengine.IsDevAppServer() {
r.GET(pubSubPullURLPath, base, pubSubPull)
r.POST(pubSubPushURLPath, base, pubSubPush)
// setupPubSub creates a subscription to AuthDB service notification stream.
func setupPubSub(ctx context.Context, baseURL, authServiceURL string) error {
pushURL := ""
if !info.IsDevAppServer(ctx) {
pushURL = baseURL + pubSubPushURLPath // push in prod, pull on dev server
service := getAuthService(ctx, authServiceURL)
return service.EnsureSubscription(ctx, subscriptionName(ctx, authServiceURL), pushURL)
// killPubSub removes PubSub subscription created with setupPubSub.
func killPubSub(ctx context.Context, authServiceURL string) error {
service := getAuthService(ctx, authServiceURL)
return service.DeleteSubscription(ctx, subscriptionName(ctx, authServiceURL))
// subscriptionName returns full PubSub subscription name for AuthDB
// change notifications stream from given auth service.
func subscriptionName(ctx context.Context, authServiceURL string) string {
subIDPrefix := "gae-v1"
if info.IsDevAppServer(ctx) {
subIDPrefix = "dev-app-server-v1"
serviceURL, err := url.Parse(authServiceURL)
if err != nil {
return fmt.Sprintf("projects/%s/subscriptions/%s+%s", info.AppID(ctx), subIDPrefix, serviceURL.Host)
// pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change
// notification topic.
// Used only on dev server for manual testing. Prod services use push-based
// delivery.
func pubSubPull(c *router.Context) {
if !appengine.IsDevAppServer() {
replyError(c.Context, c.Writer, errors.New("not a dev server"))
processPubSubRequest(c.Context, c.Writer, c.Request, func(ctx context.Context, srv authService, serviceURL string) (*service.Notification, error) {
return srv.PullPubSub(ctx, subscriptionName(ctx, serviceURL))
// pubSubPush is HTTP handler that processes incoming PubSub push notifications.
// It uses the signature inside PubSub message body for authentication. Skips
// messages not signed by currently configured auth service.
func pubSubPush(c *router.Context) {
processPubSubRequest(c.Context, c.Writer, c.Request, func(ctx context.Context, srv authService, serviceURL string) (*service.Notification, error) {
body, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
return nil, err
return srv.ProcessPubSubPush(ctx, body)
type notifcationGetter func(context.Context, authService, string) (*service.Notification, error)
// processPubSubRequest is common wrapper for pubSubPull and pubSubPush.
// It implements most logic of notification handling. Calls supplied callback
// to actually get service.Notification, since this part is different from Pull
// and Push subscriptions.
func processPubSubRequest(ctx context.Context, rw http.ResponseWriter, r *http.Request, callback notifcationGetter) {
ctx = defaultNS(ctx)
info, err := GetLatestSnapshotInfo(ctx)
if err != nil {
replyError(ctx, rw, err)
if info == nil {
// Return HTTP 200 to avoid a redelivery.
replyOK(ctx, rw, "Auth Service URL is not configured, skipping the message")
srv := getAuthService(ctx, info.AuthServiceURL)
notify, err := callback(ctx, srv, info.AuthServiceURL)
if err != nil {
replyError(ctx, rw, err)
// notify may be nil if PubSub messages didn't pass authentication.
if notify == nil {
replyOK(ctx, rw, "No new valid AuthDB change notifications")
// Don't bother processing late messages (ack them though).
latest := info
if notify.Revision > info.Rev {
var err error
if latest, err = syncAuthDB(ctx); err != nil {
replyError(ctx, rw, err)
if err := notify.Acknowledge(ctx); err != nil {
replyError(ctx, rw, err)
ctx, rw, "Processed PubSub notification for rev %d: %d -> %d",
notify.Revision, info.Rev, latest.Rev)
// replyError sends HTTP 500 on transient errors, HTTP 400 on fatal ones.
func replyError(ctx context.Context, rw http.ResponseWriter, err error) {
logging.Errorf(ctx, "Error while processing PubSub notification - %s", err)
if transient.Tag.In(err) {
http.Error(rw, err.Error(), http.StatusInternalServerError)
} else {
http.Error(rw, err.Error(), http.StatusBadRequest)
// replyOK sends HTTP 200.
func replyOK(ctx context.Context, rw http.ResponseWriter, msg string, args ...interface{}) {
logging.Infof(ctx, msg, args...)
rw.Write([]byte(fmt.Sprintf(msg, args...)))