blob: 1f5962c65696ac5c14d1541b264e9c09695ade39 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"time"
cloudlogging "cloud.google.com/go/logging"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/gcloud/gs"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/dispatcher"
"go.chromium.org/luci/common/sync/dispatcher/buffer"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/grpc/grpcmon"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/auth"
logdog "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
"go.chromium.org/luci/logdog/server/archivist"
"go.chromium.org/luci/logdog/server/service"
)
var (
leaseRetryParams = func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: time.Second,
Retries: -1,
},
Multiplier: 1.25,
MaxDelay: time.Minute * 10,
}
}
ackChannelOptions = &dispatcher.Options{
Buffer: buffer.Options{
MaxLeases: 10,
BatchItemsMax: 500,
BatchAgeMax: 10 * time.Minute,
FullBehavior: &buffer.BlockNewItems{
MaxItems: 10 * 500,
},
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: time.Second,
Retries: 10,
},
Multiplier: 1.25,
MaxDelay: time.Minute * 10,
}
},
},
}
mkJobChannelOptions = func(maxWorkers int) *dispatcher.Options {
return &dispatcher.Options{
Buffer: buffer.Options{
MaxLeases: maxWorkers,
BatchItemsMax: 1,
FullBehavior: &buffer.BlockNewItems{
// Currently (2020Q2) it takes ~4s on average to process a task, and
// ~60s to lease a new batch. We never want to starve our job workers
// here, so we need at least 60s worth of tasks. We add 20% margin,
// just to be safe.
MaxItems: int(((60. / 4.) * float64(maxWorkers)) * 1.2),
},
},
}
}
// maxSleepTime is the max amount of time to sleep in-between errors, in seconds.
maxSleepTime = 32
// tsTaskProcessingTime measures the amount of time spent processing a single
// task.
//
// The "consumed" field is true if the underlying task was consumed and
// false if it was not.
tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/archivist/task_processing_time_ms_ng",
"The amount of time (in milliseconds) that a single task takes to process in the new pipeline.",
&types.MetricMetadata{Units: types.Milliseconds},
distribution.DefaultBucketer,
field.Bool("consumed"))
tsLoopCycleTime = metric.NewCumulativeDistribution("logdog/archivist/loop_cycle_time_ms",
"The amount of time a single batch of leases takes to process.",
&types.MetricMetadata{Units: types.Milliseconds},
distribution.DefaultBucketer)
tsLeaseCount = metric.NewCounter("logdog/archivist/tasks_leased",
"Number of tasks leased.",
nil)
tsNackCount = metric.NewCounter("logdog/archivist/tasks_not_acked",
"Number of tasks leased but failed.",
nil)
tsAckCount = metric.NewCounter("logdog/archivist/tasks_acked",
"Number of tasks successfully completed and acked.",
nil)
)
// runForever runs the archivist loop forever.
func runForever(ctx context.Context, ar *archivist.Archivist, flags *CommandLineFlags) {
type archiveJob struct {
deadline time.Time
task *logdog.ArchiveTask
}
ackChan, err := dispatcher.NewChannel(ctx, ackChannelOptions, func(batch *buffer.Batch) error {
var req *logdog.DeleteRequest
if batch.Meta != nil {
req = batch.Meta.(*logdog.DeleteRequest)
} else {
tasks := make([]*logdog.ArchiveTask, len(batch.Data))
for i, datum := range batch.Data {
tasks[i] = datum.Item.(*logdog.ArchiveTask)
batch.Data[i].Item = nil
}
req = &logdog.DeleteRequest{Tasks: tasks}
batch.Meta = req
}
_, err := ar.Service.DeleteArchiveTasks(ctx, req)
if err == nil {
tsAckCount.Add(ctx, int64(len(req.Tasks)))
}
return transient.Tag.Apply(err)
})
if err != nil {
panic(err) // only occurs if Options is invalid
}
defer func() {
logging.Infof(ctx, "draining ACK channel")
ackChan.CloseAndDrain(ctx)
logging.Infof(ctx, "ACK channel drained")
}()
jobChanOpts := mkJobChannelOptions(flags.MaxConcurrentTasks)
jobChan, err := dispatcher.NewChannel(ctx, jobChanOpts, func(data *buffer.Batch) error {
job := data.Data[0].Item.(*archiveJob)
nc, cancel := context.WithDeadline(ctx, job.deadline)
defer cancel()
nc = logging.SetFields(nc, logging.Fields{
"project": job.task.Project,
"id": job.task.Id,
})
startTime := clock.Now(nc)
err := ar.ArchiveTask(nc, job.task)
duration := clock.Now(nc).Sub(startTime)
tsTaskProcessingTime.Add(ctx, float64(duration.Nanoseconds())/1000000, err == nil)
if err == nil {
select {
case ackChan.C <- job.task:
case <-ctx.Done():
logging.Errorf(nc, "Failed to ACK task %v due to context: %s", job.task, ctx.Err())
}
} else {
tsNackCount.Add(ctx, 1)
logging.Errorf(nc, "Failed to archive task %v: %s", job.task, err)
}
return nil
})
if err != nil {
panic(err) // only occurs if Options is invalid
}
defer func() {
logging.Infof(ctx, "Job channel draining")
jobChan.CloseAndDrain(ctx)
logging.Infof(ctx, "Job channel drained")
}()
// now we spin forever, pushing items into jobChan.
sleepTime := 1
var previousCycle time.Time
for ctx.Err() == nil {
var tasks *logdog.LeaseResponse
var deadline time.Time
err := retry.Retry(ctx, leaseRetryParams, func() (err error) {
logging.Infof(ctx, "Leasing max %d tasks for %s", flags.LeaseBatchSize, flags.LeaseTime)
deadline = clock.Now(ctx).Add(flags.LeaseTime)
tasks, err = ar.Service.LeaseArchiveTasks(ctx, &logdog.LeaseRequest{
MaxTasks: int64(flags.LeaseBatchSize),
LeaseTime: durationpb.New(flags.LeaseTime),
})
return
}, retry.LogCallback(ctx, "LeaseArchiveTasks"))
if ctx.Err() != nil {
logging.Infof(ctx, "lease thread got context err in RPC: %s", ctx.Err())
break
}
if err != nil {
panic("impossible: infinite retry stopped: " + err.Error())
}
now := clock.Now(ctx)
if !previousCycle.IsZero() {
tsLoopCycleTime.Add(ctx, float64(now.Sub(previousCycle).Nanoseconds()/1000000))
}
previousCycle = now
if len(tasks.Tasks) == 0 {
sleepTime *= 2
if sleepTime > maxSleepTime {
sleepTime = maxSleepTime
}
logging.Infof(ctx, "no work to do, sleeping for %d seconds", sleepTime)
clock.Sleep(ctx, time.Duration(sleepTime)*time.Second)
continue
} else {
tsLeaseCount.Add(ctx, int64(len(tasks.Tasks)))
sleepTime = 1
}
for _, task := range tasks.Tasks {
select {
case jobChan.C <- &archiveJob{deadline, task}:
case <-ctx.Done():
logging.Infof(ctx, "lease thread got context err in jobChan push: %s", ctx.Err())
break
}
}
}
}
// googleStorageClient returns an authenticated Google Storage client instance.
func googleStorageClient(ctx context.Context, luciProject string) (gs.Client, error) {
// TODO(vadimsh): Switch to AsProject + WithProject(project) once
// we are ready to roll out project scoped service accounts in Logdog.
logging.Debugf(ctx, "Creating new GCS client")
tr, err := auth.GetRPCTransport(ctx, auth.AsSelf, auth.WithScopes(auth.CloudOAuthScopes...))
if err != nil {
return nil, errors.Annotate(err, "failed to get the authenticating transport").Err()
}
client, err := gs.NewProdClient(ctx, tr)
if err != nil {
return nil, errors.Annotate(err, "failed to create Google Storage client").Err()
}
return client, nil
}
// cloudLoggingClient returns an authenticated Cloud Logging client instance.
func cloudLoggingClient(ctx context.Context, luciProject, cloudProject string) (archivist.CLClient, error) {
cred, err := auth.GetPerRPCCredentials(
ctx, auth.AsProject,
auth.WithScopes(auth.CloudOAuthScopes...),
auth.WithProject(luciProject),
)
if err != nil {
return nil, errors.Annotate(err, "failed to get per RPC credentials").Err()
}
return cloudlogging.NewClient(
ctx, cloudProject,
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cred)),
option.WithGRPCDialOption(grpcmon.WithClientRPCStatsMonitor()),
)
}
// Entry point.
func main() {
flags := DefaultCommandLineFlags()
flags.Register(flag.CommandLine)
cfg := service.MainCfg{BigTableAppProfile: "archivist"}
service.Main(cfg, func(srv *server.Server, impl *service.Implementations) error {
if err := flags.Validate(); err != nil {
return err
}
// Initialize the archivist.
ar := &archivist.Archivist{
Service: impl.Coordinator,
SettingsLoader: GetSettingsLoader(srv.Options.CloudProject, &flags),
Storage: impl.Storage,
GSClientFactory: googleStorageClient,
CLClientFactory: cloudLoggingClient,
}
// Run the archivist loop until the server closes.
srv.RunInBackground("archivist", func(ctx context.Context) {
runForever(ctx, ar, &flags)
})
return nil
})
}