blob: c392a91d0941dbf27d651cbd31f41f0e58e35572 [file] [log] [blame]
# coding=utf8
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Commit queue manager class.
Security implications:
The following hypothesis are made:
- Commit queue:
- Impersonate the same svn credentials that the patchset owner.
- Can't impersonate a non committer.
- SVN will check the committer write access.
"""
import errno
import logging
import os
import socket
import ssl
import time
import traceback
import urllib2
import find_depot_tools # pylint: disable=W0611
import checkout
import git_cl
import patch
import subprocess2
import errors
import model
from verification import base
class PendingCommit(base.Verified):
"""Represents a pending commit that is being processed."""
# Important since they tell if we need to revalidate and send try jobs
# again or not if any of these value changes.
issue = int
patchset = int
description = unicode
files = list
# Only a cache, these values can be regenerated.
owner = unicode
reviewers = list
base_url = unicode
messages = list
relpath = unicode
# Only used after a patch was committed. Keeping here for try job retries.
revision = (None, int, unicode)
def __init__(self, **kwargs):
super(PendingCommit, self).__init__(**kwargs)
for message in self.messages:
# Save storage, no verifier really need 'text', just 'approval'.
if 'text' in message:
del message['text']
def pending_name(self):
"""The name that should be used for try jobs.
It makes it possible to regenerate the try_jobs array if ever needed."""
return '%d-%d' % (self.issue, self.patchset)
def prepare_for_patch(self, context_obj):
self.revision = context_obj.checkout.prepare(self.revision)
# Verify revision consistency.
if not self.revision:
raise base.DiscardPending(
self, 'Internal error: failed to checkout. Please try again.')
def apply_patch(self, context_obj, prepare):
"""Applies the pending patch to the checkout and throws if it fails."""
try:
if prepare:
self.prepare_for_patch(context_obj)
patches = context_obj.rietveld.get_patch(self.issue, self.patchset)
if not patches:
raise base.DiscardPending(
self, 'No diff was found for this patchset.')
if self.relpath:
patches.set_relpath(self.relpath)
self.files = [p.filename for p in patches]
if not self.files:
raise base.DiscardPending(
self, 'No file was found in this patchset.')
context_obj.checkout.apply_patch(patches)
except (checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e:
raise base.DiscardPending(self, str(e))
except subprocess2.CalledProcessError as e:
out = 'Failed to apply the patch.'
if e.stdout:
out += '\n%s' % e.stdout
raise base.DiscardPending(self, out)
except (ssl.SSLError, urllib2.HTTPError, urllib2.URLError) as e:
raise base.DiscardPending(
self,
('Failed to request the patch to try. Please note that binary files '
'are still unsupported at the moment, this is being worked on.\n\n'
'Thanks for your patience.\n\n%s') % e)
class PendingQueue(model.PersistentMixIn):
"""Represents the queue of pending commits being processed.
Each entry is keyed by the issue number as a string to be json-compatible.
There can only be one pending commit per issue and they are fine to be
processed out of order.
"""
pending_commits = dict
def add(self, item):
self.pending_commits[str(item.issue)] = item
def get(self, key):
return self.pending_commits[str(key)]
def iterate(self):
"""Returns the items sorted by issue id to ease testability."""
return sorted(self.pending_commits.itervalues(), key=lambda x: x.issue)
def remove(self, key):
self.pending_commits.pop(str(key), None)
class PendingManager(object):
"""Fetch new issues from rietveld, pass the issues through all of verifiers
and then commit the patches with checkout.
"""
FAILED_NO_MESSAGE = (
'Commit queue patch verification failed without an error message.\n'
'Something went wrong, probably a crash, a hickup or simply\n'
'the monkeys went out for dinner.\n'
'Please email commit-bot@chromium.org with the CL url.')
INTERNAL_EXCEPTION = (
'Commit queue had an internal error.\n'
'Something went really wrong, probably a crash, a hickup or\n'
'simply the monkeys went out for dinner.\n'
'Please email commit-bot@chromium.org with the CL url.')
DESCRIPTION_UPDATED = (
'Commit queue rejected this change because the description was changed\n'
'between the time the change entered the commit queue and the time it\n'
'was ready to commit. You can safely check the commit box again.')
TRYING_PATCH = 'CQ is trying da patch. Follow status at\n'
# Maximum number of commits done in a burst.
MAX_COMMIT_BURST = 4
# Delay (secs) between commit bursts.
COMMIT_BURST_DELAY = 8*60
def __init__(self, context_obj, pre_patch_verifiers, verifiers,
project_name=''):
"""
Args:
pre_patch_verifiers: Verifiers objects that are run before applying the
patch.
verifiers: Verifiers object run after applying the patch.
"""
if not(len(pre_patch_verifiers) or len(verifiers)):
raise ValueError('at least one verifier should be defined (in project %s)'
% project_name)
self.context = context_obj
self.pre_patch_verifiers = pre_patch_verifiers or []
self.verifiers = verifiers or []
self.all_verifiers = pre_patch_verifiers + verifiers
self.queue = PendingQueue()
# Keep the timestamps of the last few commits so that we can control the
# pace (burstiness) of commits.
self.recent_commit_timestamps = []
# Assert names are unique.
names = [x.name for x in pre_patch_verifiers + verifiers]
assert len(names) == len(set(names))
for verifier in self.pre_patch_verifiers:
assert not isinstance(verifier, base.VerifierCheckout)
def look_for_new_pending_commit(self):
"""Looks for new reviews on self.context.rietveld with c+ set.
Calls _new_pending_commit() on all new review found.
"""
try:
new_issues = self.context.rietveld.get_pending_issues()
except urllib2.URLError as e:
if 'timed out' in e.reason:
# Handle timeouts gracefully. Log them and pretend there are no
# pending issues. We'll retry on the next iteration.
logging.warn('request to fetch pending issues timed out: %s' % e)
return
raise
# If there is an issue in processed_issues that is not in new_issues,
# discard it.
for pending in self.queue.iterate():
# Note that pending.issue is a int but self.queue.pending_commits keys
# are str due to json support.
if pending.issue not in new_issues:
logging.info('Flushing issue %d' % pending.issue)
pending.get_state = lambda: base.IGNORED
self._discard_pending(pending, 'CQ bit was unchecked on CL. Ignoring.')
# Find new issues.
for issue_id in new_issues:
if str(issue_id) not in self.queue.pending_commits:
try:
issue_data = self.context.rietveld.get_issue_properties(
issue_id, True)
except urllib2.HTTPError as e:
if e.code in (500, 502, 503):
# Temporary AppEngine hiccup. Just log it and continue.
logging.warning('%s while accessing %s. Ignoring error.' % (
str(e), e.url))
continue
raise
except urllib2.URLError as e:
# Temporary AppEngine hiccup. Just log it and continue.
if 'timed out' in e.reason:
logging.warning(
'%s while accessing rietveld issue %s. Ignoring error.' % (
str(e), str(issue_id)))
continue
raise
except socket.error as e:
# Temporary AppEngine hiccup. Just log it and continue.
if e.errno == errno.ECONNRESET:
logging.warning(
'%s while accessing rietveld issue %s. Ignoring error.' % (
str(e), str(issue_id)))
continue
raise
except IOError as e:
# Temporary AppEngine hiccup. Just log it and continue.
if e.errno == 'socket error':
logging.warning(
'%s while accessing rietveld issue %s. Ignoring error.' % (
str(e), str(issue_id)))
continue
raise
# This assumption needs to hold.
assert issue_id == issue_data['issue']
if issue_data['patchsets'] and issue_data['commit']:
logging.info('Found new issue %d' % issue_id)
self.queue.add(
PendingCommit(
issue=issue_id,
owner=issue_data['owner_email'],
reviewers=issue_data['reviewers'],
patchset=issue_data['patchsets'][-1],
base_url=issue_data['base_url'],
description=issue_data['description'].replace('\r', ''),
messages=issue_data['messages']))
def process_new_pending_commit(self):
"""Starts verification on newly found pending commits."""
expected = set(i.name for i in self.all_verifiers)
for pending in self.queue.iterate():
try:
# Take in account the case where a verifier was removed.
done = set(pending.verifications.keys())
missing = expected - done
if (not missing or pending.get_state() != base.PROCESSING):
continue
logging.info(
'Processing issue %s (%s, %d)' % (
pending.issue, missing, pending.get_state()))
self._verify_pending(pending)
except base.DiscardPending as e:
message = e.status
if not message:
message = 'process_new_pending_commit: ' + self.FAILED_NO_MESSAGE
self._discard_pending(e.pending, message)
def update_status(self):
"""Updates the status for each pending commit verifier."""
why_nots = dict((p.issue, p.why_not()) for p in self.queue.iterate())
for verifier in self.all_verifiers:
try:
verifier.update_status(self.queue.iterate())
except base.DiscardPending as e:
# It's not efficient since it takes a full loop for each pending
# commit to discard.
message = e.status
if not message:
message = 'update_status: ' + self.FAILED_NO_MESSAGE
self._discard_pending(e.pending, message)
for pending in self.queue.iterate():
why_not = pending.why_not()
if why_nots[pending.issue] != why_not:
self.context.status.send(
pending,
{'verification': 'why not',
'payload': {'message': why_not}})
def scan_results(self):
"""Scans pending commits that can be committed or discarded."""
for pending in self.queue.iterate():
state = pending.get_state()
if state == base.FAILED:
message = pending.error_message()
if not message:
message = 'scan_results(FAILED): ' + self.FAILED_NO_MESSAGE
self._discard_pending(pending, message)
elif state == base.SUCCEEDED:
if self._throttle(pending):
continue
try:
# Runs checks. It's be nice to run the test before the postpone,
# especially if the tree is closed for a long moment but at the same
# time it would keep fetching the rietveld status constantly.
self._last_minute_checks(pending)
self.context.status.send(
pending,
{'verification': 'why not',
'payload': {'message': ''}})
self._commit_patch(pending)
except base.DiscardPending as e:
message = e.status
if not message:
message = 'scan_results(discard): ' + self.FAILED_NO_MESSAGE
self._discard_pending(e.pending, message)
except Exception as e:
message = 'scan_result(Exception): ' + self.INTERNAL_EXCEPTION
self._discard_pending(pending, message)
raise
else:
# When state is IGNORED, we need to keep this issue so it's not fetched
# another time but we can't discard it since we don't want to remove the
# commit bit for another project hosted on the same code review
# instance.
assert state in (base.PROCESSING, base.IGNORED)
def _verify_pending(self, pending):
"""Initiates all the verifiers on a pending change."""
# Do not apply the patch if not necessary. It will be applied at commit
# time anyway so if the patch doesn't apply, it'll be catch later.
if not self._pending_run_verifiers(pending, self.pre_patch_verifiers):
return
if self.verifiers:
pending.prepare_for_patch(self.context)
# This CL is real business, alert the user that we're going to try his
# patch. Note that this is done *after* syncing but *before* applying the
# patch.
self.context.status.send(
pending,
{ 'verification': 'initial',
'payload': {'revision': pending.revision}})
self.context.rietveld.add_comment(
pending.issue,
self.TRYING_PATCH + '%s/%s/%d/%d\n' % (
self.context.status.url, pending.owner,
pending.issue, pending.patchset))
if self.verifiers:
pending.apply_patch(self.context, False)
previous_cwd = os.getcwd()
try:
os.chdir(self.context.checkout.project_path)
self._pending_run_verifiers(pending, self.verifiers)
finally:
os.chdir(previous_cwd)
# Send the initial 'why not' message.
if pending.why_not():
self.context.status.send(
pending,
{'verification': 'why not',
'payload': {'message': pending.why_not()}})
@classmethod
def _pending_run_verifiers(cls, pending, verifiers):
"""Runs verifiers on a pending change.
Returns True if all Verifiers were run.
"""
for verifier in verifiers:
assert verifier.name not in pending.verifications
verifier.verify(pending)
assert verifier.name in pending.verifications
if pending.get_state() == base.IGNORED:
assert pending.verifications[verifier.name].get_state() == base.IGNORED
# Remove all the other verifiers since we need to keep it in the
# 'datastore' to not retry this issue constantly.
for key in pending.verifications.keys():
if key != verifier.name:
del pending.verifications[key]
return False
if pending.get_state() == base.FAILED:
# Throw if it didn't pass, so the error message is not lost.
raise base.DiscardPending(
pending, pending.error_message() or cls.FAILED_NO_MESSAGE)
return True
def _last_minute_checks(self, pending):
"""Does last minute checks on Rietvld before committing a pending patch."""
pending_data = self.context.rietveld.get_issue_properties(
pending.issue, True)
if pending_data['commit'] != True:
raise base.DiscardPending(pending, None)
if pending_data['closed'] != False:
raise base.DiscardPending(pending, None)
if pending.description != pending_data['description'].replace('\r', ''):
raise base.DiscardPending(pending, self.DESCRIPTION_UPDATED)
commit_user = set([self.context.rietveld.email])
expected = set(pending.reviewers) - commit_user
actual = set(pending_data['reviewers']) - commit_user
# Try to be nice, if there was a drive-by review and the new reviewer left
# a lgtm, don't abort.
def is_approver(r):
return any(
m.get('approval') for m in pending_data['messages']
if m['sender'] == r)
drivers_by = [r for r in (actual - expected) if not is_approver(r)]
if drivers_by:
# That annoying driver-by.
raise base.DiscardPending(
pending,
'List of reviewers changed. %s did a drive-by without LGTM\'ing!' %
','.join(drivers_by))
if pending.patchset != pending_data['patchsets'][-1]:
raise base.DiscardPending(pending,
'Commit queue failed due to new patchset.')
def _discard_pending(self, pending, message):
"""Discards a pending commit. Attach an optional message to the review."""
logging.debug('_discard_pending(%s, %s)', pending.issue, message)
msg = message or self.FAILED_NO_MESSAGE
try:
try:
if pending.get_state() != base.IGNORED:
self.context.rietveld.set_flag(
pending.issue, pending.patchset, 'commit', False)
except urllib2.HTTPError as e:
logging.error(
'Failed to set the flag to False for %s with message %s' % (
pending.pending_name(), msg))
traceback.print_stack()
logging.error(str(e))
errors.send_stack(e)
try:
self.context.rietveld.add_comment(pending.issue, msg)
except urllib2.HTTPError as e:
logging.error(
'Failed to add comment for %s with message %s' % (
pending.pending_name(), msg))
traceback.print_stack()
errors.send_stack(e)
self.context.status.send(
pending,
{ 'verification': 'abort',
'payload': {
'output': msg }})
finally:
# Most importantly, remove the PendingCommit from the queue.
self.queue.remove(pending.issue)
def _commit_patch(self, pending):
"""Commits the pending patch to the repository.
Do the checkout and applies the patch.
"""
try:
try:
# Make sure to apply on HEAD.
pending.revision = None
pending.apply_patch(self.context, True)
# Commit it.
commit_desc = git_cl.ChangeDescription(pending.description)
if (self.context.server_hooks_missing and
self.context.rietveld.email != pending.owner):
commit_desc.update_reviewers(pending.reviewers)
commit_desc.append_footer('Author: ' + pending.owner)
commit_desc.append_footer('Review URL: %s/%s' % (
self.context.rietveld.url,
pending.issue))
pending.revision = self.context.checkout.commit(
commit_desc.description, pending.owner)
if not pending.revision:
raise base.DiscardPending(pending, 'Failed to commit patch.')
# Note that the commit succeeded for commit throttling.
self.recent_commit_timestamps.append(time.time())
self.recent_commit_timestamps = (
self.recent_commit_timestamps[-(self.MAX_COMMIT_BURST + 1):])
viewvc_url = self.context.checkout.get_settings('VIEW_VC')
issue_desc = git_cl.ChangeDescription(pending.description)
msg = 'Committed: %s' % pending.revision
if viewvc_url:
viewvc_url = '%s%s' % (viewvc_url.rstrip('/'), pending.revision)
msg = 'Committed: %s' % viewvc_url
issue_desc.append_footer(msg)
# Update the CQ dashboard.
self.context.status.send(
pending,
{ 'verification': 'commit',
'payload': {
'revision': pending.revision,
'output': msg,
'url': viewvc_url}})
# Closes the issue on Rietveld.
# TODO(csharp): Retry if exceptions are encountered.
try:
self.context.rietveld.close_issue(pending.issue)
self.context.rietveld.update_description(
pending.issue, issue_desc.description)
self.context.rietveld.add_comment(
pending.issue, 'Change committed as %s' % pending.revision)
except (urllib2.HTTPError, urllib2.URLError) as e:
# Ignore AppEngine flakiness.
logging.warning('Unable to fully close the issue')
# And finally remove the issue. If the close_issue() call above failed,
# it is possible the dashboard will be confused but it is harmless.
try:
self.queue.get(pending.issue)
except KeyError:
logging.error('Internal inconsistency for %d', pending.issue)
self.queue.remove(pending.issue)
except (
checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e:
raise base.DiscardPending(pending, str(e))
except subprocess2.CalledProcessError as e:
stdout = getattr(e, 'stdout', None)
out = 'Failed to apply the patch.'
if stdout:
out += '\n%s' % stdout
raise base.DiscardPending(pending, out)
except base.DiscardPending as e:
message = e.status
if not message:
message = '_commit_patch: ' + self.FAILED_NO_MESSAGE
self._discard_pending(e.pending, message)
def _throttle(self, pending):
"""Returns True if a commit should be delayed."""
if pending.postpone():
self.context.status.send(
pending,
{'verification': 'why not',
'payload': {
'message': pending.why_not()}})
return True
if not self.recent_commit_timestamps:
return False
cutoff = time.time() - self.COMMIT_BURST_DELAY
bursted = len([True for i in self.recent_commit_timestamps if i > cutoff])
if bursted >= self.MAX_COMMIT_BURST:
self.context.status.send(
pending,
{'verification': 'why not',
'payload': {
'message': ('Patch is ready to commit, but the CQ is delaying '
'it because CQ has already submitted %d patches in '
'the last %d seconds' %
(self.MAX_COMMIT_BURST, self.COMMIT_BURST_DELAY))}})
return True
return False
def load(self, filename):
"""Loads the commit queue state from a JSON file."""
self.queue = model.load_from_json_file(filename)
def save(self, filename):
"""Save the commit queue state in a simple JSON file."""
model.save_to_json_file(filename, self.queue)
def close(self):
"""Close all the active pending manager items."""
self.context.status.close()