blob: e400eda7dd7b9f0f3a53dcc9fb8979ea47a08523 [file] [log] [blame] [edit]
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import logging
import os
import urllib
from buildbot.changes import base
from buildbot.util import deferredLocked
from twisted.python import log
from twisted.internet import defer
from common.gerrit_agent import GerritAgent
from common.gerrit.time import ParseGerritTime
class GerritPoller(base.PollingChangeSource):
"""A poller which queries a gerrit server for new changes and patchsets."""
# TODO(szager): Due to the way query continuation works in gerrit (using
# the 'S=%d' URL parameter), there are two distinct error scenarios that
# are currently unhandled:
#
# - A new patch set is uploaded.
# - When the poller runs, the change is #11, meaning it doesn't come in
# the first batch of query results.
# - In between the first and second queries, another patch set is
# uploaded to the same change, bumping the change up to #1 in the list.
# - The second query skips ahead by 10, and never sees the change.
#
# - A new patch set is uploaded.
# - When the poller runs, the change is #10, and appears in the first set
# of query results.
# - In between the first and second queries, some other change gets a new
# patch set and moves up to #1, bumping the current #10 to #11.
# - The second query skips 10, getting changes 11-20. So, the change that
# was already processes is processed again.
#
# Both of these problems need the same solution: keep some state in poller of
# 'patch sets already processed'; and relax the 'since' parameter to
# processChanges so that it goes further back in time than the last polling
# event (maybe pollInterval*3).
change_category = 'patchset-created'
def __init__(self, gerrit_host, gerrit_projects=None, pollInterval=None,
dry_run=None):
"""Constructs a new Gerrit poller.
Args:
gerrit_host: (str or GerritAgent) If supplied as a GerritAgent, the
Gerrit Agent to use when polling; otherwise, the host parameter to
use to construct the GerritAgent to poll through.
gerrit_projects: (list) A list of project names (str) to poll.
pollInterval: (int or datetime.timedelta) The amount of time to wait in
between polls.
dry_run: (bool) If 'True', then polls will not actually be executed.
"""
if isinstance(pollInterval, datetime.timedelta):
pollInterval = pollInterval.total_seconds()
if isinstance(gerrit_projects, basestring):
gerrit_projects = [gerrit_projects]
self.gerrit_projects = gerrit_projects
if pollInterval:
self.pollInterval = pollInterval
self.initLock = defer.DeferredLock()
self.last_timestamp = None
if dry_run is None:
dry_run = 'POLLER_DRY_RUN' in os.environ
self.dry_run = dry_run
self.agent = gerrit_host
if not isinstance(self.agent, GerritAgent):
self.agent = GerritAgent(self.agent)
def startService(self):
if not self.dry_run:
self.initLastTimeStamp()
base.PollingChangeSource.startService(self)
@staticmethod
def buildQuery(terms, operator=None):
"""Builds a Gerrit query from terms.
This function will go away once the new GerritAgent lands.
"""
connective = ('+%s+' % operator) if operator else '+'
terms_with_parens = [('(%s)' % t) if ('+' in t) else t
for t in terms]
return connective.join(terms_with_parens)
def getChangeQuery(self): # pylint: disable=R0201
# Fetch only open issues.
terms = ['status:open']
# Filter by projects.
if self.gerrit_projects:
project_terms = ['project:%s' % urllib.quote(p, safe='')
for p in self.gerrit_projects]
terms.append(self.buildQuery(project_terms, 'OR'))
return self.buildQuery(terms)
def request(self, path, method='GET'):
log.msg('Gerrit request: %s' % path, logLevel=logging.DEBUG)
return self.agent.request(method, path)
@deferredLocked('initLock')
def initLastTimeStamp(self):
log.msg('GerritPoller: Getting latest timestamp from gerrit server.')
query = self.getChangeQuery()
path = '/changes/?q=%s&n=1' % query
d = self.request(path)
def _get_timestamp(j):
if len(j) == 0:
self.last_timestamp = datetime.datetime.now()
else:
self.last_timestamp = ParseGerritTime(j[0]['updated'])
d.addCallback(_get_timestamp)
return d
def getChanges(self, skip=None):
path = '/changes/?q=%s&n=10' % self.getChangeQuery()
if skip:
path += '&S=%d' % skip
return self.request(path)
def _is_interesting_message(self, message): # pylint: disable=R0201
return any((check_str in message['message'])
for check_str in (
'Uploaded patch set ',
'Published edit on patch set ',))
def checkForNewPatchset(self, change, since):
o_params = '&'.join('o=%s' % x for x in (
'MESSAGES', 'ALL_REVISIONS', 'ALL_COMMITS', 'ALL_FILES'))
path = '/changes/%s?%s' % (change['_number'], o_params)
d = self.request(path)
def _parse_messages(j):
if not j or 'messages' not in j:
return
for m in reversed(j['messages']):
if ParseGerritTime(m['date']) <= since:
break
if self._is_interesting_message(m):
return j, m
d.addCallback(_parse_messages)
return d
def getChangeUrl(self, change):
"""Generates a URL for a Gerrit change."""
# GerritAgent stores its URL as protocol and host.
return '%s/#/c/%s' % (self.agent.base_url,
change['_number'])
def getRepositoryUrl(self, change):
"""Generates a URL for a Gerrit repository containing a change"""
return '%s/%s' % (self.agent.base_url,
change['project'])
def addBuildbotChange(self, change, revision=None, additional_chdict=None):
"""Adds a buildbot change into the database.
Args:
change: ChangeInfo Gerrit object. Documentation:
https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info
revision: the sha of the buildbot change revision to use. Defaults to the
value of change['current_revision']
Returns the new buildbot change as Deferred.
"""
revision = revision or change['current_revision']
revision_details = change['revisions'][revision]
commit = revision_details['commit']
properties = {
'event.change.number': change['_number'],
'event.change.id': change['id'],
'event.change.url': self.getChangeUrl(change),
}
if change['status'] == 'NEW':
ref = revision_details.get('fetch', {}).get('http', {}).get('ref')
if ref:
properties['event.patchSet.ref'] = ref
elif change['status'] in ('SUBMITTED', 'MERGED'):
properties['event.refUpdate.newRev'] = revision
chdict = {
'author': '%s <%s>' % (
commit['author']['name'], commit['author']['email']),
'project': change['project'],
'branch': change['branch'],
'revision': revision,
'comments': commit['subject'],
'files': revision_details.get('files', {'UNKNOWN': None}).keys(),
'category': self.change_category,
'when_timestamp': ParseGerritTime(commit['committer']['date']),
'revlink': self.getChangeUrl(change),
'repository': self.getRepositoryUrl(change),
'properties': properties,
}
# Factor in external 'chdict' overrides.
if additional_chdict is not None:
properties.update(additional_chdict.pop('properties', {}))
chdict.update(additional_chdict)
chdict['properties'] = properties
d = self.master.addChange(**chdict)
d.addErrback(log.err, 'GerritPoller: Could not add buildbot change for '
'gerrit change %s.' % revision_details['_number'])
return d
@staticmethod
def findRevisionShaForMessage(change, message):
def warn(text):
log.msg('GerritPoller warning: %s. Change: %s, message: %s' %
(text, change['id'], message['message']))
revision_number = message.get('_revision_number')
if revision_number is None:
warn('A message doesn\'t have a _revision_number')
return None
for sha, revision in change['revisions'].iteritems():
if revision['_number'] == revision_number:
return sha
warn('a revision wasn\'t found for message')
def addChange(self, change, message):
revision = self.findRevisionShaForMessage(change, message)
return self.addBuildbotChange(change, revision)
def processChanges(self, j, since, skip=0):
need_more = bool(j)
for change in j:
skip += 1
tm = ParseGerritTime(change['updated'])
if tm <= since:
need_more = False
break
if self.gerrit_projects and change['project'] not in self.gerrit_projects:
continue
d = self.checkForNewPatchset(change, since)
d.addCallback(lambda x: self.addChange(*x) if x else None)
if need_more and j[-1].get('_more_changes'):
d = self.getChanges(skip=skip)
d.addCallback(self.processChanges, since=since, skip=skip)
else:
d = defer.succeed(None)
return d
@deferredLocked('initLock')
def poll(self):
if self.dry_run:
return
log.msg('GerritPoller: getting latest changes...')
since = self.last_timestamp
d = self.getChanges()
def _update_last_timestamp(j):
if j:
self.last_timestamp = ParseGerritTime(j[0]['updated'])
return j
d.addCallback(_update_last_timestamp)
d.addCallback(self.processChanges, since=since)
return d