| // Copyright 2017 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package gerrit |
| |
| import ( |
| "fmt" |
| "sort" |
| "strings" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| ds "go.chromium.org/gae/service/datastore" |
| tq "go.chromium.org/gae/service/taskqueue" |
| "go.chromium.org/luci/auth/identity" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/server/auth" |
| |
| gr "golang.org/x/build/gerrit" |
| "golang.org/x/net/context" |
| |
| "google.golang.org/appengine" |
| |
| "infra/tricium/api/admin/v1" |
| "infra/tricium/api/v1" |
| "infra/tricium/appengine/common" |
| "infra/tricium/appengine/common/config" |
| ) |
| |
| // The timestamp format used by Gerrit (using the reference date). |
| // All timestamps are in UTC. |
| const timeStampLayout = "2006-01-02 15:04:05.000000000" |
| |
| // Datastore schema diagram for tracked Gerrit projects and CLs: |
| // |
| // +------------------+ |
| // |GerritProject | |
| // |id=<host:project> | |
| // +---+--------------+ |
| // | |
| // +---+-----------+ |
| // |Change | |
| // |id=<change ID> | |
| // +---------------+ |
| |
| // Project represents one Gerrit project, which corresponds to one repo. |
| // |
| // Mutable entity. This is used to track the last poll time. |
| // The kind is "GerritProject"; this is not to be confused with |
| // `GerritProject` from the Tricium config schema. |
| type Project struct { |
| ID string `gae:"$id"` |
| Instance string |
| Project string |
| // Timestamp of last successful poll. |
| LastPoll time.Time |
| } |
| |
| // Change represents one CL (one Gerrit change). |
| // |
| // Mutable entity. This is used to track the latest patchset. |
| // It's assumed that entities for inactive changes are cleaned up. |
| type Change struct { |
| ID string `gae:"$id"` |
| Parent *ds.Key `gae:"$parent"` |
| LastRevision string |
| } |
| |
| // byUpdateTime sorts changes based on update timestamps. |
| type byUpdatedTime []gr.ChangeInfo |
| |
| func (c byUpdatedTime) Len() int { return len(c) } |
| func (c byUpdatedTime) Swap(i, j int) { c[i], c[j] = c[j], c[i] } |
| func (c byUpdatedTime) Less(i, j int) bool { return c[i].Updated.Time().Before(c[i].Updated.Time()) } |
| |
| // poll adds a task to poll Gerrit for each project. |
| func poll(c context.Context, cp config.ProviderAPI) error { |
| projects, err := cp.GetAllProjectConfigs(c) |
| if err != nil { |
| return errors.Annotate(err, "failed to get all service configs").Err() |
| } |
| |
| // Sort the names so that they are processed in a deterministic order. |
| names := make([]string, 0, len(projects)) |
| for name := range projects { |
| names = append(names, name) |
| } |
| sort.Strings(names) |
| |
| var tasks []*tq.Task |
| for _, name := range names { |
| task := tq.NewPOSTTask("/gerrit/internal/poll-project", nil) |
| bytes, err := proto.Marshal(&admin.PollProjectRequest{Project: name}) |
| if err != nil { |
| return errors.Annotate(err, "failed to marshal PollProjectRequest").Err() |
| } |
| task.Payload = bytes |
| tasks = append(tasks, task) |
| } |
| if err := tq.Add(c, common.PollProjectQueue, tasks...); err != nil { |
| return errors.Annotate(err, "failed to enqueue poll project requests").Err() |
| } |
| return nil |
| } |
| |
| // pollProject polls for new changes for all repos in one LUCI project. |
| func pollProject(c context.Context, name string, gerrit API, cp config.ProviderAPI) error { |
| // Separate repos in one project can be processed in parallel in one request. |
| pc, err := cp.GetProjectConfig(c, name) |
| if err != nil { |
| return errors.Annotate(err, "failed to get project config for %q", name).Err() |
| } |
| return parallel.FanOutIn(func(taskC chan<- func() error) { |
| for _, repo := range pc.Repos { |
| repo := repo // Make a separate variable for use in the closure below. |
| if repo.GetGerritProject() != nil { |
| taskC <- func() error { |
| return pollGerritProject(c, name, repo, gerrit) |
| } |
| } |
| } |
| }) |
| } |
| |
| // pollGerritProject polls for changes for one Gerrit project. |
| // |
| // One Gerrit project corresponds to one repo; however one LUCI project |
| // (with one Tricium project config) may include multiple Gerrit projects. |
| // |
| // This function could be run concurrently and two parallel runs may query |
| // Gerrit using the same last poll time. This scenario would lead to duplicate |
| // analyze tasks. This is OK because the analyze task queue handler will check |
| // for existing active runs for a change before moving tasks further along the |
| // pipeline. |
| // |
| // Each poll to a Gerrit host and project is logged with a timestamp and last |
| // seen revisions (within the same second). The timestamp of the most recent |
| // change in the last poll is used in the next poll, as the value of "after" |
| // in the query string. If no previous poll has been logged, then a time |
| // corresponding to zero is used (time.Time{}). |
| func pollGerritProject(c context.Context, luciProject string, repo *tricium.RepoDetails, gerrit API) error { |
| gerritProject := repo.GetGerritProject() |
| |
| // Get last poll data for the given host/project. |
| p := &Project{ID: gerritProjectID(gerritProject.Host, gerritProject.Project)} |
| if err := ds.Get(c, p); err != nil { |
| if err != ds.ErrNoSuchEntity { |
| return errors.Annotate(err, "failed to get Project entity").Err() |
| } |
| logging.Fields{ |
| "gerritProject": p.ID, |
| }.Infof(c, "Found no previous poll for gerrit project.") |
| err = nil |
| p.Instance = gerritProject.Host |
| p.Project = gerritProject.Project |
| } |
| |
| // If there is no previous poll, store current time and return. |
| if p.LastPoll.IsZero() { |
| logging.Infof(c, "No previous poll for %s/%s. Storing current timestamp and stopping.", |
| gerritProject.Host, gerritProject.Project) |
| p.ID = gerritProjectID(gerritProject.Host, gerritProject.Project) |
| p.Instance = gerritProject.Host |
| p.Project = gerritProject.Project |
| p.LastPoll = clock.Now(c).UTC() |
| logging.Debugf(c, "Storing project data: %+v", p) |
| if err := ds.Put(c, p); err != nil { |
| return errors.Annotate(err, "failed to store Project entity").Err() |
| } |
| return nil |
| } |
| |
| logging.Debugf(c, "Last poll for %q: %s", p.ID, p.LastPoll) |
| |
| // Query for changes updated since last poll. Even though Gerrit supports |
| // pagination, we only make one request for a list of changes to avoid using |
| // too much memory and time. During super-busy times, this may occasionally |
| // mean that some revisions are skipped. |
| // TODO(crbug.com/915842): We could add another task to the task queue to poll |
| // for the next page, to avoid skipping revisions. |
| changes, more, err := gerrit.QueryChanges(c, p.Instance, p.Project, p.LastPoll) |
| if err != nil { |
| return errors.Annotate(err, "failed to query for change").Err() |
| } |
| if more { |
| logging.Warningf(c, "There were changes beyond the limit %d. Some will be skipped.", maxChanges) |
| } |
| |
| changes = filterChangesWithFlags(changes) |
| |
| // No changes found. |
| if len(changes) == 0 { |
| logging.Infof(c, "Poll done for %q. No changes found.", luciProject) |
| return nil |
| } |
| |
| // Make sure changes are sorted (most recent change first). |
| // This is used to move the poll pointer forward and avoid polling for |
| // the same changes more than once. There may still be an overlap but |
| // the tracking of change state should be update between polls (and is |
| // also guarded by a transaction). |
| sort.Sort(sort.Reverse(byUpdatedTime(changes))) |
| |
| // Extract updates. |
| diff, uchanges, dchanges, err := extractUpdates(c, p, changes) |
| if err != nil { |
| return errors.Annotate(err, "failed to extract updates").Err() |
| } |
| |
| // Store updated tracking data. |
| if err := ds.RunInTransaction(c, func(c context.Context) error { |
| return parallel.FanOutIn(func(taskC chan<- func() error) { |
| // Update existing changes and add new ones. |
| taskC <- func() error { |
| if len(uchanges) == 0 { |
| return nil |
| } |
| return ds.Put(c, uchanges) |
| } |
| |
| // Delete removed changes. |
| taskC <- func() error { |
| if len(dchanges) == 0 { |
| return nil |
| } |
| if err := ds.Delete(c, dchanges); err != nil { |
| if me, ok := err.(appengine.MultiError); ok { |
| for _, merr := range me { |
| if merr != ds.ErrNoSuchEntity { |
| // Some error other than entity not found, report. |
| return err |
| } |
| } |
| } else { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // Update poll timestamp. |
| taskC <- func() error { |
| p.LastPoll = changes[0].Updated.Time() |
| if err := ds.Put(c, p); err != nil { |
| return errors.Annotate(err, "failed to update last poll timestamp").Err() |
| } |
| return nil |
| } |
| }) |
| }, &ds.TransactionOptions{XG: true}); err != nil { |
| return err |
| } |
| logging.Infof(c, "Poll done for %q. Processed %d change(s).", luciProject, len(changes)) |
| |
| // Convert diff to Analyze requests. |
| // |
| // Running after the transaction because each seen change will result in one |
| // enqueued task and there is a limit on the number of action in a transaction. |
| return enqueueAnalyzeRequests(c, luciProject, repo, diff) |
| } |
| |
| // filterChangesWithFlags filters out the changes where the current revision's footer |
| // contains the flag that indicates Tricium should be skipped. |
| func filterChangesWithFlags(changes []gr.ChangeInfo) []gr.ChangeInfo { |
| var filtered = changes[:0] |
| var falseValues = []string{"disable", "skip", "no", "none", "false"} |
| for _, change := range changes { |
| var flags map[string]string |
| |
| if change.Revisions[change.CurrentRevision].Commit != nil { |
| flags = extractFooterFlags(change.Revisions[change.CurrentRevision].Commit.Message) |
| } |
| value, _ := flags["tricium"] |
| |
| isFalse := false |
| for _, falseVal := range falseValues { |
| if value == falseVal { |
| isFalse = true |
| break |
| } |
| } |
| |
| if !isFalse { |
| filtered = append(filtered, change) |
| } |
| } |
| return filtered |
| } |
| |
| // extractUpdates extracts change updates, which determines what to analyze. |
| // |
| // Compares stored tracked changes with those found in the poll. Takes as |
| // input the list of changes found in the poll. |
| // |
| // Returns the Gerrit changes to analyze, as well as the Change entities |
| // to update (re-put) and remove. |
| func extractUpdates(c context.Context, p *Project, pollChanges []gr.ChangeInfo) ([]gr.ChangeInfo, []*Change, []*Change, error) { |
| var diff []gr.ChangeInfo |
| var uchanges []*Change |
| var dchanges []*Change |
| |
| // Get list of tracked changes. |
| pkey := ds.NewKey(c, "GerritProject", p.ID, 0, nil) |
| var trackedChanges []*Change |
| for _, change := range pollChanges { |
| trackedChanges = append(trackedChanges, &Change{ID: change.ID, Parent: pkey}) |
| } |
| if err := ds.Get(c, trackedChanges); err != nil { |
| if me, ok := err.(errors.MultiError); ok { |
| for _, merr := range me { |
| if merr != nil && merr != ds.ErrNoSuchEntity { |
| return diff, uchanges, dchanges, err |
| } |
| } |
| } else if err != ds.ErrNoSuchEntity { |
| logging.WithError(err).Errorf(c, "Getting tracked changes failed.") |
| return diff, uchanges, dchanges, err |
| } |
| } |
| |
| // Create a map of tracked changes stored in the datastore, |
| // so that tracked changes can be looked up by ID when iterating |
| // through changes from the poll. |
| t := map[string]Change{} |
| for _, change := range trackedChanges { |
| if change != nil { |
| t[change.ID] = *change |
| } |
| } |
| |
| // Compare polled changes to tracked changes, update tracking and add to the |
| // diff list when there is an updated revision change. |
| for _, change := range pollChanges { |
| tc, ok := t[change.ID] |
| switch { |
| // Untracked open change; start tracking and add to diff list. |
| case !ok && isOpen(change): |
| logging.Debugf(c, "Found untracked %s change (%s); tracking.", change.Status, change.ID) |
| tc.ID = change.ID |
| tc.LastRevision = change.CurrentRevision |
| uchanges = append(uchanges, &tc) |
| diff = append(diff, change) |
| // Untracked closed change; move on to the next change. |
| case !ok: |
| logging.Debugf(c, "Found untracked %s change (%s); leaving untracked.", change.Status, change.ID) |
| // Tracked closed change; stop tracking (clean up). |
| case !isOpen(change): |
| logging.Debugf(c, "Found tracked %s change (%s); removing.", change.Status, change.ID) |
| // Because we are only adding keys found by the query, |
| // we should not get any NoSuchEntity errors. |
| dchanges = append(dchanges, &tc) |
| // Open tracked changes with a new revision: update tracking and add |
| // to diff list. |
| case tc.LastRevision != change.CurrentRevision: |
| logging.Debugf(c, "Found tracked %s change (%s) with new revision; updating.", change.Status, change.ID) |
| tc.LastRevision = change.CurrentRevision |
| uchanges = append(uchanges, &tc) |
| diff = append(diff, change) |
| // Open tracked change with no new revision: Leave as is. |
| default: |
| logging.Debugf(c, "Found tracked %s change (%s) with no update; leaving as is.", change.Status, change.ID) |
| } |
| } |
| return diff, uchanges, dchanges, nil |
| } |
| |
| // isOpen checks whether a Gerrit CL is considered open. |
| // |
| // Status is one of "NEW", "MERGED", or "ABANDONED", where "NEW" indicates |
| // any open change. ChangeInfo schema: https://goo.gl/M8Csu6 |
| func isOpen(change gr.ChangeInfo) bool { |
| return change.Status == "NEW" |
| } |
| |
| // enqueueAnalyzeRequests enqueues Analyze requests for the provided Gerrit changes. |
| // |
| // Changes that shouldn't be analyzed will be skipped; this includes changes by |
| // owners that aren't whitelisted, as well as empty changes, such as those |
| // containing only deleted files. |
| func enqueueAnalyzeRequests(c context.Context, luciProject string, repo *tricium.RepoDetails, changes []gr.ChangeInfo) error { |
| if len(changes) == 0 { |
| return nil |
| } |
| logging.Infof(c, "Preparing to enqueue Analyze requests for %d changes for project %q.", |
| len(changes), luciProject) |
| gerritProject := repo.GetGerritProject() |
| changes = filterByWhitelist(c, repo.WhitelistedGroup, changes) |
| var tasks []*tq.Task |
| for _, change := range changes { |
| var files []*tricium.Data_File |
| curRev := change.Revisions[change.CurrentRevision] |
| for k, v := range curRev.Files { |
| status := statusFromCode(c, v.Status) |
| if status == tricium.Data_DELETED { |
| continue // Never consider deleted files; they don't exist after the patch. |
| } |
| files = append(files, &tricium.Data_File{ |
| Path: k, |
| Status: status, |
| IsBinary: v.Binary, |
| }) |
| } |
| if len(files) == 0 { |
| logging.Fields{ |
| "changeID": change.ID, |
| }.Infof(c, "Not making Analyze request for change; changes has no files.") |
| continue |
| } |
| // Sorting files according to their paths to account for random |
| // enumeration in go maps. This is to get consistent behavior for the |
| // same input. |
| sort.Slice(files, func(i, j int) bool { |
| return files[i].Path < files[j].Path |
| }) |
| req := &tricium.AnalyzeRequest{ |
| Project: luciProject, |
| Files: files, |
| Source: &tricium.AnalyzeRequest_GerritRevision{ |
| GerritRevision: &tricium.GerritRevision{ |
| Host: gerritProject.Host, |
| Project: gerritProject.Project, |
| Change: change.ID, |
| GitRef: curRev.Ref, |
| GitUrl: gerritProject.GitUrl, |
| }, |
| }, |
| } |
| b, err := proto.Marshal(req) |
| if err != nil { |
| return errors.Annotate(err, "failed to marshal Analyze request").Err() |
| } |
| t := tq.NewPOSTTask("/internal/analyze", nil) |
| t.Payload = b |
| logging.Debugf(c, "Created AnalyzeRequest: %v", req) |
| tasks = append(tasks, t) |
| } |
| if err := tq.Add(c, common.AnalyzeQueue, tasks...); err != nil { |
| return errors.Annotate(err, "failed to enqueue Analyze request").Err() |
| } |
| return nil |
| } |
| |
| // filterByWhitelist filters the list of changes to analyze based on |
| // whitelisted groups. |
| // |
| // A CL is analyzed if the owner (author) of the CL is in at least one of the |
| // the whitelist groups for this repo as specified in the project config. |
| // If there are no whitelisted groups, then there is no filtering. |
| func filterByWhitelist(c context.Context, whitelistedGroups []string, changes []gr.ChangeInfo) []gr.ChangeInfo { |
| if len(whitelistedGroups) == 0 { |
| return changes |
| } |
| |
| // The auth DB should be set in state by middleware. |
| state := auth.GetState(c) |
| if state == nil { |
| logging.Errorf(c, "failed to check auth, no State in context.") |
| return nil |
| } |
| authDB := state.DB() |
| if authDB == nil { |
| logging.Errorf(c, "Failed to check auth, nil auth DB in State.") |
| return nil |
| } |
| |
| // To avoid making multiple checks for the same CL owner, we keep track |
| // of owners we've already checked. |
| owners := map[string]bool{} |
| whitelistedChanges := make([]gr.ChangeInfo, 0, len(changes)) |
| for _, change := range changes { |
| email := change.Owner.Email |
| whitelisted, ok := owners[email] |
| if !ok { |
| // If we fail to check the whitelist for a user, we'll |
| // log an error and consider the owner not whitelisted. |
| owners[email] = false |
| ident, err := identity.MakeIdentity("user:" + email) |
| if err != nil { |
| logging.WithError(err).Errorf(c, "Failed to create identity for %q.", email) |
| continue |
| } |
| authOK, err := authDB.IsMember(c, ident, whitelistedGroups) |
| if err != nil { |
| logging.WithError(err).Errorf(c, "Failed to check auth for %q.", email) |
| } |
| whitelisted = authOK |
| if !whitelisted { |
| logging.Fields{ |
| "email": email, |
| "groups": whitelistedGroups, |
| }.Infof(c, "Owner not whitelisted; skipping Analyze.") |
| } |
| owners[email] = whitelisted |
| } |
| if whitelisted { |
| whitelistedChanges = append(whitelistedChanges, change) |
| } |
| } |
| return whitelistedChanges |
| } |
| |
| // extractFooterFlags extracts the Flag: Value and Flag=Value footers from the CL description. |
| // returns a map of flag to its value in footer with both flag and value converted to lower-case. |
| func extractFooterFlags(commitMessage string) map[string]string { |
| flags := map[string]string{} |
| lines := strings.Split(commitMessage, "\n") |
| |
| // Going in reverse to stop as soon as a non flag line is reached. |
| for i := len(lines) - 1; i >= 0; i-- { |
| line := lines[i] |
| |
| // Look for the first : or = and split on it. |
| if splitPoint := strings.IndexAny(line, ":="); splitPoint != -1 { |
| flagName := strings.ToLower(strings.TrimSpace(line[:splitPoint])) |
| value := strings.ToLower(strings.TrimSpace(line[splitPoint+1:])) |
| flags[flagName] = value |
| } else { |
| break |
| } |
| } |
| |
| return flags |
| } |
| |
| // gerritProjectID constructs the ID used to store information about |
| // a Gerrit host and project. |
| func gerritProjectID(host, project string) string { |
| return fmt.Sprintf("%s:%s", host, project) |
| } |
| |
| // statusFromCode returns a file status given a one character code. |
| // |
| // The input is one of the valid values for the status field in |
| // Gerrit FileInfo; see https://goo.gl/ABFHDg. |
| func statusFromCode(c context.Context, status string) tricium.Data_Status { |
| switch status { |
| case "", "M": |
| // An empty or missing Status field indicates "MODIFIED". |
| // Gerrit uses an empty status field for modified files. |
| return tricium.Data_MODIFIED |
| case "A": |
| return tricium.Data_ADDED |
| case "D": |
| return tricium.Data_DELETED |
| case "R": |
| return tricium.Data_RENAMED |
| case "C": |
| return tricium.Data_COPIED |
| case "W": |
| return tricium.Data_REWRITTEN |
| default: |
| logging.Warningf(c, "Received unrecognized status %q.", status) |
| return tricium.Data_MODIFIED |
| } |
| } |