blob: 17efe66ed42b218e651977fbab2287ce41ab241a [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"bytes"
"context"
"fmt"
"math"
"strings"
"time"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/auth/identity"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/tsmon/distribution"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
"go.chromium.org/luci/scheduler/appengine/internal"
"go.chromium.org/luci/scheduler/appengine/task"
)
// errInvocationIDConflict is returned by generateInvocationID.
var errInvocationIDConflict = errors.New("could not find available invocationID", transient.Tag)
const (
// debugLogSizeLimit is how many bytes the invocation debug log can be before
// it gets trimmed. See 'trimDebugLog'. The debug log isn't supposed to be
// huge.
debugLogSizeLimit = 200000
// debugLogTailLines is how many last log lines to keep when trimming the log.
debugLogTailLines = 100
)
// Jan 1 2015, in UTC.
var invocationIDEpoch time.Time
func init() {
var err error
invocationIDEpoch, err = time.Parse(time.RFC822, "01 Jan 15 00:00 UTC")
if err != nil {
panic(err)
}
}
var (
// distributionBucketerSecToDay 1s to 1 day.
//
// 0.05037 is math.log10(math.exp(math.log(24*60*60)/98)), which means
// the last bucket will contain overflow of everything >=1 day.
distributionBucketerSecToDay = distribution.GeometricBucketer(math.Pow(10.0, 0.05037), 100)
metricInvocationsDurations = metric.NewCumulativeDistribution(
"luci/scheduler/invocations/durations",
"Durations of completed invocations (sec).",
&types.MetricMetadata{Units: types.Seconds},
distributionBucketerSecToDay,
field.String("jobID"),
field.String("status"), // one of final statuses of task.Status enum.
)
)
// generateInvocationID is called within a transaction to pick a new Invocation
// ID and ensure it isn't taken yet.
//
// This function essentially pick root key for a new entity group, checking
// that it hasn't been taken yet.
//
// Format of the invocation ID:
// - highest order bit set to 0 to keep the value positive.
// - next 43 bits set to negated time since some predefined epoch, in ms.
// - next 16 bits are generated by math.Rand
// - next 4 bits set to 0. They indicate ID format.
//
// Makes one attempt at allocating an ID. If it fails (should be extremely
// rare), the entire transaction should be retried. We do it to avoid
// unnecessarily hitting multiple entity groups from a single transaction.
//
// Returns only transient errors.
func generateInvocationID(c context.Context) (int64, error) {
// See http://play.golang.org/p/POpQzpT4Up.
invTs := int64(clock.Now(c).UTC().Sub(invocationIDEpoch) / time.Millisecond)
invTs = ^invTs & 8796093022207 // 0b111....1, 42 bits (clear highest bit)
invTs = invTs << 20
randSuffix := mathrand.Int63n(c, 65536)
invID := invTs | (randSuffix << 4)
exists, err := datastore.Exists(c, datastore.NewKey(c, "Invocation", "", invID, nil))
if err != nil {
return 0, transient.Tag.Apply(err)
}
if !exists.All() {
return invID, nil
}
return 0, errInvocationIDConflict
}
// Invocation entity stores single invocation of a job (with perhaps multiple
// attempts due retries if the invocation fails to start).
//
// Root entity. ID is generated based on time by generateInvocationID()
// function.
type Invocation struct {
_kind string `gae:"$kind,Invocation"`
_extra datastore.PropertyMap `gae:"-,extra"`
// ID is identifier of this particular attempt to run a job.
ID int64 `gae:"$id"`
// JobID is '<ProjectID>/<JobName>' string of a parent job.
//
// Set when the invocation is created and never changes.
JobID string `gae:",noindex"`
// IndexedJobID is '<ProjectID>/<JobName>' string of a parent job, but it is
// set only for finished invocations.
//
// It is used to make the invocations appear in the listings of finished
// invocations.
//
// We can't use JobID field for this since the invocation launch procedure can
// potentially generate orphaned "garbage" invocations in some edge cases (if
// Invocation transaction lands, but separate Job transaction doesn't). They
// are harmless, but we don't want them to show up in listings.
IndexedJobID string
// RealmID is a global realm name (i.e. "<ProjectID>:...") the invocation
// belongs to.
//
// It is copied from the Job entity when the invocation is created. May be
// empty for old invocations.
RealmID string `gae:",noindex"`
// Started is time when this invocation was created.
Started time.Time `gae:",noindex"`
// Finished is time when this invocation transitioned to a terminal state.
Finished time.Time `gae:",noindex"`
// TriggeredBy is identity of whoever triggered the invocation, if it was
// triggered via a single trigger submitted by some external user (not by the
// service itself).
//
// Empty identity string if it was triggered by the service itself.
TriggeredBy identity.Identity
// PropertiesRaw is a blob with serialized task.Request.Properties supplied
// when the invocation was created.
//
// Task managers use it to prepare the parameters for tasks.
PropertiesRaw []byte `gae:",noindex"`
// Tags is a sorted list of indexed "key:value" pairs supplied via
// task.Request.Tags when the invocation was created.
//
// May be passed down the stack by task managers.
Tags []string
// IncomingTriggersRaw is a serialized list of triggers that the invocation
// consumed.
//
// They are popped from job's pending triggers set when the invocation
// starts.
//
// Use IncomingTriggers() function to grab them in deserialized form.
IncomingTriggersRaw []byte `gae:",noindex"`
// OutgoingTriggersRaw is a serialized list of triggers that the invocation
// produced.
//
// They are fanned out into pending trigger sets of corresponding triggered
// jobs (specified by TriggeredJobIDs).
//
// Use OutgoingTriggers() function to grab them in deserialized form.
OutgoingTriggersRaw []byte `gae:",noindex"`
// PendingTimersRaw is a serialized list of pending invocation timers.
//
// Timers are emitted by Controller's AddTimer call.
//
// Use PendingTimers() function to grab them in deserialized form.
PendingTimersRaw []byte `gae:",noindex"`
// Revision is revision number of config.cfg when this invocation was created.
// For informational purpose.
Revision string `gae:",noindex"`
// RevisionURL is URL to human readable page with config file at
// an appropriate revision. For informational purpose.
RevisionURL string `gae:",noindex"`
// Task is the job payload for this invocation in binary serialized form.
// For informational purpose. See Catalog.UnmarshalTask().
Task []byte `gae:",noindex"`
// TriggeredJobIDs is a list of jobIDs of jobs which this job triggers.
// The list is sorted and without duplicates.
TriggeredJobIDs []string `gae:",noindex"`
// DebugLog is short free form text log with debug messages.
DebugLog string `gae:",noindex"`
// RetryCount is 0 on a first attempt to launch the task. Increased with each
// retry. For informational purposes.
RetryCount int64 `gae:",noindex"`
// Status is current status of the invocation (e.g. "RUNNING"), see the enum.
Status task.Status
// ViewURL is optional URL to a human readable page with task status, e.g.
// Swarming task page. Populated by corresponding TaskManager.
ViewURL string `gae:",noindex"`
// TaskData is a storage where TaskManager can keep task-specific state
// between calls.
TaskData []byte `gae:",noindex"`
// MutationsCount is used for simple compare-and-swap transaction control.
//
// It is incremented on each change to the entity.
MutationsCount int64 `gae:",noindex"`
}
// isEqual returns true iff 'e' is equal to 'other'
func (e *Invocation) isEqual(other *Invocation) bool {
return e == other || (e.ID == other.ID &&
e.MutationsCount == other.MutationsCount && // compare it first, it changes most often
e.JobID == other.JobID &&
e.IndexedJobID == other.IndexedJobID &&
e.Started.Equal(other.Started) &&
e.Finished.Equal(other.Finished) &&
e.TriggeredBy == other.TriggeredBy &&
bytes.Equal(e.PropertiesRaw, other.PropertiesRaw) &&
equalSortedLists(e.Tags, other.Tags) &&
bytes.Equal(e.IncomingTriggersRaw, other.IncomingTriggersRaw) &&
bytes.Equal(e.OutgoingTriggersRaw, other.OutgoingTriggersRaw) &&
bytes.Equal(e.PendingTimersRaw, other.PendingTimersRaw) &&
e.Revision == other.Revision &&
e.RevisionURL == other.RevisionURL &&
bytes.Equal(e.Task, other.Task) &&
equalSortedLists(e.TriggeredJobIDs, other.TriggeredJobIDs) &&
e.DebugLog == other.DebugLog &&
e.RetryCount == other.RetryCount &&
e.Status == other.Status &&
e.ViewURL == other.ViewURL &&
bytes.Equal(e.TaskData, other.TaskData))
}
// GetProjectID parses the ProjectID from the JobID and returns it.
func (e *Invocation) GetProjectID() string {
parts := strings.Split(e.JobID, "/")
return parts[0]
}
// debugLog appends a line to DebugLog field.
func (e *Invocation) debugLog(c context.Context, format string, args ...interface{}) {
debugLog(c, &e.DebugLog, format, args...)
}
// trimDebugLog makes sure DebugLog field doesn't exceed limits.
//
// It cuts the middle of the log. We need to do this to keep the entity small
// enough to fit the datastore limits.
func (e *Invocation) trimDebugLog() {
if len(e.DebugLog) <= debugLogSizeLimit {
return
}
const cutMsg = "--- the log has been cut here ---"
giveUp := func() {
e.DebugLog = e.DebugLog[:debugLogSizeLimit-len(cutMsg)-2] + "\n" + cutMsg + "\n"
}
// We take last debugLogTailLines lines of log and move them "up", so that
// the total log size is less than debugLogSizeLimit. We then put a line with
// the message that some log lines have been cut. If these operations are not
// possible (e.g. we have some giant lines or something), we give up and just
// cut the end of the log.
// Find debugLogTailLines-th "\n" from the end, e.DebugLog[tailStart:] is the
// log tail.
tailStart := len(e.DebugLog)
for i := 0; i < debugLogTailLines; i++ {
tailStart = strings.LastIndex(e.DebugLog[:tailStart-1], "\n")
if tailStart <= 0 {
giveUp()
return
}
}
tailStart++
// Figure out how many bytes of head we can keep to make trimmed log small
// enough.
tailLen := len(e.DebugLog) - tailStart + len(cutMsg) + 1
headSize := debugLogSizeLimit - tailLen
if headSize <= 0 {
giveUp()
return
}
// Find last "\n" in the head.
headEnd := strings.LastIndex(e.DebugLog[:headSize], "\n")
if headEnd <= 0 {
giveUp()
return
}
// We want to keep 50 lines of the head no matter what.
headLines := strings.Count(e.DebugLog[:headEnd], "\n")
if headLines < 50 {
giveUp()
return
}
// Remove duplicated 'cutMsg' lines. They may appear if 'debugLog' (followed
// by 'trimDebugLog') is called on already trimmed log multiple times.
lines := strings.Split(e.DebugLog[:headEnd], "\n")
lines = append(lines, cutMsg)
lines = append(lines, strings.Split(e.DebugLog[tailStart:], "\n")...)
trimmed := make([]byte, 0, debugLogSizeLimit)
trimmed = append(trimmed, lines[0]...)
for i := 1; i < len(lines); i++ {
if !(lines[i-1] == cutMsg && lines[i] == cutMsg) {
trimmed = append(trimmed, '\n')
trimmed = append(trimmed, lines[i]...)
}
}
e.DebugLog = string(trimmed)
}
// IncomingTriggers is a list of triggers that the invocation consumed.
//
// It is deserialized on the fly from IncomingTriggersRaw.
func (e *Invocation) IncomingTriggers() ([]*internal.Trigger, error) {
return unmarshalTriggersList(e.IncomingTriggersRaw)
}
// OutgoingTriggers is a list of triggers that the invocation produced.
//
// It is deserialized on the fly from OutgoingTriggersRaw.
func (e *Invocation) OutgoingTriggers() ([]*internal.Trigger, error) {
return unmarshalTriggersList(e.OutgoingTriggersRaw)
}
// PendingTimers is a list of not-yet-consumed invocation timers.
//
// It is deserialized on the fly from PendingTimersRaw.
func (e *Invocation) PendingTimers() ([]*internal.Timer, error) {
return unmarshalTimersList(e.PendingTimersRaw)
}
// cleanupUnreferencedInvocations tries to delete given invocations.
//
// This is best effort cleanup after failures. It logs errors, but doesn't
// return them, to indicate that there's nothing we can actually do.
//
// 'invs' is allowed to have nils, they are skipped. Allowed to be called
// within a transaction, ignores it.
func cleanupUnreferencedInvocations(c context.Context, invs []*Invocation) {
keysToKill := make([]*datastore.Key, 0, len(invs))
for _, inv := range invs {
if inv != nil {
logging.Warningf(c, "Cleaning up inv %d of job %q", inv.ID, inv.JobID)
keysToKill = append(keysToKill, datastore.KeyForObj(c, inv))
}
}
if err := datastore.Delete(datastore.WithoutTransaction(c), keysToKill); err != nil {
logging.WithError(err).Warningf(c, "Invocation cleanup failed")
}
}
// reportCompletionMetrics reports invocation stats to monitoring.
// Should be called after transaction to save this invocation is completed.
func (e *Invocation) reportCompletionMetrics(c context.Context) {
if !e.Status.Final() || e.Finished.IsZero() {
panic(fmt.Errorf("reportCompletionMetrics on incomplete invocation: %v", e))
}
duration := e.Finished.Sub(e.Started)
metricInvocationsDurations.Add(c, duration.Seconds(), e.JobID, string(e.Status))
}