blob: 9ef592fad435a28eea38f70f01abdba2425a15ed [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"bytes"
"context"
"fmt"
"io"
"time"
"github.com/klauspost/compress/zlib"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/data/packedintset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/gae/service/datastore"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.chromium.org/luci/swarming/server/acls"
)
const (
// ChunkSize is the size of each TaskOutputChunk.Chunk in uncompressed form.
//
// The rationale is that appending data to an entity requires reading it
// first, so it must not be too big. On the other hand, having thousands of
// small entities is pure overhead.
//
// Note that changing this value is a breaking change for existing logs.
ChunkSize = 100 * 1024
// MaxChunks sets a limit on how much of a task output to store.
MaxChunks = 1024
)
// TaskResultCommon contains properties that are common to both TaskRunResult
// and TaskResultSummary.
//
// It is not meant to be stored in the datastore on its own, only as an embedded
// struct inside TaskRunResult or TaskResultSummary.
type TaskResultCommon struct {
// State is the current state of the task.
//
// The index is used in task listing queries to filter by.
State apipb.TaskState `gae:"state"`
// Modified is when this entity was created or last modified.
Modified time.Time `gae:"modified_ts,noindex"`
// BotVersion is a version of the bot code running the task, if already known.
BotVersion string `gae:"bot_version,noindex"`
// BotDimensions are bot dimensions at the moment the bot claimed the task.
BotDimensions BotDimensions `gae:"bot_dimensions"`
// BotIdleSince is when the bot running this task finished its previous task,
// if already known.
BotIdleSince datastore.Optional[time.Time, datastore.Unindexed] `gae:"bot_idle_since_ts"`
// BotLogsCloudProject is a GCP project where the bot uploads its logs.
BotLogsCloudProject string `gae:"bot_logs_cloud_project,noindex"`
// ServerVersions is a set of server version(s) that touched this entity.
ServerVersions []string `gae:"server_versions,noindex"`
// CurrentTaskSlice is an index of the active task slice in the TaskRequest.
//
// For TaskResultSummary it is the currently running slice (and it changes
// over lifetime of a task if it has many slices). For TaskRunResult it is the
// index representing this concrete run.
CurrentTaskSlice int64 `gae:"current_task_slice,noindex"`
// Started is when the bot started executing the task.
//
// The index is used in task listing queries to order by this property.
Started datastore.Nullable[time.Time, datastore.Indexed] `gae:"started_ts"`
// Completed is when the task switched into a final state.
//
// This is either when the bot finished executing it, or when it expired or
// was canceled.
//
// The index is used in a bunch of places:
// 1. In task listing queries to order by this property.
// 2. In crashed bot detection to list still pending or running tasks.
// 3. In BQ export pagination.
Completed datastore.Nullable[time.Time, datastore.Indexed] `gae:"completed_ts"`
// Abandoned is set when a task had an internal failure, timed out or was
// killed by a client request.
//
// The index is used in task listing queries to order by this property.
Abandoned datastore.Nullable[time.Time, datastore.Indexed] `gae:"abandoned_ts"`
// DurationSecs is how long the task process was running.
//
// This is reported explicitly by the bot and it excludes all overhead.
DurationSecs datastore.Optional[float64, datastore.Unindexed] `gae:"durations"` // legacy plural property name
// ExitCode is the task process exit code for tasks in COMPLETED state.
ExitCode datastore.Optional[int64, datastore.Unindexed] `gae:"exit_codes"` // legacy plural property name
// Failure is true if the task finished with a non-zero process exit code.
//
// The index is used in task listing queries to filter by failure.
Failure bool `gae:"failure"`
// InternalFailure is true if the task failed due to an internal error.
InternalFailure bool `gae:"internal_failure,noindex"`
// StdoutChunks is the number of TaskOutputChunk entities with the output.
StdoutChunks int64 `gae:"stdout_chunks,noindex"`
// CASOutputRoot is the digest of the output root uploaded to RBE-CAS.
CASOutputRoot CASReference `gae:"cas_output_root,lsp,noindex"`
// CIPDPins is resolved versions of all the CIPD packages used in the task.
CIPDPins CIPDInput `gae:"cipd_pins,lsp,noindex"`
// MissingCIPD is the missing CIPD packages in CLIENT_ERROR state.
MissingCIPD []CIPDPackage `gae:"missing_cipd,lsp,noindex"`
// MissingCAS is the missing CAS digests in CLIENT_ERROR state.
MissingCAS []CASReference `gae:"missing_cas,lsp,noindex"`
// ResultDBInfo is ResultDB invocation info for this task.
//
// If this task was deduplicated, this contains invocation info from the
// previously ran the task deduplicated from.
ResultDBInfo ResultDBInfo `gae:"resultdb_info,lsp,noindex"`
// LegacyOutputsRef is no longer used.
LegacyOutputsRef LegacyProperty `gae:"outputs_ref"`
}
// toProto populates as much of apipb.TaskResultResponse as it can.
//
// It is used as part of TaskRunResult.ToProto and TaskResultSummary.ToProto.
//
// Fields that still need to be populated by the caller:
// - BotId
// - CostSavedUsd
// - CostsUsd
// - CreatedTs
// - DedupedFrom
// - Name
// - PerformanceStats
// - RunId
// - Tags
// - TaskId
// - User
func (p *TaskResultCommon) toProto() *apipb.TaskResultResponse {
type timestamp interface {
IsSet() bool
Get() time.Time
}
timeToTimestampPB := func(t timestamp) *timestamppb.Timestamp {
if !t.IsSet() {
return nil
}
return timestamppb.New(t.Get())
}
var cipdPins *apipb.CipdPins
if pinsPb := p.CIPDPins.ToProto(); pinsPb != nil {
cipdPins = &apipb.CipdPins{
ClientPackage: pinsPb.GetClientPackage(),
Packages: pinsPb.GetPackages(),
}
}
var missingCAS []*apipb.CASReference
if len(p.MissingCAS) != 0 {
missingCAS = make([]*apipb.CASReference, 0, len(p.MissingCAS))
for _, cas := range p.MissingCAS {
missingCAS = append(missingCAS, cas.ToProto())
}
}
var missingCIPD []*apipb.CipdPackage
if len(p.MissingCIPD) != 0 {
missingCIPD = make([]*apipb.CipdPackage, 0, len(p.MissingCIPD))
for _, pkg := range p.MissingCIPD {
missingCIPD = append(missingCIPD, pkg.ToProto())
}
}
return &apipb.TaskResultResponse{
AbandonedTs: timeToTimestampPB(p.Abandoned),
BotDimensions: p.BotDimensions.ToProto(),
BotIdleSinceTs: timeToTimestampPB(p.BotIdleSince),
BotLogsCloudProject: p.BotLogsCloudProject,
BotVersion: p.BotVersion,
CasOutputRoot: p.CASOutputRoot.ToProto(),
CipdPins: cipdPins,
CompletedTs: timeToTimestampPB(p.Completed),
CurrentTaskSlice: int32(p.CurrentTaskSlice),
Duration: float32(p.DurationSecs.Get()),
ExitCode: p.ExitCode.Get(),
Failure: p.Failure,
InternalFailure: p.InternalFailure,
MissingCas: missingCAS,
MissingCipd: missingCIPD,
ModifiedTs: timestamppb.New(p.Modified),
ResultdbInfo: p.ResultDBInfo.ToProto(),
ServerVersions: p.ServerVersions,
StartedTs: timeToTimestampPB(p.Started),
State: p.State,
}
}
// ResultDBInfo contains invocation ID of the task.
type ResultDBInfo struct {
// Hostname is the ResultDB service hostname e.g. "results.api.cr.dev".
Hostname string `gae:"hostname"`
// Invocation is the ResultDB invocation name for the task, if any.
Invocation string `gae:"invocation"`
}
// ToProto converts ResultDBInfo to an apipb.ResultDBInfo.
func (p *ResultDBInfo) ToProto() *apipb.ResultDBInfo {
if p.Hostname == "" && p.Invocation == "" {
return nil
}
return &apipb.ResultDBInfo{
Hostname: p.Hostname,
Invocation: p.Invocation,
}
}
// TaskResultSummary represents the overall result of a task.
//
// Parent is a TaskRequest. Key id is always 1.
//
// It is created (in PENDING state) as soon as the task is created and then
// mutated whenever the task changes its state. Its used by various task listing
// APIs.
type TaskResultSummary struct {
// TaskResultCommon embeds most of result properties.
TaskResultCommon
// Extra are entity properties that didn't match any declared ones below.
//
// Should normally be empty.
Extra datastore.PropertyMap `gae:"-,extra"`
// Key identifies the task, see TaskResultSummaryKey().
Key *datastore.Key `gae:"$key"`
// BotID is ID of a bot that runs this task, if already known.
BotID datastore.Optional[string, datastore.Unindexed] `gae:"bot_id"`
// Created is a timestamp when the task was submitted.
//
// The index is used in task listing queries to order by this property.
//
// TODO(vadimsh): Is the index actually used? Entity keys encode the same
// information.
Created time.Time `gae:"created_ts"`
// Tags are copied from the corresponding TaskRequest entity.
//
// The index is used in global task listing queries to filter by.
Tags []string `gae:"tags"`
// RequestName is copied from the corresponding TaskRequest entity.
RequestName string `gae:"name,noindex"`
// RequestUser is copied from the corresponding TaskRequest entity.
RequestUser string `gae:"user,noindex"`
// RequestPriority is copied from the corresponding TaskRequest entity.
RequestPriority int64 `gae:"priority,noindex"`
// RequestAuthenticated is copied from the corresponding TaskRequest entity.
RequestAuthenticated identity.Identity `gae:"request_authenticated,noindex"`
// RequestRealm is copied from the corresponding TaskRequest entity.
RequestRealm string `gae:"request_realm,noindex"`
// RequestPool is a pool the task is targeting.
//
// Derived from dimensions in the corresponding TaskRequest entity.
RequestPool string `gae:"request_pool,noindex"`
// RequestBotID is a concrete bot the task is targeting (if any).
//
// Derived from dimensions in the corresponding TaskRequest entity.
RequestBotID string `gae:"request_bot_id,noindex"`
// PropertiesHash is used to find duplicate tasks.
//
// It is set to a hash of properties of the task slice only when these
// conditions are met:
// - TaskSlice.TaskProperties.Idempotent is true.
// - State is COMPLETED.
// - Failure is false (i.e. ExitCode is 0).
// - InternalFailure is false.
//
// The index is used to find an an existing duplicate task when submitting
// a new task.
PropertiesHash datastore.Optional[[]byte, datastore.Indexed] `gae:"properties_hash"`
// TryNumber indirectly identifies if this task was dedupped.
//
// This field is leftover from when swarming had internal retries and for that
// reason has a weird semantics. See https://crbug.com/1065101.
//
// Possible values:
// null: if the task is still pending, has expired or was canceled.
// 1: if the task was assigned to a bot and either currently runs or has
// finished or crashed already.
// 0: if the task was dedupped.
//
// The index is used in global task listing queries to find what tasks were
// dedupped.
TryNumber datastore.Nullable[int64, datastore.Indexed] `gae:"try_number"`
// CostUSD is an approximate bot time cost spent executing this task.
CostUSD float64 `gae:"costs_usd,noindex"` // legacy plural property name
// CostSavedUSD is an approximate cost saved by deduping this task.
CostSavedUSD float64 `gae:"cost_saved_usd,noindex"`
// DedupedFrom is set if this task reused results of some previous task.
//
// See PropertiesHash for conditions when this is possible.
//
// DedupedFrom is a packed TaskRunResult key of the reused result. Note that
// there's no TaskRunResult representing this task, since it didn't run.
DedupedFrom string `gae:"deduped_from,noindex"`
// ExpirationDelay is a delay from TaskRequest.ExpirationTS to the actual
// expiry time.
//
// This is set at expiration process if the last task slice expired by
// reaching its deadline. Unset if the last slice expired because there were
// no bots that could run it.
//
// Exclusively for monitoring.
ExpirationDelay datastore.Optional[float64, datastore.Unindexed] `gae:"expiration_delay"`
}
// ToProto converts the TaskResultSummary struct to an apipb.TaskResultResponse.
//
// Note: This function will not handle PerformanceStats due to the requirement
// of fetching another datastore entity. Please refer to PerformanceStats to
// fetch them.
func (p *TaskResultSummary) ToProto() *apipb.TaskResultResponse {
pb := p.TaskResultCommon.toProto()
pb.BotId = p.BotID.Get()
pb.CostSavedUsd = float32(p.CostSavedUSD)
pb.CostsUsd = p.CostsUSD()
pb.CreatedTs = timestamppb.New(p.Created)
pb.DedupedFrom = p.DedupedFrom
pb.Name = p.RequestName
pb.PerformanceStats = nil // will have to be filled in later
pb.RunId = p.TaskRunID()
pb.Tags = p.Tags
pb.TaskId = RequestKeyToTaskID(p.TaskRequestKey(), AsRequest)
pb.User = p.RequestUser
return pb
}
// CostsUSD converts the costUSD in TaskResultSummary to a []float32 containing
// that costUSD.
func (p *TaskResultSummary) CostsUSD() []float32 {
if p.CostUSD == 0 {
return nil
}
return []float32{float32(p.CostUSD)}
}
// GetOutput returns the stdout content for the task.
func (p *TaskResultSummary) GetOutput(ctx context.Context, length, offset int64) ([]byte, error) {
if p.StdoutChunks == 0 {
return nil, nil
}
if length == 0 {
// Fetch the whole content
length = p.StdoutChunks*ChunkSize - offset
}
firstChunk := offset / ChunkSize
end := offset + length
lastChunk := (end + ChunkSize - 1) / ChunkSize
if lastChunk > p.StdoutChunks {
lastChunk = p.StdoutChunks
}
toGet := make([]*TaskOutputChunk, lastChunk-firstChunk)
for i := firstChunk; i < lastChunk; i++ {
toGet[i-firstChunk] = &TaskOutputChunk{Key: TaskOutputChunkKey(ctx, p.TaskRequestKey(), i)}
}
handleChunk := func(chunk []byte, e error) ([]byte, error) {
switch {
case errors.Is(e, datastore.ErrNoSuchEntity):
return bytes.Repeat([]byte{'\x00'}, ChunkSize), nil
case e != nil:
return nil, e
}
r, err := zlib.NewReader(bytes.NewReader(chunk))
if err != nil {
return nil, errors.Annotate(err, "failed to get zlib reader").Err()
}
decompressed, err := io.ReadAll(r)
if err != nil {
_ = r.Close()
return nil, errors.Annotate(err, "failed to read chunk").Err()
}
if err := r.Close(); err != nil {
return nil, errors.Annotate(err, "failed to close zlib reader").Err()
}
return decompressed, nil
}
var chunks [][]byte
err := datastore.Get(ctx, toGet)
for i := 0; i < len(toGet); i++ {
chunkErr := func(i int) error {
if merr, ok := err.(errors.MultiError); ok {
return merr[i]
}
return err
}
decmpChunk, err := handleChunk(toGet[i].Chunk, chunkErr(i))
if err != nil {
return nil, errors.Annotate(err, "failed to handle chunk").Err()
}
chunks = append(chunks, decmpChunk)
}
min := func(x, y int) int {
if x < y {
return x
}
return y
}
toReturn := bytes.Join(chunks, nil)
startOffset := offset % ChunkSize
endOffset := min(int(end-(firstChunk*ChunkSize)), len(toReturn))
return toReturn[startOffset:endOffset], nil
}
// PerformanceStatsKey returns PerformanceStats entity key or nil.
//
// It returns a non-nil key if the performance stats entity can **potentially**
// exist (e.g. the task has finished running and reported its stats). It returns
// nil if the performance stats entity definitely doesn't exist.
//
// If the task was dedupped, returns the key of the performance stats of the
// original task that actually ran.
func (p *TaskResultSummary) PerformanceStatsKey(ctx context.Context) *datastore.Key {
switch {
case p.DedupedFrom != "":
reqKey, err := TaskIDToRequestKey(ctx, p.DedupedFrom)
if err != nil {
logging.Errorf(ctx, "Broken DedupedFrom %q in %s", p.DedupedFrom, p.Key)
return nil
}
return PerformanceStatsKey(ctx, reqKey)
case !ranAndDidNotCrash(p.State):
return nil
default:
return PerformanceStatsKey(ctx, p.TaskRequestKey())
}
}
// TaskAuthInfo is information about the task for ACL checks.
func (p *TaskResultSummary) TaskAuthInfo() acls.TaskAuthInfo {
return acls.TaskAuthInfo{
TaskID: RequestKeyToTaskID(p.TaskRequestKey(), AsRequest),
Realm: p.RequestRealm,
Pool: p.RequestPool,
BotID: p.RequestBotID,
Submitter: p.RequestAuthenticated,
}
}
// TaskRequestKey returns the parent task request key or panics if it is unset.
func (p *TaskResultSummary) TaskRequestKey() *datastore.Key {
key := p.Key.Parent()
if key == nil || key.Kind() != "TaskRequest" {
panic(fmt.Sprintf("invalid TaskResultSummary key %q", p.Key))
}
return key
}
// TaskRunID returns the packed RunResult key for a swarming task.
func (p *TaskResultSummary) TaskRunID() string {
// Returning nil if there was no attempt made.
if !p.TryNumber.IsSet() {
return ""
}
return RequestKeyToTaskID(p.TaskRequestKey(), AsRunResult)
}
// TaskResultSummaryKey construct a summary key given a task request key.
func TaskResultSummaryKey(ctx context.Context, taskReq *datastore.Key) *datastore.Key {
return datastore.NewKey(ctx, "TaskResultSummary", "", 1, taskReq)
}
// TaskRunResult contains result of an attempt to run a task on a bot.
//
// Parent is a TaskResultSummary. Key id is 1.
//
// Unlike TaskResultSummary it is created only when the task is assigned to a
// bot, i.e. existence of this entity means a bot claimed the task and started
// executing it.
type TaskRunResult struct {
// TaskResultCommon embeds most of result properties.
TaskResultCommon
// Extra are entity properties that didn't match any declared ones below.
//
// Should normally be empty.
Extra datastore.PropertyMap `gae:"-,extra"`
// Key identifies the task, see TaskRunResultKey().
Key *datastore.Key `gae:"$key"`
// BotID is ID of a bot that runs this task attempt.
//
// The index is used in task listing queries to filter by a specific bot.
BotID string `gae:"bot_id"`
// CostUSD is an approximate bot time cost spent executing this task.
CostUSD float64 `gae:"cost_usd,noindex"`
// Killing is true if a user requested the task to be canceled while it is
// already running.
//
// Setting this field eventually results in the task moving to KILLED state.
// This transition also unsets Killing back to false.
Killing bool `gae:"killing,noindex"`
// DeadAfter specifies the time after which the bot executing this task
// is considered dead.
//
// It is set after every ping from the bot if the task is in RUNNING state
// and get unset once the task terminates.
DeadAfter datastore.Optional[time.Time, datastore.Unindexed] `gae:"dead_after_ts"`
}
// TaskRunResultKey constructs a task run result key given a task request key.
func TaskRunResultKey(ctx context.Context, taskReq *datastore.Key) *datastore.Key {
return datastore.NewKey(ctx, "TaskRunResult", "", 1, TaskResultSummaryKey(ctx, taskReq))
}
// TaskRequestKey returns the parent task request key or panics if it is unset.
func (p *TaskRunResult) TaskRequestKey() *datastore.Key {
key := p.Key.Parent().Parent()
if key == nil || key.Kind() != "TaskRequest" {
panic(fmt.Sprintf("invalid TaskRunResult key %q", p.Key))
}
return key
}
// PerformanceStatsKey returns PerformanceStats entity key or nil.
//
// It returns a non-nil key if the performance stats entity can **potentially**
// exist (e.g. the task has finished running and reported its stats). It returns
// nil if the performance stats entity definitely doesn't exist.
func (p *TaskRunResult) PerformanceStatsKey(ctx context.Context) *datastore.Key {
if !ranAndDidNotCrash(p.State) {
return nil
}
return PerformanceStatsKey(ctx, p.TaskRequestKey())
}
// ToProto converts the TaskRunResult struct to an apipb.TaskResultResponse.
//
// Note: This function will not handle PerformanceStats due to the requirement
// of fetching another datastore entity. Please refer to PerformanceStats to
// fetch them.
func (p *TaskRunResult) ToProto() *apipb.TaskResultResponse {
pb := p.TaskResultCommon.toProto()
pb.BotId = p.BotID
pb.CostSavedUsd = 0 // the task is running, it doesn't save any cost
pb.CostsUsd = []float32{float32(p.CostUSD)}
pb.CreatedTs = nil // TODO
pb.DedupedFrom = "" // the task is running, not deduped
pb.Name = "" // TODO
pb.PerformanceStats = nil // will need to be filled in later
pb.RunId = RequestKeyToTaskID(p.TaskRequestKey(), AsRunResult)
pb.Tags = nil // TODO
pb.TaskId = RequestKeyToTaskID(p.TaskRequestKey(), AsRequest)
pb.User = "" // TODO
return pb
}
// PerformanceStats contains various timing and performance information about
// the task as reported by the bot.
type PerformanceStats struct {
// Extra are entity properties that didn't match any declared ones below.
//
// Should normally be empty.
Extra datastore.PropertyMap `gae:"-,extra"`
// Key identifies the task, see PerformanceStatsKey.
Key *datastore.Key `gae:"$key"`
// BotOverheadSecs is the total overhead in seconds, summing overhead from all
// sections defined below.
BotOverheadSecs float64 `gae:"bot_overhead,noindex"`
// CacheTrim is stats of cache trimming before the dependency installations.
CacheTrim OperationStats `gae:"cache_trim,lsp,noindex"`
// PackageInstallation is stats of installing CIPD packages before the task.
PackageInstallation OperationStats `gae:"package_installation,lsp,noindex"`
// NamedCachesInstall is stats of named cache mounting before the task.
NamedCachesInstall OperationStats `gae:"named_caches_install,lsp,noindex"`
// NamedCachesUninstall is stats of named cache unmounting after the task.
NamedCachesUninstall OperationStats `gae:"named_caches_uninstall,lsp,noindex"`
// IsolatedDownload is stats of CAS dependencies download before the task.
IsolatedDownload CASOperationStats `gae:"isolated_download,lsp,noindex"`
// IsolatedUpload is stats of CAS uploading operation after the task.
IsolatedUpload CASOperationStats `gae:"isolated_upload,lsp,noindex"`
// Cleanup is stats of work directory cleanup operation after the task.
Cleanup OperationStats `gae:"cleanup,lsp,noindex"`
}
// ToProto converts the PerformanceStats struct to apipb.PerformanceStats.
func (p *PerformanceStats) ToProto() (*apipb.PerformanceStats, error) {
isolatedDownload, err := p.IsolatedDownload.ToProto()
if err != nil {
return nil, err
}
isolatedUpload, err := p.IsolatedUpload.ToProto()
if err != nil {
return nil, err
}
return &apipb.PerformanceStats{
BotOverhead: float32(p.BotOverheadSecs),
CacheTrim: p.CacheTrim.ToProto(),
Cleanup: p.Cleanup.ToProto(),
IsolatedDownload: isolatedDownload,
IsolatedUpload: isolatedUpload,
NamedCachesInstall: p.NamedCachesInstall.ToProto(),
NamedCachesUninstall: p.NamedCachesUninstall.ToProto(),
PackageInstallation: p.PackageInstallation.ToProto(),
}, nil
}
// PerformanceStatsKey builds a PerformanceStats key given a task request key.
func PerformanceStatsKey(ctx context.Context, taskReq *datastore.Key) *datastore.Key {
return datastore.NewKey(ctx, "PerformanceStats", "", 1, TaskRunResultKey(ctx, taskReq))
}
// OperationStats is performance stats of a particular operation.
//
// Stored as a unindexed subentity of PerformanceStats entity.
type OperationStats struct {
// DurationSecs is how long the operation ran.
DurationSecs float64 `gae:"duration"`
}
// ToProto converts the OperationStats struct to apipb.OperationStats.
func (p *OperationStats) ToProto() *apipb.OperationStats {
if p.DurationSecs == 0 {
return nil
}
return &apipb.OperationStats{
Duration: float32(p.DurationSecs),
}
}
// CASOperationStats is performance stats of a CAS operation.
//
// Stored as a unindexed subentity of PerformanceStats entity.
type CASOperationStats struct {
// DurationSecs is how long the operation ran.
DurationSecs float64 `gae:"duration"`
// InitialItems is the number of items in the cache before the operation.
InitialItems int64 `gae:"initial_number_items"`
// InitialSize is the total cache size before the operation.
InitialSize int64 `gae:"initial_size"`
// ItemsCold is a set of sizes of items that were downloaded or uploaded.
//
// It is encoded in a special way, see packedintset.Unpack.
ItemsCold []byte `gae:"items_cold"`
// ItemsHot is a set of sizes of items that were already present in the cache.
//
// It is encoded in a special way, see packedintset.Unpack.
ItemsHot []byte `gae:"items_hot"`
}
// IsEmpty is true if this struct is unpopulated.
func (p *CASOperationStats) IsEmpty() bool {
return p.DurationSecs == 0 && p.InitialItems == 0 && p.InitialSize == 0 &&
len(p.ItemsCold) == 0 && len(p.ItemsHot) == 0
}
// ToProto converts the CASOperationStats struct to apipb.CASOperationStats.
func (p *CASOperationStats) ToProto() (*apipb.CASOperationStats, error) {
if p.IsEmpty() {
return nil, nil
}
resp := &apipb.CASOperationStats{
Duration: float32(p.DurationSecs),
InitialNumberItems: int32(p.InitialItems),
InitialSize: p.InitialSize,
}
// TODO (crbug.com/329071986): Store precalculated sums in the stats. We only
// need to compute sum once since ItemsCold and ItemsHot never change once
// they are set and they generally can be pretty big, costing more compute
// power.
if p.ItemsCold != nil {
resp.ItemsCold = p.ItemsCold
itemsCold, err := packedintset.Unpack(p.ItemsCold)
if err != nil {
return nil, errors.Annotate(err, "unpacking cold items").Err()
}
resp.NumItemsCold = int64(len(itemsCold))
sum := int64(0)
for _, val := range itemsCold {
sum += val
}
resp.TotalBytesItemsCold = int64(sum)
}
if p.ItemsHot != nil {
resp.ItemsHot = p.ItemsHot
itemsHot, err := packedintset.Unpack(p.ItemsHot)
if err != nil {
return nil, errors.Annotate(err, "unpacking hot items").Err()
}
resp.NumItemsHot = int64(len(itemsHot))
sum := int64(0)
for _, val := range itemsHot {
sum += val
}
resp.TotalBytesItemsHot = int64(sum)
}
return resp, nil
}
// TaskOutputChunk represents a chunk of the task console output.
//
// The number of such chunks per task is tracked in TaskRunResult.StdoutChunks.
// Each entity holds a compressed segment of the task output. For all entities
// except the last one, the uncompressed size of the segment is ChunkSize. That
// way it is always possible to figure out what chunk to use for a given output
// offset.
type TaskOutputChunk struct {
// Extra are entity properties that didn't match any declared ones below.
//
// Should normally be empty.
Extra datastore.PropertyMap `gae:"-,extra"`
// Key identifies the task and the chunk index, see TaskOutputChunkKey.
Key *datastore.Key `gae:"$key"`
// Chunk is zlib-compressed chunk of the output.
Chunk []byte `gae:"chunk,noindex"`
// Gaps is a series of 2 integer pairs, which specifies the part that are
// invalid. Normally it should be empty. All values are relative to the start
// of this chunk offset.
Gaps []int32 `gae:"gaps,noindex"`
}
// TaskOutputChunkKey builds a TaskOutputChunk key for the given chunk index.
func TaskOutputChunkKey(ctx context.Context, taskReq *datastore.Key, idx int64) *datastore.Key {
return datastore.NewKey(ctx, "TaskOutputChunk", "", idx+1,
datastore.NewKey(ctx, "TaskOutput", "", 1, TaskRunResultKey(ctx, taskReq)))
}
// TaskResultSummaryQuery prepares a query that fetches TaskResultSummary
// entities.
func TaskResultSummaryQuery() *datastore.Query {
return datastore.NewQuery("TaskResultSummary")
}
// TaskRunResultQuery prepares a query that fetches TaskRunResult entities.
func TaskRunResultQuery() *datastore.Query {
return datastore.NewQuery("TaskRunResult")
}
// FilterTasksByTags limits a TaskResultSummary query to return tasks matching
// given tags filter.
//
// For complex filters this may split the query into multiple queries that need
// to run in parallel with their results merged. See SplitForQuery() in Filter
// for more details.
func FilterTasksByTags(q *datastore.Query, mode SplitMode, tags Filter) []*datastore.Query {
return tags.Apply(q, "tags", mode)
}
// FilterTasksByCreationTime limits a TaskResultSummary or TaskRunResult query
// to return tasks created within the given time range [start, end).
//
// Works only for queries ordered by key. Returns an error if timestamps are
// outside of expected range (e.g. before BeginningOfTheWorld).
//
// Zero time means no limit on the corresponding side of the range.
func FilterTasksByCreationTime(ctx context.Context, q *datastore.Query, start, end time.Time) (*datastore.Query, error) {
if start.IsZero() && end.IsZero() {
return q, nil
}
// Need to use correct key constructors to limit the key range **precisely**
// to what is being asked. This is important for handling tasks that sit on
// the edge of the filtered range.
var keyCB func(context.Context, *datastore.Key) *datastore.Key
f, err := q.Finalize()
if err != nil {
panic(fmt.Sprintf("weird query: %v", q))
}
switch f.Kind() {
case "TaskResultSummary":
keyCB = TaskResultSummaryKey
case "TaskRunResult":
keyCB = TaskRunResultKey
default:
panic(fmt.Sprintf("FilterTasksByCreationTime is used with a query over kind %q", f.Kind()))
}
// Refuse reversed time range, it makes queries fail with ErrNullQuery.
if !start.IsZero() && !end.IsZero() && start.After(end) {
return nil, errors.Reason("start time must be before the end time").Err()
}
// Keys are ordered by timestamp. Transform the provided timestamps into keys
// to filter entities by key. The inequalities are inverted because keys are
// in reverse chronological order.
if !start.IsZero() {
startReqKey, err := TimestampToRequestKey(ctx, start, 0)
if err != nil {
return nil, errors.Annotate(err, "invalid start time").Err()
}
q = q.Lte("__key__", keyCB(ctx, startReqKey))
}
if !end.IsZero() {
endReqKey, err := TimestampToRequestKey(ctx, end, 0)
if err != nil {
return nil, errors.Annotate(err, "invalid end time").Err()
}
q = q.Gt("__key__", keyCB(ctx, endReqKey))
}
return q, nil
}
// FilterTasksByTimestampField orders the query by a timestamp stored in the
// given field, filtering based on it as well.
//
// Tasks will be returned in the descending order (i.e. the most recent first).
//
// The filter range is [start, end). Zero time means no limit on the
// corresponding side of the range.
func FilterTasksByTimestampField(q *datastore.Query, field string, start, end time.Time) *datastore.Query {
q = q.Order("-" + field)
if !start.IsZero() {
q = q.Gte(field, start)
}
if !end.IsZero() {
q = q.Lt(field, end)
}
return q
}
// FilterTasksByState limits a TaskResultSummary query to return tasks in
// particular state.
//
// For StateQuery_QUERY_PENDING_RUNNING filter, depending on passed SplitMode,
// may either split the query into multiple queries that need to run in
// parallel, or append an "IN" filter to the original query. For all other state
// filters just adds simple "EQ" query filters.
//
// Per current datastore limitations (see Filter.SplitForQuery), a query can
// have at most one "IN" filter. Thus if FilterTasksByState returns a query with
// an "IN" filter, all queries built on top of it must not add any more "IN"
// filters. This is communicated by returning SplitCompletely SplitMode that
// should be applied to all subsequent splittings (if any).
func FilterTasksByState(q *datastore.Query, state apipb.StateQuery, splitMode SplitMode) ([]*datastore.Query, SplitMode) {
// Special case that requires merging results of multiple queries.
if state == apipb.StateQuery_QUERY_PENDING_RUNNING {
// Note that it is tempting to use `q.Lte("state", apipb.TaskState_PENDING)`
// since TaskState_RUNNING < TaskState_PENDING and there are no other states
// like that. But Datastore allows an inequality filter on at least one
// property. Queries that use `q.Lte("state", ...)` won't be able to use
// an inequality filter for filtering by timestamp. So we are forced to use
// `In("state", ...)` query (if supported) or merge to parallel queries
// (when In queries are not supported, e.g. when running in dev mode). This
// is communicated by splitMode.
switch splitMode {
case SplitOptimally:
// It is OK to use an IN filter.
return []*datastore.Query{
q.In("state", apipb.TaskState_RUNNING, apipb.TaskState_PENDING),
}, SplitCompletely
case SplitCompletely:
// Asked to avoid IN filters. Run two queries.
return []*datastore.Query{
q.Eq("state", apipb.TaskState_RUNNING),
q.Eq("state", apipb.TaskState_PENDING),
}, SplitCompletely
default:
panic(fmt.Sprintf("unexpected SplitMode %d", splitMode))
}
}
switch state {
case apipb.StateQuery_QUERY_PENDING:
q = q.Eq("state", apipb.TaskState_PENDING)
case apipb.StateQuery_QUERY_RUNNING:
q = q.Eq("state", apipb.TaskState_RUNNING)
case apipb.StateQuery_QUERY_PENDING_RUNNING:
panic("already handled above")
case apipb.StateQuery_QUERY_COMPLETED:
q = q.Eq("state", apipb.TaskState_COMPLETED)
case apipb.StateQuery_QUERY_COMPLETED_SUCCESS:
q = q.Eq("state", apipb.TaskState_COMPLETED).Eq("failure", false)
case apipb.StateQuery_QUERY_COMPLETED_FAILURE:
q = q.Eq("state", apipb.TaskState_COMPLETED).Eq("failure", true)
case apipb.StateQuery_QUERY_EXPIRED:
q = q.Eq("state", apipb.TaskState_EXPIRED)
case apipb.StateQuery_QUERY_TIMED_OUT:
q = q.Eq("state", apipb.TaskState_TIMED_OUT)
case apipb.StateQuery_QUERY_BOT_DIED:
q = q.Eq("state", apipb.TaskState_BOT_DIED)
case apipb.StateQuery_QUERY_CANCELED:
q = q.Eq("state", apipb.TaskState_CANCELED)
case apipb.StateQuery_QUERY_DEDUPED:
q = q.Eq("state", apipb.TaskState_COMPLETED).Eq("try_number", 0)
case apipb.StateQuery_QUERY_KILLED:
q = q.Eq("state", apipb.TaskState_KILLED)
case apipb.StateQuery_QUERY_NO_RESOURCE:
q = q.Eq("state", apipb.TaskState_NO_RESOURCE)
case apipb.StateQuery_QUERY_CLIENT_ERROR:
q = q.Eq("state", apipb.TaskState_CLIENT_ERROR)
case apipb.StateQuery_QUERY_ALL:
// No restrictions.
default:
panic(fmt.Sprintf("unexpected StateQuery %q", state))
}
return []*datastore.Query{q}, splitMode
}