blob: a310dd257b3fd1b5a408afa2a12c1e1521acdf1f [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runs
import (
"context"
"time"
"cloud.google.com/go/spanner"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/analysis/internal/clustering/rules"
"go.chromium.org/luci/analysis/internal/clustering/shards"
"go.chromium.org/luci/analysis/internal/config"
spanutil "go.chromium.org/luci/analysis/internal/span"
"go.chromium.org/luci/analysis/pbutil"
)
// ReclusteringRun contains the details of a runs used to re-cluster
// test results.
type ReclusteringRun struct {
// The LUCI Project for which this rule is defined.
Project string
// The attempt. This is the time the orchestrator run ends.
AttemptTimestamp time.Time
// The minimum algorithms version this reclustering run is trying
// to achieve. Chunks with an AlgorithmsVersion less than this
// value are eligible to be re-clustered.
AlgorithmsVersion int64
// The minimum config version the reclustering run is trying to achieve.
// Chunks with a ConfigVersion less than this value are eligible to be
// re-clustered.
ConfigVersion time.Time
// The minimum rules version the reclustering run is trying to achieve.
// Chunks with a RulesVersion less than this value are eligible to be
// re-clustered.
RulesVersion time.Time
// The number of shards created for this run (for this LUCI project).
ShardCount int64
// The number of shards that have reported progress (at least once).
// When this is equal to ShardCount, readers can have confidence Progress
// is a reasonable reflection of the progress made reclustering
// this project. Until then, it is a loose lower-bound.
ShardsReported int64
// The progress. This is a value between 0 and 1000*ShardCount.
Progress int64
}
// NotFound is the error returned by Read if the row could not be found.
var NotFound = errors.New("reclustering run row not found")
// StartingEpoch is the earliest valid run attempt time.
var StartingEpoch = shards.StartingEpoch
// MaxAttemptTimestamp can be passed to any Read....() method to
// return data up to the last attempt.
var MaxAttemptTimestamp = time.Date(9999, 12, 31, 23, 59, 0, 0, time.UTC)
// Read reads the run with the given attempt timestamp in the given LUCI
// project. If the row does not exist, the error NotFound is returned.
func Read(ctx context.Context, projectID string, attemptTimestamp time.Time) (*ReclusteringRun, error) {
whereClause := `AttemptTimestamp = @attemptTimestamp`
params := map[string]any{
"attemptTimestamp": attemptTimestamp,
}
r, err := readLastWhere(ctx, projectID, whereClause, params)
if err != nil {
return nil, errors.Annotate(err, "query run").Err()
}
if r == nil {
return nil, NotFound
}
return r, nil
}
// ReadLastUpTo reads the last run in the given LUCI project up to
// the given attempt timestamp. If no row exists,
// a fake run is returned with the following details:
// - Project matching the requested Project ID.
// - AttemptTimestamp of StartingEpoch.
// - AlgorithmsVersion of 1.
// - ConfigVersion of clusteringcfg.StartingEpoch.
// - RulesVersion of rules.StartingEpoch.
// - ShardCount and ShardsReported of 1.
// - Progress of 1000.
func ReadLastUpTo(ctx context.Context, projectID string, upToAttemptTimestamp time.Time) (*ReclusteringRun, error) {
whereClause := `AttemptTimestamp <= @upToAttemptTimestamp`
params := map[string]any{
"upToAttemptTimestamp": upToAttemptTimestamp,
}
r, err := readLastWhere(ctx, projectID, whereClause, params)
if err != nil {
return nil, errors.Annotate(err, "query last run").Err()
}
if r == nil {
r = fakeLastRow(projectID)
}
return r, nil
}
// ReadLastWithProgress reads the last run with progress in the given LUCI
// project up to the given attempt timestamp.
//
// If no row exists, a fake row is returned; see ReadLast for details.
func ReadLastWithProgressUpTo(ctx context.Context, projectID string, upToAttemptTimestamp time.Time) (*ReclusteringRun, error) {
whereClause := `ShardsReported = ShardCount AND AttemptTimestamp <= @upToAttemptTimestamp`
params := map[string]any{
"upToAttemptTimestamp": upToAttemptTimestamp,
}
r, err := readLastWhere(ctx, projectID, whereClause, params)
if err != nil {
return nil, errors.Annotate(err, "query last run with progress up to").Err()
}
if r == nil {
r = fakeLastRow(projectID)
}
return r, nil
}
// ReadLastCompleteUpTo reads the last run that completed in the given LUCI
// project up to the given attempt timestamp.
// If no row exists, a fake row is returned; see ReadLast for details.
func ReadLastCompleteUpTo(ctx context.Context, projectID string, upToAttemptTimestamp time.Time) (*ReclusteringRun, error) {
whereClause := `Progress = (ShardCount * 1000) AND AttemptTimestamp <= @upToAttemptTimestamp`
params := map[string]any{
"upToAttemptTimestamp": upToAttemptTimestamp,
}
r, err := readLastWhere(ctx, projectID, whereClause, params)
if err != nil {
return nil, errors.Annotate(err, "query last run up to").Err()
}
if r == nil {
r = fakeLastRow(projectID)
}
return r, nil
}
func fakeLastRow(projectID string) *ReclusteringRun {
return &ReclusteringRun{
Project: projectID,
AttemptTimestamp: StartingEpoch,
AlgorithmsVersion: 1,
ConfigVersion: config.StartingEpoch,
RulesVersion: rules.StartingEpoch,
ShardCount: 1,
ShardsReported: 1,
Progress: 1000,
}
}
// readLastWhere reads the last run matching the given where clause,
// substituting params for any SQL parameters used in that clause.
func readLastWhere(ctx context.Context, projectID string, whereClause string, params map[string]any) (*ReclusteringRun, error) {
stmt := spanner.NewStatement(`
SELECT
AttemptTimestamp, ConfigVersion, RulesVersion,
AlgorithmsVersion, ShardCount, ShardsReported, Progress
FROM ReclusteringRuns
WHERE Project = @projectID AND (` + whereClause + `)
ORDER BY AttemptTimestamp DESC
LIMIT 1
`)
for k, v := range params {
stmt.Params[k] = v
}
stmt.Params["projectID"] = projectID
it := span.Query(ctx, stmt)
rs := []*ReclusteringRun{}
err := it.Do(func(r *spanner.Row) error {
var attemptTimestamp, rulesVersion, configVersion time.Time
var algorithmsVersion, shardCount, shardsReported, progress int64
err := r.Columns(
&attemptTimestamp, &configVersion, &rulesVersion,
&algorithmsVersion, &shardCount, &shardsReported, &progress,
)
if err != nil {
return errors.Annotate(err, "read run row").Err()
}
run := &ReclusteringRun{
Project: projectID,
AttemptTimestamp: attemptTimestamp,
AlgorithmsVersion: algorithmsVersion,
ConfigVersion: configVersion,
RulesVersion: rulesVersion,
ShardCount: shardCount,
ShardsReported: shardsReported,
Progress: progress,
}
rs = append(rs, run)
return nil
})
if len(rs) > 0 {
return rs[0], err
}
return nil, err
}
// Create inserts a new reclustering run.
func Create(ctx context.Context, r *ReclusteringRun) error {
if err := validateRun(r); err != nil {
return err
}
ms := spanutil.InsertMap("ReclusteringRuns", map[string]any{
"Project": r.Project,
"AttemptTimestamp": r.AttemptTimestamp,
"AlgorithmsVersion": r.AlgorithmsVersion,
"ConfigVersion": r.ConfigVersion,
"RulesVersion": r.RulesVersion,
"ShardCount": r.ShardCount,
"ShardsReported": r.ShardsReported,
"Progress": r.Progress,
})
span.BufferWrite(ctx, ms)
return nil
}
func validateRun(r *ReclusteringRun) error {
if err := pbutil.ValidateProject(r.Project); err != nil {
return errors.Annotate(err, "project").Err()
}
switch {
case r.AttemptTimestamp.Before(StartingEpoch):
return errors.New("attempt timestamp must be valid")
case r.AlgorithmsVersion <= 0:
return errors.New("algorithms version must be valid")
case r.ConfigVersion.Before(config.StartingEpoch):
return errors.New("config version must be valid")
case r.RulesVersion.Before(rules.StartingEpoch):
return errors.New("rules version must be valid")
case r.ShardCount <= 0:
return errors.New("shard count must be valid")
case r.ShardsReported < 0 || r.ShardsReported > r.ShardCount:
return errors.New("shards reported must be valid")
case r.Progress < 0 || r.Progress > (r.ShardCount*1000):
return errors.New("progress must be valid")
}
return nil
}
// UpdateProgress sets the progress of a particular run.
func UpdateProgress(ctx context.Context, projectID string, attemptTimestamp time.Time, shardsReported, progress int64) error {
ms := spanutil.UpdateMap("ReclusteringRuns", map[string]any{
"Project": projectID,
"AttemptTimestamp": attemptTimestamp,
"ShardsReported": shardsReported,
"Progress": progress,
})
span.BufferWrite(ctx, ms)
return nil
}