| # coding=utf8 |
| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Commit queue manager class. |
| |
| Security implications: |
| |
| The following hypothesis are made: |
| - Commit queue: |
| - Impersonate the same svn credentials that the patchset owner. |
| - Can't impersonate a non committer. |
| - SVN will check the committer write access. |
| """ |
| |
| import errno |
| import logging |
| import os |
| import socket |
| import ssl |
| import time |
| import traceback |
| import urllib2 |
| |
| import find_depot_tools # pylint: disable=W0611 |
| import checkout |
| import git_cl |
| import patch |
| import subprocess2 |
| |
| import errors |
| import model |
| from verification import base |
| |
| |
| class PendingCommit(base.Verified): |
| """Represents a pending commit that is being processed.""" |
| # Important since they tell if we need to revalidate and send try jobs |
| # again or not if any of these value changes. |
| issue = int |
| patchset = int |
| description = unicode |
| files = list |
| # Only a cache, these values can be regenerated. |
| owner = unicode |
| reviewers = list |
| base_url = unicode |
| messages = list |
| relpath = unicode |
| # Only used after a patch was committed. Keeping here for try job retries. |
| revision = (None, int, unicode) |
| |
| def __init__(self, **kwargs): |
| super(PendingCommit, self).__init__(**kwargs) |
| for message in self.messages: |
| # Save storage, no verifier really need 'text', just 'approval'. |
| if 'text' in message: |
| del message['text'] |
| |
| def pending_name(self): |
| """The name that should be used for try jobs. |
| |
| It makes it possible to regenerate the try_jobs array if ever needed.""" |
| return '%d-%d' % (self.issue, self.patchset) |
| |
| def prepare_for_patch(self, context_obj): |
| self.revision = context_obj.checkout.prepare(self.revision) |
| # Verify revision consistency. |
| if not self.revision: |
| raise base.DiscardPending( |
| self, 'Internal error: failed to checkout. Please try again.') |
| |
| def apply_patch(self, context_obj, prepare): |
| """Applies the pending patch to the checkout and throws if it fails.""" |
| try: |
| if prepare: |
| self.prepare_for_patch(context_obj) |
| patches = context_obj.rietveld.get_patch(self.issue, self.patchset) |
| if not patches: |
| raise base.DiscardPending( |
| self, 'No diff was found for this patchset.') |
| if self.relpath: |
| patches.set_relpath(self.relpath) |
| self.files = [p.filename for p in patches] |
| if not self.files: |
| raise base.DiscardPending( |
| self, 'No file was found in this patchset.') |
| context_obj.checkout.apply_patch(patches) |
| except (checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e: |
| raise base.DiscardPending(self, str(e)) |
| except subprocess2.CalledProcessError as e: |
| out = 'Failed to apply the patch.' |
| if e.stdout: |
| out += '\n%s' % e.stdout |
| raise base.DiscardPending(self, out) |
| except (ssl.SSLError, urllib2.HTTPError, urllib2.URLError) as e: |
| raise base.DiscardPending( |
| self, |
| ('Failed to request the patch to try. Please note that binary files ' |
| 'are still unsupported at the moment, this is being worked on.\n\n' |
| 'Thanks for your patience.\n\n%s') % e) |
| |
| |
| class PendingQueue(model.PersistentMixIn): |
| """Represents the queue of pending commits being processed. |
| |
| Each entry is keyed by the issue number as a string to be json-compatible. |
| There can only be one pending commit per issue and they are fine to be |
| processed out of order. |
| """ |
| pending_commits = dict |
| |
| def add(self, item): |
| self.pending_commits[str(item.issue)] = item |
| |
| def get(self, key): |
| return self.pending_commits[str(key)] |
| |
| def iterate(self): |
| """Returns the items sorted by issue id to ease testability.""" |
| return sorted(self.pending_commits.itervalues(), key=lambda x: x.issue) |
| |
| def remove(self, key): |
| self.pending_commits.pop(str(key), None) |
| |
| |
| class PendingManager(object): |
| """Fetch new issues from rietveld, pass the issues through all of verifiers |
| and then commit the patches with checkout. |
| """ |
| FAILED_NO_MESSAGE = ( |
| 'Commit queue patch verification failed without an error message.\n' |
| 'Something went wrong, probably a crash, a hickup or simply\n' |
| 'the monkeys went out for dinner.\n' |
| 'Please email commit-bot@chromium.org with the CL url.') |
| INTERNAL_EXCEPTION = ( |
| 'Commit queue had an internal error.\n' |
| 'Something went really wrong, probably a crash, a hickup or\n' |
| 'simply the monkeys went out for dinner.\n' |
| 'Please email commit-bot@chromium.org with the CL url.') |
| DESCRIPTION_UPDATED = ( |
| 'Commit queue rejected this change because the description was changed\n' |
| 'between the time the change entered the commit queue and the time it\n' |
| 'was ready to commit. You can safely check the commit box again.') |
| TRYING_PATCH = 'CQ is trying da patch. Follow status at\n' |
| # Maximum number of commits done in a burst. |
| MAX_COMMIT_BURST = 4 |
| # Delay (secs) between commit bursts. |
| COMMIT_BURST_DELAY = 8*60 |
| |
| def __init__(self, context_obj, pre_patch_verifiers, verifiers, |
| project_name=''): |
| """ |
| Args: |
| pre_patch_verifiers: Verifiers objects that are run before applying the |
| patch. |
| verifiers: Verifiers object run after applying the patch. |
| """ |
| if not(len(pre_patch_verifiers) or len(verifiers)): |
| raise ValueError('at least one verifier should be defined (in project %s)' |
| % project_name) |
| |
| self.context = context_obj |
| self.pre_patch_verifiers = pre_patch_verifiers or [] |
| self.verifiers = verifiers or [] |
| self.all_verifiers = pre_patch_verifiers + verifiers |
| self.queue = PendingQueue() |
| # Keep the timestamps of the last few commits so that we can control the |
| # pace (burstiness) of commits. |
| self.recent_commit_timestamps = [] |
| # Assert names are unique. |
| names = [x.name for x in pre_patch_verifiers + verifiers] |
| assert len(names) == len(set(names)) |
| for verifier in self.pre_patch_verifiers: |
| assert not isinstance(verifier, base.VerifierCheckout) |
| |
| def look_for_new_pending_commit(self): |
| """Looks for new reviews on self.context.rietveld with c+ set. |
| |
| Calls _new_pending_commit() on all new review found. |
| """ |
| try: |
| new_issues = self.context.rietveld.get_pending_issues() |
| except urllib2.URLError as e: |
| if 'timed out' in e.reason: |
| # Handle timeouts gracefully. Log them and pretend there are no |
| # pending issues. We'll retry on the next iteration. |
| logging.warn('request to fetch pending issues timed out: %s' % e) |
| return |
| |
| raise |
| |
| # If there is an issue in processed_issues that is not in new_issues, |
| # discard it. |
| for pending in self.queue.iterate(): |
| # Note that pending.issue is a int but self.queue.pending_commits keys |
| # are str due to json support. |
| if pending.issue not in new_issues: |
| logging.info('Flushing issue %d' % pending.issue) |
| self.context.status.send( |
| pending, |
| { 'verification': 'abort', |
| 'payload': { |
| 'output': 'CQ bit was unchecked on CL. Ignoring.' }}) |
| pending.get_state = lambda: base.IGNORED |
| self._discard_pending(pending, None) |
| |
| # Find new issues. |
| for issue_id in new_issues: |
| if str(issue_id) not in self.queue.pending_commits: |
| try: |
| issue_data = self.context.rietveld.get_issue_properties( |
| issue_id, True) |
| except urllib2.HTTPError as e: |
| if e.code in (500, 502, 503): |
| # Temporary AppEngine hiccup. Just log it and continue. |
| logging.warning('%s while accessing %s. Ignoring error.' % ( |
| str(e), e.url)) |
| continue |
| raise |
| except urllib2.URLError as e: |
| # Temporary AppEngine hiccup. Just log it and continue. |
| if 'timed out' in e.reason: |
| logging.warning( |
| '%s while accessing rietveld issue %s. Ignoring error.' % ( |
| str(e), str(issue_id))) |
| continue |
| raise |
| except socket.error as e: |
| # Temporary AppEngine hiccup. Just log it and continue. |
| if e.errno == errno.ECONNRESET: |
| logging.warning( |
| '%s while accessing rietveld issue %s. Ignoring error.' % ( |
| str(e), str(issue_id))) |
| continue |
| raise |
| except IOError as e: |
| # Temporary AppEngine hiccup. Just log it and continue. |
| if e.errno == 'socket error': |
| logging.warning( |
| '%s while accessing rietveld issue %s. Ignoring error.' % ( |
| str(e), str(issue_id))) |
| continue |
| raise |
| # This assumption needs to hold. |
| assert issue_id == issue_data['issue'] |
| if issue_data['patchsets'] and issue_data['commit']: |
| logging.info('Found new issue %d' % issue_id) |
| self.queue.add( |
| PendingCommit( |
| issue=issue_id, |
| owner=issue_data['owner_email'], |
| reviewers=issue_data['reviewers'], |
| patchset=issue_data['patchsets'][-1], |
| base_url=issue_data['base_url'], |
| description=issue_data['description'].replace('\r', ''), |
| messages=issue_data['messages'])) |
| |
| def process_new_pending_commit(self): |
| """Starts verification on newly found pending commits.""" |
| expected = set(i.name for i in self.all_verifiers) |
| for pending in self.queue.iterate(): |
| try: |
| # Take in account the case where a verifier was removed. |
| done = set(pending.verifications.keys()) |
| missing = expected - done |
| if (not missing or pending.get_state() != base.PROCESSING): |
| continue |
| logging.info( |
| 'Processing issue %s (%s, %d)' % ( |
| pending.issue, missing, pending.get_state())) |
| self._verify_pending(pending) |
| except base.DiscardPending as e: |
| self._discard_pending(e.pending, e.status) |
| |
| def update_status(self): |
| """Updates the status for each pending commit verifier.""" |
| why_nots = dict((p.issue, p.why_not()) for p in self.queue.iterate()) |
| |
| for verifier in self.all_verifiers: |
| try: |
| verifier.update_status(self.queue.iterate()) |
| except base.DiscardPending as e: |
| # It's not efficient since it takes a full loop for each pending |
| # commit to discard. |
| self._discard_pending(e.pending, e.status) |
| |
| for pending in self.queue.iterate(): |
| why_not = pending.why_not() |
| if why_nots[pending.issue] != why_not: |
| self.context.status.send( |
| pending, |
| {'verification': 'why not', |
| 'payload': {'message': why_not}}) |
| |
| |
| def scan_results(self): |
| """Scans pending commits that can be committed or discarded.""" |
| for pending in self.queue.iterate(): |
| state = pending.get_state() |
| if state == base.FAILED: |
| self._discard_pending( |
| pending, pending.error_message() or self.FAILED_NO_MESSAGE) |
| elif state == base.SUCCEEDED: |
| if self._throttle(pending): |
| continue |
| try: |
| # Runs checks. It's be nice to run the test before the postpone, |
| # especially if the tree is closed for a long moment but at the same |
| # time it would keep fetching the rietveld status constantly. |
| self._last_minute_checks(pending) |
| self.context.status.send( |
| pending, |
| {'verification': 'why not', |
| 'payload': {'message': ''}}) |
| |
| self._commit_patch(pending) |
| except base.DiscardPending as e: |
| self._discard_pending(e.pending, e.status) |
| except Exception as e: |
| self._discard_pending(pending, self.INTERNAL_EXCEPTION) |
| raise |
| else: |
| # When state is IGNORED, we need to keep this issue so it's not fetched |
| # another time but we can't discard it since we don't want to remove the |
| # commit bit for another project hosted on the same code review |
| # instance. |
| assert state in (base.PROCESSING, base.IGNORED) |
| |
| def _verify_pending(self, pending): |
| """Initiates all the verifiers on a pending change.""" |
| # Do not apply the patch if not necessary. It will be applied at commit |
| # time anyway so if the patch doesn't apply, it'll be catch later. |
| if not self._pending_run_verifiers(pending, self.pre_patch_verifiers): |
| return |
| |
| if self.verifiers: |
| pending.prepare_for_patch(self.context) |
| |
| # This CL is real business, alert the user that we're going to try his |
| # patch. Note that this is done *after* syncing but *before* applying the |
| # patch. |
| self.context.status.send( |
| pending, |
| { 'verification': 'initial', |
| 'payload': {'revision': pending.revision}}) |
| self.context.rietveld.add_comment( |
| pending.issue, |
| self.TRYING_PATCH + '%s/%s/%d/%d\n' % ( |
| self.context.status.url, pending.owner, |
| pending.issue, pending.patchset)) |
| |
| if self.verifiers: |
| pending.apply_patch(self.context, False) |
| previous_cwd = os.getcwd() |
| try: |
| os.chdir(self.context.checkout.project_path) |
| self._pending_run_verifiers(pending, self.verifiers) |
| finally: |
| os.chdir(previous_cwd) |
| |
| # Send the initial 'why not' message. |
| if pending.why_not(): |
| self.context.status.send( |
| pending, |
| {'verification': 'why not', |
| 'payload': {'message': pending.why_not()}}) |
| |
| @classmethod |
| def _pending_run_verifiers(cls, pending, verifiers): |
| """Runs verifiers on a pending change. |
| |
| Returns True if all Verifiers were run. |
| """ |
| for verifier in verifiers: |
| assert verifier.name not in pending.verifications |
| verifier.verify(pending) |
| assert verifier.name in pending.verifications |
| if pending.get_state() == base.IGNORED: |
| assert pending.verifications[verifier.name].get_state() == base.IGNORED |
| # Remove all the other verifiers since we need to keep it in the |
| # 'datastore' to not retry this issue constantly. |
| for key in pending.verifications.keys(): |
| if key != verifier.name: |
| del pending.verifications[key] |
| return False |
| if pending.get_state() == base.FAILED: |
| # Throw if it didn't pass, so the error message is not lost. |
| raise base.DiscardPending( |
| pending, pending.error_message() or cls.FAILED_NO_MESSAGE) |
| return True |
| |
| def _last_minute_checks(self, pending): |
| """Does last minute checks on Rietvld before committing a pending patch.""" |
| pending_data = self.context.rietveld.get_issue_properties( |
| pending.issue, True) |
| if pending_data['commit'] != True: |
| raise base.DiscardPending(pending, None) |
| if pending_data['closed'] != False: |
| raise base.DiscardPending(pending, None) |
| if pending.description != pending_data['description'].replace('\r', ''): |
| raise base.DiscardPending(pending, self.DESCRIPTION_UPDATED) |
| commit_user = set([self.context.rietveld.email]) |
| expected = set(pending.reviewers) - commit_user |
| actual = set(pending_data['reviewers']) - commit_user |
| # Try to be nice, if there was a drive-by review and the new reviewer left |
| # a lgtm, don't abort. |
| def is_approver(r): |
| return any( |
| m.get('approval') for m in pending_data['messages'] |
| if m['sender'] == r) |
| drivers_by = [r for r in (actual - expected) if not is_approver(r)] |
| if drivers_by: |
| # That annoying driver-by. |
| raise base.DiscardPending( |
| pending, |
| 'List of reviewers changed. %s did a drive-by without LGTM\'ing!' % |
| ','.join(drivers_by)) |
| if pending.patchset != pending_data['patchsets'][-1]: |
| raise base.DiscardPending(pending, |
| 'Commit queue failed due to new patchset.') |
| |
| def _discard_pending(self, pending, message): |
| """Discards a pending commit. Attach an optional message to the review.""" |
| logging.debug('_discard_pending(%s, %s)', pending.issue, message) |
| try: |
| try: |
| if pending.get_state() != base.IGNORED: |
| self.context.rietveld.set_flag( |
| pending.issue, pending.patchset, 'commit', False) |
| except urllib2.HTTPError as e: |
| logging.error( |
| 'Failed to set the flag to False for %s with message %s' % ( |
| pending.pending_name(), message)) |
| traceback.print_stack() |
| logging.error(str(e)) |
| errors.send_stack(e) |
| if message: |
| try: |
| self.context.rietveld.add_comment(pending.issue, message) |
| except urllib2.HTTPError as e: |
| logging.error( |
| 'Failed to add comment for %s with message %s' % ( |
| pending.pending_name(), message)) |
| traceback.print_stack() |
| errors.send_stack(e) |
| self.context.status.send( |
| pending, |
| { 'verification': 'abort', |
| 'payload': { |
| 'output': message }}) |
| finally: |
| # Most importantly, remove the PendingCommit from the queue. |
| self.queue.remove(pending.issue) |
| |
| def _commit_patch(self, pending): |
| """Commits the pending patch to the repository. |
| |
| Do the checkout and applies the patch. |
| """ |
| try: |
| try: |
| # Make sure to apply on HEAD. |
| pending.revision = None |
| pending.apply_patch(self.context, True) |
| # Commit it. |
| commit_desc = git_cl.ChangeDescription(pending.description) |
| if (self.context.server_hooks_missing and |
| self.context.rietveld.email != pending.owner): |
| commit_desc.update_reviewers(pending.reviewers) |
| commit_desc.append_footer('Author: ' + pending.owner) |
| commit_desc.append_footer('Review URL: %s/%s' % ( |
| self.context.rietveld.url, |
| pending.issue)) |
| pending.revision = self.context.checkout.commit( |
| commit_desc.description, pending.owner) |
| if not pending.revision: |
| raise base.DiscardPending(pending, 'Failed to commit patch.') |
| |
| # Note that the commit succeeded for commit throttling. |
| self.recent_commit_timestamps.append(time.time()) |
| self.recent_commit_timestamps = ( |
| self.recent_commit_timestamps[-(self.MAX_COMMIT_BURST + 1):]) |
| |
| viewvc_url = self.context.checkout.get_settings('VIEW_VC') |
| issue_desc = git_cl.ChangeDescription(pending.description) |
| msg = 'Committed: %s' % pending.revision |
| if viewvc_url: |
| viewvc_url = '%s%s' % (viewvc_url.rstrip('/'), pending.revision) |
| msg = 'Committed: %s' % viewvc_url |
| issue_desc.append_footer(msg) |
| |
| # Update the CQ dashboard. |
| self.context.status.send( |
| pending, |
| { 'verification': 'commit', |
| 'payload': { |
| 'revision': pending.revision, |
| 'output': msg, |
| 'url': viewvc_url}}) |
| |
| # Closes the issue on Rietveld. |
| # TODO(csharp): Retry if exceptions are encountered. |
| try: |
| self.context.rietveld.close_issue(pending.issue) |
| self.context.rietveld.update_description( |
| pending.issue, issue_desc.description) |
| self.context.rietveld.add_comment( |
| pending.issue, 'Change committed as %s' % pending.revision) |
| except (urllib2.HTTPError, urllib2.URLError) as e: |
| # Ignore AppEngine flakiness. |
| logging.warning('Unable to fully close the issue') |
| # And finally remove the issue. If the close_issue() call above failed, |
| # it is possible the dashboard will be confused but it is harmless. |
| try: |
| self.queue.get(pending.issue) |
| except KeyError: |
| logging.error('Internal inconsistency for %d', pending.issue) |
| self.queue.remove(pending.issue) |
| except ( |
| checkout.PatchApplicationFailed, patch.UnsupportedPatchFormat) as e: |
| raise base.DiscardPending(pending, str(e)) |
| except subprocess2.CalledProcessError as e: |
| stdout = getattr(e, 'stdout', None) |
| out = 'Failed to apply the patch.' |
| if stdout: |
| out += '\n%s' % stdout |
| raise base.DiscardPending(pending, out) |
| except base.DiscardPending as e: |
| self._discard_pending(e.pending, e.status) |
| |
| def _throttle(self, pending): |
| """Returns True if a commit should be delayed.""" |
| if pending.postpone(): |
| self.context.status.send( |
| pending, |
| {'verification': 'why not', |
| 'payload': { |
| 'message': pending.why_not()}}) |
| return True |
| if not self.recent_commit_timestamps: |
| return False |
| cutoff = time.time() - self.COMMIT_BURST_DELAY |
| bursted = len([True for i in self.recent_commit_timestamps if i > cutoff]) |
| |
| if bursted >= self.MAX_COMMIT_BURST: |
| self.context.status.send( |
| pending, |
| {'verification': 'why not', |
| 'payload': { |
| 'message': ('Patch is ready to commit, but the CQ is delaying ' |
| 'it because CQ has already submitted %d patches in ' |
| 'the last %d seconds' % |
| (self.MAX_COMMIT_BURST, self.COMMIT_BURST_DELAY))}}) |
| return True |
| |
| return False |
| |
| def load(self, filename): |
| """Loads the commit queue state from a JSON file.""" |
| self.queue = model.load_from_json_file(filename) |
| |
| def save(self, filename): |
| """Save the commit queue state in a simple JSON file.""" |
| model.save_to_json_file(filename, self.queue) |
| |
| def close(self): |
| """Close all the active pending manager items.""" |
| self.context.status.close() |