blob: 1f8a5c5e1dbbffa41b19b0e70f4faae20260be9b [file] [log] [blame]
// Copyright 2024 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package notifications contains the logic about send Swarming notifications.
package notifications
import (
"context"
"encoding/json"
"fmt"
"sync"
"cloud.google.com/go/pubsub"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
bbpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/strpair"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/swarming/server/metrics"
"go.chromium.org/luci/swarming/server/model"
"go.chromium.org/luci/swarming/server/notifications/taskspb"
"go.chromium.org/luci/swarming/server/util/taskbackendutil"
)
type PubSubNotifier struct {
// A shared pubsub client
client *pubsub.Client
// A lock for the topics cache.
lock sync.RWMutex
topics map[string]*pubsub.Topic
// True if "Stop()" was called.
stopped bool
cloudProject string
}
// PubSubNotification defines the message schema for Swarming to send the task
// completion PubSub message.
// The message attributes may contain "auth_token" if users provided it.
type PubSubNotification struct {
TaskID string `json:"task_id"`
Userdata string `json:"userdata"`
}
// NewPubSubNotifier initializes a PubSubNotifier.
func NewPubSubNotifier(ctx context.Context, cloudProj string) (*PubSubNotifier, error) {
creds, err := auth.GetPerRPCCredentials(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, errors.Annotate(err, "failed to get AsSelf credentails").Err()
}
psClient, err := pubsub.NewClient(
ctx, cloudProj,
option.WithGRPCDialOption(grpc.WithStatsHandler(&grpcmon.ClientRPCStatsMonitor{})),
option.WithGRPCDialOption(grpc.WithStatsHandler(otelgrpc.NewClientHandler())),
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds)),
)
if err != nil {
return nil, errors.Annotate(err, "failed to create a pubsub client for %s", cloudProj).Err()
}
return &PubSubNotifier{
client: psClient,
cloudProject: cloudProj,
}, nil
}
// Stop properly stops the PubSubNotifier.
func (ps *PubSubNotifier) Stop() {
ps.lock.Lock()
defer ps.lock.Unlock()
if ps.stopped {
return
}
for _, topic := range ps.topics {
topic.Stop()
}
ps.topics = nil
_ = ps.client.Close()
ps.stopped = true
}
// RegisterTQTasks registers task queue handlers.
//
// Tasks are actually submitted from the Python side.
func (ps *PubSubNotifier) RegisterTQTasks(disp *tq.Dispatcher) {
disp.RegisterTaskClass(tq.TaskClass{
ID: "pubsub-go",
Kind: tq.NonTransactional,
Prototype: (*taskspb.PubSubNotifyTask)(nil),
Queue: "pubsub-go", // to replace "pubsub" taskqueue in Py.
Handler: func(ctx context.Context, payload proto.Message) error {
return ps.handlePubSubNotifyTask(ctx, payload.(*taskspb.PubSubNotifyTask))
},
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "buildbucket-notify-go",
Kind: tq.NonTransactional,
Prototype: (*taskspb.BuildbucketNotifyTask)(nil),
Queue: "buildbucket-notify-go", // to replace "buildbucket-notify" taskqueue in Py.
Handler: func(ctx context.Context, payload proto.Message) error {
return ps.handleBBNotifyTask(ctx, payload.(*taskspb.BuildbucketNotifyTask))
},
})
}
// handlePubSubNotifyTask sends the notification about a task completion.
// For fatal errors, a tq.Fatal tag will be applied.
// For retryable errors, a transient.Tag will be applied.
func (ps *PubSubNotifier) handlePubSubNotifyTask(ctx context.Context, t *taskspb.PubSubNotifyTask) error {
cloudProj, topicID, err := parsePubSubTopicName(t.GetTopic())
if err != nil {
return tq.Fatal.Apply(err)
}
topic, err := ps.getTopic(cloudProj, topicID)
if err != nil {
return transient.Tag.Apply(err)
}
data, err := json.MarshalIndent(&PubSubNotification{
TaskID: t.TaskId,
Userdata: t.Userdata,
}, "", " ")
if err != nil {
return errors.Annotate(err, "cannot compose the pubsub msg").Tag(tq.Fatal).Err()
}
psMsg := &pubsub.Message{
Data: data,
}
if t.AuthToken != "" {
psMsg.Attributes = map[string]string{"auth_token": t.AuthToken}
}
result := topic.Publish(ctx, psMsg)
_, err = result.Get(ctx)
// Publish to TsMon pubsub latency metric.
now := clock.Now(ctx).UTC().UnixMilli()
startTimeMilli := t.StartTime.AsTime().UnixMilli()
latency := now - startTimeMilli
if latency < 0 {
logging.Warningf(ctx, "ts_mon_metric pubsub latency %dms (%d - %d) is negative. Setting latency to 0", latency, now, startTimeMilli)
latency = 0
}
pool := strpair.ParseMap(t.Tags).Get("pool")
httpCode := grpcutil.CodeStatus(status.Code(err))
status := t.State.String()
logging.Debugf(ctx, "Updating TsMon pubsub metric with latency: %dms, httpCode: %d, status: %s, pool: %s", latency, httpCode, status, pool)
metrics.TaskStatusChangePubsubLatency.Add(ctx, float64(latency), pool, status, httpCode)
return errors.Annotate(err, "failed to publish the msg to %s", t.Topic).Tag(transient.Tag).Err()
}
// handleBBNotifyTask sends a pubsub update to Buildbucket.
func (ps *PubSubNotifier) handleBBNotifyTask(ctx context.Context, t *taskspb.BuildbucketNotifyTask) error {
taskReq, err := model.TaskIDToRequestKey(ctx, t.GetTaskId())
if err != nil {
return tq.Fatal.Apply(err)
}
buildTask := &model.BuildTask{Key: model.BuildTaskKey(ctx, taskReq)}
switch err := datastore.Get(ctx, buildTask); {
case errors.Is(err, datastore.ErrNoSuchEntity):
return errors.Annotate(err, "cannot find BuildTask").Tag(tq.Fatal).Err()
case err != nil:
return errors.Annotate(err, "failed to fetch BuildTask").Tag(transient.Tag).Err()
}
// Shouldn't make the update.
if buildTask.UpdateID >= t.UpdateId || buildTask.LatestTaskStatus == t.State {
return nil
}
resultSummary := &model.TaskResultSummary{Key: model.TaskResultSummaryKey(ctx, taskReq)}
if err := datastore.Get(ctx, resultSummary); err != nil {
return errors.Annotate(err, "failed to fetch TaskResultSummary").Tag(transient.Tag).Err()
}
// construct bbpb.Task
botDims := buildTask.BotDimensions
if len(botDims) == 0 {
botDims = resultSummary.BotDimensions
}
bbTask := &bbpb.Task{
Id: &bbpb.TaskID{
Id: model.RequestKeyToTaskID(taskReq, model.AsRequest),
Target: fmt.Sprintf("swarming://%s", ps.cloudProject),
},
UpdateId: t.UpdateId,
Details: &structpb.Struct{
Fields: map[string]*structpb.Value{
"bot_dimensions": {
Kind: &structpb.Value_StructValue{
StructValue: botDims.ToStructPB(),
},
},
},
},
}
taskbackendutil.SetBBStatus(t.State, resultSummary.Failure, bbTask)
// send the update msg via PubSub
cloudProj, topicID, err := parsePubSubTopicName(buildTask.PubSubTopic)
if err != nil {
return tq.Fatal.Apply(err)
}
topic, err := ps.getTopic(cloudProj, topicID)
if err != nil {
return transient.Tag.Apply(err)
}
bytes, err := proto.Marshal(&bbpb.BuildTaskUpdate{
BuildId: buildTask.BuildID,
Task: bbTask,
})
if err != nil {
return errors.Annotate(err, "failed to marshal BuildTaskUpdate").Tag(tq.Fatal).Err()
}
result := topic.Publish(ctx, &pubsub.Message{
Data: bytes,
})
if _, err = result.Get(ctx); err != nil {
return errors.Annotate(err, "failed to send buildbucket task update").Tag(transient.Tag).Err()
}
// update the BuildTask entity in Datastore
buildTask.LatestTaskStatus = t.State
buildTask.UpdateID = t.UpdateId
if len(buildTask.BotDimensions) == 0 {
buildTask.BotDimensions = botDims
}
return errors.Annotate(datastore.Put(ctx, buildTask), "failed to update BuildTask").Tag(transient.Tag).Err()
}
// getTopic returns the reference for a topic. If the topic is not in ps.topics
// it will create a new one and store into it
func (ps *PubSubNotifier) getTopic(cloudProj, topicID string) (*pubsub.Topic, error) {
topicName := fmt.Sprintf("projects/%s/topics/%s", cloudProj, topicID)
ps.lock.RLock()
topic := ps.topics[topicName]
ps.lock.RUnlock()
if topic != nil {
return topic, nil
}
ps.lock.Lock()
defer ps.lock.Unlock()
if topic, ok := ps.topics[topicName]; ok {
return topic, nil
}
if ps.stopped {
return nil, errors.New("cannot create a topic; the PubSubNotifier has already stopped")
}
if ps.topics == nil {
ps.topics = make(map[string]*pubsub.Topic)
}
ps.topics[topicName] = ps.client.TopicInProject(topicID, cloudProj)
return ps.topics[topicName], nil
}