blob: 1fb873624cfdd3262906f401d2e9c51d6cd6d10c [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sweep
import (
"context"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server/tq/internal/db"
"go.chromium.org/luci/server/tq/internal/metrics"
"go.chromium.org/luci/server/tq/internal/partition"
"go.chromium.org/luci/server/tq/internal/reminder"
)
// ScanParams contains parameters for the Scan call.
type ScanParams struct {
DB db.DB // DB to use to fetch reminders
Partition *partition.Partition // the keyspace partition to scan
KeySpaceBytes int // length of the reminder keys (usually 16)
TasksPerScan int // caps maximum number of reminders to process
SecondaryScanShards int // caps the number of follow-up scans
Level int // recursion level (0 == the root task)
}
// Scan scans the given partition of the Reminders' keyspace.
//
// Returns a list of stale reminders which likely match crashed AddTask calls.
// The caller is expected to eventually execute corresponding Cloud Tasks
// calls and delete these reminders, lest they'll be rediscovered during the
// next scan.
//
// If unable to complete the scan of the given part of the keyspace and Level is
// less than 2, it intelligently partitions the not-yet-scanned keyspace into
// several partitions for the follow up and returns them as well.
//
// Logs errors inside, but doesn't return them.
func Scan(ctx context.Context, p *ScanParams) ([]*reminder.Reminder, partition.SortedPartitions) {
l, h := p.Partition.QueryBounds(p.KeySpaceBytes)
startedAt := clock.Now(ctx)
rs, err := p.DB.FetchRemindersMeta(ctx, l, h, p.TasksPerScan)
durMS := float64(clock.Now(ctx).Sub(startedAt).Milliseconds())
status := ""
needMoreScans := false
switch {
case len(rs) >= p.TasksPerScan:
if len(rs) > p.TasksPerScan {
logging.Errorf(ctx, "bug: %s.FetchRemindersMeta returned %d > limit %d",
p.DB.Kind(), len(rs), p.TasksPerScan)
}
status = "limit"
// There may be more items in the partition.
needMoreScans = true
case err == nil:
// Scan covered everything.
status = "OK"
case ctx.Err() == context.DeadlineExceeded && err != nil:
status = "timeout"
// Nothing fetched before timeout should not happen frequently.
// To avoid waiting until next SweepAll(), follow up with scans on
// sub-partitions.
needMoreScans = true
default:
status = "fail"
}
metrics.SweepFetchMetaDurationsMS.Add(ctx, durMS, status, p.Level, p.DB.Kind())
metrics.SweepFetchMetaReminders.Add(ctx, int64(len(rs)), status, p.Level, p.DB.Kind())
var scanParts partition.SortedPartitions
if needMoreScans {
if len(rs) == 0 {
// We timed out before fetching anything at all. Divide the initial range
// into smaller chunks.
scanParts = p.Partition.Split(p.SecondaryScanShards)
} else {
// We fetched something but then hit the limit or timed out. Divide
// the range after the last fetched Reminder.
scanParts = p.Partition.EducatedSplitAfter(
rs[len(rs)-1].ID,
len(rs),
// Aim to hit these many Reminders per follow up sweep task,
p.TasksPerScan,
// but create at most these many.
p.SecondaryScanShards,
)
}
}
// Keep only sufficiently old reminders.
filtered := filterOutTooFresh(ctx, rs, p.Level, p.DB.Kind())
if err != nil {
if len(filtered) == 0 && len(scanParts) == 0 {
logging.Errorf(ctx, "Scan failed without returning any results: %s", err)
return nil, nil
}
logging.Warningf(ctx, "Got %d reminders and %d follow-up ranges and then failed with: %s", len(filtered), len(scanParts), err)
} else if len(filtered) != 0 || len(scanParts) != 0 {
logging.Infof(ctx, "Got %d reminders and %d follow-up ranges", len(filtered), len(scanParts))
}
// Refuse to scan deeper than 2 levels.
if p.Level >= 2 && len(scanParts) != 0 {
logging.Errorf(ctx, "Refusing to recurse deeper, abandoning scans of %v", scanParts)
scanParts = nil
}
return filtered, scanParts
}
// filterOutTooFresh throws away reminders that are too fresh.
//
// There's a high chance they will be processed on AddTask happy path, we
// shouldn't interfere.
//
// Mutates & re-uses the given Reminders slice. Updates metricReminderAge based
// on all fetched reminders.
//
// `lvl` and `db` used for metrics only.
func filterOutTooFresh(ctx context.Context, reminders []*reminder.Reminder, lvl int, db string) []*reminder.Reminder {
now := clock.Now(ctx)
filtered := reminders[:0]
for _, r := range reminders {
staleness := now.Sub(r.FreshUntil)
metrics.ReminderStalenessMS.Add(ctx, float64(staleness.Milliseconds()), lvl, db)
if staleness >= 0 {
filtered = append(filtered, r)
}
}
return filtered
}