| // Copyright 2021 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package changelist |
| |
| import ( |
| "context" |
| "fmt" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "google.golang.org/protobuf/proto" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/server/tq" |
| |
| "go.chromium.org/luci/cv/internal/common" |
| "go.chromium.org/luci/cv/internal/gerrit" |
| "go.chromium.org/luci/cv/internal/metrics" |
| ) |
| |
| const ( |
| // BatchUpdateCLTaskClass is the Task Class ID of the BatchUpdateCLTask, |
| // which is enqueued only during a transaction. |
| BatchUpdateCLTaskClass = "batch-update-cl" |
| // UpdateCLTaskClass is the Task Class ID of the UpdateCLTask. |
| UpdateCLTaskClass = "update-cl" |
| |
| // blindRefreshInterval sets interval between blind refreshes of a CL. |
| blindRefreshInterval = time.Minute |
| |
| // knownRefreshInterval sets interval between refreshes of a CL when |
| // updatedHint is known. |
| knownRefreshInterval = 15 * time.Minute |
| |
| // autoRefreshAfter makes CLs worthy of "blind" refresh. |
| // |
| // "blind" refresh means that CL is already stored in Datastore and is up to |
| // the date to the best knowledge of CV. |
| autoRefreshAfter = 2 * time.Hour |
| ) |
| |
| // UpdaterBackend abstracts out fetching CL details from code review backend. |
| type UpdaterBackend interface { |
| // Kind identifies the backend. |
| // |
| // It's also the first part of the CL's ExternalID, e.g. "gerrit". |
| // Must not contain a slash. |
| Kind() string |
| |
| // LookupApplicableConfig returns the latest ApplicableConfig for the previously |
| // saved CL. |
| // |
| // See CL.ApplicableConfig field doc for more details. Roughly, it finds which |
| // LUCI projects are configured to watch this CL. |
| // |
| // Updater calls LookupApplicableConfig() before Fetch() in order to avoid |
| // the unnecessary Fetch() call entirely, e.g. if the CL is up to date or if |
| // the CL is definitely not watched by a specific LUCI project. |
| // |
| // Returns non-nil ApplicableConfig normally. |
| // Returns nil ApplicableConfig if the previously saved CL state isn't |
| // sufficient to confidently determine the ApplicableConfig. |
| LookupApplicableConfig(ctx context.Context, saved *CL) (*ApplicableConfig, error) |
| |
| // Fetch fetches the CL in the context of a given project. |
| Fetch(ctx context.Context, input *FetchInput) (UpdateFields, error) |
| |
| // HasChanged decides whether the CL in the backend has changed from existing |
| // snapshot in LUCI CV. |
| HasChanged(cvCurrent, backendCurrent *Snapshot) bool |
| |
| // TQErrorSpec allows customizing logging and error TQ-specific handling. |
| // |
| // For example, Gerrit backend may wish to retry out of quota errors without |
| // logging detailed stacktrace. |
| TQErrorSpec() common.TQIfy |
| } |
| |
| // FetchInput an input for UpdaterBackend.Fetch. |
| // |
| // It contains fields for what to fetch with meta information. |
| type FetchInput struct { |
| // CL of the ChangeList to fetch a snapshot of. |
| // |
| // If CL.ID in the input is 0, it means the CL entity doesn't exist in |
| // Datastore. The cl.ExternalID is always set. |
| CL *CL |
| // Project is the LUCI project to use the scoped account of for the fetch |
| // operation to be performed. |
| Project string |
| // UpdatedHint, if not zero time, is the backend-originating timestamp of |
| // the most recent CL update time. It's sourced by CV by e.g. polling or |
| // PubSub subscription. It is useful to detect and work around backend's |
| // eventual consistency. |
| UpdatedHint time.Time |
| // Requester identifies various scenarios that issued the Fetch invocation. |
| Requester UpdateCLTask_Requester |
| Hint *UpdateCLTask_Hint |
| } |
| |
| // NewFetchInput returns FetchInput for a given CL and UpdateCLTask. |
| func NewFetchInput(cl *CL, task *UpdateCLTask) *FetchInput { |
| return &FetchInput{ |
| CL: cl, |
| Project: task.GetLuciProject(), |
| Hint: task.GetHint(), |
| Requester: task.GetRequester(), |
| } |
| } |
| |
| // UpdateFields defines what parts of CL to update. |
| // |
| // At least one field must be specified. |
| type UpdateFields struct { |
| // Snapshot overwrites existing CL snapshot if newer according to its |
| // .ExternalUpdateTime. |
| Snapshot *Snapshot |
| |
| // ApplicableConfig overwrites existing CL ApplicableConfig if semantically |
| // different from existing one. |
| ApplicableConfig *ApplicableConfig |
| |
| // AddDependentMeta adds or overwrites metadata per LUCI project in CL AsDepMeta. |
| // Doesn't affect metadata stored for projects not referenced here. |
| AddDependentMeta *Access |
| |
| // DelAccess deletes Access records for the given projects. |
| DelAccess []string |
| } |
| |
| // IsEmpty returns true if no updates are necessary. |
| func (u UpdateFields) IsEmpty() bool { |
| return (u.Snapshot == nil && |
| u.ApplicableConfig == nil && |
| len(u.AddDependentMeta.GetByProject()) == 0 && |
| len(u.DelAccess) == 0) |
| } |
| |
| func (u UpdateFields) shouldUpdateSnapshot(cl *CL, backend UpdaterBackend) bool { |
| switch { |
| case u.Snapshot == nil: |
| return false |
| case cl.Snapshot == nil: |
| return true |
| case cl.Snapshot.GetOutdated() != nil: |
| return true |
| case cl.Snapshot.GetLuciProject() != u.Snapshot.GetLuciProject(): |
| return true |
| case backend.HasChanged(cl.Snapshot, u.Snapshot): |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // Apply applies the UpdatedFields to a given CL. |
| func (u UpdateFields) Apply(cl *CL, backend UpdaterBackend) (changed, changedSnapshot bool) { |
| if u.ApplicableConfig != nil && !cl.ApplicableConfig.SemanticallyEqual(u.ApplicableConfig) { |
| cl.ApplicableConfig = u.ApplicableConfig |
| changed = true |
| } |
| |
| if u.shouldUpdateSnapshot(cl, backend) { |
| cl.Snapshot = u.Snapshot |
| changed, changedSnapshot = true, true |
| } |
| |
| switch { |
| case u.AddDependentMeta == nil: |
| case cl.Access == nil || cl.Access.GetByProject() == nil: |
| cl.Access = u.AddDependentMeta |
| changed = true |
| default: |
| e := cl.Access.GetByProject() |
| for lProject, v := range u.AddDependentMeta.GetByProject() { |
| if v.GetNoAccessTime() == nil { |
| panic("NoAccessTime must be set") |
| } |
| old, exists := e[lProject] |
| if !exists || old.GetUpdateTime().AsTime().Before(v.GetUpdateTime().AsTime()) { |
| if old.GetNoAccessTime() != nil && old.GetNoAccessTime().AsTime().Before(v.GetNoAccessTime().AsTime()) { |
| v.NoAccessTime = old.NoAccessTime |
| } |
| e[lProject] = v |
| changed = true |
| } |
| } |
| } |
| |
| if len(u.DelAccess) > 0 && len(cl.Access.GetByProject()) > 0 { |
| for _, p := range u.DelAccess { |
| if _, exists := cl.Access.GetByProject()[p]; exists { |
| changed = true |
| delete(cl.Access.ByProject, p) |
| if len(cl.Access.GetByProject()) == 0 { |
| cl.Access = nil |
| break |
| } |
| } |
| } |
| } |
| |
| return |
| } |
| |
| // Updater knows how to update CLs from relevant backend (e.g. Gerrit), |
| // notifying other CV parts as needed. |
| type Updater struct { |
| tqd *tq.Dispatcher |
| mutator *Mutator |
| |
| rwmutex sync.RWMutex // guards `backends` |
| backends map[string]UpdaterBackend |
| } |
| |
| // NewUpdater creates a new Updater. |
| // |
| // Starts without backends, but they ought to be added via RegisterBackend(). |
| func NewUpdater(tqd *tq.Dispatcher, m *Mutator) *Updater { |
| u := &Updater{ |
| tqd: tqd, |
| mutator: m, |
| backends: make(map[string]UpdaterBackend, 1), |
| } |
| tqd.RegisterTaskClass(tq.TaskClass{ |
| ID: BatchUpdateCLTaskClass, |
| Prototype: &BatchUpdateCLTask{}, |
| Queue: "update-cl", |
| Quiet: true, |
| QuietOnError: true, |
| Kind: tq.Transactional, |
| Handler: func(ctx context.Context, payload proto.Message) error { |
| t := payload.(*BatchUpdateCLTask) |
| err := u.handleBatch(ctx, t) |
| return common.TQifyError(ctx, err) |
| }, |
| }) |
| tqd.RegisterTaskClass(tq.TaskClass{ |
| ID: UpdateCLTaskClass, |
| Prototype: &UpdateCLTask{}, |
| Queue: "update-cl", |
| Quiet: true, |
| QuietOnError: true, |
| Kind: tq.FollowsContext, |
| Handler: func(ctx context.Context, payload proto.Message) error { |
| t := payload.(*UpdateCLTask) |
| // NOTE: unlike other TQ handlers code in CV, the common.TQifyError is |
| // done inside the handler to allow per-backend definition of which errors |
| // are retriable. |
| return u.handleCL(ctx, t) |
| }, |
| }) |
| return u |
| } |
| |
| // RegisterBackend registers a backend. |
| // |
| // Panics if backend for the same kind is already registered. |
| func (u *Updater) RegisterBackend(b UpdaterBackend) { |
| kind := b.Kind() |
| if strings.ContainsRune(kind, '/') { |
| panic(fmt.Errorf("backend %T of kind %q must not contain '/'", b, kind)) |
| } |
| u.rwmutex.Lock() |
| defer u.rwmutex.Unlock() |
| if _, exists := u.backends[kind]; exists { |
| panic(fmt.Errorf("backend %q is already registered", kind)) |
| } |
| u.backends[kind] = b |
| } |
| |
| // ScheduleBatch schedules update of several CLs. |
| // |
| // If called in a transaction, enqueues exactly one TQ task transactionally. |
| // This allows to write 1 Datastore entity during a transaction instead of N |
| // entities if Schedule() was used for each CL. |
| // |
| // Otherwise, enqueues 1 TQ task per CL non-transactionally and in parallel. |
| func (u *Updater) ScheduleBatch(ctx context.Context, luciProject string, cls []*CL, requester UpdateCLTask_Requester) error { |
| tasks := make([]*UpdateCLTask, len(cls)) |
| for i, cl := range cls { |
| tasks[i] = &UpdateCLTask{ |
| LuciProject: luciProject, |
| ExternalId: string(cl.ExternalID), |
| Id: int64(cl.ID), |
| Requester: requester, |
| } |
| } |
| |
| switch { |
| case len(tasks) == 1: |
| // Optimization for the most frequent use-case of single-CL Runs. |
| return u.Schedule(ctx, tasks[0]) |
| case datastore.CurrentTransaction(ctx) == nil: |
| return u.handleBatch(ctx, &BatchUpdateCLTask{Tasks: tasks}) |
| default: |
| return u.tqd.AddTask(ctx, &tq.Task{ |
| Payload: &BatchUpdateCLTask{Tasks: tasks}, |
| Title: fmt.Sprintf("batch-%s-%d", luciProject, len(tasks)), |
| }) |
| } |
| } |
| |
| // Schedule dispatches a TQ task. It should be used instead of the direct |
| // tq.AddTask to allow for consistent de-duplication. |
| func (u *Updater) Schedule(ctx context.Context, payload *UpdateCLTask) error { |
| return u.ScheduleDelayed(ctx, payload, 0) |
| } |
| |
| // ScheduleDelayed is the same as Schedule but with a delay. |
| func (u *Updater) ScheduleDelayed(ctx context.Context, payload *UpdateCLTask, delay time.Duration) error { |
| task := &tq.Task{ |
| Payload: payload, |
| Delay: delay, |
| Title: makeTQTitleForHumans(payload), |
| } |
| if payload.Requester == UpdateCLTask_REQUESTER_CLASS_UNSPECIFIED { |
| panic(fmt.Errorf("BUG: UpdateCLTask.Requester unspecified: %s", payload)) |
| } |
| if datastore.CurrentTransaction(ctx) == nil { |
| task.DeduplicationKey = makeTaskDeduplicationKey(ctx, payload, delay) |
| } |
| return u.tqd.AddTask(ctx, task) |
| } |
| |
| // ResolveAndScheduleDepsUpdate resolves deps, creating new CL entities as |
| // necessary, and schedules an update task for each dep which needs an update. |
| // |
| // It's meant to be used by the Updater backends. |
| // |
| // Returns a sorted slice of Deps by their CL ID, ready to be stored as |
| // CL.Snapshot.Deps. |
| func (u *Updater) ResolveAndScheduleDepsUpdate(ctx context.Context, luciProject string, deps map[ExternalID]DepKind, requester UpdateCLTask_Requester) ([]*Dep, error) { |
| // Optimize for the most frequent case whereby deps are already known to CV |
| // and were updated recently enough that no task scheduling is even necessary. |
| |
| // Batch-resolve external IDs to CLIDs, and load all existing CLs. |
| resolvingDeps, err := resolveDeps(ctx, luciProject, deps) |
| if err != nil { |
| return nil, err |
| } |
| // Identify indexes of deps which need to have an update task scheduled. |
| ret := make([]*Dep, len(deps)) |
| var toSchedule []int // indexes |
| for i, d := range resolvingDeps { |
| if d.ready { |
| ret[i] = d.resolvedDep |
| } else { |
| // Also covers the case of a dep not yet having a CL entity. |
| toSchedule = append(toSchedule, i) |
| } |
| } |
| if len(toSchedule) == 0 { |
| // Quick path exit. |
| return sortDeps(ret), nil |
| } |
| |
| errs := parallel.WorkPool(min(10, len(toSchedule)), func(work chan<- func() error) { |
| for _, i := range toSchedule { |
| i, d := i, resolvingDeps[i] |
| work <- func() error { |
| if err := d.createIfNotExists(ctx, u.mutator, luciProject); err != nil { |
| return err |
| } |
| if err := d.schedule(ctx, u, luciProject, requester); err != nil { |
| return err |
| } |
| ret[i] = d.resolvedDep |
| return nil |
| } |
| } |
| }) |
| if errs != nil { |
| return nil, common.MostSevereError(err) |
| } |
| return sortDeps(ret), nil |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // implementation details. |
| |
| func (u *Updater) handleBatch(ctx context.Context, batch *BatchUpdateCLTask) error { |
| total := len(batch.GetTasks()) |
| err := parallel.WorkPool(min(16, total), func(work chan<- func() error) { |
| for _, task := range batch.GetTasks() { |
| task := task |
| work <- func() error { return u.Schedule(ctx, task) } |
| } |
| }) |
| switch merrs, ok := err.(errors.MultiError); { |
| case err == nil: |
| return nil |
| case !ok: |
| return err |
| default: |
| failed, _ := merrs.Summary() |
| err = common.MostSevereError(merrs) |
| return errors.Annotate(err, "failed to schedule UpdateCLTask for %d out of %d CLs, keeping the most severe error", failed, total).Err() |
| } |
| } |
| |
| // TestingForceUpdate runs the CL Updater synchronously. |
| // |
| // For use in tests only. Production code should use Schedule() to benefit from |
| // task de-duplication. |
| // |
| // TODO(crbug/1284393): revisit the usefulness of the sync refresh after |
| // consistency-on-demand is provided by Gerrit. |
| func (u *Updater) TestingForceUpdate(ctx context.Context, task *UpdateCLTask) error { |
| return u.handleCL(ctx, task) |
| } |
| |
| func (u *Updater) handleCL(ctx context.Context, task *UpdateCLTask) error { |
| cl, err := u.preload(ctx, task) |
| if err != nil { |
| return common.TQifyError(ctx, err) |
| } |
| // cl.ID == 0 means CL doesn't yet exist. |
| ctx = logging.SetFields(ctx, logging.Fields{ |
| "project": task.GetLuciProject(), |
| "id": cl.ID, |
| "eid": cl.ExternalID, |
| }) |
| |
| backend, err := u.backendFor(cl) |
| if err != nil { |
| return common.TQifyError(ctx, err) |
| } |
| |
| switch err := u.handleCLWithBackend(ctx, task, cl, backend); { |
| case err == errHackRetryForOutOfQuota: |
| return tq.Ignore.Apply(err) |
| case err != nil: |
| return backend.TQErrorSpec().Error(ctx, err) |
| } |
| return nil |
| } |
| |
| var errHackRetryForOutOfQuota = errors.New("hack retry for out of quota") |
| |
| func (u *Updater) handleCLWithBackend(ctx context.Context, task *UpdateCLTask, cl *CL, backend UpdaterBackend) error { |
| // Save ID and ExternalID before giving CL to backend to avoid accidental corruption. |
| id, eid := cl.ID, cl.ExternalID |
| skip, updateFields, err := u.trySkippingFetch(ctx, task, cl, backend) |
| var fetchDuration time.Duration |
| switch { |
| case err != nil: |
| return err |
| case !skip: |
| now := clock.Now(ctx) |
| updateFields, err = backend.Fetch(ctx, NewFetchInput(cl, task)) |
| fetchDuration = clock.Since(ctx, now) |
| switch { |
| case err != nil && errors.Unwrap(err) == gerrit.ErrOutOfQuota && task.GetLuciProject() == "chromeos": |
| // HACK: don't retry on out of quota error, instead schedule another task |
| // with delay so that it will be deduplicated in cloud task with any |
| // subsequent tasks. |
| if scheduleErr := u.ScheduleDelayed(ctx, task, blindRefreshInterval); scheduleErr != nil { |
| return errors.Annotate(err, "%T.Fetch failed", backend).Err() |
| } |
| return errHackRetryForOutOfQuota |
| case err != nil: |
| return errors.Annotate(err, "%T.Fetch failed", backend).Err() |
| } |
| } |
| |
| if updateFields.IsEmpty() { |
| logging.Debugf(ctx, "No update is necessary") |
| return nil |
| } |
| |
| // Transactionally update the CL. |
| var changed, changedSnapshot bool |
| transClbk := func(latest *CL) error { |
| if changed, changedSnapshot = updateFields.Apply(latest, backend); !changed { |
| // Someone, possibly even us in case of Datastore transaction retry, has |
| // already updated this CL. |
| return ErrStopMutation |
| } |
| return nil |
| } |
| if cl.ID == 0 { |
| _, err = u.mutator.Upsert(ctx, task.GetLuciProject(), eid, transClbk) |
| } else { |
| _, err = u.mutator.Update(ctx, task.GetLuciProject(), id, transClbk) |
| } |
| |
| if err != nil { |
| return err |
| } |
| |
| switch { |
| case updateFields.Snapshot == nil: |
| // Skip reporting the fetch metrics. It's either the fetch operation |
| // failed or skipped. |
| case skip: |
| // Fetch was not performed; skip reporting the metrics. |
| case changed: |
| // Report the latency metrics only if the fetch actually returned |
| // new data. If the data was the same as the existing snapshot, |
| // the fetch wasn't needed, indeed. |
| delay := clock.Now(ctx).Sub(updateFields.Snapshot.ExternalUpdateTime.AsTime()) |
| if delay < 0 { |
| logging.Errorf(ctx, "negative CL fetch duration (%d) detected", delay) |
| delay = 0 |
| } |
| metrics.Internal.CLIngestionLatency.Add( |
| ctx, delay.Seconds(), task.GetRequester().String(), task.GetIsForDep(), |
| task.GetLuciProject(), changedSnapshot) |
| metrics.Internal.CLIngestionLatencyWithoutFetch.Add( |
| ctx, (delay - fetchDuration).Seconds(), task.GetRequester().String(), |
| task.GetIsForDep(), task.GetLuciProject(), changedSnapshot) |
| fallthrough |
| default: |
| metrics.Internal.CLIngestionAttempted.Add( |
| ctx, 1, task.GetRequester().String(), changed, task.GetIsForDep(), |
| task.GetLuciProject(), changedSnapshot) |
| } |
| return nil |
| } |
| |
| // trySkippingFetch checks if a fetch from the backend can be skipped. |
| // |
| // Returns true if so. |
| // NOTE: UpdateFields may be set if fetch can be skipped, meaning CL entity |
| // should be updated in Datastore. |
| func (u *Updater) trySkippingFetch(ctx context.Context, task *UpdateCLTask, cl *CL, backend UpdaterBackend) (bool, UpdateFields, error) { |
| if cl.ID == 0 || cl.Snapshot == nil || cl.Snapshot.GetOutdated() != nil { |
| return false, UpdateFields{}, nil |
| } |
| |
| hintedTS := task.GetHint().GetExternalUpdateTime() |
| hintedRevID := task.GetHint().GetMetaRevId() |
| switch { |
| case hintedTS == nil && hintedRevID == "": |
| // fetch always if there is no hint available. |
| return false, UpdateFields{}, nil |
| case hintedRevID != "" && hintedRevID != cl.Snapshot.GetGerrit().GetInfo().GetMetaRevId(): |
| // fetch always if MetaRev is different to the rev id of the stored |
| // snapshot. If the fetched snapshot is older than the stored snapshot, |
| // it will be skipped to update the DS entity with the fetched snapshot. |
| return false, UpdateFields{}, nil |
| case hintedTS != nil && hintedTS.AsTime().After(cl.Snapshot.GetExternalUpdateTime().AsTime()): |
| // There is no confidence that Snapshot is up-to-date, so proceed fetching |
| // anyway. |
| |
| // NOTE: it's tempting to check first whether the LUCI project is watching |
| // the CL given the existing Snapshot and skip the fetch if it's not the |
| // case. However, for Gerrit CLs, the ref is mutable after the CL |
| // creation and since ref is used to determine if CL is being watched, |
| // we can't skip the fetch. For an example, see Gerrit move API |
| // https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#move-change |
| return false, UpdateFields{}, nil |
| } |
| |
| // CL Snapshot is up to date, but does it belong to the right LUCI project? |
| acfg, err := backend.LookupApplicableConfig(ctx, cl) |
| if err != nil { |
| err = errors.Annotate(err, "%T.LookupApplicableConfig failed", backend).Err() |
| return false, UpdateFields{}, err |
| } |
| if acfg == nil { |
| // Insufficient saved CL, need to fetch before deciding if CL is watched. |
| return false, UpdateFields{}, err |
| } |
| |
| // Update CL with the new set of watching projects if materially different, |
| // which should be saved to Datastore even if the fetch from Gerrit itself is |
| // skipped. |
| var toUpdate UpdateFields |
| if !cl.ApplicableConfig.SemanticallyEqual(acfg) { |
| toUpdate.ApplicableConfig = acfg |
| } |
| |
| if !acfg.HasProject(task.GetLuciProject()) { |
| // This project isn't watching the CL, so no need to fetch. |
| // |
| // NOTE: even if the Snapshot was fetched in the context of this project before, |
| // we don't have to erase the Snapshot from the CL immediately: the update |
| // in cl.ApplicableConfig suffices to ensure that CV won't be using the |
| // Snapshot. |
| return true, toUpdate, nil |
| } |
| |
| if !acfg.HasProject(cl.Snapshot.GetLuciProject()) { |
| // The Snapshot was previously fetched in the context of a project which is |
| // no longer watching the CL. |
| // |
| // This can happen in practice in case of e.g. newly created "chromium-mXXX" |
| // project to watch for a specific ref which was previously watched by a |
| // generic "chromium" project. A Snapshot of a CL on such a ref would have |
| // been fetched in the context of "chromium" first, and now it must be re-fetched |
| // under "chromium-mXXX" to verify that the new project hasn't lost access |
| // to the Gerrit CL. |
| logging.Warningf(ctx, "Detected switch from %q LUCI project", cl.Snapshot.GetLuciProject()) |
| return false, toUpdate, nil |
| } |
| |
| // At this point, these must be true: |
| // * the Snapshot is up-to-date to the best of CV knowledge; |
| // * this project is watching the CL, but there may be other projects, too; |
| // * the Snapshot was created by a project still watching the CL, but which may |
| // differ from this project. |
| if len(acfg.GetProjects()) >= 2 { |
| // When there are several watching projects, projects shouldn't race |
| // re-fetching & saving Snapshot. No new Runs are going to be started on |
| // such CLs, so skip fetching new snapshot. |
| return true, toUpdate, nil |
| } |
| |
| // There is just 1 project, so check the invariant. |
| if task.GetLuciProject() != cl.Snapshot.GetLuciProject() { |
| panic(fmt.Errorf("BUG: this project %q must have created the Snapshot, not %q", task.GetLuciProject(), cl.Snapshot.GetLuciProject())) |
| } |
| |
| if restriction := cl.Access.GetByProject()[task.GetLuciProject()]; restriction != nil { |
| // For example, Gerrit has responded HTTP 403/404 before. |
| // Must fetch again to verify if restriction still holds. |
| logging.Debugf(ctx, "Detected prior access restriction: %s", restriction) |
| return false, toUpdate, nil |
| } |
| |
| // Finally, do refresh if the CL entity is just really old and the meta rev |
| // id is unset. |
| switch { |
| case hintedRevID != "": |
| // skip the fetch if the meta rev id is the same as the rev id of the stored |
| // snapshot. |
| case clock.Since(ctx, cl.UpdateTime) > autoRefreshAfter: |
| // Strictly speaking, cl.UpdateTime isn't just changed on refresh, but |
| // also whenever Run starts/ends. However, the start of Run is usually |
| // happenening right after recent refresh, and end of Run is usually |
| // followed by the refresh. |
| return false, toUpdate, nil |
| } |
| |
| // OK, skip the fetch. |
| return true, toUpdate, nil |
| } |
| |
| func (*Updater) preload(ctx context.Context, task *UpdateCLTask) (*CL, error) { |
| if task.GetLuciProject() == "" { |
| return nil, errors.New("invalid task input: LUCI project must be given") |
| } |
| eid := ExternalID(task.GetExternalId()) |
| id := common.CLID(task.GetId()) |
| switch { |
| case id != 0: |
| cl := &CL{ID: common.CLID(id)} |
| switch err := datastore.Get(ctx, cl); { |
| case err == datastore.ErrNoSuchEntity: |
| return nil, errors.Annotate(err, "CL %d %q doesn't exist in Datastore", id, task.GetExternalId()).Err() |
| case err != nil: |
| return nil, errors.Annotate(err, "failed to load CL %d", id).Tag(transient.Tag).Err() |
| case eid != "" && eid != cl.ExternalID: |
| return nil, errors.Reason("invalid task input: CL %d actually has %q ExternalID, not %q", id, cl.ExternalID, eid).Err() |
| default: |
| return cl, nil |
| } |
| case eid == "": |
| return nil, errors.Reason("invalid task input: either internal ID or ExternalID must be given").Err() |
| default: |
| switch cl, err := eid.Load(ctx); { |
| case err != nil: |
| return nil, errors.Annotate(err, "failed to load CL %q", eid).Tag(transient.Tag).Err() |
| case cl == nil: |
| // New CL to be created. |
| return &CL{ |
| ExternalID: eid, |
| ID: 0, // will be populated later. |
| EVersion: 0, |
| }, nil |
| default: |
| return cl, nil |
| } |
| } |
| } |
| |
| func (u *Updater) backendFor(cl *CL) (UpdaterBackend, error) { |
| kind, err := cl.ExternalID.kind() |
| if err != nil { |
| return nil, err |
| } |
| u.rwmutex.RLock() |
| defer u.rwmutex.RUnlock() |
| if b, exists := u.backends[kind]; exists { |
| return b, nil |
| } |
| return nil, errors.Reason("%q backend is not supported", kind).Err() |
| } |
| |
| // makeTaskDeduplicationKey returns TQ task deduplication key. |
| func makeTaskDeduplicationKey(ctx context.Context, t *UpdateCLTask, delay time.Duration) string { |
| var sb strings.Builder |
| sb.WriteString("v0") |
| sb.WriteRune('\n') |
| sb.WriteString(t.GetLuciProject()) |
| sb.WriteRune('\n') |
| |
| // Prefer ExternalID if both ID and ExternalID are known, as the most frequent |
| // use-case for update via PubSub/Polling, which specifies ExternalID and may |
| // not resolve it to internal ID just yet. |
| uniqArg := t.GetExternalId() |
| if uniqArg == "" { |
| uniqArg = strconv.FormatInt(t.GetId(), 16) |
| } |
| sb.WriteString(uniqArg) |
| |
| // If the meta rev ID is set, dedup with a time window isn't necessary. |
| // 1) Gerrit guarantees one publish for each of CL update events. |
| // 2) # of redelivered messages should be low enough to ignore. |
| // 3) If the same message is redelivered multiple times, the backend |
| // will skip fetching the snapshot after the first message. |
| // 4) If it's concerned that retries can fast burn out Gerrit quota, |
| // pubsub retry config should be tuned, instead. |
| if revID := t.GetHint().GetMetaRevId(); revID != "" { |
| _, _ = fmt.Fprintf(&sb, "\n%s", revID) |
| return sb.String() |
| } |
| |
| // Dedup in the short term to avoid excessive number of refreshes, |
| // but ensure eventually calling Schedule with the same payload results in a |
| // new task. This is done by de-duping only within a single "epoch" window, |
| // which differs by CL to avoid synchronized herd of requests hitting |
| // a backend (e.g. Gerrit). |
| // |
| // +----------------------------------------------------------------------+ |
| // | ... -> time goes forward -> .... | |
| // +----------------------------------------------------------------------+ |
| // | | |
| // | ... | epoch (N-1, CL-A) | epoch (N, CL-A) | epoch (N+1, CL-A) | ... | |
| // | | |
| // | ... | epoch (N-1, CL-B) | epoch (N, CL-B) | ... | |
| // +----------------------------------------------------------------------+ |
| // |
| // Furthermore, de-dup window differs based on whether updatedHint is given |
| // or it's a blind refresh. |
| interval := blindRefreshInterval |
| if t.GetHint().GetExternalUpdateTime() != nil { |
| interval = knownRefreshInterval |
| } |
| epochOffset := common.DistributeOffset(interval, "update-cl", t.GetLuciProject(), uniqArg) |
| epochTS := clock.Now(ctx).Add(delay).Truncate(interval).Add(interval + epochOffset) |
| _, _ = fmt.Fprintf(&sb, "\n%x", epochTS.UnixNano()) |
| if h := t.GetHint().GetExternalUpdateTime(); h != nil { |
| _, _ = fmt.Fprintf(&sb, "\n%x", h.AsTime().UnixNano()) |
| } |
| return sb.String() |
| } |
| |
| // makeTQTitleForHumans makes human-readable TQ task title. |
| // |
| // WARNING: do not use for anything else. Doesn't guarantee uniqueness. |
| // |
| // It will be visible in logs as the suffix of URL in Cloud Tasks console and |
| // in the GAE requests log. |
| // |
| // The primary purpose is that quick search for specific CL in the GAE request |
| // log alone, as opposed to searching through much larger and separate stderr |
| // log of the process (which is where logging.Logf calls go into). |
| // |
| // For example, |
| // |
| // "proj/gerrit/chromium/1111111/u2016-02-03T04:05:06Z/deadbeef" |
| // "proj/gerrit/chromium/1111111/u2016-02-03T04:05:06Z" |
| // "proj/gerrit/chromium/1111111/deadbeef" |
| func makeTQTitleForHumans(t *UpdateCLTask) string { |
| var sb strings.Builder |
| sb.WriteString(t.GetLuciProject()) |
| if id := t.GetId(); id != 0 { |
| _, _ = fmt.Fprintf(&sb, "/%d", id) |
| } |
| if eid := t.GetExternalId(); eid != "" { |
| sb.WriteRune('/') |
| // Reduce verbosity in common case of Gerrit on googlesource. |
| // Although it's possible to delegate this to backend, the additional |
| // boilerplate isn't yet justified. |
| if kind, err := ExternalID(eid).kind(); err == nil && kind == "gerrit" { |
| eid = strings.Replace(eid, "-review.googlesource.com/", "/", 1) |
| } |
| sb.WriteString(eid) |
| } |
| if hintedTS := t.GetHint().GetExternalUpdateTime(); hintedTS != nil { |
| sb.WriteString("/u") |
| sb.WriteString(hintedTS.AsTime().UTC().Format(time.RFC3339)) |
| } |
| if hintedRevID := t.GetHint().GetMetaRevId(); hintedRevID != "" { |
| sb.WriteString("/") |
| sb.WriteString(hintedRevID) |
| } |
| return sb.String() |
| } |
| |
| const maxDepsLoadingBatchSize = 100 |
| |
| func resolveDeps(ctx context.Context, luciProject string, deps map[ExternalID]DepKind) ([]resolvingDep, error) { |
| eids := make([]ExternalID, 0, len(deps)) |
| ret := make([]resolvingDep, 0, len(deps)) |
| for eid, kind := range deps { |
| eids = append(eids, eid) |
| ret = append(ret, resolvingDep{eid: eid, kind: kind}) |
| } |
| |
| ids, err := Lookup(ctx, eids) |
| if err != nil { |
| return nil, err |
| } |
| depCLs := make([]CL, 0, maxDepsLoadingBatchSize) |
| depCLIndices := make([]int, 0, maxDepsLoadingBatchSize) |
| for i, id := range ids { |
| if id > 0 { |
| cl := CL{ID: id} |
| depCLs = append(depCLs, cl) |
| depCLIndices = append(depCLIndices, i) |
| ret[i].resolvedDep = &Dep{Clid: int64(id), Kind: ret[i].kind} |
| } |
| if len(depCLs) == maxDepsLoadingBatchSize || (len(depCLs) > 0 && i == len(ids)-1) { |
| // cut a batch if max is reached or end of ids. |
| if err := datastore.Get(ctx, depCLs); err != nil { |
| // Mark error as transient because by this time, all CLIDs should have |
| // corresponding CL entities in datastore. |
| return nil, errors.Annotate(err, "failed to load %d CLs", len(depCLs)).Tag(transient.Tag).Err() |
| } |
| for j, depCL := range depCLs { |
| ret[depCLIndices[j]].ready = !depNeedsRefresh(ctx, depCL, luciProject) |
| } |
| depCLs = depCLs[:0] |
| depCLIndices = depCLIndices[:0] |
| } |
| } |
| return ret, nil |
| } |
| |
| // resolvingDep represents a dependency known by its external ID only being |
| // resolved. |
| // |
| // Helper struct for the Updater.ResolveAndScheduleDeps. |
| type resolvingDep struct { |
| eid ExternalID |
| kind DepKind |
| ready bool // true if already up to date and .dep is populated. |
| resolvedDep *Dep // if nil, use createIfNotExists() to populate |
| } |
| |
| func (d *resolvingDep) createIfNotExists(ctx context.Context, m *Mutator, luciProject string) error { |
| if d.resolvedDep != nil { |
| return nil // already exists |
| } |
| cl, err := m.Upsert(ctx, luciProject, d.eid, func(cl *CL) error { |
| // TODO: somehow record when CL was inserted to put a boundary on how long |
| // Project Manager should be waiting for the dep to be actually fetched & |
| // its entity updated in Datastore. |
| if cl.EVersion > 0 { |
| // If CL already exists, we don't need to modify it % above comment. |
| return ErrStopMutation |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| d.resolvedDep = &Dep{Clid: int64(cl.ID), Kind: d.kind} |
| return nil |
| } |
| |
| func (d *resolvingDep) schedule(ctx context.Context, u *Updater, luciProject string, requester UpdateCLTask_Requester) error { |
| return u.Schedule(ctx, &UpdateCLTask{ |
| ExternalId: string(d.eid), |
| Id: d.resolvedDep.GetClid(), |
| LuciProject: luciProject, |
| Requester: requester, |
| IsForDep: true, |
| }) |
| } |
| |
| // sortDeps sorts given slice by CLID ASC in place and returns it. |
| func sortDeps(deps []*Dep) []*Dep { |
| sort.Slice(deps, func(i, j int) bool { |
| return deps[i].GetClid() < deps[j].GetClid() |
| }) |
| return deps |
| } |
| |
| // depNeedsRefresh returns true if the dependency CL needs a refresh in the |
| // context of a specific LUCI project. |
| func depNeedsRefresh(ctx context.Context, dep CL, luciProject string) bool { |
| switch { |
| case dep.Snapshot == nil: |
| return true |
| case dep.Snapshot.GetOutdated() != nil: |
| return true |
| case dep.Snapshot.GetLuciProject() != luciProject: |
| return true |
| default: |
| return false |
| } |
| } |