| // Copyright 2021 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package clpurger |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "time" |
| |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/server/tq" |
| |
| cfgpb "go.chromium.org/luci/cv/api/config/v2" |
| "go.chromium.org/luci/cv/internal/changelist" |
| "go.chromium.org/luci/cv/internal/common" |
| "go.chromium.org/luci/cv/internal/configs/prjcfg" |
| "go.chromium.org/luci/cv/internal/gerrit" |
| "go.chromium.org/luci/cv/internal/gerrit/cancel" |
| "go.chromium.org/luci/cv/internal/gerrit/trigger" |
| "go.chromium.org/luci/cv/internal/prjmanager" |
| "go.chromium.org/luci/cv/internal/prjmanager/prjpb" |
| "go.chromium.org/luci/cv/internal/run" |
| "go.chromium.org/luci/cv/internal/usertext" |
| ) |
| |
| // Purger purges CLs for Project Manager. |
| type Purger struct { |
| pmNotifier *prjmanager.Notifier |
| gFactory gerrit.Factory |
| clUpdater clUpdater |
| clMutator *changelist.Mutator |
| } |
| |
| // clUpdater is a subset of the *changelist.Updater which Purger needs. |
| type clUpdater interface { |
| Schedule(context.Context, *changelist.UpdateCLTask) error |
| } |
| |
| // New creates a Purger and registers it for handling tasks created by the given |
| // PM Notifier. |
| func New(n *prjmanager.Notifier, g gerrit.Factory, u clUpdater, clm *changelist.Mutator) *Purger { |
| p := &Purger{n, g, u, clm} |
| n.TasksBinding.PurgeProjectCL.AttachHandler( |
| func(ctx context.Context, payload proto.Message) error { |
| task := payload.(*prjpb.PurgeCLTask) |
| ctx = logging.SetFields(ctx, logging.Fields{ |
| "project": task.GetLuciProject(), |
| "cl": task.GetPurgingCl().GetClid(), |
| }) |
| err := p.PurgeCL(ctx, task) |
| return common.TQifyError(ctx, err) |
| }, |
| ) |
| return p |
| } |
| |
| // Schedule enqueues a task to purge a CL for immediate execution. |
| func (p *Purger) Schedule(ctx context.Context, t *prjpb.PurgeCLTask) error { |
| return p.pmNotifier.TasksBinding.TQDispatcher.AddTask(ctx, &tq.Task{ |
| Payload: t, |
| // No DeduplicationKey as these tasks are created transactionally by PM. |
| Title: fmt.Sprintf("%s/%d/%s", t.GetLuciProject(), t.GetPurgingCl().GetClid(), t.GetPurgingCl().GetOperationId()), |
| }) |
| } |
| |
| // PurgeCL purges a CL and notifies PM on success or failure. |
| func (p *Purger) PurgeCL(ctx context.Context, task *prjpb.PurgeCLTask) error { |
| now := clock.Now(ctx) |
| |
| if len(task.GetPurgeReasons()) == 0 { |
| return errors.Reason("no reasons given in %s", task).Err() |
| } |
| |
| d := task.GetPurgingCl().GetDeadline() |
| if d == nil { |
| return errors.Reason("no deadline given in %s", task).Err() |
| } |
| switch dt := d.AsTime(); { |
| case dt.Before(now): |
| logging.Warningf(ctx, "purging task running too late (deadline %s, now %s)", dt, now) |
| default: |
| dctx, cancel := clock.WithDeadline(ctx, dt) |
| defer cancel() |
| if err := p.purgeWithDeadline(dctx, task); err != nil { |
| return err |
| } |
| } |
| return p.pmNotifier.NotifyPurgeCompleted(ctx, task.GetLuciProject(), task.GetPurgingCl().GetOperationId()) |
| } |
| |
| func (p *Purger) purgeWithDeadline(ctx context.Context, task *prjpb.PurgeCLTask) error { |
| cl := &changelist.CL{ID: common.CLID(task.GetPurgingCl().GetClid())} |
| if err := datastore.Get(ctx, cl); err != nil { |
| return errors.Annotate(err, "failed to load %d", cl.ID).Tag(transient.Tag).Err() |
| } |
| |
| configGroups, err := loadConfigGroups(ctx, task) |
| if err != nil { |
| return nil |
| } |
| purgeTriggers, msg, err := triggersToPurge(ctx, configGroups[0].Content, cl, task) |
| switch { |
| case err != nil: |
| return errors.Annotate(err, "CL %d of project %q", cl.ID, task.GetLuciProject()).Err() |
| case purgeTriggers == nil: |
| return nil |
| } |
| |
| var whoms gerrit.Whoms |
| if cqMode := purgeTriggers.GetCqVoteTrigger().GetMode(); cqMode != "" { |
| whoms = append(whoms, run.Mode(cqMode).GerritNotifyTargets()...) |
| } |
| if nprMode := purgeTriggers.GetNewPatchsetRunTrigger().GetMode(); nprMode != "" { |
| whoms = append(whoms, run.Mode(nprMode).GerritNotifyTargets()...) |
| } |
| if len(whoms) == 0 { |
| panic(fmt.Errorf("expected the trigger(s) to purge to have a RunMode")) |
| } |
| |
| whoms.Dedupe() |
| logging.Debugf(ctx, "proceeding to purge CL due to\n%s", msg) |
| err = cancel.Cancel(ctx, cancel.Input{ |
| LUCIProject: task.GetLuciProject(), |
| CL: cl, |
| LeaseDuration: time.Minute, |
| Notify: whoms, |
| AddToAttentionSet: whoms, |
| AttentionReason: "CV can't start a new Run as requested", |
| Requester: "prjmanager/clpurger", |
| Triggers: purgeTriggers, |
| Message: msg, |
| RunCLExternalIDs: nil, // there is no Run. |
| ConfigGroups: configGroups, |
| GFactory: p.gFactory, |
| CLMutator: p.clMutator, |
| }) |
| |
| switch { |
| case err == nil: |
| logging.Debugf(ctx, "purging done") |
| case cancel.ErrPreconditionFailedTag.In(err): |
| logging.Debugf(ctx, "cancel is not necessary: %s", err) |
| case cancel.ErrPermanentTag.In(err): |
| logging.Errorf(ctx, "permanently failed to purge CL: %s", err) |
| default: |
| return errors.Annotate(err, "failed to purge CL %d of project %q", cl.ID, task.GetLuciProject()).Err() |
| } |
| |
| // Schedule a refresh of a CL. |
| // TODO(crbug.com/1284393): use Gerrit's consistency-on-demand when available. |
| return p.clUpdater.Schedule(ctx, &changelist.UpdateCLTask{ |
| LuciProject: task.GetLuciProject(), |
| ExternalId: string(cl.ExternalID), |
| Id: int64(cl.ID), |
| Requester: changelist.UpdateCLTask_CL_PURGER, |
| }) |
| } |
| |
| func triggersToPurge(ctx context.Context, cg *cfgpb.ConfigGroup, cl *changelist.CL, task *prjpb.PurgeCLTask) (*run.Triggers, string, error) { |
| if cl.Snapshot == nil { |
| logging.Warningf(ctx, "CL without Snapshot can't be purged\n%s", task) |
| return nil, "", nil |
| } |
| if p := cl.Snapshot.GetLuciProject(); p != task.GetLuciProject() { |
| logging.Warningf(ctx, "CL now belongs to different project %q", p) |
| return nil, "", nil |
| } |
| if cl.Snapshot.GetGerrit() == nil { |
| panic(fmt.Errorf("CL %d has non-Gerrit snapshot", cl.ID)) |
| } |
| ci := cl.Snapshot.GetGerrit().GetInfo() |
| currentTriggers := trigger.Find(&trigger.FindInput{ChangeInfo: ci, ConfigGroup: cg, TriggerNewPatchsetRunAfterPS: cl.TriggerNewPatchsetRunAfterPS}) |
| if currentTriggers == nil { |
| return nil, "", nil |
| } |
| |
| ret := &run.Triggers{} |
| var sb strings.Builder |
| for _, pr := range task.GetPurgeReasons() { |
| taskNPRTrigger := pr.GetTriggers().GetNewPatchsetRunTrigger() |
| taskCQVTrigger := pr.GetTriggers().GetCqVoteTrigger() |
| var clErrorMode run.Mode |
| switch { |
| case pr.GetAllActiveTriggers(): |
| ret = currentTriggers |
| switch { |
| // If multiple triggers are being purged, the mode of the CQ Vote |
| // trigger takes precedence for the purposes of formatting. |
| case ret.GetCqVoteTrigger() != nil: |
| clErrorMode = run.Mode(ret.GetCqVoteTrigger().GetMode()) |
| case ret.GetNewPatchsetRunTrigger() != nil: |
| clErrorMode = run.Mode(ret.GetNewPatchsetRunTrigger().GetMode()) |
| } |
| // If we are purging a specific trigger, only proceed if the trigger to |
| // purge has not been updated since the task was scheduled. |
| // Note that we can't entirely avoid races with users modifying the CL. |
| case taskNPRTrigger != nil && proto.Equal(currentTriggers.GetNewPatchsetRunTrigger().GetTime(), taskNPRTrigger.GetTime()): |
| ret.NewPatchsetRunTrigger = taskNPRTrigger |
| clErrorMode = run.Mode(taskNPRTrigger.GetMode()) |
| case taskCQVTrigger != nil && proto.Equal(currentTriggers.GetCqVoteTrigger().GetTime(), taskCQVTrigger.GetTime()): |
| ret.CqVoteTrigger = taskCQVTrigger |
| clErrorMode = run.Mode(taskCQVTrigger.GetMode()) |
| default: |
| continue |
| } |
| if sb.Len() > 0 { |
| sb.WriteRune('\n') |
| } |
| if err := usertext.FormatCLError(ctx, pr.GetClError(), cl, clErrorMode, &sb); err != nil { |
| return nil, "", err |
| } |
| } |
| if ret.CqVoteTrigger == nil && ret.NewPatchsetRunTrigger == nil { |
| return nil, "", nil |
| } |
| return ret, sb.String(), nil |
| } |
| |
| func loadConfigGroups(ctx context.Context, task *prjpb.PurgeCLTask) ([]*prjcfg.ConfigGroup, error) { |
| // There is usually exactly 1 config group. |
| res := make([]*prjcfg.ConfigGroup, len(task.GetConfigGroups())) |
| for i, id := range task.GetConfigGroups() { |
| cg, err := prjcfg.GetConfigGroup(ctx, task.GetLuciProject(), prjcfg.ConfigGroupID(id)) |
| if err != nil { |
| return nil, errors.Annotate(err, "failed to load a ConfigGroup").Tag(transient.Tag).Err() |
| } |
| res[i] = cg |
| } |
| return res, nil |
| } |