blob: 9847161f757e32141c446f941880dc68532951f1 [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"context"
"crypto/subtle"
"encoding/hex"
"errors"
"fmt"
"math"
"time"
"google.golang.org/grpc/codes"
ds "go.chromium.org/gae/service/datastore"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/cryptorand"
"go.chromium.org/luci/common/logging"
google_pb "go.chromium.org/luci/common/proto/google"
dm "go.chromium.org/luci/dm/api/service/v1"
"go.chromium.org/luci/grpc/grpcutil"
)
const ek = logging.ErrorKey
type invertedHexUint32 uint32
const invertedHexUint32RenderFmt = "%08x"
var _ ds.PropertyConverter = (*invertedHexUint32)(nil)
func (i *invertedHexUint32) ToProperty() (ret ds.Property, err error) {
err = ret.SetValue(fmt.Sprintf(
invertedHexUint32RenderFmt, (*i)^math.MaxUint32), ds.NoIndex)
return
}
func (i *invertedHexUint32) FromProperty(p ds.Property) (err error) {
sVal, err := p.Project(ds.PTString)
if err != nil {
return
}
tmp := uint32(0)
if _, err = fmt.Sscanf(sVal.(string), invertedHexUint32RenderFmt, &tmp); err != nil {
return
}
*i = invertedHexUint32(tmp ^ math.MaxUint32)
return
}
// Execution represents either an ongoing execution on the Quest's specified
// distributor, or is a placeholder for an already-completed Execution.
type Execution struct {
ID invertedHexUint32 `gae:"$id"`
Attempt *ds.Key `gae:"$parent"`
Created time.Time
Modified time.Time
// DistributorConfigName is redundant with the Quest definition, but this
// helps avoid extra unnecessary datastore round-trips to load the Quest.
DistributorConfigName string
DistributorConfigVersion string
DistributorToken string
State dm.Execution_State
// IsAbnormal is true iff State==ABNORMAL_FINISHED. Used for walk_graph.
IsAbnormal bool
// A lazily-updated boolean to reflect that this Execution is expired for
// queries.
IsExpired bool
// Contains either data (State==FINISHED) or abnormal_finish (State==ABNORMAL_FINISHED)
Result dm.Result `gae:",noindex"`
// These are DM's internal mechanism for performing timeout actions on
// Executions.
//
// The TimeTo* variables are copied from the quest description.
//
// The Timeout is only active when the Execution is in a non-terminal state.
TimeToStart time.Duration `gae:",noindex"` // timeouts.start
TimeToRun time.Duration `gae:",noindex"` // timeouts.run
TimeToStop time.Duration `gae:",noindex"` // pollTimeout || timeouts.stop
// Token is a randomized nonce that's used to verify that RPCs verify from the
// expected client (the client that's currently running the Execution). The
// Token has 2 modes.
//
// When the Execution is handed to the distributor, the Token is randomly
// generated by DM and passed to the distributor. The State of the Execution
// starts as SCHEDULED. This token may be used by the client to "activate" the
// Execution with the ActivateExecution rpc. At that point, the client
// provides a new random token, the Execution State moves from SCHEDULED to
// RUNNING, and Token assumes the new value. As long as the Execution State is
// RUNNING, the client may continue to use that new Token value to
// authenticate other rpc's like AddDeps and FinishAttempt.
//
// As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED
// state, this will be nil'd out.
Token []byte `gae:",noindex"`
}
// MakeExecution makes a new Execution in the SCHEDULING state, with a new
// random Token.
func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution {
now := clock.Now(c).UTC()
ret := &Execution{
ID: invertedHexUint32(e.Id),
Attempt: AttemptKeyFromID(c, e.AttemptID()),
Created: now,
Modified: now,
DistributorConfigName: cfgName,
DistributorConfigVersion: cfgVers,
Token: MakeRandomToken(c, dm.MinimumActivationTokenLength),
}
return ret
}
// ModifyState changes the current state of this Execution and updates its
// Modified timestamp.
func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error {
if e.State == newState {
return nil
}
if err := e.State.Evolve(newState); err != nil {
return err
}
now := clock.Now(c).UTC()
if now.After(e.Modified) {
e.Modified = now
} else {
// Microsecond is the smallest granularity that datastore can store
// timestamps, so use that to disambiguate: the goal here is that any
// modification always increments the modified time, and never decrements
// it.
e.Modified = e.Modified.Add(time.Microsecond)
}
return nil
}
// MakeRandomToken creates a cryptographically random byte slice of the
// specified length. It panics if the specified length cannot be read in full.
func MakeRandomToken(c context.Context, l uint32) []byte {
rtok := make([]byte, l)
if _, err := cryptorand.Read(c, rtok); err != nil {
panic(err)
}
return rtok
}
// Revoke will clear the Token and Put this Execution to the datastore. This
// action requires the Execution to be in the RUNNING state, and causes it to
// enter the STOPPING state.
func (e *Execution) Revoke(c context.Context) error {
e.Token = nil
if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil {
return err
}
return ds.Put(c, e)
}
func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Execution, err error) {
a = &Attempt{ID: *eid.AttemptID()}
e = &Execution{ID: invertedHexUint32(eid.Id), Attempt: ds.KeyForObj(c, a)}
err = ds.Get(c, a, e)
if err != nil {
err = grpcutil.Errf(codes.Internal,
"couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
return
}
if a.CurExecution != uint32(e.ID) {
err = fmt.Errorf("verifying incorrect execution %d, expected %d", a.CurExecution, e.ID)
return
}
return
}
func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
a, e, err = loadExecution(c, auth.Id)
if err != nil {
return
}
if a.State != dm.Attempt_EXECUTING {
err = errors.New("Attempt is not executing")
return
}
if e.State != dm.Execution_RUNNING {
err = errors.New("Execution is not running")
return
}
if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth.Token))
}
return
}
func makeError(err error, msg string) error {
code := grpcutil.Code(err)
if code == codes.Unknown {
code = codes.PermissionDenied
}
return grpcutil.Errf(code, msg)
}
// AuthenticateExecution verifies that the Attempt is executing, and that evkey
// matches the execution key of the current Execution for this Attempt.
//
// As a bonus, it will return the loaded Attempt and Execution.
func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
a, e, err = verifyExecutionAndCheckExTok(c, auth)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
err = makeError(err, "requires execution Auth")
}
return a, e, err
}
// InvalidateExecution verifies that the execution key is valid, and then
// revokes the execution key.
//
// As a bonus, it will return the loaded Attempt and Execution.
func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
err = makeError(err, "requires execution Auth")
return
}
err = e.Revoke(c)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution")
err = makeError(err, "unable to invalidate Auth")
}
return
}
func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actTok []byte) (a *Attempt, e *Execution, err error) {
a, e, err = loadExecution(c, auth.Id)
if err != nil {
return
}
if a.State != dm.Attempt_EXECUTING {
err = errors.New("Attempt is in wrong state")
return
}
switch e.State {
case dm.Execution_SCHEDULING:
if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
err = errors.New("incorrect ActivationToken")
return
}
e.State.MustEvolve(dm.Execution_RUNNING)
e.Token = actTok
err = ds.Put(c, e)
logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id)
case dm.Execution_RUNNING:
if subtle.ConstantTimeCompare(e.Token, actTok) != 1 {
err = errors.New("incorrect Token")
}
// either the Token matched, in which case this is simply a retry
// by the same client, so there's no error, or it's wrong which means it's
// a retry by a different client.
logging.Infof(c, "already activated execution %s", auth.Id)
default:
err = fmt.Errorf("Execution is in wrong state")
}
return
}
// ActivateExecution validates that the execution is unactivated and that
// the activation token matches and then sets the token to the new
// value.
//
// It's OK to retry this. Subsequent invocations with the same Token
// will recognize this case and not return an error.
func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []byte) (a *Attempt, e *Execution, err error) {
a, e, err = verifyExecutionAndActivate(c, auth, actToken)
if err != nil {
logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution")
err = makeError(err, "failed to activate execution Auth")
}
return a, e, err
}
// GetEID gets an Execution_ID for this Execution. It panics if the Execution
// is in an invalid state.
func (e *Execution) GetEID() *dm.Execution_ID {
aid := &dm.Attempt_ID{}
if e.ID == 0 {
panic("cannot create valid Execution_ID with 0-value ID field")
}
if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
panic(err)
}
return dm.NewExecutionID(aid.Quest, aid.Id, uint32(e.ID))
}
// ToProto returns a dm proto version of this Execution.
func (e *Execution) ToProto(includeID bool) *dm.Execution {
ret := &dm.Execution{Data: e.DataProto()}
if includeID {
ret.Id = e.GetEID()
}
return ret
}
// DataProto returns an Execution.Data message for this Execution.
//
// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for
// package cyclical import reasons.
func (e *Execution) DataProto() (ret *dm.Execution_Data) {
switch e.State {
case dm.Execution_SCHEDULING:
ret = dm.NewExecutionScheduling().Data
case dm.Execution_RUNNING:
ret = dm.NewExecutionRunning().Data
case dm.Execution_STOPPING:
ret = dm.NewExecutionStopping().Data
case dm.Execution_FINISHED:
ret = dm.NewExecutionFinished(e.Result.Data).Data
case dm.Execution_ABNORMAL_FINISHED:
ret = dm.NewExecutionAbnormalFinish(e.Result.AbnormalFinish).Data
default:
panic(fmt.Errorf("unknown Execution_State: %s", e.State))
}
ret.Created = google_pb.NewTimestamp(e.Created)
ret.Modified = google_pb.NewTimestamp(e.Modified)
ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{
ConfigName: e.DistributorConfigName,
ConfigVersion: e.DistributorConfigVersion,
Token: e.DistributorToken,
}
return ret
}