blob: babbaa2bca9bd435efdb3fb4a489c9b86b704185 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"time"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/auth"
"go.chromium.org/luci/logdog/server/collector"
"go.chromium.org/luci/logdog/server/collector/coordinator"
"go.chromium.org/luci/logdog/server/service"
)
var (
// tsPubsubCount counts the number of Pub/Sub messages processed by the
// Archivist.
//
// Result tracks the outcome of each message, either "success", "failure", or
// "transient_failure".
tsPubsubCount = metric.NewCounter("logdog/collector/subscription/count",
"The number of Pub/Sub messages pulled.",
nil,
field.String("result"))
// tsTaskProcessingTime tracks the amount of time a single subscription
// message takes to process, in milliseconds.
tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/collector/subscription/processing_time_ms",
"Amount of time in milliseconds that a single Pub/Sub message takes to process.",
&types.MetricMetadata{Units: types.Milliseconds},
distribution.DefaultBucketer)
)
// runForever runs the collector loop until the context closes.
func runForever(ctx context.Context, coll *collector.Collector, sub *pubsub.Subscription) {
retryForever := func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond,
Retries: -1, // Unlimited.
},
MaxDelay: 10 * time.Second,
Multiplier: 2,
}
}
retry.Retry(ctx, retryForever, func() error {
return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
ctx = logging.SetField(ctx, "messageID", msg.ID)
if processMessage(ctx, coll, msg) {
// ACK the message, removing it from Pub/Sub.
msg.Ack()
} else {
// NACK the message. It will be redelivered and processed.
msg.Nack()
}
})
}, func(err error, d time.Duration) {
logging.Fields{
"error": err,
"delay": d,
}.Errorf(ctx, "Error during subscription Receive loop; retrying...")
})
}
// processMessage returns true if the message should be ACK'd (deleted from
// Pub/Sub) or false if the message should not be ACK'd.
func processMessage(ctx context.Context, coll *collector.Collector, msg *pubsub.Message) bool {
logging.Fields{
"size": len(msg.Data),
}.Infof(ctx, "Received Pub/Sub Message.")
startTime := clock.Now(ctx)
err := coll.Process(ctx, msg.Data)
duration := clock.Now(ctx).Sub(startTime)
// We track processing time in milliseconds.
tsTaskProcessingTime.Add(ctx, duration.Seconds()*1000)
switch {
case transient.Tag.In(err) || errors.Contains(err, context.Canceled):
// Do not consume
logging.Fields{
"error": err,
"duration": duration,
}.Warningf(ctx, "TRANSIENT error ingesting Pub/Sub message.")
tsPubsubCount.Add(ctx, 1, "transient_failure")
return false
case err == nil:
logging.Fields{
"size": len(msg.Data),
"duration": duration,
}.Infof(ctx, "Message successfully processed; ACKing.")
tsPubsubCount.Add(ctx, 1, "success")
return true
default:
logging.Fields{
"error": err,
"size": len(msg.Data),
"duration": duration,
}.Errorf(ctx, "Non-transient error ingesting Pub/Sub message; ACKing.")
tsPubsubCount.Add(ctx, 1, "failure")
return true
}
}
// pubSubClient returns an authenticated Google PubSub client instance.
func pubSubClient(ctx context.Context, cloudProject string) (*pubsub.Client, error) {
ts, err := auth.GetTokenSource(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, errors.Annotate(err, "failed to get the token source").Err()
}
client, err := pubsub.NewClient(ctx, cloudProject, option.WithTokenSource(ts))
if err != nil {
return nil, errors.Annotate(err, "failed to create the PubSub client").Err()
}
return client, nil
}
// Entry point.
func main() {
flags := CommandLineFlags{}
flags.Register(flag.CommandLine)
cfg := service.MainCfg{BigTableAppProfile: "collector"}
service.Main(cfg, func(srv *server.Server, impl *service.Implementations) error {
if err := flags.Validate(); err != nil {
return err
}
// Initialize our Collector service object using a caching Coordinator
// interface.
coll := &collector.Collector{
Coordinator: coordinator.NewCache(
coordinator.NewCoordinator(impl.Coordinator),
flags.StateCacheSize,
flags.StateCacheExpiration,
),
Storage: impl.Storage,
MaxMessageWorkers: flags.MaxMessageWorkers,
}
srv.RegisterCleanup(func(context.Context) { coll.Close() })
// Initialize a Subscription object ready to pull messages.
psClient, err := pubSubClient(srv.Context, flags.PubSubProject)
if err != nil {
return err
}
psSub := psClient.Subscription(flags.PubSubSubscription)
psSub.ReceiveSettings = pubsub.ReceiveSettings{
MaxExtension: 24 * time.Hour,
MaxOutstandingMessages: flags.MaxConcurrentMessages, // If < 1, default.
MaxOutstandingBytes: 0, // Default.
}
// Run the collector loop until the server closes.
srv.RunInBackground("collector", func(ctx context.Context) {
runForever(ctx, coll, psSub)
})
return nil
})
}