package updater
import (
gerritpb ""
// fetcher efficiently computes new snapshot by fetching data from Gerrit.
// It ensures each dependency is resolved to an existing CLID,
// creating CLs in datastore as needed. Schedules tasks to update
// dependencies but doesn't wait for them to complete.
// The prior Snapshot, if given, can reduce RPCs made to Gerrit.
type fetcher struct {
pm PM
rm RM
scheduleDepRefresh func(ctx context.Context, p *RefreshGerritCL) error
luciProject string
host string
change int64
updatedHint time.Time
forceNotifyPM bool
g gerrit.CLReaderClient
externalID changelist.ExternalID
priorCL *changelist.CL
toUpdate changelist.UpdateFields
func (f *fetcher) update(ctx context.Context, clidHint common.CLID) (err error) {
// Check if CL already exists in Datastore.
if clidHint != 0 {
f.priorCL = &changelist.CL{ID: clidHint}
err = datastore.Get(ctx, f.priorCL)
} else {
f.priorCL, err = f.externalID.Get(ctx)
switch {
case err == datastore.ErrNoSuchEntity:
if clidHint != 0 {
return errors.Reason("clidHint %d doesn't refer to an existing CL (%s)", clidHint, f).Err()
f.priorCL = nil
err = f.fetchNew(ctx)
case err != nil:
return err
case f.priorCL.Snapshot == nil:
// CL exists, but without snapshot, usually because it was created as
// dependency of another CL.
err = f.fetchNew(ctx)
case f.updatedHint.IsZero():
logging.Debugf(ctx, "force updating %s", f)
case f.updatedHint.After(f.priorCL.Snapshot.GetExternalUpdateTime().AsTime()):
// Either force update or updatedHint is after the snapshot we already have.
// NOTE: ideally, we'd check whether the current project is watching the
// (host,repo,ref) which is stored in a snapshot. Unfortunately, ref is not
// immutable after a change creation, see Gerrit move API which is actually
// being used to transition to the main branch:
// Therefore, here, proceed to fetchng snapshot.
err = f.fetchExisting(ctx)
// Snapshot considered up-to-date, check if we can skip updates.
ci := f.priorCL.Snapshot.GetGerrit().GetInfo()
var acfg *changelist.ApplicableConfig
switch acfg, err = gobmap.Lookup(ctx,, ci.GetProject(), ci.GetRef()); {
case err != nil:
return err
case !f.priorCL.ApplicableConfig.SemanticallyEqual(acfg):
// Only update CL.ApplicableConfig in datastore iff it's materially different,
// since timestamp is almost guaranteed to be newer.
f.toUpdate.ApplicableConfig = acfg
if acfg.HasOnlyProject(f.luciProject) && f.priorCL.Snapshot.GetLuciProject() != f.luciProject {
// Snapshot considered up-to-date, but fetched in the context of a wrong project.
// Must re-fetch. It's OK to re-use prior snapshot so long as read access
// to Gerrit is verified.
logging.Warningf(ctx, "%s switches from %q to %q LUCI project", f, f.priorCL.Snapshot.GetLuciProject(), f.luciProject)
err = f.fetchExisting(ctx)
} else {
// Can indeed skip the update.
switch {
case err != nil:
return err
case f.toUpdate.IsEmpty():
if f.priorCL == nil {
panic("update can be skipped iff priorCL is set")
if f.forceNotifyPM {
return, f.luciProject, f.priorCL.ID, f.priorCL.EVersion)
return nil
return changelist.Update(
f.externalID, f.clidIfKnown(),
func(ctx context.Context, cl *changelist.CL) error {
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error {
return, f.luciProject, cl.ID, cl.EVersion)
// Generally, a CL will have only one Run at a time. Hence, use
// unbounded parallelism here.
for _, rid := range cl.IncompleteRuns {
rid := rid
eg.Go(func() error {
return f.rm.NotifyCLUpdated(ectx, rid, cl.ID, cl.EVersion)
return eg.Wait()
// fetchExisting efficiently fetches new snapshot from Gerrit,
// but it may re-use data from prior snapshot.
func (f *fetcher) fetchExisting(ctx context.Context) error {
// TODO(tandrii): actually do this efficiently.
return f.fetchNew(ctx)
// fetchNew efficiently fetches fetchNew snapshot from Gerrit.
func (f *fetcher) fetchNew(ctx context.Context) error {
ci, err := f.fetchChangeInfo(ctx,
// These are expensive to compute for Gerrit,
// CV should not do this needlessly.
// Avoid asking Gerrit to perform expensive operation.
if err != nil || ci == nil {
return err
f.toUpdate.Snapshot = &changelist.Snapshot{
LuciProject: f.luciProject,
ExternalUpdateTime: ci.GetUpdated(),
Kind: &changelist.Snapshot_Gerrit{
Gerrit: &changelist.Gerrit{
Info: ci,
return f.fetchPostChangeInfo(ctx, ci)
func (f *fetcher) fetchPostChangeInfo(ctx context.Context, ci *gerritpb.ChangeInfo) error {
min, cur, err := gerrit.EquivalentPatchsetRange(ci)
if err != nil {
return errors.Annotate(err, "failed to compute equivalent patchset range on %s", f).Err()
f.toUpdate.Snapshot.MinEquivalentPatchset = int32(min)
f.toUpdate.Snapshot.Patchset = int32(cur)
switch ci.GetStatus() {
case gerritpb.ChangeStatus_NEW:
// OK, proceed.
case gerritpb.ChangeStatus_ABANDONED, gerritpb.ChangeStatus_MERGED:
logging.Debugf(ctx, "%s is %s", f, ci.GetStatus())
return nil
logging.Warningf(ctx, "%s has unknown status %d %s", f, ci.GetStatus().Number(), ci.GetStatus().String())
return nil
if f.priorSnapshot().GetGerrit().GetInfo().GetCurrentRevision() == f.mustHaveCurrentRevision() {
// Re-use past results since CurrentRevision is the same.
f.toUpdate.Snapshot.GetGerrit().Files = f.priorSnapshot().GetGerrit().GetFiles()
f.toUpdate.Snapshot.GetGerrit().GitDeps = f.priorSnapshot().GetGerrit().GetGitDeps()
// NOTE: CQ-Depend deps are fixed per revision. Once soft deps are accepted
// via hashtags or topics, the re-use won't be possible.
f.toUpdate.Snapshot.GetGerrit().SoftDeps = f.priorSnapshot().GetGerrit().GetSoftDeps()
} else {
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error { return f.fetchFiles(ectx) })
eg.Go(func() error { return f.fetchRelated(ectx) })
// Meanwhile, compute soft deps. Currently, it's cheap operation.
// In the future, it may require sending another RPC to Gerrit,
// e.g. to fetch related CLs by topic.
if err = f.setSoftDeps(); err != nil {
return err
if err = eg.Wait(); err != nil {
return err
// Always run resolveDeps regardless of re-use of GitDeps/SoftDeps.
// CV retention policy deletes CLs not modified for a long time,
// which in some very rare case may affect a dep of this CL.
// TODO(tandrii): remove such risk by force-updating dep CLs to prevent
// retention policy from wiping them out.
if err := f.resolveDeps(ctx); err != nil {
return err
return nil
// fetchChangeInfo fetches newest ChangeInfo from Gerrit.
// * handles permission errors
// * verifies fetched data isn't definitely stale.
// * checks that current LUCI project is still watching the change.
// Returns nil ChangeInfo if no further fetching should proceed.
func (f *fetcher) fetchChangeInfo(ctx context.Context, opts ...gerritpb.QueryOption) (*gerritpb.ChangeInfo, error) {
setNoAccess := func() {
f.toUpdate.AddDependentMeta = &changelist.DependentMeta{
ByProject: map[string]*changelist.DependentMeta_Meta{
f.luciProject: {
NoAccess: true,
UpdateTime: timestamppb.New(clock.Now(ctx)),
// Avoid querying Gerrit iff current project doesn't watch the given host,
// which should be treated as PermissionDenied.
switch watched, err := f.isHostWatched(ctx); {
case err != nil:
return nil, err
case !watched:
logging.Warningf(ctx, "Gerrit host %q is not watched by project %q [%s]",, f.luciProject, f)
return nil, nil
if err := f.ensureGerritClient(ctx); err != nil {
return nil, err
ci, err := f.g.GetChange(ctx, &gerritpb.GetChangeRequest{
Number: f.change,
Project: f.gerritProjectIfKnown(),
Options: opts,
switch grpcutil.Code(err) {
case codes.OK:
if err := f.ensureNotStale(ctx, ci.GetUpdated()); err != nil {
return nil, err
case codes.NotFound, codes.PermissionDenied:
// Either no access OR CL was deleted.
return nil, nil
return nil, gerrit.UnhandledError(ctx, err, "failed to fetch %s", f)
f.toUpdate.ApplicableConfig, err = gobmap.Lookup(ctx,, ci.GetProject(), ci.GetRef())
switch {
case err != nil:
return nil, err
case !f.toUpdate.ApplicableConfig.HasProject(f.luciProject):
logging.Warningf(ctx, "%s is not watched by the %q project", f, f.luciProject)
return nil, nil
return ci, nil
// fetchRelated fetches related changes and computes GerritGitDeps.
func (f *fetcher) fetchRelated(ctx context.Context) error {
if err := f.ensureGerritClient(ctx); err != nil {
return err
resp, err := f.g.GetRelatedChanges(ctx, &gerritpb.GetRelatedChangesRequest{
Number: f.change,
Project: f.gerritProjectIfKnown(),
RevisionId: f.mustHaveCurrentRevision(),
switch code := grpcutil.Code(err); code {
case codes.OK:
f.setGitDeps(ctx, resp.GetChanges())
return nil
case codes.PermissionDenied, codes.NotFound:
// Getting this right after successfully fetching ChangeInfo should
// typically be due to eventual consistency of Gerrit, and rarely due to
// change of ACLs. So, err transiently s.t. retry handles the same error
// when re-fetching ChangeInfo.
return errors.Annotate(err, "failed to fetch related changes for %s", f).Tag(transient.Tag).Err()
return gerrit.UnhandledError(ctx, err, "failed to fetch related changes for %s", f)
// setGitDeps sets GerritGitDeps based on list of related changes provided by
// Gerrit.GetRelatedChanges RPC.
// If GetRelatedChanges output is invalid, doesn't set GerritGitDep and adds an
// appropriate CLError to Snapshot.Errors.
func (f *fetcher) setGitDeps(ctx context.Context, related []*gerritpb.GetRelatedChangesResponse_ChangeAndCommit) {
// Gerrit does not provide API that returns just the changes which a given
// change depends on, but has the API call that returns the following changes:
// (1) those on which this change depends, transitively. Among these,
// some CLs may have been already merged.
// (2) this change itself, with its commit and parent(s) hashes
// (3) changes which depend on this change transitively
// We need (1).
if len(related) == 0 {
// Gerrit may not bother to return the CL itself if there are no related
// changes.
this, clErr := f.matchCurrentAmongRelated(ctx, related)
if clErr != nil {
f.toUpdate.Snapshot.Errors = append(f.toUpdate.Snapshot.Errors, clErr)
// Construct a map from revision to a list of changes that it represents.
// One may think that list is not necessary:
// two CLs with the same revision should (% sha1 collision) have equal
// commit messages, and hence Change-Id, so should be really the same CL.
// However, many Gerrit projects do not require Change-Id in commit message at
// upload time, instead generating new Change-Id on the fly.
byRevision := make(map[string][]*gerritpb.GetRelatedChangesResponse_ChangeAndCommit, len(related))
for _, r := range related {
rev := r.GetCommit().GetId()
byRevision[rev] = append(byRevision[rev], r)
thisParentsCount := f.countRelatedWhichAreParents(this, byRevision)
if thisParentsCount == 0 {
// Quick exit if there are no dependencies of this change (1), only changes
// depending on this change (3).
// Now starting from `this` change and following parents relation,
// find all issues that we can reach via breadth first traversal ordeded by
// distance from this CL.
// Note that diamond-shaped child->[parent1, parent2]->grantparent are
// probably possible, so keeping track of visited commits is required.
// Furthermore, the same CL number may appear multiple times in the chain
// under different revisions (patchsets).
visitedRevs := stringset.New(len(related))
ordered := make([]*gerritpb.GetRelatedChangesResponse_ChangeAndCommit, 0, len(related))
curLevel := make([]*gerritpb.GetRelatedChangesResponse_ChangeAndCommit, 0, len(related))
nextLevel := make([]*gerritpb.GetRelatedChangesResponse_ChangeAndCommit, 1, len(related))
nextLevel[0] = this
for len(nextLevel) > 0 {
curLevel, nextLevel = nextLevel, curLevel[:0]
// For determinism of the output.
sort.SliceStable(curLevel, func(i, j int) bool {
return curLevel[i].GetNumber() < curLevel[j].GetNumber()
ordered = append(ordered, curLevel...)
for _, r := range curLevel {
for _, p := range r.GetCommit().GetParents() {
switch prs := byRevision[p.GetId()]; {
case len(prs) == 0:
case len(prs) > 1:
"Gerrit.GetRelatedChanges returned rev %q %d times for %s (ALL Related %s)",
p.GetId(), len(prs), f, related)
// Avoid borking. Take the first CL by number.
for i, x := range prs[1:] {
if prs[0].GetNumber() > x.GetNumber() {
prs[i+1], prs[0] = prs[0], prs[i+1]
if visitedRevs.Add(prs[0].GetCommit().GetId()) {
nextLevel = append(nextLevel, prs[0])
deps := make([]*changelist.GerritGitDep, 0, len(ordered)-1)
// Specific revision doesn't matter, CV always looks at latest revision,
// but since the same CL may have >1 revision, the CL number may be added
// several times into `ordered`.
// TODO(tandrii): after CQDaemon is removed, consider paying attention to
// specific revision of the dependency to notice when parent dep has been
// substantially modified such that tryjobs of this change alone ought to be
// invalidated (see
added := make(map[int64]bool, len(ordered))
for i, r := range ordered[1:] {
n := r.GetNumber()
if added[n] {
added[n] = true
deps = append(deps, &changelist.GerritGitDep{
Change: n,
// By construction of ordered, immediate dependencies must be located at
// ordered[1:1+thisParentsCount], but we are iterating over [1:] subslice.
Immediate: i < thisParentsCount,
f.toUpdate.Snapshot.GetGerrit().GitDeps = deps
func (f *fetcher) matchCurrentAmongRelated(
ctx context.Context, related []*gerritpb.GetRelatedChangesResponse_ChangeAndCommit,
) (*gerritpb.GetRelatedChangesResponse_ChangeAndCommit, *changelist.CLError) {
var this *gerritpb.GetRelatedChangesResponse_ChangeAndCommit
matched := 0
for _, r := range related {
if r.GetNumber() == f.change {
this = r
if matched != 1 {
// Apparently in rare cases, Gerrit may get confused and substitute this CL
// for some other CL in the output (see
msg := fmt.Sprintf(
("Gerrit related changes should return the %s/%d CL itself exactly once, but got %d." +
" Maybe is affecting you?"),, f.change, matched)
logging.Errorf(ctx, "%s Related output: %s", msg, related)
return nil, &changelist.CLError{
Kind: &changelist.CLError_CorruptGerritMetadata{
CorruptGerritMetadata: msg,
return this, nil
func (f *fetcher) countRelatedWhichAreParents(this *gerritpb.GetRelatedChangesResponse_ChangeAndCommit, byRevision map[string][]*gerritpb.GetRelatedChangesResponse_ChangeAndCommit) int {
cnt := 0
for _, p := range this.GetCommit().GetParents() {
// Not all parents may be represented by related CLs.
// OTOH, if there are several CLs matching parent revision,
// CV will choose just one.
if _, ok := byRevision[p.GetId()]; ok {
return cnt
// fetchFiles fetches files for the current revision of the new Snapshot.
func (f *fetcher) fetchFiles(ctx context.Context) error {
if err := f.ensureGerritClient(ctx); err != nil {
return err
resp, err := f.g.ListFiles(ctx, &gerritpb.ListFilesRequest{
Number: f.change,
Project: f.gerritProjectIfKnown(),
RevisionId: f.mustHaveCurrentRevision(),
// For CLs with >1 parent commit (aka merge commits), this relies on Gerrit
// ensuring that such a CL always has first parent from the target branch.
Parent: 1, // Request a diff against the first parent.
switch code := grpcutil.Code(err); code {
case codes.OK:
// Iterate files map and take keys only. CV treats all files "touched" in a
// Change to be interesting, including chmods. Skip special /COMMIT_MSG and
// /MERGE_LIST entries, which aren't files. For example output, see
fs := make([]string, 0, len(resp.GetFiles()))
for f := range resp.GetFiles() {
if !strings.HasPrefix(f, "/") {
fs = append(fs, f)
f.toUpdate.Snapshot.GetGerrit().Files = fs
return nil
case codes.PermissionDenied, codes.NotFound:
// Getting this right after successfully fetching ChangeInfo should
// typically be due to eventual consistency of Gerrit, and rarely due to
// change of ACLs. So, err transiently s.t. retry handles the same error
// when re-fetching ChangeInfo.
return errors.Annotate(err, "failed to fetch files for %s", f).Tag(transient.Tag).Err()
return gerrit.UnhandledError(ctx, err, "failed to fetch files for %s", f)
// setSoftDeps parses CL description and sets soft deps.
func (f *fetcher) setSoftDeps() error {
ci := f.toUpdate.Snapshot.GetGerrit().GetInfo()
msg := ci.GetRevisions()[ci.GetCurrentRevision()].GetCommit().GetMessage()
deps := cqdepend.Parse(msg)
if len(deps) == 0 {
return nil
// Given like "sub-review.x.y.z", compute "-review.x.y.z" suffix.
dot := strings.IndexRune(, '.')
if dot == -1 || !strings.HasSuffix([:dot], "-review") {
return errors.Reason("Host %s doesn't support Cq-Depend (%s)",, f).Err()
hostSuffix :=[dot-len("-review"):]
softDeps := make([]*changelist.GerritSoftDep, len(deps))
for i, d := range deps {
depHost :=
if d.Subdomain != "" {
depHost = d.Subdomain + hostSuffix
softDeps[i] = &changelist.GerritSoftDep{Host: depHost, Change: int64(d.Change)}
f.toUpdate.Snapshot.GetGerrit().SoftDeps = softDeps
return nil
// resolveDeps resolves to CLID and triggers tasks for each of the soft and GerritGit dep.
func (f *fetcher) resolveDeps(ctx context.Context) error {
eids, err := f.depsToExternalIDs()
if err != nil {
return err
lock := sync.Mutex{}
resolved := make([]*changelist.Dep, 0, len(eids))
addDep := func(depCL *changelist.CL, eid changelist.ExternalID, kind changelist.DepKind) error {
resolved = append(resolved, &changelist.Dep{Clid: int64(depCL.ID), Kind: kind})
switch yes, err := depCL.NeedsFetching(ctx, f.luciProject); {
case err != nil:
return err
case yes:
depHost, depChange, err := eid.ParseGobID()
if err != nil {
panic("impossible: by construction, all deps are Gerrit, too")
return f.scheduleDepRefresh(ctx, &RefreshGerritCL{
LuciProject: f.luciProject,
Host: depHost,
Change: depChange,
ClidHint: int64(depCL.ID),
return nil
// TODO(tandrii): optimize for the typical case where each dep is already known
// to CV by sending just 1 multi-Get against CLMap before doing parallel
// GetOrInsert calls.
errs := parallel.WorkPool(10, func(work chan<- func() error) {
for eid, kind := range eids {
eid, kind := eid, kind
work <- func() error {
depCL, err := eid.GetOrInsert(ctx, func(*changelist.CL) {
// TODO(tandrii): somehow record when CL was inserted,
// to put a boundary on how long ProjectManager should wait for
// dependency to be fetched.
if err != nil {
return err
return addDep(depCL, eid, kind)
if errs != nil {
// All errors must be transient. Return any one of them.
return errs.(errors.MultiError).First()
sort.Slice(resolved, func(i, j int) bool {
return resolved[i].GetClid() < resolved[j].GetClid()
f.toUpdate.Snapshot.Deps = resolved
return nil
func (f *fetcher) depsToExternalIDs() (map[changelist.ExternalID]changelist.DepKind, error) {
cqdeps := f.toUpdate.Snapshot.GetGerrit().GetSoftDeps()
gitdeps := f.toUpdate.Snapshot.GetGerrit().GetGitDeps()
// Git deps that are immediate parents of the current CL are HARD deps.
// Since arbitrary Cq-Depend deps may duplicate those of Git,
// avoid accidental downgrading from HARD to SOFT dep by processing Cq-Depend
// first and Git deps second.
eids := make(map[changelist.ExternalID]changelist.DepKind, len(cqdeps)+len(gitdeps))
for _, dep := range cqdeps {
eid, err := changelist.GobID(dep.Host, dep.Change)
if err != nil {
return nil, err
eids[eid] = changelist.DepKind_SOFT
for _, dep := range gitdeps {
eid, err := changelist.GobID(, dep.Change)
if err != nil {
return nil, err
kind := changelist.DepKind_SOFT
if dep.Immediate {
kind = changelist.DepKind_HARD
eids[eid] = kind
return eids, nil
// ensureNotStale returns error if given Gerrit updated timestamp is older than
// the updateHint or existing CL state.
func (f *fetcher) ensureNotStale(ctx context.Context, externalUpdateTime *timestamppb.Timestamp) error {
t := externalUpdateTime.AsTime()
storedTS := f.priorSnapshot().GetExternalUpdateTime()
switch {
case !f.updatedHint.IsZero() && f.updatedHint.After(t):
logging.Warningf(ctx, "Fetched last Gerrit update of %s, but %s expected", t, f.updatedHint)
case storedTS != nil && storedTS.AsTime().After(t):
logging.Warningf(ctx, "Fetched last Gerrit update of %s, but %s was already seen & stored", t, storedTS.AsTime())
return nil
return errStaleData
func (f *fetcher) ensureGerritClient(ctx context.Context) error {
if f.g != nil {
return nil
var err error
f.g, err = gerrit.CurrentClient(ctx,, f.luciProject)
return err
// Checks whether this LUCI project watches any repo on this Gerrit host.
func (f *fetcher) isHostWatched(ctx context.Context) (bool, error) {
meta, err := config.GetLatestMeta(ctx, f.luciProject)
if err != nil {
return false, err
cgs, err := meta.GetConfigGroups(ctx)
if err != nil {
return false, err
for _, cg := range cgs {
for _, g := range cg.Content.GetGerrit() {
if config.GerritHost(g) == {
return true, nil
return false, nil
func (f *fetcher) gerritProjectIfKnown() string {
if project := f.priorSnapshot().GetGerrit().GetInfo().GetProject(); project != "" {
return project
if project := f.toUpdate.Snapshot.GetGerrit().GetInfo().GetProject(); project != "" {
return project
return ""
func (f *fetcher) clidIfKnown() common.CLID {
if f.priorCL != nil {
return f.priorCL.ID
return 0
func (f *fetcher) priorSnapshot() *changelist.Snapshot {
if f.priorCL != nil {
return f.priorCL.Snapshot
return nil
func (f *fetcher) priorCLEversion() int {
if f.priorCL != nil {
return f.priorCL.EVersion
return 0
func (f *fetcher) mustHaveCurrentRevision() string {
switch ci := f.toUpdate.Snapshot.GetGerrit().GetInfo(); {
case ci == nil:
panic("ChangeInfo must be already fetched into toUpdate.Snapshot")
case ci.GetCurrentRevision() == "":
panic("ChangeInfo must have CurrentRevision populated.")
return ci.GetCurrentRevision()
// String is used for debug identification of a fetch in errors and logs.
func (f *fetcher) String() string {
if f.priorCL == nil {
return fmt.Sprintf("CL(%s/%d)",, f.change)
return fmt.Sprintf("CL(%s/%d [%d])",, f.change, f.priorCL.ID)