| // Copyright 2021 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // Package engine provides struts and functionality of recovery engine. |
| // For more details please read go/paris-recovery-engine. |
| package engine |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "strings" |
| "time" |
| |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/luciexe/build" |
| |
| "infra/cros/recovery/config" |
| "infra/cros/recovery/internal/execs" |
| "infra/cros/recovery/internal/log" |
| "infra/cros/recovery/logger" |
| "infra/cros/recovery/logger/metrics" |
| ) |
| |
| // recoveryEngine holds info required for running a recovery plan. |
| type recoveryEngine struct { |
| planName string |
| plan *config.Plan |
| args *execs.RunArgs |
| // Caches |
| actionResultsCache map[string]error |
| recoveryUsageCache map[recoveryUsageKey]error |
| } |
| |
| // Error tag to track error with request to start critical actions over. |
| var startOverTag = errors.BoolTag{Key: errors.NewTagKey("start-over")} |
| |
| // Run runs the recovery plan. |
| func Run(ctx context.Context, planName string, plan *config.Plan, args *execs.RunArgs) error { |
| r := &recoveryEngine{ |
| planName: planName, |
| plan: plan, |
| args: args, |
| } |
| r.initCache() |
| defer func() { r.close() }() |
| log.Debugf(ctx, "Received plan %s for %s", r.planName, r.args.ResourceName) |
| return r.runPlan(ctx) |
| } |
| |
| // close free up used resources. |
| func (r *recoveryEngine) close() { |
| if r.actionResultsCache != nil { |
| r.actionResultsCache = nil |
| } |
| if r.recoveryUsageCache != nil { |
| r.recoveryUsageCache = nil |
| } |
| } |
| |
| // runPlan executes recovery plan with critical-actions. |
| func (r *recoveryEngine) runPlan(ctx context.Context) (rErr error) { |
| log.Infof(ctx, "Plan %q: started", r.planName) |
| // The step and metrics need to know about error but if we need to stop from return then it is here. |
| forgiveError := false |
| defer func() { |
| if forgiveError { |
| log.Debugf(ctx, "Plan %q: forgiven error: %v", r.planName, rErr) |
| rErr = nil |
| } |
| }() |
| if r.args != nil { |
| // TODO: Generate metrics for plan closing. |
| if r.args.ShowSteps { |
| var step *build.Step |
| step, ctx = build.StartStep(ctx, fmt.Sprintf("Plan: %s", r.planName)) |
| if r.plan.GetAllowFail() { |
| step.Log("Allowed to fail!") |
| } |
| defer func() { step.End(rErr) }() |
| } |
| if i, ok := r.args.Logger.(logger.LogIndenter); ok { |
| i.Indent() |
| defer func() { i.Dedent() }() |
| } |
| } |
| var restartTally int64 |
| var forgivenFailureTally int64 |
| if r.args != nil && r.args.Metrics != nil { |
| action := &metrics.Action{} |
| closer, mErr := r.args.NewMetric( |
| ctx, |
| fmt.Sprintf("plan:%s", r.planName), |
| action, |
| ) |
| if mErr == nil && action != nil { |
| defer func() { |
| action.Observations = append( |
| action.Observations, |
| metrics.NewInt64Observation("restarts", restartTally), |
| metrics.NewInt64Observation("forgiven_failures", forgivenFailureTally), |
| ) |
| closer(ctx, rErr) |
| }() |
| } |
| } |
| |
| for { |
| if err := r.runCriticalActionAttempt(ctx, restartTally); err != nil { |
| if startOverTag.In(err) { |
| log.Infof(ctx, "Plan %q for %s: received request to start over.", r.planName, r.args.ResourceName) |
| r.resetCacheAfterSuccessfulRecoveryAction() |
| restartTally++ |
| continue |
| } |
| if r.plan.GetAllowFail() { |
| log.Debugf(ctx, "Plan %q for %s: failed with error: %s.", r.planName, r.args.ResourceName, err) |
| log.Infof(ctx, "Plan %q for %s: is allowed to fail, continue.", r.planName, r.args.ResourceName) |
| forgivenFailureTally++ |
| forgiveError = true |
| } |
| log.Infof(ctx, "Plan %q: fail", r.planName) |
| return errors.Annotate(err, "run plan %q", r.planName).Err() |
| } |
| break |
| } |
| log.Infof(ctx, "Plan %q: finished successfully.", r.planName) |
| log.Debugf(ctx, "Plan %q: recorded %d restarts during execution.", r.planName, restartTally) |
| log.Debugf(ctx, "Plan %q: recorded %d forgiven failures during execution.", r.planName, forgivenFailureTally) |
| return nil |
| } |
| |
| // runCriticalActionAttempt runs critical action of the plan with wrapper step to show plan restart attempts. |
| func (r *recoveryEngine) runCriticalActionAttempt(ctx context.Context, attempt int64) (err error) { |
| if r.args.ShowSteps { |
| var step *build.Step |
| stepName := fmt.Sprintf("First run of critical actions for %s", r.planName) |
| if attempt > 0 { |
| stepName = fmt.Sprintf("Attempt %d to run critical actions for %s", attempt, r.planName) |
| } |
| step, ctx = build.StartStep(ctx, stepName) |
| defer func() { step.End(err) }() |
| } |
| return r.runActions(ctx, r.plan.GetCriticalActions(), r.args.EnableRecovery, "Action") |
| } |
| |
| // runActions runs actions in order. |
| func (r *recoveryEngine) runActions(ctx context.Context, actions []string, enableRecovery bool, stepNamePrefix string) error { |
| for _, actionName := range actions { |
| if err := r.runAction(ctx, actionName, enableRecovery, stepNamePrefix); err != nil { |
| return errors.Annotate(err, "run actions").Err() |
| } |
| } |
| return nil |
| } |
| |
| // runAction runs single action. |
| // Execution steps: |
| // 1) Check action's result in cache. |
| // 2) Check if the action is applicable based on conditions. Skip if any fail. |
| // 3) Run dependencies of the action. Fail if any fails. |
| // 4) Run action exec function. Fail if any fail. |
| func (r *recoveryEngine) runAction(ctx context.Context, actionName string, enableRecovery bool, stepNamePrefix string) (rErr error) { |
| // The step and metrics need to know about error but if we need to stop from return then it is here. |
| forgiveError := false |
| defer func() { |
| if forgiveError { |
| log.Debugf(ctx, "Action %q: forgiven error: %v", actionName, rErr) |
| rErr = nil |
| } |
| }() |
| var step *build.Step |
| if r.args != nil { |
| action := &metrics.Action{} |
| if actionCloser := r.recordAction(ctx, actionName, action); actionCloser != nil { |
| defer actionCloser(rErr) |
| } |
| if r.args.ShowSteps { |
| stepName := fmt.Sprintf("%s: %s", stepNamePrefix, actionName) |
| step, ctx = build.StartStep(ctx, stepName) |
| defer func() { step.End(rErr) }() |
| stepLogCloser := log.AddStepLog(ctx, r.args.Logger, step, "execution details") |
| defer func() { stepLogCloser() }() |
| } |
| if i, ok := r.args.Logger.(logger.LogIndenter); ok { |
| i.Indent() |
| defer func() { i.Dedent() }() |
| } |
| } |
| defer func() { |
| if rErr != nil { |
| log.Debugf(ctx, "Action %q: finished with error %s.", actionName, rErr) |
| } else { |
| log.Debugf(ctx, "Action %q: finished.", actionName) |
| } |
| }() |
| a := r.getAction(actionName) |
| if len(a.GetDocs()) > 0 && step != nil { |
| docLog := step.Log("docs") |
| if _, err := io.WriteString(docLog, strings.Join(a.GetDocs(), "\n")); err != nil { |
| log.Debugf(ctx, "Fail write docs for %q, Error: %s", actionName, err) |
| } |
| } |
| if aErr, ok := r.actionResultFromCache(actionName); ok { |
| if aErr == nil { |
| log.Infof(ctx, "Action %q: pass (cached).", actionName) |
| // Return nil error so we can continue execution of next actions... |
| return nil |
| } |
| if a.GetAllowFailAfterRecovery() { |
| log.Infof(ctx, "Action %q: fail (cached). Error: %s", actionName, aErr) |
| log.Debugf(ctx, "Action %q: error ignored as action is allowed to fail.", actionName) |
| // Return error to report for step and metrics but stop from return to parent. |
| forgiveError = true |
| } |
| return errors.Annotate(aErr, "run action %q: (cached)", actionName).Err() |
| } |
| log.Infof(ctx, "Action %q: started.", actionName) |
| conditionName, err := r.runActionConditions(ctx, actionName) |
| if err != nil { |
| log.Infof(ctx, "Action %q: skipping, one of conditions %q failed.", actionName, conditionName) |
| if step != nil { |
| stepLog := step.Log("Skipped") |
| if _, err := io.WriteString(stepLog, fmt.Sprintf("The condition %q failed!", conditionName)); err != nil { |
| log.Debugf(ctx, "Fail to write reason why action skipped: %v.", err) |
| } |
| } |
| log.Debugf(ctx, "Action %q: conditions fail with %s", actionName, err) |
| // Return nil error so we can continue execution of next actions... |
| return nil |
| } |
| if err := r.runDependencies(ctx, actionName, enableRecovery); err != nil { |
| if startOverTag.In(err) { |
| return errors.Annotate(err, "run action %q", actionName).Err() |
| } |
| if a.GetAllowFailAfterRecovery() { |
| log.Infof(ctx, "Action %q: one of dependencies fail. Error: %s", actionName, err) |
| log.Debugf(ctx, "Action %q: error ignored as action is allowed to fail.", actionName) |
| // Return error to report for step and metrics but stop from return to parent. |
| forgiveError = true |
| } |
| return errors.Annotate(err, "run action %q", actionName).Err() |
| } |
| if err := r.runActionExec(ctx, actionName, enableRecovery); err != nil { |
| if startOverTag.In(err) { |
| return errors.Annotate(err, "run action %q", actionName).Err() |
| } |
| if a.GetAllowFailAfterRecovery() { |
| log.Infof(ctx, "Action %q: fail. Error: %s", actionName, err) |
| log.Debugf(ctx, "Action %q: error ignored as action is allowed to fail.", actionName) |
| // Return error to report for step and metrics but stop from return to parent. |
| forgiveError = true |
| } |
| return errors.Annotate(err, "run action %q", actionName).Err() |
| } |
| // Return nil error so we can continue execution of next actions... |
| log.Infof(ctx, "Action %q: finished successfully.", actionName) |
| return nil |
| } |
| |
| // runActionExec runs action's exec function and initiates recovery flow if exec fails. |
| // The recover flow start only recoveries is enabled. |
| func (r *recoveryEngine) runActionExec(ctx context.Context, actionName string, enableRecovery bool) error { |
| if err := r.runActionExecWithTimeout(ctx, actionName); err != nil { |
| a := r.getAction(actionName) |
| if enableRecovery && len(a.GetRecoveryActions()) > 0 { |
| log.Infof(ctx, "Action %q: starting recovery actions.", actionName) |
| log.Debugf(ctx, "Action %q: fail. Error: %s", actionName, err) |
| if rErr := r.runRecoveries(ctx, actionName); rErr != nil { |
| return errors.Annotate(rErr, "run action %q exec", actionName).Err() |
| } |
| log.Infof(ctx, "Run action %q exec: no recoveries left to try", actionName) |
| } |
| // Cache the action error only after running recoveries. |
| // If no recoveries were run, we still cache the action. |
| r.cacheActionResult(actionName, err) |
| return errors.Annotate(err, "run action %q exec", actionName).Err() |
| } |
| r.cacheActionResult(actionName, nil) |
| return nil |
| } |
| |
| // Default time limit per action exec function. |
| const defaultExecTimeout = 60 * time.Second |
| |
| func actionExecTimeout(a *config.Action) time.Duration { |
| if a.ExecTimeout != nil { |
| return a.ExecTimeout.AsDuration() |
| } |
| return defaultExecTimeout |
| } |
| |
| // runActionExecWithTimeout runs action's exec function with timeout. |
| func (r *recoveryEngine) runActionExecWithTimeout(ctx context.Context, actionName string) (rErr error) { |
| a := r.getAction(actionName) |
| if r.args != nil && r.args.ShowSteps { |
| var step *build.Step |
| step, ctx = build.StartStep(ctx, fmt.Sprintf("Execution: %q", actionName)) |
| defer func() { step.End(rErr) }() |
| stepLogCloser := log.AddStepLog(ctx, r.args.Logger, step, "execution details") |
| defer func() { stepLogCloser() }() |
| } |
| timeout := actionExecTimeout(a) |
| ctx, cancel := context.WithTimeout(ctx, timeout) |
| defer func() { cancel() }() |
| cw := make(chan error, 1) |
| go func() { |
| err := execs.Run(ctx, &execs.ExecInfo{ |
| RunArgs: r.args, |
| Name: a.ExecName, |
| ActionArgs: a.GetExecExtraArgs(), |
| ActionTimeout: timeout, |
| }) |
| cw <- err |
| }() |
| select { |
| case err := <-cw: |
| return errors.Annotate(err, "run exec %q with timeout %s", a.ExecName, timeout).Err() |
| case <-ctx.Done(): |
| log.Infof(ctx, "Run exec %q with timeout %s: excited timeout", a.ExecName, timeout) |
| return errors.Reason("run exec %q with timeout %s: excited timeout", a.ExecName, timeout).Err() |
| } |
| } |
| |
| // runActionConditions checks if action is applicable based on condition actions. |
| // If return err then not applicable, if nil then applicable. |
| func (r *recoveryEngine) runActionConditions(ctx context.Context, actionName string) (conditionName string, err error) { |
| a := r.getAction(actionName) |
| log.Debugf(ctx, "Action %q: starting running conditions...", actionName) |
| enableRecovery := false |
| for _, condition := range a.GetConditions() { |
| if err := r.runAction(ctx, condition, enableRecovery, "Condition"); err != nil { |
| log.Debugf(ctx, "Action %q: condition %q fails. Error: %s", actionName, condition, err) |
| return condition, errors.Annotate(err, "run conditions").Err() |
| } |
| } |
| log.Debugf(ctx, "Action %q: all conditions passed.", actionName) |
| return "", nil |
| } |
| |
| // runDependencies runs action's dependencies. |
| func (r *recoveryEngine) runDependencies(ctx context.Context, actionName string, enableRecovery bool) error { |
| a := r.getAction(actionName) |
| log.Debugf(ctx, "Action %q: starting running dependencies...", actionName) |
| for _, dependencyName := range a.GetDependencies() { |
| if err := r.runAction(ctx, dependencyName, enableRecovery, "Dependency"); err != nil { |
| log.Debugf(ctx, "Action %q: dependency %q fails. Errors: %s", actionName, dependencyName, err) |
| return errors.Annotate(err, "dependencies").Err() |
| } |
| } |
| log.Debugf(ctx, "Action %q: all dependencies passed.", actionName) |
| return nil |
| } |
| |
| // runRecoveries runs action's recoveries. |
| // Recovery actions are expected to fail. If recovery action fails then next will be attempted. |
| // Finishes with nil if no recovery action provided or nether succeeded. |
| // Finishes with start-over request if any recovery succeeded. |
| // Recovery action will skip if used before. |
| func (r *recoveryEngine) runRecoveries(ctx context.Context, actionName string) (rErr error) { |
| a := r.getAction(actionName) |
| for _, recoveryName := range a.GetRecoveryActions() { |
| if r.isRecoveryUsed(actionName, recoveryName) { |
| // Engine allows to use each recovery action only once in scope of the action. |
| log.Infof(ctx, "Recovery %q skipped as already used before for %q.", recoveryName, actionName) |
| continue |
| } |
| if err := r.runAction(ctx, recoveryName, false, "Recovery"); err != nil { |
| log.Infof(ctx, "Recovery %q: fail", recoveryName) |
| log.Debugf(ctx, "Recovery %q: fail. Error: %s", recoveryName, err) |
| r.registerRecoveryUsage(actionName, recoveryName, err) |
| continue |
| } |
| r.registerRecoveryUsage(actionName, recoveryName, nil) |
| log.Infof(ctx, "Recovery action %q: request to start-over.", recoveryName) |
| return errors.Reason("recovery %q: request to start over", recoveryName).Tag(startOverTag).Err() |
| } |
| return nil |
| } |
| |
| // getAction finds and provides action from the plan collection. |
| func (r *recoveryEngine) getAction(name string) *config.Action { |
| if a, ok := r.plan.Actions[name]; ok { |
| return a |
| } |
| // If we reach this place then we have issues with plan validation logic. |
| panic(fmt.Sprintf("action %q not found in the plan", name)) |
| } |
| |
| // initCache initializes cache on engine. |
| // The function extracted to supported testing. |
| func (r *recoveryEngine) initCache() { |
| r.actionResultsCache = make(map[string]error, len(r.plan.GetActions())) |
| r.recoveryUsageCache = make(map[recoveryUsageKey]error) |
| } |
| |
| // actionResultFromCache reads action's result from cache. |
| func (r *recoveryEngine) actionResultFromCache(actionName string) (err error, ok bool) { |
| err, ok = r.actionResultsCache[actionName] |
| return err, ok |
| } |
| |
| // cacheActionResult sets action's result to the cache. |
| func (r *recoveryEngine) cacheActionResult(actionName string, err error) { |
| switch r.getAction(actionName).GetRunControl() { |
| case config.RunControl_RERUN_AFTER_RECOVERY, config.RunControl_RUN_ONCE: |
| r.actionResultsCache[actionName] = err |
| case config.RunControl_ALWAYS_RUN: |
| // Do not cache the value |
| } |
| } |
| |
| // resetCacheAfterSuccessfulRecoveryAction resets cache for actions |
| // with run-control=RERUN_AFTER_RECOVERY. |
| func (r *recoveryEngine) resetCacheAfterSuccessfulRecoveryAction() { |
| for name, a := range r.plan.GetActions() { |
| if a.GetRunControl() == config.RunControl_RERUN_AFTER_RECOVERY { |
| delete(r.actionResultsCache, name) |
| } |
| } |
| } |
| |
| // isRecoveryUsed checks if recovery action is used in plan or action level scope. |
| func (r *recoveryEngine) isRecoveryUsed(actionName, recoveryName string) bool { |
| k := recoveryUsageKey{ |
| action: actionName, |
| recovery: recoveryName, |
| } |
| // If the recovery has been used in previous actions then it can be in |
| // the action result cache. |
| if err, ok := r.actionResultsCache[recoveryName]; ok { |
| r.recoveryUsageCache[k] = err |
| } |
| _, ok := r.recoveryUsageCache[k] |
| return ok |
| } |
| |
| // registerRecoveryUsage sets recovery action usage to the cache. |
| func (r *recoveryEngine) registerRecoveryUsage(actionName, recoveryName string, err error) { |
| r.recoveryUsageCache[recoveryUsageKey{ |
| action: actionName, |
| recovery: recoveryName, |
| }] = err |
| } |
| |
| // recoveryUsageKey holds action and action's recovery name as key for recovery-usage cache. |
| type recoveryUsageKey struct { |
| action string |
| recovery string |
| } |