blob: 65830b5e6f1394d0ca7ab2d721746bd9b78f89c7 [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package deadlineenforcer finalizes tasks with overdue deadlines.
package deadlineenforcer
import (
"context"
"time"
"cloud.google.com/go/spanner"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/server"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/internal/cron"
"go.chromium.org/luci/resultdb/internal/invocations"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/internal/tasks"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
const maxInvocationsPerShardToEnforceAtOnce = 100
var (
// timeOverdue tracks the delay between invocations expiring and being
// picked up by deadlineenforcer.
timeOverdue = metric.NewCumulativeDistribution(
"resultdb/deadlineenforcer/delay",
"Delay between invocation expiration and forced finalization",
&types.MetricMetadata{Units: types.Milliseconds},
nil,
field.String("realm"),
)
// overdueInvocationsFinalized counts invocations finalized by the
// deadlineenforcer service.
overdueInvocationsFinalized = metric.NewCounter(
"resultdb/deadlineenforcer/finalized_invocations",
"Invocations finalized by deadline enforcer",
&types.MetricMetadata{Units: "invocations"},
field.String("realm"),
)
)
// Options are for configuring the deadline enforcer.
type Options struct {
// ForceCronInterval forces minimum interval in cron jobs.
// Useful in integration tests to reduce the test time.
ForceCronInterval time.Duration
}
// InitServer initializes a deadline enforcer server.
func InitServer(srv *server.Server, opts Options) {
srv.RunInBackground("resultdb.deadlineenforcer", func(ctx context.Context) {
minInterval := time.Minute
if opts.ForceCronInterval > 0 {
minInterval = opts.ForceCronInterval
}
run(ctx, minInterval)
})
}
// run continuously finalizes expired invocations.
// It blocks until context is canceled.
func run(ctx context.Context, minInterval time.Duration) {
maxShard, err := invocations.CurrentMaxShard(ctx)
switch {
case err == spanutil.ErrNoResults:
maxShard = invocations.Shards - 1
case err != nil:
panic(errors.Annotate(err, "failed to determine number of shards").Err())
}
// Start one cron job for each shard of the database.
cron.Group(ctx, maxShard+1, minInterval, enforceOneShard)
}
func enforceOneShard(ctx context.Context, shard int) error {
limit := maxInvocationsPerShardToEnforceAtOnce
for {
cnt, err := enforce(ctx, shard, limit)
if err != nil {
return err
}
if cnt != limit {
// The last page wasn't full, there likely aren't any more
// overdue invocations for now.
break
}
}
return nil
}
func enforce(ctx context.Context, shard, limit int) (int, error) {
st := spanner.NewStatement(`
SELECT InvocationId, ActiveDeadline, Realm
FROM Invocations@{FORCE_INDEX=InvocationsByActiveDeadline, spanner_emulator.disable_query_null_filtered_index_check=true}
WHERE ShardId = @shardId
AND ActiveDeadline <= CURRENT_TIMESTAMP()
LIMIT @limit
`)
st.Params["shardId"] = shard
st.Params["limit"] = limit
rowCount := 0
ctx, cancel := span.ReadOnlyTransaction(ctx)
defer cancel()
err := spanutil.Query(ctx, st, func(row *spanner.Row) error {
rowCount++
var id invocations.ID
var ts *timestamppb.Timestamp
var realm string
if err := spanutil.FromSpanner(row, &id, &ts, &realm); err != nil {
return err
}
// TODO(crbug.com/1207606): Increase parallelism.
_, err := span.ReadWriteTransaction(ctx, func(ctx context.Context) error {
state, err := invocations.ReadState(ctx, id)
if err != nil {
return err
}
if state != pb.Invocation_ACTIVE {
// Finalization already started (possible race with explicit
// finalization). Do not start finalization again as doing
// so would overwrite the existing FinalizeStartTime
// and create an unnecessary task.
return nil
}
tasks.StartInvocationFinalization(ctx, id, true)
return nil
})
if err == nil {
overdueInvocationsFinalized.Add(ctx, 1, realm)
timeOverdue.Add(ctx, float64(clock.Now(ctx).Sub(ts.AsTime()).Milliseconds()), realm)
}
return err
})
return rowCount, err
}