blob: 484db2da05f52f7abbdf713d7d89d41d2b6e8e8a [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bblistener
import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"fmt"
"io"
"regexp"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
buildbucketpb "go.chromium.org/luci/buildbucket/proto"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/cv/internal/buildbucket"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/tryjob"
)
const (
// NumConcurrentListeners defines the number of Buildbucket Pub/Sub
// listeners that run concurrently.
//
// Increase this value if the notification processing speed can't keep up
// with the incoming speed.
NumConcurrentListeners = 5
// SubscriptionID is the default subscription ID for listening to
// Buildbucket build updates.
SubscriptionID = "buildbucket-builds"
// ListenDuration is how long each listener will running for.
//
// This should be in sync with the interval of the cron job that kicks the
// listener to ensure continuous processing of Buildbucket Pub/Sub events.
ListenDuration = 5 * time.Minute
)
// topicNameRegexp - the Cloud project Pub/Sub topic name regex expression.
var topicNameRegexp = regexp.MustCompile(`^projects/(.*)/topics/(.*)$`)
// This interface encapsulate the communication with tryjob component.
type tryjobNotifier interface {
ScheduleUpdate(context.Context, common.TryjobID, tryjob.ExternalID) error
}
type tryjobUpdater interface {
// Update updates the Tryjob entity associated with the given `eid`.
//
// `data` should contain the latest information of the Tryjob from the
// Tryjob backend system (e.g. Build proto from Buildbucket pubsub).
//
// No-op if the Tryjob data stored in CV appears to be newer than the provided
// data (e.g. has newer Tryjob.Result.UpdateTime)
Update(ctx context.Context, eid tryjob.ExternalID, data any) error
}
// Register registers tasks for listener and returns a function to kick off
// `NumConcurrentListeners` listeners.
func Register(tqd *tq.Dispatcher, projectID string, tjNotifier tryjobNotifier, tjUpdater tryjobUpdater) func(context.Context) error {
_ = tqd.RegisterTaskClass(tq.TaskClass{
ID: "listen-bb-pubsub",
Prototype: &ListenBBPubsubTask{},
Queue: "listen-bb-pubsub",
Kind: tq.NonTransactional,
Quiet: true,
QuietOnError: true,
Handler: func(ctx context.Context, payload proto.Message) error {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
logging.Errorf(ctx, "failed to close PubSub client: %s", err)
}
}()
sub := client.Subscription(SubscriptionID)
subConfig, err := sub.Config(ctx)
if err != nil {
return errors.Annotate(err, "failed to get configuration for the subscription %s", sub.String()).Err()
}
subscribedProj, err := extractTopicProject(subConfig.Topic.String())
if err != nil {
return errors.Annotate(err, "for subscription %s", sub.String()).Err()
}
l := &listener{
bbHost: fmt.Sprintf("%s.appspot.com", subscribedProj),
subscription: sub,
tjNotifier: tjNotifier,
tjUpdater: tjUpdater,
}
defer l.reportStats(ctx)
duration := payload.(*ListenBBPubsubTask).GetDuration().AsDuration()
if duration == 0 {
duration = ListenDuration
}
cctx, cancel := clock.WithTimeout(ctx, duration)
defer cancel()
if err := l.start(cctx); err != nil {
// Never retry the tasks because the listener will be started
// periodically by the Cron.
return common.TQIfy{NeverRetry: true}.Error(ctx, err)
}
return nil
},
})
return func(ctx context.Context) error {
return parallel.FanOutIn(func(workCh chan<- func() error) {
for i := 0; i < NumConcurrentListeners; i++ {
i := i
workCh <- func() error {
return tqd.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("listener-%d", i),
Payload: &ListenBBPubsubTask{
Duration: durationpb.New(ListenDuration),
},
})
}
}
})
}
}
// StartListenerForTest starts a buildbucket listener for testing purpose.
//
// Returns a callback function to stop the listener.
func StartListenerForTest(ctx context.Context, sub *pubsub.Subscription, tjNotifier tryjobNotifier) func() {
cctx, cancel := context.WithCancel(ctx)
go func() {
l := &listener{
subscription: sub,
tjNotifier: tjNotifier,
}
if err := l.start(cctx); err != nil {
logging.Errorf(ctx, "encounter error in buildbucket listener: %s", err)
}
}()
return cancel
}
type listener struct {
bbHost string // Buildbucket host that the subscription subscribes to. e.g. cr-buildbucket.appspot.com
subscription *pubsub.Subscription
tjNotifier tryjobNotifier
tjUpdater tryjobUpdater
stats listenerStats
processedCh chan string // for testing only
}
type listenerStats struct {
totalProcessedCount, transientErrCount, permanentErrCount int64
}
func (l *listener) start(ctx context.Context) error {
return l.subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
switch err := l.processMsg(ctx, msg); {
case err == nil:
msg.Ack()
case transient.Tag.In(err):
logging.Warningf(ctx, "encounter transient error when processing buildbucket pubsub message %q; Reason: %s", string(msg.Data), err)
msg.Nack()
atomic.AddInt64(&l.stats.totalProcessedCount, 1)
default:
logging.Errorf(ctx, "encounter non-transient error when processing buildbucket pubsub message: %q; Reason: %s", string(msg.Data), err)
// Dismiss non-transient failure.
msg.Ack()
atomic.AddInt64(&l.stats.permanentErrCount, 1)
}
atomic.AddInt64(&l.stats.transientErrCount, 1)
if l.processedCh != nil {
l.processedCh <- msg.ID
}
})
}
func (l *listener) processMsg(ctx context.Context, msg *pubsub.Message) error {
var isV2Msg bool
var buildID int64
var hostname string
var build *buildbucketpb.Build
var err error
if v, ok := msg.Attributes["version"]; ok && v == "v2" {
isV2Msg = true
if build, err = parseV2Data(msg); err != nil {
return err
}
hostname, buildID = build.GetInfra().GetBuildbucket().GetHostname(), build.GetId()
} else {
// TODO(crbug.com/1406393): delete it once the migration is done. And the
// above pre-declared variables can also be deleted to make code more clean.
parsedMsg, err := parseV1Data(ctx, msg.Data)
if err != nil {
return err
}
hostname, buildID = parsedMsg.Hostname, parsedMsg.Build.ID
}
if hostname == "" {
logging.Warningf(ctx, "received pubsub message with empty hostname for build %d, using the computed one %s", build.GetId(), l.bbHost)
hostname = l.bbHost
}
eid, err := tryjob.BuildbucketID(hostname, buildID)
if err != nil {
return err
}
switch ids, err := tryjob.Resolve(ctx, eid); {
case err != nil:
return err
case len(ids) != 1:
panic(fmt.Errorf("impossible; requested to resolve 1 external ID %s, got %d", eid, len(ids)))
case ids[0] != 0:
// Build that is tracked by LUCI CV.
if !isV2Msg {
return l.tjNotifier.ScheduleUpdate(ctx, ids[0], eid)
}
// TODO(crbug.com/1406393): remove the debugging once the migration is done.
logging.Debugf(ctx, "builds_v2 pubsub listener: updating tryjob %s", eid)
return l.tjUpdater.Update(ctx, eid, build)
}
return nil
}
// parseV2Data parses Buildbucket new `builds_v2` pubsub message data.
func parseV2Data(msg *pubsub.Message) (*buildbucketpb.Build, error) {
buildsV2Msg := &buildbucketpb.BuildsV2PubSub{}
if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(msg.Data, buildsV2Msg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal pubsub message into BuildsV2PubSub proto").Err()
}
largeFieldsData, err := zlibDecompress(buildsV2Msg.BuildLargeFields)
if err != nil {
return nil, errors.Annotate(err, "failed to decompress build_large_fields for build %d", buildsV2Msg.Build.GetId()).Err()
}
largeFields := &buildbucketpb.Build{}
if err := (proto.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(largeFieldsData, largeFields); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal build_large_fields for build %d", buildsV2Msg.Build.GetId()).Err()
}
proto.Merge(buildsV2Msg.Build, largeFields)
return buildsV2Msg.Build, nil
}
// parseV1Data extracts the relevant information from Buildbucket old `builds`
// topic Pub/Sub message data.
func parseV1Data(ctx context.Context, data []byte) (buildbucket.PubsubMessage, error) {
message := buildbucket.PubsubMessage{}
// Extra fields that are not in the struct are ignored by json.Unmarshal.
if err := json.Unmarshal(data, &message); err != nil {
return buildbucket.PubsubMessage{}, errors.Annotate(err, "while unmarshalling build pubsub message").Err()
}
if message.Hostname == "" || message.Build.ID == 0 {
return buildbucket.PubsubMessage{}, errors.Reason("missing build details in pubsub message: %s", data).Err()
}
return message, nil
}
func (l *listener) reportStats(ctx context.Context) {
logging.Infof(ctx, "processed %d buildbucket pubsub messages in total. %d of them have transient failure. %d of them have non-transient failure",
l.stats.totalProcessedCount,
l.stats.transientErrCount,
l.stats.permanentErrCount)
// TODO(yiwzhang): send tsmon metrics. Especially for non-transient count to
// to alert on.
}
// zlibDecompress decompresses data using zlib.
func zlibDecompress(compressed []byte) ([]byte, error) {
r, err := zlib.NewReader(bytes.NewReader(compressed))
if err != nil {
return nil, err
}
defer func() { _ = r.Close() }()
return io.ReadAll(r)
}
// extractTopicProject extracts the subscribed project name from the given subscription.
func extractTopicProject(topic string) (string, error) {
matches := topicNameRegexp.FindStringSubmatch(topic)
if len(matches) != 3 {
return "", errors.Reason("topic %s doesn't match %q", topic, topicNameRegexp.String()).Err()
}
return matches[1], nil
}