blob: 16ffa693c6e1d88ab4069670caf77bd0c96dea0a [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Poller and support classes for git repositories with a gerrit interface.
This module can poll git repositories for commit activity without requiring a
local git checkout.
(Based largely on the structure of chrome.bugdroid.git_poller.)
"""
# TODO(mmoss): Refactor common methods with gitiles_poller.
import collections
import datetime
import json
import logging
import os
import sys
import infra.services.bugdroid.gob_helper as gob_helper
import infra.services.bugdroid.poller_handlers as poller_handlers
from infra.services.bugdroid.poll import Poller
DEFAULT_LOGGER = logging.getLogger(__name__)
DEFAULT_LOGGER.addHandler(logging.NullHandler())
class FileBitmap(object):
"""File-backed bitmap (automatically flushes each change)."""
def __init__(self, filepath, count):
self.count = count
# Make sure the file has at least as many bits as requested.
byte_count = (self.count + 8) / 8
# Initialize the backing store if necessary.
if not os.path.exists(filepath):
DEFAULT_LOGGER.debug('Creating FileBitmap "%s" of %d bytes', filepath,
byte_count)
with open(filepath, 'wb') as fn:
fn.truncate(byte_count)
file_size = os.stat(filepath).st_size
if file_size != byte_count:
# TODO(mmoss): This might be destructive, so backup the existing file?
DEFAULT_LOGGER.info('Resizing "%s" from %d to %d bytes.', filepath,
file_size, byte_count)
with open(filepath, 'r+b') as fn:
fn.truncate(byte_count)
# Hold it open for editing.
self.bitmap_file = open(filepath, 'r+b')
self.bitmap_file.flush()
def __del__(self):
if self.bitmap_file:
self.bitmap_file.close()
def _GetFileByte(self, index):
"""Get the byte from the file containing the given bit."""
if index < 0 or index >= self.count:
raise IndexError('%d is outside range [0, %d]' % (index, self.count - 1))
byte_index = index / 8
bit_offset = index % 8
bit_mask = 1 << bit_offset
self.bitmap_file.seek(byte_index)
return byte_index, bit_mask, ord(self.bitmap_file.read(1))
def CheckBit(self, index):
_, bit_mask, byte = self._GetFileByte(index)
return bool(byte & bit_mask)
def SetBit(self, index):
byte_index, bit_mask, byte = self._GetFileByte(index)
byte = byte | bit_mask
self.bitmap_file.seek(byte_index)
self.bitmap_file.write(chr(byte))
self.bitmap_file.flush()
class GerritPoller(Poller):
"""Poller for monitoring commits to a gerrit host using the REST API.
Args:
host_url: The url of the gerrit host to poll.
poller_id: ID for the poller instance.
commits_since: datetime() of the oldest commits to look for.
"""
def __init__(self, host_url, poller_id,
commits_since=None, interval_in_minutes=3,
setup_refresh_interval_minutes=0, logger=None, run_once=False,
with_paths=True, with_diffs=False, datadir=None,
git_projects=None):
Poller.__init__(self, interval_in_minutes, setup_refresh_interval_minutes,
run_once)
self.logger = logger or DEFAULT_LOGGER
self.gerrit = gob_helper.GerritHelper(host_url, self.logger, git_projects)
self.poller_id = poller_id
self.handlers = []
self.with_paths = with_paths
self.with_diffs = with_diffs
self.__last_seen = None
self.last_seen_fn = os.path.join(datadir or '', '%s.txt' % self.poller_id)
# Initializes last_seen with |commits_since|, or stored value, or now().
self.last_seen = commits_since
self.logger.info('Polling changes since: %s', self.last_seen)
seen_bitmap_fn = os.path.join(datadir or '', '%s.seen' % self.poller_id)
# TODO(mmoss): Hopefully 10,000,000 changes is enough for quite a while
# (currently <300,000), and hopefully gerrit just numbers the changes in
# small increments.
self.seen_bitmap = FileBitmap(seen_bitmap_fn, 10000000)
@property
def last_seen(self):
return self.__last_seen
@last_seen.setter
def last_seen(self, timestamp):
# Initialize the stored value.
if not os.path.exists(self.last_seen_fn):
self.logger.debug('Creating "last seen" file: %s', self.last_seen_fn)
with open(self.last_seen_fn, 'w') as f:
f.write(timestamp or
self.gerrit.GenerateTimeStamp(datetime.datetime.utcnow()))
# Default to the stored value.
if not self.__last_seen:
with open(self.last_seen_fn, 'r') as f:
self.__last_seen = self.gerrit.ParseTimeStamp(f.read().strip())
# None is ignored, and can be used as a "default initializer".
if timestamp is None:
self.logger.debug('Ignoring None timestamp.')
return
# Convert the timestamp to a datetime() if it's not already.
try:
timestamp = self.gerrit.ParseTimeStamp(timestamp)
except ValueError:
self.logger.warning('Ignoring unsupported timestamp: %s', timestamp)
return
if timestamp == self.__last_seen:
return
# Looks like a good, new value, so store and set it.
with open(self.last_seen_fn, 'w') as f:
f.write(self.gerrit.GenerateTimeStamp(timestamp))
self.__last_seen = timestamp
def _ProcessGitLogEntry(self, log_entry):
if log_entry.ignored:
self.logger.debug('Not processing ignored commit %s.', log_entry.commit)
self.commits_metric.increment(
{'poller': 'gerrit', 'project': self.poller_id, 'status': 'ignored'})
return
for handler in self.handlers:
try:
handler.ProcessLogEntry(log_entry)
except Exception as e:
# Log it here so that we see where it's breaking.
self.logger.exception('Uncaught Exception in %s', handler)
self.commits_metric.increment(
{'poller': 'gerrit', 'project': self.poller_id, 'status': 'error'})
# Some handlers aren't that important, but other ones should always
# succeed, and should abort processing if they don't.
if handler.must_succeed:
raise e
self.logger.info('Handler is not fatal. Continuing.')
continue
else:
self.commits_metric.increment(
{'poller': 'gerrit', 'project': self.poller_id,
'status': 'success'})
def add_handler(self, handler):
if not isinstance(handler, poller_handlers.BasePollerHandler):
raise TypeError('%s doesn\'t accept handlers of type %s' %
self.__class__.__name__, type(handler))
self.handlers.append(handler)
def execute(self):
extra_fields = ['CURRENT_COMMIT', 'CURRENT_REVISION']
if self.with_paths:
extra_fields.append('CURRENT_FILES')
# Set a hard limit on the number of commits to process, so it doesn't
# accidentally do something crazy like try to process all commits since the
# beginning of time.
max_changes = 1000
start_since = self.last_seen
# TODO(mmoss): Does this need with_diffs handling?
# Find commits since the last_seen time.
entries, more = self.gerrit.GetLogEntries(since=start_since,
limit=max_changes,
fields=extra_fields)
self.logger.debug('Found %s commits since %s.', len(entries), start_since)
if more:
self.logger.warning('Not processing all commits since %s '
'(limit: %d).', start_since, max_changes)
# Gerrit output is sorted by the last update time, most recent to oldest, so
# process in reverse order to make sure self.last_seen is always older than
# any unprocessed items.
for entry in reversed(entries):
if self.seen_bitmap.CheckBit(entry.number):
self.logger.debug('Rejecting %s (%s), which was previously seen.',
entry.commit, entry.number)
self.commits_metric.increment(
{'poller': 'gerrit', 'project': self.poller_id, 'status': 'seen'})
continue
# There seems to be some bug in gerrit's "since" param handling, where it
# sometimes returns changes older than "since" (.5 seconds or more too
# old), so manually verify and reject anything like that, otherwise
# bugdroid might repeatedly process the same commits.
if entry.update_datetime <= start_since:
self.logger.debug('Rejecting %s, not newer than requested "since". '
'(%s < %s)' % (entry.commit, entry.update_datetime,
start_since))
self.commits_metric.increment(
{'poller': 'gerrit', 'project': self.poller_id, 'status': 'old'})
continue
try:
self._ProcessGitLogEntry(entry)
except Exception:
self.logger.error('Aborting processing of commits.')
break
# Finished processing this commit. Mark it as seen and save its timestamp
# as the new query starting point.
self.seen_bitmap.SetBit(entry.number)
self.last_seen = entry.update_datetime