| // Copyright 2016 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package model |
| |
| import ( |
| "context" |
| "crypto/subtle" |
| "encoding/hex" |
| "errors" |
| "fmt" |
| "math" |
| "time" |
| |
| "google.golang.org/grpc/codes" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/data/rand/cryptorand" |
| "go.chromium.org/luci/common/logging" |
| google_pb "go.chromium.org/luci/common/proto/google" |
| dm "go.chromium.org/luci/dm/api/service/v1" |
| ds "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/grpc/grpcutil" |
| ) |
| |
| const ek = logging.ErrorKey |
| |
| type invertedHexUint32 uint32 |
| |
| const invertedHexUint32RenderFmt = "%08x" |
| |
| var _ ds.PropertyConverter = (*invertedHexUint32)(nil) |
| |
| func (i *invertedHexUint32) ToProperty() (ret ds.Property, err error) { |
| err = ret.SetValue(fmt.Sprintf( |
| invertedHexUint32RenderFmt, (*i)^math.MaxUint32), ds.NoIndex) |
| return |
| } |
| |
| func (i *invertedHexUint32) FromProperty(p ds.Property) (err error) { |
| sVal, err := p.Project(ds.PTString) |
| if err != nil { |
| return |
| } |
| |
| tmp := uint32(0) |
| if _, err = fmt.Sscanf(sVal.(string), invertedHexUint32RenderFmt, &tmp); err != nil { |
| return |
| } |
| *i = invertedHexUint32(tmp ^ math.MaxUint32) |
| return |
| } |
| |
| // Execution represents either an ongoing execution on the Quest's specified |
| // distributor, or is a placeholder for an already-completed Execution. |
| type Execution struct { |
| ID invertedHexUint32 `gae:"$id"` |
| Attempt *ds.Key `gae:"$parent"` |
| |
| Created time.Time |
| Modified time.Time |
| |
| // DistributorConfigName is redundant with the Quest definition, but this |
| // helps avoid extra unnecessary datastore round-trips to load the Quest. |
| DistributorConfigName string |
| DistributorConfigVersion string |
| DistributorToken string |
| |
| State dm.Execution_State |
| |
| // IsAbnormal is true iff State==ABNORMAL_FINISHED. Used for walk_graph. |
| IsAbnormal bool |
| |
| // A lazily-updated boolean to reflect that this Execution is expired for |
| // queries. |
| IsExpired bool |
| |
| // Contains either data (State==FINISHED) or abnormal_finish (State==ABNORMAL_FINISHED) |
| Result dm.Result `gae:",noindex"` |
| |
| // These are DM's internal mechanism for performing timeout actions on |
| // Executions. |
| // |
| // The TimeTo* variables are copied from the quest description. |
| // |
| // The Timeout is only active when the Execution is in a non-terminal state. |
| TimeToStart time.Duration `gae:",noindex"` // timeouts.start |
| TimeToRun time.Duration `gae:",noindex"` // timeouts.run |
| TimeToStop time.Duration `gae:",noindex"` // pollTimeout || timeouts.stop |
| |
| // Token is a randomized nonce that's used to verify that RPCs verify from the |
| // expected client (the client that's currently running the Execution). The |
| // Token has 2 modes. |
| // |
| // When the Execution is handed to the distributor, the Token is randomly |
| // generated by DM and passed to the distributor. The State of the Execution |
| // starts as SCHEDULED. This token may be used by the client to "activate" the |
| // Execution with the ActivateExecution rpc. At that point, the client |
| // provides a new random token, the Execution State moves from SCHEDULED to |
| // RUNNING, and Token assumes the new value. As long as the Execution State is |
| // RUNNING, the client may continue to use that new Token value to |
| // authenticate other rpc's like AddDeps and FinishAttempt. |
| // |
| // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED |
| // state, this will be nil'd out. |
| Token []byte `gae:",noindex"` |
| } |
| |
| // MakeExecution makes a new Execution in the SCHEDULING state, with a new |
| // random Token. |
| func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution { |
| now := clock.Now(c).UTC() |
| ret := &Execution{ |
| ID: invertedHexUint32(e.Id), |
| Attempt: AttemptKeyFromID(c, e.AttemptID()), |
| |
| Created: now, |
| Modified: now, |
| |
| DistributorConfigName: cfgName, |
| DistributorConfigVersion: cfgVers, |
| |
| Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), |
| } |
| return ret |
| } |
| |
| // ModifyState changes the current state of this Execution and updates its |
| // Modified timestamp. |
| func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { |
| if e.State == newState { |
| return nil |
| } |
| if err := e.State.Evolve(newState); err != nil { |
| return err |
| } |
| now := clock.Now(c).UTC() |
| if now.After(e.Modified) { |
| e.Modified = now |
| } else { |
| // Microsecond is the smallest granularity that datastore can store |
| // timestamps, so use that to disambiguate: the goal here is that any |
| // modification always increments the modified time, and never decrements |
| // it. |
| e.Modified = e.Modified.Add(time.Microsecond) |
| } |
| return nil |
| } |
| |
| // MakeRandomToken creates a cryptographically random byte slice of the |
| // specified length. It panics if the specified length cannot be read in full. |
| func MakeRandomToken(c context.Context, l uint32) []byte { |
| rtok := make([]byte, l) |
| if _, err := cryptorand.Read(c, rtok); err != nil { |
| panic(err) |
| } |
| return rtok |
| } |
| |
| // Revoke will clear the Token and Put this Execution to the datastore. This |
| // action requires the Execution to be in the RUNNING state, and causes it to |
| // enter the STOPPING state. |
| func (e *Execution) Revoke(c context.Context) error { |
| e.Token = nil |
| if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { |
| return err |
| } |
| return ds.Put(c, e) |
| } |
| |
| func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Execution, err error) { |
| a = &Attempt{ID: *eid.AttemptID()} |
| e = &Execution{ID: invertedHexUint32(eid.Id), Attempt: ds.KeyForObj(c, a)} |
| err = ds.Get(c, a, e) |
| |
| if err != nil { |
| err = grpcutil.Errf(codes.Internal, |
| "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) |
| return |
| } |
| |
| if a.CurExecution != uint32(e.ID) { |
| err = fmt.Errorf("verifying incorrect execution %d, expected %d", a.CurExecution, e.ID) |
| return |
| } |
| return |
| } |
| |
| func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { |
| a, e, err = loadExecution(c, auth.Id) |
| if err != nil { |
| return |
| } |
| |
| if a.State != dm.Attempt_EXECUTING { |
| err = errors.New("Attempt is not executing") |
| return |
| } |
| |
| if e.State != dm.Execution_RUNNING { |
| err = errors.New("Execution is not running") |
| return |
| } |
| |
| if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
| err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth.Token)) |
| } |
| return |
| } |
| |
| func makeError(err error, msg string) error { |
| code := grpcutil.Code(err) |
| if code == codes.Unknown { |
| code = codes.PermissionDenied |
| } |
| return grpcutil.Errf(code, msg) |
| } |
| |
| // AuthenticateExecution verifies that the Attempt is executing, and that evkey |
| // matches the execution key of the current Execution for this Attempt. |
| // |
| // As a bonus, it will return the loaded Attempt and Execution. |
| func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { |
| a, e, err = verifyExecutionAndCheckExTok(c, auth) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
| err = makeError(err, "requires execution Auth") |
| } |
| return a, e, err |
| } |
| |
| // InvalidateExecution verifies that the execution key is valid, and then |
| // revokes the execution key. |
| // |
| // As a bonus, it will return the loaded Attempt and Execution. |
| func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { |
| if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
| err = makeError(err, "requires execution Auth") |
| return |
| } |
| |
| err = e.Revoke(c) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution") |
| err = makeError(err, "unable to invalidate Auth") |
| } |
| return |
| } |
| |
| func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actTok []byte) (a *Attempt, e *Execution, err error) { |
| a, e, err = loadExecution(c, auth.Id) |
| if err != nil { |
| return |
| } |
| |
| if a.State != dm.Attempt_EXECUTING { |
| err = errors.New("Attempt is in wrong state") |
| return |
| } |
| |
| switch e.State { |
| case dm.Execution_SCHEDULING: |
| if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
| err = errors.New("incorrect ActivationToken") |
| return |
| } |
| |
| e.State.MustEvolve(dm.Execution_RUNNING) |
| e.Token = actTok |
| err = ds.Put(c, e) |
| logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id) |
| |
| case dm.Execution_RUNNING: |
| if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
| err = errors.New("incorrect Token") |
| } |
| // either the Token matched, in which case this is simply a retry |
| // by the same client, so there's no error, or it's wrong which means it's |
| // a retry by a different client. |
| |
| logging.Infof(c, "already activated execution %s", auth.Id) |
| |
| default: |
| err = fmt.Errorf("Execution is in wrong state") |
| } |
| return |
| } |
| |
| // ActivateExecution validates that the execution is unactivated and that |
| // the activation token matches and then sets the token to the new |
| // value. |
| // |
| // It's OK to retry this. Subsequent invocations with the same Token |
| // will recognize this case and not return an error. |
| func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []byte) (a *Attempt, e *Execution, err error) { |
| a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution") |
| err = makeError(err, "failed to activate execution Auth") |
| } |
| return a, e, err |
| } |
| |
| // GetEID gets an Execution_ID for this Execution. It panics if the Execution |
| // is in an invalid state. |
| func (e *Execution) GetEID() *dm.Execution_ID { |
| aid := &dm.Attempt_ID{} |
| if e.ID == 0 { |
| panic("cannot create valid Execution_ID with 0-value ID field") |
| } |
| if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
| panic(err) |
| } |
| return dm.NewExecutionID(aid.Quest, aid.Id, uint32(e.ID)) |
| } |
| |
| // ToProto returns a dm proto version of this Execution. |
| func (e *Execution) ToProto(includeID bool) *dm.Execution { |
| ret := &dm.Execution{Data: e.DataProto()} |
| if includeID { |
| ret.Id = e.GetEID() |
| } |
| return ret |
| } |
| |
| // DataProto returns an Execution.Data message for this Execution. |
| // |
| // This omits the DistributorInfo.Url portion, which must be filled in elsewhere for |
| // package cyclical import reasons. |
| func (e *Execution) DataProto() (ret *dm.Execution_Data) { |
| switch e.State { |
| case dm.Execution_SCHEDULING: |
| ret = dm.NewExecutionScheduling().Data |
| case dm.Execution_RUNNING: |
| ret = dm.NewExecutionRunning().Data |
| case dm.Execution_STOPPING: |
| ret = dm.NewExecutionStopping().Data |
| case dm.Execution_FINISHED: |
| ret = dm.NewExecutionFinished(e.Result.Data).Data |
| case dm.Execution_ABNORMAL_FINISHED: |
| ret = dm.NewExecutionAbnormalFinish(e.Result.AbnormalFinish).Data |
| default: |
| panic(fmt.Errorf("unknown Execution_State: %s", e.State)) |
| } |
| ret.Created = google_pb.NewTimestamp(e.Created) |
| ret.Modified = google_pb.NewTimestamp(e.Modified) |
| ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ |
| ConfigName: e.DistributorConfigName, |
| ConfigVersion: e.DistributorConfigVersion, |
| Token: e.DistributorToken, |
| } |
| return ret |
| } |