blob: 3f6b28b96436df0f909b40fb14e688bbba090018 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package distributor
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/dm/api/service/v1"
"go.chromium.org/luci/server/router"
"go.chromium.org/luci/server/tokens"
"go.chromium.org/luci/tumble"
)
const notifyTopicSuffix = "dm-distributor-notify"
// PubsubReceiver is the HTTP handler that processes incoming pubsub events
// delivered to topics prepared with TaskDescription.PrepareTopic, and routes
// them to the appropriate distributor implementation's HandleNotification
// method.
//
// It requires that a Registry be installed in c via WithRegistry.
func PubsubReceiver(ctx *router.Context) {
c, rw, r := ctx.Context, ctx.Writer, ctx.Request
defer r.Body.Close()
type PubsubMessage struct {
Attributes map[string]string `json:"attributes"`
Data []byte `json:"data"`
MessageID string `json:"message_id"`
}
type PubsubPushMessage struct {
Message PubsubMessage `json:"message"`
Subscription string `json:"subscription"`
}
psm := &PubsubPushMessage{}
if err := json.NewDecoder(r.Body).Decode(psm); err != nil {
logging.WithError(err).Errorf(c, "Failed to parse pubsub message")
http.Error(rw, "Failed to parse pubsub message", http.StatusInternalServerError)
return
}
eid, cfgName, err := decodeAuthToken(c, psm.Message.Attributes["auth_token"])
if err != nil {
logging.WithError(err).Errorf(c, "bad auth_token")
// Acknowledge this message, since it'll never be valid.
rw.WriteHeader(http.StatusNoContent)
return
}
// remove "auth_token" from Attributes to avoid having it pass to the
// distributor.
delete(psm.Message.Attributes, "auth_token")
err = tumble.RunMutation(c, &NotifyExecution{
cfgName, &Notification{eid, psm.Message.Data, psm.Message.Attributes},
})
if err != nil {
// TODO(riannucci): distinguish between transient/non-transient failures.
logging.WithError(err).Errorf(c, "failed to NotifyExecution")
rw.WriteHeader(http.StatusInternalServerError)
return
}
rw.WriteHeader(http.StatusNoContent)
}
// pubsubAuthToken describes how to generate HMAC protected tokens used to
// authenticate PubSub messages.
var pubsubAuthToken = tokens.TokenKind{
Algo: tokens.TokenAlgoHmacSHA256,
Expiration: 48 * time.Hour,
SecretKey: "pubsub_auth_token",
Version: 1,
}
func encodeAuthToken(c context.Context, eid *dm.Execution_ID, cfgName string) (string, error) {
return pubsubAuthToken.Generate(c, nil, map[string]string{
"quest": eid.Quest,
"attempt": strconv.FormatUint(uint64(eid.Attempt), 10),
"execution": strconv.FormatUint(uint64(eid.Id), 10),
"cfgName": cfgName,
}, 0)
}
func decodeAuthToken(c context.Context, authToken string) (eid *dm.Execution_ID, cfgName string, err error) {
items, err := pubsubAuthToken.Validate(c, authToken, nil)
if err != nil {
return
}
quest, qok := items["quest"]
attempt, aok := items["attempt"]
execution, eok := items["execution"]
if !qok || !aok || !eok {
err = fmt.Errorf("missing keys: %v", items)
return
}
attemptNum, err := strconv.ParseUint(attempt, 10, 32)
if err != nil {
return
}
executionNum, err := strconv.ParseUint(execution, 10, 32)
if err != nil {
return
}
eid = dm.NewExecutionID(quest, uint32(attemptNum), uint32(executionNum))
cfgName, ok := items["cfgName"]
if !ok {
err = fmt.Errorf("missing config name")
}
return
}