blob: 1f84b8ea17a1206fe693f53889924332371288a7 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package poller
import (
"context"
"fmt"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
gerritutil "go.chromium.org/luci/common/api/gerrit"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
gerritpb "go.chromium.org/luci/common/proto/gerrit"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/cv/internal/changelist"
"go.chromium.org/luci/cv/internal/common"
"go.chromium.org/luci/cv/internal/gerrit"
"go.chromium.org/luci/cv/internal/gerrit/updater"
)
const (
// fullPollInterval is between querying Gerrit for all changes relevant to CV as
// if from scratch.
fullPollInterval = 30 * time.Minute
// incrementalPollOverlap is safety overlap of time range of Change.Updated
// between two successive polls.
//
// While this doesn't guarantee that CV won't miss changes in-between
// incremental polls, it should mitigate the most common reasons:
// * time skew between CV and Gerrit clocks,
// * hopping between potentially out of sync Gerrit mirrors.
incrementalPollOverlap = time.Minute
// changesPerPoll is how many changes CV will process per poll.
//
// A value that's too low here will first affect full polls, since they have
// to (re-)process all interesting changes watched by a LUCI project.
//
// 10k is OK to fetch sequentially and keep in RAM without OOM-ing,
// and currently enough for each of the LUCI projects.
//
// Higher values may need smarter full polling techniques.
changesPerPoll = 10000
// pageSize is how many changes to request in a single ListChangesRequest.
pageSize = 1000
// moreChangesTrustFactor controls when CV must not trust false value of
// ListChangesResponse.MoreChanges.
//
// Value of 0.5 combined with pageSize of 1000 means that CV will trust
// MoreChanges iff Gerrit returns <= 500 CLs.
//
// For more info, see corresponding field in
// https://godoc.org/go.chromium.org/luci/common/api/gerrit#PagingListChangesOptions
moreChangesTrustFactor = 0.5
)
// subpoll queries Gerrit and updates the state of individual SubPoller.
func (p *Poller) subpoll(ctx context.Context, luciProject string, sp *SubPoller) error {
q := singleQuery{
luciProject: luciProject,
sp: sp,
p: p,
}
var err error
if q.client, err = gerrit.CurrentClient(ctx, sp.GetHost(), luciProject); err != nil {
return err
}
if sp.GetLastFullTime() == nil {
return q.full(ctx)
}
nextFullAt := sp.GetLastFullTime().AsTime().Add(fullPollInterval)
if clock.Now(ctx).Before(nextFullAt) {
return q.incremental(ctx)
}
return q.full(ctx)
}
type singleQuery struct {
luciProject string
pm PM
sp *SubPoller
p *Poller
client gerrit.QueryClient
}
func (q *singleQuery) full(ctx context.Context) error {
ctx = logging.SetFields(ctx, logging.Fields{
"luciProject": q.luciProject,
"poll": "full",
})
started := clock.Now(ctx)
after := started.Add(-common.MaxTriggerAge)
changes, err := q.fetch(ctx, after, buildQuery(q.sp, queryLimited))
// There can be partial result even if err != nil.
switch err2 := q.scheduleTasks(ctx, changes, true); {
case err != nil:
return err
case err2 != nil:
return err2
}
cur := uniqueSortedIDsOf(changes)
if diff := common.DifferenceSorted(q.sp.Changes, cur); len(diff) != 0 {
// `diff` changes are no longer matching the limited query,
// so they probably updated since.
if err := q.p.scheduleRefreshTasks(ctx, q.luciProject, q.sp.GetHost(), diff); err != nil {
return err
}
}
q.sp.Changes = cur
q.sp.LastFullTime = timestamppb.New(started)
q.sp.LastIncrTime = nil
return nil
}
func (q *singleQuery) incremental(ctx context.Context) error {
ctx = logging.SetFields(ctx, logging.Fields{
"luciProject": q.luciProject,
"poll": "incremental",
})
started := clock.Now(ctx)
lastInc := q.sp.GetLastIncrTime()
if lastInc == nil {
if lastInc = q.sp.GetLastFullTime(); lastInc == nil {
panic("must have been a full poll")
}
}
after := lastInc.AsTime().Add(-incrementalPollOverlap)
// Unlike the full poll, query for all changes regardless of status or CQ
// vote. This ensures that CV notices quickly when previously NEW & CQ-ed
// change has either CQ vote removed OR status changed (e.g. submitted or
// abandoned).
changes, err := q.fetch(ctx, after, buildQuery(q.sp, queryAll))
// There can be partial result even if err != nil.
switch err2 := q.scheduleTasks(ctx, changes, false); {
case err != nil:
return err
case err2 != nil:
return err2
}
q.sp.Changes = common.UnionSorted(q.sp.Changes, uniqueSortedIDsOf(changes))
q.sp.LastIncrTime = timestamppb.New(started)
return nil
}
func (q *singleQuery) fetch(ctx context.Context, after time.Time, query string) ([]*gerritpb.ChangeInfo, error) {
opts := gerritutil.PagingListChangesOptions{
Limit: changesPerPoll,
PageSize: pageSize,
MoreChangesTrustFactor: moreChangesTrustFactor,
UpdatedAfter: after,
}
req := gerritpb.ListChangesRequest{
Options: []gerritpb.QueryOption{
gerritpb.QueryOption_SKIP_MERGEABLE,
},
Query: query,
}
resp, err := gerritutil.PagingListChanges(ctx, q.client, &req, opts)
switch grpcutil.Code(err) {
case codes.OK:
if resp.GetMoreChanges() {
logging.Errorf(ctx, "Ignoring oldest changes because reached max (%d) allowed to process per poll", changesPerPoll)
}
return resp.GetChanges(), nil
// TODO(tandrii): handle 403 and 404 if CV lacks access to entire host.
default:
// NOTE: resp may be set if there was partial success in fetching changes
// followed by a typically transient error.
return resp.GetChanges(), gerrit.UnhandledError(ctx, err, "PagingListChanges failed")
}
}
// maxLoadCLBatchSize limits how many CL entities are loaded at once for
// notifying PM.
const maxLoadCLBatchSize = 100
func (q *singleQuery) scheduleTasks(ctx context.Context, changes []*gerritpb.ChangeInfo, forceNotifyPM bool) error {
// TODO(tandrii): optimize by checking if CV is interested in the
// (host,project,ref) these changes come from before triggering tasks.
logging.Debugf(ctx, "scheduling %d CLUpdate tasks", len(changes))
var clids []common.CLID
if forceNotifyPM {
// Objective: make PM aware of all CLs.
// Optimization: avoid RefreshGerritCL with forceNotifyPM=true if possible,
// as this removes ability to de-duplicate.
var err error
eids := make([]changelist.ExternalID, len(changes))
for i, c := range changes {
if eids[i], err = changelist.GobID(q.sp.GetHost(), c.GetNumber()); err != nil {
return err
}
}
clids, err = changelist.Lookup(ctx, eids)
if err != nil {
return err
}
if err := q.p.notifyPMifKnown(ctx, q.luciProject, clids, maxLoadCLBatchSize); err != nil {
return err
}
}
errs := parallel.WorkPool(10, func(work chan<- func() error) {
for i, c := range changes {
payload := &updater.RefreshGerritCL{
LuciProject: q.luciProject,
Host: q.sp.GetHost(),
Change: c.GetNumber(),
UpdatedHint: c.GetUpdated(),
ForceNotifyPm: forceNotifyPM && (clids[i] == 0),
}
work <- func() error {
return q.p.clUpdater.Schedule(ctx, payload)
}
}
})
return common.MostSevereError(errs)
}
func (p *Poller) scheduleRefreshTasks(ctx context.Context, luciProject, host string, changes []int64) error {
logging.Debugf(ctx, "scheduling %d CLUpdate tasks for no longer matched CLs", len(changes))
var err error
eids := make([]changelist.ExternalID, len(changes))
for i, c := range changes {
if eids[i], err = changelist.GobID(host, c); err != nil {
return err
}
}
// Objective: make PM aware of all CLs.
// Optimization: avoid RefreshGerritCL with forceNotifyPM=true if possible,
// as this removes ability to de-duplicate.
// Since all no longer matched CLs are typically already stored in the
// Datastore, get their internal CL IDs and notify PM directly.
clids, err := changelist.Lookup(ctx, eids)
if err != nil {
return err
}
if err := p.notifyPMifKnown(ctx, luciProject, clids, maxLoadCLBatchSize); err != nil {
return err
}
errs := parallel.WorkPool(10, func(work chan<- func() error) {
for i, c := range changes {
payload := &updater.RefreshGerritCL{
LuciProject: luciProject,
Host: host,
Change: c,
ForceNotifyPm: 0 == clids[i], // notify iff CL ID isn't yet known.
}
work <- func() error {
return p.clUpdater.Schedule(ctx, payload)
}
}
})
return common.MostSevereError(errs)
}
// notifyPMifKnown notifies PM to update its CLs for each non-0 CLID.
//
// Obtains EVersion of each CL before notify a PM. Unfortunately, this loads a
// lot of information we don't need, such as Snapshot. So, loads CLs in batches
// to avoid large memory footprint.
//
// In practice, most of these CLs would be already dscache-ed, so loading them
// is fast.
func (p *Poller) notifyPMifKnown(ctx context.Context, luciProject string, clids []common.CLID, maxBatchSize int) error {
cls := make([]*changelist.CL, 0, maxBatchSize)
flush := func() error {
if err := datastore.Get(ctx, cls); err != nil {
return errors.Annotate(common.MostSevereError(err), "failed to load CLs").Tag(transient.Tag).Err()
}
return p.pm.NotifyCLsUpdated(ctx, luciProject, cls)
}
for _, clid := range clids {
switch l := len(cls); {
case clid == 0:
case l == maxBatchSize:
if err := flush(); err != nil {
return err
}
cls = cls[:0]
default:
cls = append(cls, &changelist.CL{ID: clid})
}
}
if len(cls) > 0 {
return flush()
}
return nil
}
type queryKind int
const (
queryLimited queryKind = iota
queryAll
)
// buildQuery returns query string.
//
// If queryLimited, unlike queryAll, searches for NEW CLs with CQ vote.
func buildQuery(sp *SubPoller, kind queryKind) string {
buf := strings.Builder{}
switch kind {
case queryLimited:
buf.WriteString("status:NEW ")
// TODO(tandrii): make label optional to support Tricium use-case.
buf.WriteString("label:Commit-Queue>0 ")
case queryAll:
default:
panic(fmt.Errorf("unknown queryKind %d", kind))
}
// TODO(crbug/1163177): specify `branch:` search term to restrict search to
// specific refs. This requires changing partitioning poller into subpollers,
// but will provide more targeted queries, reducing load on CV & Gerrit.
emitProjectValue := func(p string) {
// Even though it appears to work without, Gerrit doc says project names
// containing / must be surrounded by "" or {}:
// https://gerrit-review.googlesource.com/Documentation/user-search.html#_argument_quoting
buf.WriteRune('"')
buf.WriteString(p)
buf.WriteRune('"')
}
// One of .OrProjects or .CommonProjectPrefix must be set.
switch prs := sp.GetOrProjects(); len(prs) {
case 0:
if sp.GetCommonProjectPrefix() == "" {
panic("partitionConfig function should have ensured this")
}
// project*s* means find matching projects by prefix
buf.WriteString("projects:")
emitProjectValue(sp.GetCommonProjectPrefix())
case 1:
buf.WriteString("project:")
emitProjectValue(prs[0])
default:
buf.WriteRune('(')
for i, p := range prs {
if i > 0 {
buf.WriteString(" OR ")
}
buf.WriteString("project:")
emitProjectValue(p)
}
buf.WriteRune(')')
}
return buf.String()
}
func uniqueSortedIDsOf(changes []*gerritpb.ChangeInfo) []int64 {
if len(changes) == 0 {
return nil
}
out := make([]int64, len(changes))
for i, c := range changes {
out[i] = c.GetNumber()
}
return common.UniqueSorted(out)
}