blob: dbb0052d21f189cdb2e94729fbf1af8facc52c86 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"time"
"github.com/golang/protobuf/ptypes"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/gcloud/gs"
"go.chromium.org/luci/common/logging"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
montypes "go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/logdog/api/config/svcconfig"
logdog "go.chromium.org/luci/logdog/api/endpoints/coordinator/services/v1"
"go.chromium.org/luci/logdog/appengine/coordinator"
"go.chromium.org/luci/logdog/common/types"
"go.chromium.org/luci/logdog/server/archivist"
"go.chromium.org/luci/logdog/server/bundleServicesClient"
"go.chromium.org/luci/logdog/server/service"
"go.chromium.org/luci/hardcoded/chromeinfra"
)
var (
errInvalidConfig = errors.New("invalid configuration")
errNoWorkToDo = errors.New("no work to do")
// batchSize is the number of jobs to lease from taskqueue per cycle.
// TaskQueue has a limit of 10qps for leasing tasks, so the batch size must be set to:
// batchSize * 10 > (max expected stream creation QPS)
// In 2019, max stream QPS is approximately 400 QPS
// TODO(hinoka): Make these configurable in settings.
batchSize = int64(50)
// leaseTime is the amount of time to to lease the batch of tasks for.
// We need:
// (time to process avg stream) * batchSize < leaseTime
// As of 2019, 90th percentile process time per stream is ~4s.
leaseTime = 15 * time.Minute
// maxSleepTime is the max amount of time to sleep in-between errors, in seconds.
maxSleepTime = 32
// tsTaskProcessingTime measures the amount of time spent processing a single
// task.
//
// The "consumed" field is true if the underlying task was consumed and
// false if it was not.
tsTaskProcessingTime = metric.NewCumulativeDistribution("logdog/archivist/task_processing_time_ms_ng",
"The amount of time (in milliseconds) that a single task takes to process in the new pipeline.",
&montypes.MetricMetadata{Units: montypes.Milliseconds},
distribution.DefaultBucketer,
field.Bool("consumed"))
tsLoopCycleTime = metric.NewCumulativeDistribution("logdog/archivist/loop_cycle_time_ms",
"The amount of time a single batch of leases takes to process.",
&montypes.MetricMetadata{Units: montypes.Milliseconds},
distribution.DefaultBucketer)
tsLeaseCount = metric.NewCounter("logdog/archivist/tasks_leased",
"Number of tasks leased.",
nil)
tsNackCount = metric.NewCounter("logdog/archivist/tasks_not_acked",
"Number of tasks leased but failed.",
nil)
tsAckCount = metric.NewCounter("logdog/archivist/tasks_acked",
"Number of tasks successfully completed and acked.",
nil)
)
// application is the Archivist application state.
type application struct {
service.Service
}
// runForever runs the archivist loop forever.
func runForever(c context.Context, ar archivist.Archivist) error {
sleepTime := 1
client := ar.Service
// Forever loop.
for {
// Only exit out if the context is cancelled (I.E. Ctrl + c).
if c.Err() != nil {
return c.Err()
}
if err := func() error {
cycleStartTime := clock.Now(c)
leaseTimeProto := ptypes.DurationProto(leaseTime)
nc, _ := context.WithTimeout(c, leaseTime)
tasks, err := client.LeaseArchiveTasks(c, &logdog.LeaseRequest{
MaxTasks: batchSize,
LeaseTime: leaseTimeProto,
})
if err != nil {
return err
}
logging.Fields{
"cycleStart": true,
"numLeased": len(tasks.Tasks),
"noWork": len(tasks.Tasks) == 0,
}.Infof(c, "Start of cycled, leased %d tasks", len(tasks.Tasks))
tsLeaseCount.Add(c, int64(len(tasks.Tasks)))
if len(tasks.Tasks) == 0 {
return errNoWorkToDo
}
merr := errors.NewLazyMultiError(len(tasks.Tasks))
ackTasks := make([]*logdog.ArchiveTask, 0, len(tasks.Tasks))
for i, task := range tasks.Tasks {
deleteTask := false
startTime := clock.Now(c)
if err := ar.ArchiveTask(nc, task); err != nil {
merr.Assign(i, err)
} else { // err == nil
ackTasks = append(ackTasks, task)
deleteTask = true
}
duration := clock.Now(c).Sub(startTime)
tsTaskProcessingTime.Add(c, float64(duration.Nanoseconds())/1000000, deleteTask)
}
tsNackCount.Add(c, int64(len(tasks.Tasks)-len(ackTasks)))
// Ack the successful tasks. We log the error here since there's nothing we can do.
if len(ackTasks) > 0 {
tsAckCount.Add(c, int64(len(ackTasks)))
c := log.SetFields(c, log.Fields{"Tasks": ackTasks})
if _, err := client.DeleteArchiveTasks(c, &logdog.DeleteRequest{Tasks: ackTasks}); err != nil {
log.WithError(err).Errorf(c, "error while acking tasks (%s)", ackTasks)
}
}
duration := clock.Now(c).Sub(cycleStartTime)
logging.Fields{
"cycleEnd": true,
"numLeased": len(tasks.Tasks),
"numCompleted": len(ackTasks),
"timeSpentSec": duration.Seconds(),
}.Infof(c, "Done archiving %d items (%d successful) took %.2fs", len(tasks.Tasks), len(ackTasks), duration.Seconds())
tsLoopCycleTime.Add(c, float64(duration.Nanoseconds()/1000000))
return merr.Get()
}(); err != nil {
// Back off on errors.
sleepTime *= 2
if sleepTime > maxSleepTime {
sleepTime = maxSleepTime
}
switch err {
case errNoWorkToDo:
logging.Infof(c, "no work to do, sleeping for %d seconds", sleepTime)
default:
logging.WithError(err).Errorf(c, "got error in loop, sleeping for %d seconds", sleepTime)
}
clock.Sleep(c, time.Duration(sleepTime)*time.Second)
} else {
sleepTime = 1
}
}
}
// settingsUpdater updates the settings from datastore every once in a while.
func settingsUpdater(c context.Context) {
for {
set := coordinator.GetSettings(c)
logging.Debugf(c, "updating settings: %v", set)
if batchSize != set.ArchivistBatchSize {
batchSize = set.ArchivistBatchSize
}
if leaseTime != set.ArchivistLeaseTime {
leaseTime = set.ArchivistLeaseTime
}
clock.Sleep(c, 5*time.Minute)
}
}
// run is the main execution function.
func (a *application) runArchivist(c context.Context) error {
cfg := a.ServiceConfig()
coordCfg, acfg := cfg.GetCoordinator(), cfg.GetArchivist()
switch {
case coordCfg == nil:
fallthrough
case acfg == nil:
return errors.New("missing required config: archivist")
case acfg.GsStagingBucket == "":
return errors.New("missing required config: archivist.gs_staging_bucket")
}
// Initialize our Storage.
//
// NOTE: We're requesting read/write access even though we only need read-only
// access because GKE doesn't understand the read-only scope:
// https://www.googleapis.com/auth/bigtable.readonly
st, err := a.IntermediateStorage(c, true)
if err != nil {
log.WithError(err).Errorf(c, "Failed to get storage instance.")
return err
}
defer st.Close()
// Defines our Google Storage client project scoped factory.
gsClientFactory := func(ctx context.Context, proj types.ProjectName) (gs.Client, error) {
gsClient, err := a.GSClient(ctx, proj)
if err != nil {
log.WithError(err).Errorf(c, "Failed to get Google Storage client.")
return nil, err
}
return gsClient, nil
}
// Initialize a Coordinator client that bundles requests together.
coordClient := &bundleServicesClient.Client{
ServicesClient: a.Coordinator(),
DelayThreshold: time.Second,
BundleCountThreshold: 100,
}
defer coordClient.Flush()
ar := archivist.Archivist{
Service: coordClient,
SettingsLoader: a.GetSettingsLoader(acfg),
Storage: st,
GSClientFactory: gsClientFactory,
}
// Application shutdown will now operate by stopping the Iterator.
c, cancelFunc := context.WithCancel(c)
defer cancelFunc()
// Application shutdown will now operate by cancelling the Archivist's
// shutdown Context.
a.SetShutdownFunc(cancelFunc)
// Update our settings periodicall.
go settingsUpdater(c)
return runForever(c, ar)
}
// GetSettingsLoader is an archivist.SettingsLoader implementation that merges
// global and project-specific settings.
//
// The resulting settings object will be verified by the Archivist.
func (a *application) GetSettingsLoader(acfg *svcconfig.Archivist) archivist.SettingsLoader {
serviceID := a.ServiceID()
return func(c context.Context, proj types.ProjectName) (*archivist.Settings, error) {
// Fold in our project-specific configuration, if valid.
pcfg, err := a.ProjectConfig(c, proj)
if err != nil {
log.Fields{
log.ErrorKey: err,
"project": proj,
}.Errorf(c, "Failed to fetch project configuration.")
return nil, err
}
indexParam := func(get func(ic *svcconfig.ArchiveIndexConfig) int32) int {
if ic := pcfg.ArchiveIndexConfig; ic != nil {
if v := get(ic); v > 0 {
return int(v)
}
}
if ic := acfg.ArchiveIndexConfig; ic != nil {
if v := get(ic); v > 0 {
return int(v)
}
}
return 0
}
// Load our base settings.
//
// Archival bases are:
// Staging: gs://<services:gs_staging_bucket>/<project-id>/...
// Archive: gs://<project:archive_gs_bucket>/<project-id>/...
st := archivist.Settings{
GSBase: gs.MakePath(pcfg.ArchiveGsBucket, "").Concat(serviceID),
GSStagingBase: gs.MakePath(acfg.GsStagingBucket, "").Concat(serviceID),
IndexStreamRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.StreamRange }),
IndexPrefixRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.PrefixRange }),
IndexByteRange: indexParam(func(ic *svcconfig.ArchiveIndexConfig) int32 { return ic.ByteRange }),
AlwaysRender: (acfg.RenderAllStreams || pcfg.RenderAllStreams),
}
// Fold project settings into loaded ones.
return &st, nil
}
}
// Entry point.
func main() {
mathrand.SeedRandomly()
a := application{
Service: service.Service{
Name: "archivist",
DefaultAuthOptions: chromeinfra.DefaultAuthOptions(),
},
}
a.Run(context.Background(), a.runArchivist)
}