| // Copyright 2020 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package impl |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "time" |
| |
| "google.golang.org/protobuf/encoding/protojson" |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/gae/service/datastore" |
| |
| "go.chromium.org/luci/cv/internal/changelist" |
| "go.chromium.org/luci/cv/internal/common" |
| "go.chromium.org/luci/cv/internal/common/bq" |
| "go.chromium.org/luci/cv/internal/common/eventbox" |
| "go.chromium.org/luci/cv/internal/common/lease" |
| "go.chromium.org/luci/cv/internal/common/tree" |
| "go.chromium.org/luci/cv/internal/gerrit" |
| "go.chromium.org/luci/cv/internal/gerrit/cancel" |
| "go.chromium.org/luci/cv/internal/gerrit/updater" |
| "go.chromium.org/luci/cv/internal/prjmanager" |
| "go.chromium.org/luci/cv/internal/run" |
| runbq "go.chromium.org/luci/cv/internal/run/bq" |
| "go.chromium.org/luci/cv/internal/run/eventpb" |
| "go.chromium.org/luci/cv/internal/run/impl/handler" |
| "go.chromium.org/luci/cv/internal/run/impl/state" |
| "go.chromium.org/luci/cv/internal/run/pubsub" |
| ) |
| |
| // maxEventsPerBatch limits the number of incoming events the PM will process at |
| // once. |
| // |
| // This shouldn't be hit in practice under normal operation. This is chosen such |
| // that RM can read these events and make some progress in 1 minute. |
| const maxEventsPerBatch = 10000 |
| |
| // RunManager manages Runs. |
| // |
| // It decides starting, cancelation, submission etc. for Runs. |
| type RunManager struct { |
| runNotifier *run.Notifier |
| pmNotifier *prjmanager.Notifier |
| handler handler.Handler |
| } |
| |
| // New constructs a new RunManager instance. |
| func New( |
| n *run.Notifier, |
| pm *prjmanager.Notifier, |
| clm *changelist.Mutator, |
| u *updater.Updater, |
| g gerrit.Factory, |
| tc tree.Client, |
| bqc bq.Client, |
| ) *RunManager { |
| rm := &RunManager{n, pm, &handler.Impl{ |
| PM: pm, |
| RM: n, |
| CLUpdater: u, |
| CLMutator: clm, |
| BQExporter: runbq.NewExporter(n.TasksBinding.TQDispatcher, bqc), |
| GFactory: g, |
| TreeClient: tc, |
| Publisher: pubsub.NewPublisher(n.TasksBinding.TQDispatcher), |
| }} |
| n.TasksBinding.ManageRun.AttachHandler( |
| func(ctx context.Context, payload proto.Message) error { |
| task := payload.(*eventpb.ManageRunTask) |
| ctx = logging.SetField(ctx, "run", task.GetRunId()) |
| err := rm.manageRun(ctx, common.RunID(task.GetRunId())) |
| // TODO(tandrii/yiwzhang): avoid retries iff we know a new task was |
| // already scheduled for the next second. |
| return common.TQIfy{ |
| KnownRetry: []error{ |
| handler.ErrTransientSubmissionFailure, |
| }, |
| KnownIgnoreTags: []errors.BoolTag{ |
| cancel.ErrPreconditionFailedTag, |
| common.DSContentionTag, |
| ignoreErrTag, |
| }, |
| }.Error(ctx, err) |
| }, |
| ) |
| return rm |
| } |
| |
| var ignoreErrTag = errors.BoolTag{ |
| Key: errors.NewTagKey("intentionally ignored rm error"), |
| } |
| |
| var pokeInterval = 5 * time.Minute |
| |
| var fakeHandlerKey = "Fake Run Events Handler" |
| |
| func (rm *RunManager) manageRun(ctx context.Context, runID common.RunID) error { |
| proc := &runProcessor{ |
| runID: runID, |
| runNotifier: rm.runNotifier, |
| pmNotifier: rm.pmNotifier, |
| handler: rm.handler, |
| } |
| if h, ok := ctx.Value(&fakeHandlerKey).(handler.Handler); ok { |
| proc.handler = h |
| } |
| recipient := run.EventboxRecipient(ctx, runID) |
| postProcessFns, processErr := eventbox.ProcessBatch(ctx, recipient, proc, maxEventsPerBatch) |
| if processErr != nil { |
| if alreadyInLeaseErr, ok := lease.IsAlreadyInLeaseErr(processErr); ok { |
| expireTime := alreadyInLeaseErr.ExpireTime.UTC() |
| if err := rm.runNotifier.Invoke(ctx, runID, expireTime); err != nil { |
| return err |
| } |
| logging.Infof(ctx, "failed to acquire the lease for %q, revisit at %s", alreadyInLeaseErr.ResourceID, expireTime) |
| return ignoreErrTag.Apply(processErr) |
| } |
| return processErr |
| } |
| |
| for _, postProcessFn := range postProcessFns { |
| if err := postProcessFn(ctx); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // runProcessor implements eventbox.Processor. |
| type runProcessor struct { |
| runID common.RunID |
| |
| runNotifier *run.Notifier |
| pmNotifier *prjmanager.Notifier |
| |
| handler handler.Handler |
| } |
| |
| var _ eventbox.Processor = (*runProcessor)(nil) |
| |
| // LoadState is called to load the state before a transaction. |
| func (rp *runProcessor) LoadState(ctx context.Context) (eventbox.State, eventbox.EVersion, error) { |
| r := run.Run{ID: rp.runID} |
| switch err := datastore.Get(ctx, &r); { |
| case err == datastore.ErrNoSuchEntity: |
| err = errors.Reason("CRITICAL: requested run entity %q is missing in datastore.", rp.runID).Err() |
| common.LogError(ctx, err) |
| panic(err) |
| case err != nil: |
| return nil, 0, errors.Annotate(err, "failed to get Run %q", rp.runID).Tag(transient.Tag).Err() |
| } |
| rs := &state.RunState{Run: r} |
| return rs, eventbox.EVersion(r.EVersion), nil |
| } |
| |
| // PrepareMutation is called before a transaction to compute transitions based on a |
| // batch of events. |
| // |
| // All actions that must be done atomically with updating state must be |
| // encapsulated inside Transition.SideEffectFn callback. |
| func (rp *runProcessor) PrepareMutation(ctx context.Context, events eventbox.Events, s eventbox.State) ([]eventbox.Transition, eventbox.Events, error) { |
| tr := &triageResult{} |
| var eventLog strings.Builder |
| eventLog.WriteString(fmt.Sprintf("Received %d events: ", len(events))) |
| for _, e := range events { |
| eventLog.WriteRune('\n') |
| eventLog.WriteString(" * ") |
| tr.triage(ctx, e, &eventLog) |
| } |
| logging.Debugf(ctx, eventLog.String()) |
| ts, err := rp.processTriageResults(ctx, tr, s.(*state.RunState)) |
| return ts, tr.garbage, err |
| } |
| |
| // FetchEVersion is called at the beginning of a transaction. |
| // |
| // The returned EVersion is compared against the one associated with a state |
| // loaded via GetState. If different, the transaction is aborted and new state |
| // isn't saved. |
| func (rp *runProcessor) FetchEVersion(ctx context.Context) (eventbox.EVersion, error) { |
| r := &run.Run{ID: rp.runID} |
| if err := datastore.Get(ctx, r); err != nil { |
| return 0, errors.Annotate(err, "failed to get %q", rp.runID).Tag(transient.Tag).Err() |
| } |
| return eventbox.EVersion(r.EVersion), nil |
| } |
| |
| // SaveState is called in a transaction to save the state if it has changed. |
| // |
| // The passed eversion is incremented value of eversion of what GetState |
| // returned before. |
| func (rp *runProcessor) SaveState(ctx context.Context, st eventbox.State, ev eventbox.EVersion) error { |
| rs := st.(*state.RunState) |
| r := rs.Run |
| r.EVersion = int(ev) |
| r.UpdateTime = datastore.RoundTime(clock.Now(ctx).UTC()) |
| if err := datastore.Put(ctx, &r); err != nil { |
| return errors.Annotate(err, "failed to put Run %q", r.ID).Tag(transient.Tag).Err() |
| } |
| if len(rs.LogEntries) > 0 { |
| l := run.RunLog{ |
| ID: int64(ev), |
| Run: datastore.MakeKey(ctx, run.RunKind, string(r.ID)), |
| Entries: &run.LogEntries{Entries: rs.LogEntries}, |
| } |
| if err := datastore.Put(ctx, &l); err != nil { |
| return errors.Annotate(err, "failed to put RunLog %q", r.ID).Tag(transient.Tag).Err() |
| } |
| } |
| return nil |
| } |
| |
| // triageResult is the result of the triage of the incoming events. |
| type triageResult struct { |
| startEvents eventbox.Events |
| cancelEvents eventbox.Events |
| pokeEvents eventbox.Events |
| newConfigEvents struct { |
| events eventbox.Events |
| hash string |
| eversion int64 |
| } |
| clUpdatedEvents struct { |
| events eventbox.Events |
| cls common.CLIDs |
| } |
| readyForSubmissionEvents eventbox.Events |
| clSubmittedEvents struct { |
| events eventbox.Events |
| cls common.CLIDs |
| } |
| submissionCompletedEvent struct { |
| event eventbox.Event |
| sc *eventpb.SubmissionCompleted |
| } |
| cqdVerificationCompletedEvents eventbox.Events |
| cqdTryjobsUpdated eventbox.Events |
| nextReadyEventTime time.Time |
| // These events can be deleted even before the transaction starts. |
| garbage eventbox.Events |
| } |
| |
| func (tr *triageResult) triage(ctx context.Context, item eventbox.Event, eventLog *strings.Builder) { |
| e := &eventpb.Event{} |
| if err := proto.Unmarshal(item.Value, e); err != nil { |
| // This is a bug in code or data corruption. |
| // There is no way to recover on its own. |
| logging.Errorf(ctx, "CRITICAL: failed to deserialize event %q: %s", item.ID, err) |
| panic(err) |
| } |
| eventLog.WriteString(fmt.Sprintf("%T: ", e.GetEvent())) |
| // use compact json to make log short. |
| eventLog.WriteString((protojson.MarshalOptions{Multiline: false}).Format(e)) |
| if pa := e.GetProcessAfter().AsTime(); pa.After(clock.Now(ctx)) { |
| if tr.nextReadyEventTime.IsZero() || pa.Before(tr.nextReadyEventTime) { |
| tr.nextReadyEventTime = pa |
| } |
| return |
| } |
| switch e.GetEvent().(type) { |
| case *eventpb.Event_Start: |
| tr.startEvents = append(tr.startEvents, item) |
| case *eventpb.Event_Cancel: |
| tr.cancelEvents = append(tr.cancelEvents, item) |
| case *eventpb.Event_Poke: |
| tr.pokeEvents = append(tr.pokeEvents, item) |
| case *eventpb.Event_NewConfig: |
| // Record all events but only the latest config hash. |
| tr.newConfigEvents.events = append(tr.newConfigEvents.events, item) |
| if ev := e.GetNewConfig().GetEversion(); ev > tr.newConfigEvents.eversion { |
| tr.newConfigEvents.eversion = ev |
| tr.newConfigEvents.hash = e.GetNewConfig().GetHash() |
| } |
| case *eventpb.Event_ClUpdated: |
| tr.clUpdatedEvents.events = append(tr.clUpdatedEvents.events, item) |
| tr.clUpdatedEvents.cls = append(tr.clUpdatedEvents.cls, common.CLID(e.GetClUpdated().GetClid())) |
| case *eventpb.Event_ClsUpdated: |
| tr.clUpdatedEvents.events = append(tr.clUpdatedEvents.events, item) |
| for _, one := range e.GetClsUpdated().GetEvents() { |
| tr.clUpdatedEvents.cls = append(tr.clUpdatedEvents.cls, common.CLID(one.GetClid())) |
| } |
| case *eventpb.Event_ReadyForSubmission: |
| tr.readyForSubmissionEvents = append(tr.readyForSubmissionEvents, item) |
| case *eventpb.Event_ClSubmitted: |
| tr.clSubmittedEvents.events = append(tr.clSubmittedEvents.events, item) |
| tr.clSubmittedEvents.cls = append(tr.clSubmittedEvents.cls, common.CLID(e.GetClSubmitted().GetClid())) |
| case *eventpb.Event_SubmissionCompleted: |
| if tr.submissionCompletedEvent.sc != nil { |
| panic("received more than 1 SubmissionCompleted result") |
| } |
| tr.submissionCompletedEvent.event = item |
| tr.submissionCompletedEvent.sc = e.GetSubmissionCompleted() |
| case *eventpb.Event_CqdVerificationCompleted: |
| tr.cqdVerificationCompletedEvents = append(tr.cqdVerificationCompletedEvents, item) |
| case *eventpb.Event_CqdTryjobsUpdated: |
| tr.cqdTryjobsUpdated = append(tr.cqdTryjobsUpdated, item) |
| case *eventpb.Event_CqdFinished: |
| // TODO(crbug/1227523): remove this after all such events are wiped out |
| // from datastore. |
| tr.garbage = append(tr.garbage, item) |
| default: |
| panic(fmt.Errorf("unknown event: %T [id=%q]", e.GetEvent(), item.ID)) |
| } |
| } |
| |
| func (rp *runProcessor) processTriageResults(ctx context.Context, tr *triageResult, rs *state.RunState) ([]eventbox.Transition, error) { |
| statingState := rs |
| var transitions []eventbox.Transition |
| |
| switch { |
| case len(tr.cancelEvents) > 0: |
| res, err := rp.handler.Cancel(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| // Consume all the start events here as well because it is possible |
| // that Run Manager receives start and cancel events at the same time. |
| // For example, user requests to start a Run and immediately cancels |
| // it. But the duration is long enough for Project Manager to create |
| // this Run in CV. In that case, Run Manager should just move this Run |
| // to cancelled state directly. |
| events := append(tr.cancelEvents, tr.startEvents...) |
| rs, transitions = applyResult(res, events, transitions) |
| case len(tr.startEvents) > 0: |
| res, err := rp.handler.Start(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.startEvents, transitions) |
| } |
| |
| if len(tr.newConfigEvents.events) > 0 { |
| res, err := rp.handler.UpdateConfig(ctx, rs, tr.newConfigEvents.hash) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.newConfigEvents.events, transitions) |
| } |
| if len(tr.clUpdatedEvents.events) > 0 { |
| res, err := rp.handler.OnCLsUpdated(ctx, rs, tr.clUpdatedEvents.cls) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.clUpdatedEvents.events, transitions) |
| } |
| |
| if len(tr.cqdTryjobsUpdated) > 0 { |
| res, err := rp.handler.OnCQDTryjobsUpdated(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.cqdTryjobsUpdated, transitions) |
| } |
| if len(tr.cqdVerificationCompletedEvents) > 0 { |
| res, err := rp.handler.OnCQDVerificationCompleted(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.cqdVerificationCompletedEvents, transitions) |
| } |
| if len(tr.clSubmittedEvents.events) > 0 { |
| res, err := rp.handler.OnCLSubmitted(ctx, rs, tr.clSubmittedEvents.cls) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.clSubmittedEvents.events, transitions) |
| } |
| if sc := tr.submissionCompletedEvent.sc; sc != nil { |
| res, err := rp.handler.OnSubmissionCompleted(ctx, rs, sc) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, eventbox.Events{tr.submissionCompletedEvent.event}, transitions) |
| } |
| if len(tr.readyForSubmissionEvents) > 0 { |
| res, err := rp.handler.OnReadyForSubmission(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.readyForSubmissionEvents, transitions) |
| } |
| |
| if len(tr.pokeEvents) > 0 { |
| res, err := rp.handler.Poke(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| rs, transitions = applyResult(res, tr.pokeEvents, transitions) |
| } |
| // Sumbission runs as PostProcessFn after event handling/state transition |
| // is done. It is possible that submission never reports the result back |
| // to RM (e.g. app crash in the middle, task timeout and etc.). In that |
| // case, when task retries, CV should try to resume the submission or fail |
| // the submission if deadline is exceeded even though no event was received. |
| // Therefore, always run TryResumeSubmission at the end regardless. |
| res, err := rp.handler.TryResumeSubmission(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| _, transitions = applyResult(res, nil, transitions) |
| |
| if err := rp.enqueueNextPoke(ctx, statingState.Run.Status, tr.nextReadyEventTime); err != nil { |
| return nil, err |
| } |
| return transitions, nil |
| } |
| |
| func applyResult(res *handler.Result, events eventbox.Events, transitions []eventbox.Transition) (*state.RunState, []eventbox.Transition) { |
| t := eventbox.Transition{ |
| TransitionTo: res.State, |
| SideEffectFn: res.SideEffectFn, |
| PostProcessFn: res.PostProcessFn, |
| } |
| if !res.PreserveEvents { |
| t.Events = events |
| } |
| return res.State, append(transitions, t) |
| } |
| |
| func (rp *runProcessor) enqueueNextPoke(ctx context.Context, startingStatus run.Status, nextReadyEventTime time.Time) error { |
| switch now := clock.Now(ctx); { |
| case run.IsEnded(startingStatus): |
| // Do not enqueue the next poke if run is ended at the beginning of the |
| // state transition. Not using the end state after the state transition |
| // here because CV may fail to save the state which may require the |
| // recursive poke to unblock the Run. |
| return nil |
| case nextReadyEventTime.IsZero(): |
| return rp.runNotifier.PokeAfter(ctx, rp.runID, pokeInterval) |
| case now.After(nextReadyEventTime): |
| // It is possible that by this time, next ready event is already overdue. |
| // Invoke Run Manager immediately. |
| return rp.runNotifier.Invoke(ctx, rp.runID, time.Time{}) |
| case nextReadyEventTime.Before(now.Add(pokeInterval)): |
| return rp.runNotifier.Invoke(ctx, rp.runID, nextReadyEventTime) |
| default: |
| return rp.runNotifier.PokeAfter(ctx, rp.runID, pokeInterval) |
| } |
| } |