blob: 8ff0a921964aa418ad565a33c4dd6324e41069ab [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package authdbimpl
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"google.golang.org/appengine"
"go.chromium.org/luci/gae/service/info"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/server/auth/service"
"go.chromium.org/luci/server/router"
)
const (
pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only
pubSubPushURLPath = "/auth/pubsub/authdb:push"
)
// InstallHandlers installs PubSub related HTTP handlers.
func InstallHandlers(r *router.Router, base router.MiddlewareChain) {
if appengine.IsDevAppServer() {
r.GET(pubSubPullURLPath, base, pubSubPull)
}
r.POST(pubSubPushURLPath, base, pubSubPush)
}
// setupPubSub creates a subscription to AuthDB service notification stream.
func setupPubSub(ctx context.Context, baseURL, authServiceURL string) error {
pushURL := ""
if !info.IsDevAppServer(ctx) {
pushURL = baseURL + pubSubPushURLPath // push in prod, pull on dev server
}
service := getAuthService(ctx, authServiceURL)
return service.EnsureSubscription(ctx, subscriptionName(ctx, authServiceURL), pushURL)
}
// killPubSub removes PubSub subscription created with setupPubSub.
func killPubSub(ctx context.Context, authServiceURL string) error {
service := getAuthService(ctx, authServiceURL)
return service.DeleteSubscription(ctx, subscriptionName(ctx, authServiceURL))
}
// subscriptionName returns full PubSub subscription name for AuthDB
// change notifications stream from given auth service.
func subscriptionName(ctx context.Context, authServiceURL string) string {
subIDPrefix := "gae-v1"
if info.IsDevAppServer(ctx) {
subIDPrefix = "dev-app-server-v1"
}
serviceURL, err := url.Parse(authServiceURL)
if err != nil {
panic(err)
}
return fmt.Sprintf("projects/%s/subscriptions/%s+%s", info.AppID(ctx), subIDPrefix, serviceURL.Host)
}
// pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change
// notification topic.
//
// Used only on dev server for manual testing. Prod services use push-based
// delivery.
func pubSubPull(c *router.Context) {
if !appengine.IsDevAppServer() {
replyError(c.Request.Context(), c.Writer, errors.New("not a dev server"))
return
}
processPubSubRequest(c.Request.Context(), c.Writer, c.Request, func(ctx context.Context, srv authService, serviceURL string) (*service.Notification, error) {
return srv.PullPubSub(ctx, subscriptionName(ctx, serviceURL))
})
}
// pubSubPush is HTTP handler that processes incoming PubSub push notifications.
//
// It uses the signature inside PubSub message body for authentication. Skips
// messages not signed by currently configured auth service.
func pubSubPush(c *router.Context) {
processPubSubRequest(c.Request.Context(), c.Writer, c.Request, func(ctx context.Context, srv authService, serviceURL string) (*service.Notification, error) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
return nil, err
}
return srv.ProcessPubSubPush(ctx, body)
})
}
type notifcationGetter func(context.Context, authService, string) (*service.Notification, error)
// processPubSubRequest is common wrapper for pubSubPull and pubSubPush.
//
// It implements most logic of notification handling. Calls supplied callback
// to actually get service.Notification, since this part is different from Pull
// and Push subscriptions.
func processPubSubRequest(ctx context.Context, rw http.ResponseWriter, r *http.Request, callback notifcationGetter) {
ctx = defaultNS(ctx)
info, err := GetLatestSnapshotInfo(ctx)
if err != nil {
replyError(ctx, rw, err)
return
}
if info == nil {
// Return HTTP 200 to avoid a redelivery.
replyOK(ctx, rw, "Auth Service URL is not configured, skipping the message")
return
}
srv := getAuthService(ctx, info.AuthServiceURL)
notify, err := callback(ctx, srv, info.AuthServiceURL)
if err != nil {
replyError(ctx, rw, err)
return
}
// notify may be nil if PubSub messages didn't pass authentication.
if notify == nil {
replyOK(ctx, rw, "No new valid AuthDB change notifications")
return
}
// Don't bother processing late messages (ack them though).
latest := info
if notify.Revision > info.Rev {
var err error
if latest, err = syncAuthDB(ctx); err != nil {
replyError(ctx, rw, err)
return
}
}
if err := notify.Acknowledge(ctx); err != nil {
replyError(ctx, rw, err)
return
}
replyOK(
ctx, rw, "Processed PubSub notification for rev %d: %d -> %d",
notify.Revision, info.Rev, latest.Rev)
}
// replyError sends HTTP 500 on transient errors, HTTP 400 on fatal ones.
func replyError(ctx context.Context, rw http.ResponseWriter, err error) {
logging.Errorf(ctx, "Error while processing PubSub notification - %s", err)
if transient.Tag.In(err) {
http.Error(rw, err.Error(), http.StatusInternalServerError)
} else {
http.Error(rw, err.Error(), http.StatusBadRequest)
}
}
// replyOK sends HTTP 200.
func replyOK(ctx context.Context, rw http.ResponseWriter, msg string, args ...any) {
logging.Infof(ctx, msg, args...)
rw.Write([]byte(fmt.Sprintf(msg, args...)))
}