blob: 01a4daa36631f5fd1f8d4386ac2c338d090c196b [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"time"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
gcps "go.chromium.org/luci/common/gcloud/pubsub"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/proto/google"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/hardcoded/chromeinfra"
"go.chromium.org/luci/logdog/server/bundleServicesClient"
"go.chromium.org/luci/logdog/server/collector"
"go.chromium.org/luci/logdog/server/collector/coordinator"
"go.chromium.org/luci/logdog/server/service"
"cloud.google.com/go/pubsub"
)
var (
errInvalidConfig = errors.New("invalid configuration")
)
// Metrics.
var (
// tsPubsubCount counts the number of Pub/Sub messages processed by the
// Archivist.
//
// Result tracks the outcome of each message, either "success", "failure", or
// "transient_failure".
tsPubsubCount = metric.NewCounter("logdog/collector/subscription/count",
"The number of Pub/Sub messages pulled.",
nil,
field.String("result"))
// tsTaskProcessingTime tracks the amount of time a single subscription
// message takes to process, in milliseconds.
tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/collector/subscription/processing_time_ms",
"Amount of time in milliseconds that a single Pub/Sub message takes to process.",
&types.MetricMetadata{Units: types.Milliseconds},
distribution.DefaultBucketer)
)
// application is the Collector application state.
type application struct {
service.Service
}
// run is the main execution function.
func (a *application) runCollector(c context.Context) error {
ccfg := a.ServiceConfig.GetCollector()
if ccfg == nil {
return errors.New("no collector configuration")
}
pscfg := a.ServiceConfig.GetTransport().GetPubsub()
if pscfg == nil {
return errors.New("missing Pub/Sub configuration")
}
// Our Subscription must be a valid one.
sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription)
if err := sub.Validate(); err != nil {
return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, err)
}
// New PubSub instance with the authenticated client.
psClient, err := a.Service.PubSubSubscriberClient(c, pscfg.Project)
if err != nil {
log.Fields{
log.ErrorKey: err,
"subscription": sub,
}.Errorf(c, "Failed to create Pub/Sub client.")
return err
}
psSub := psClient.Subscription(pscfg.Subscription)
exists, err := psSub.Exists(c)
if err != nil {
log.Fields{
log.ErrorKey: err,
"subscription": sub,
}.Errorf(c, "Could not confirm Pub/Sub subscription.")
return errInvalidConfig
}
psSub.ReceiveSettings = pubsub.ReceiveSettings{
MaxExtension: 24 * time.Hour,
MaxOutstandingMessages: int(ccfg.MaxConcurrentMessages), // If < 1, default.
MaxOutstandingBytes: 0, // Default.
}
if !exists {
log.Fields{
"subscription": sub,
}.Errorf(c, "Subscription does not exist.")
return errInvalidConfig
}
log.Fields{
"subscription": sub,
}.Infof(c, "Successfully validated Pub/Sub subscription.")
st, err := a.IntermediateStorage(c, true)
if err != nil {
return err
}
defer st.Close()
// Initialize a Coordinator client that bundles requests together.
coordClient := &bundleServicesClient.Client{
ServicesClient: a.Coordinator,
DelayThreshold: time.Second,
BundleCountThreshold: 100,
}
defer coordClient.Flush()
// Initialize our Collector service object using a caching Coordinator
// interface.
coord := coordinator.NewCoordinator(coordClient)
coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), google.DurationFromProto(ccfg.StateCacheExpiration))
coll := collector.Collector{
Coordinator: coord,
Storage: st,
MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
}
defer coll.Close()
c, cancelFunc := context.WithCancel(c)
defer cancelFunc()
// Application shutdown will now operate by cancelling the Collector's
// shutdown Context.
a.SetShutdownFunc(cancelFunc)
// Execute our main subscription pull loop. It will run until the supplied
// Context is cancelled.
retryForever := func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: 200 * time.Millisecond,
Retries: -1, // Unlimited.
},
MaxDelay: 10 * time.Second,
Multiplier: 2,
}
}
err = retry.Retry(c, transient.Only(retryForever), func() error {
return grpcutil.WrapIfTransient(psSub.Receive(c, func(c context.Context, msg *pubsub.Message) {
c = log.SetField(c, "messageID", msg.ID)
if a.processMessage(c, &coll, msg) {
// ACK the message, removing it from Pub/Sub.
msg.Ack()
} else {
// NACK the message. It will be redelivered and processed.
msg.Nack()
}
}))
}, func(err error, d time.Duration) {
log.Fields{
log.ErrorKey: err,
"delay": d,
}.Warningf(c, "Transient error during subscription Receive loop; retrying...")
})
if err != nil {
log.WithError(err).Errorf(c, "Failed during Pub/Sub Receive.")
return err
}
log.Debugf(c, "Collector finished.")
return nil
}
// processMessage returns true if the message should be ACK'd (deleted from
// Pub/Sub) or false if the message should not be ACK'd.
func (a *application) processMessage(c context.Context, coll *collector.Collector, msg *pubsub.Message) bool {
log.Fields{
"size": len(msg.Data),
}.Infof(c, "Received Pub/Sub Message.")
startTime := clock.Now(c)
err := coll.Process(c, msg.Data)
duration := clock.Now(c).Sub(startTime)
// We track processing time in milliseconds.
tsTaskProcessingTime.Add(c, duration.Seconds()*1000)
switch {
case transient.Tag.In(err):
// Do not consume
log.Fields{
log.ErrorKey: err,
"duration": duration,
}.Warningf(c, "TRANSIENT error ingesting Pub/Sub message.")
tsPubsubCount.Add(c, 1, "transient_failure")
return false
case err == nil:
log.Fields{
"size": len(msg.Data),
"duration": duration,
}.Infof(c, "Message successfully processed; ACKing.")
tsPubsubCount.Add(c, 1, "success")
return true
default:
log.Fields{
log.ErrorKey: err,
"size": len(msg.Data),
"duration": duration,
}.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKing.")
tsPubsubCount.Add(c, 1, "failure")
return true
}
}
// Entry point.
func main() {
mathrand.SeedRandomly()
a := application{
Service: service.Service{
Name: "collector",
DefaultAuthOptions: chromeinfra.DefaultAuthOptions(),
},
}
a.Run(context.Background(), a.runCollector)
}