| # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Out of band HTTP push.""" |
| |
| import Queue |
| import json |
| import logging |
| import os |
| import threading |
| import time |
| import traceback |
| import urllib |
| |
| import errors |
| from verification import base |
| |
| |
| class AsyncPushNoop(object): |
| url = 'http://localhost' |
| def close(self): |
| pass |
| |
| def send(self, pending, packet): |
| pass |
| |
| @staticmethod |
| def _package(pending, packet): |
| data = { |
| 'done': pending.get_state() not in (base.PROCESSING, base.IGNORED), |
| 'issue': pending.issue, |
| 'owner': pending.owner, |
| 'patchset': pending.patchset, |
| 'timestamp': time.time(), |
| } |
| if packet: |
| data.update(packet) |
| return data |
| |
| |
| class AsyncPushStore(AsyncPushNoop): |
| """Saves all the events into workdir/events.json for later analysis. |
| |
| Thread-safe. |
| """ |
| def __init__(self): |
| super(AsyncPushStore, self).__init__() |
| self.lock = threading.Lock() |
| self.queue = [] |
| |
| def close(self): |
| with self.lock: |
| with open(os.path.join('workdir', 'events.json'), 'w') as f: |
| json.dump(self.queue, f, indent=2) |
| |
| def send(self, pending, packet): |
| with self.lock: |
| self.queue.append(self._package(pending, packet)) |
| |
| |
| class AsyncPush(AsyncPushNoop): |
| """Sends HTTP Post asynchronously to the tree status application. |
| |
| This object uses a background worker thread, and is thread-safe. |
| """ |
| _TERMINATE = object() |
| |
| def __init__(self, url, password, resource='/receiver'): |
| super(AsyncPush, self).__init__() |
| assert url |
| assert password |
| self.url = url |
| self.resource = resource |
| self.password = password |
| self.queue = Queue.Queue() |
| self.thread = threading.Thread(target=self._worker_thread) |
| self.thread.daemon = True |
| self.thread.start() |
| |
| def close(self): |
| self.queue.put(self._TERMINATE) |
| self.thread.join() |
| |
| def send(self, pending, packet): |
| """Queues a packet.""" |
| logging.debug('For issue %d, sending %s', pending.issue, packet) |
| self.queue.put(self._package(pending, packet)) |
| |
| def _get_items(self): |
| """Waits for an item to be queued and returns up to 10 next items if queued |
| fast enough. |
| """ |
| items = [self.queue.get()] |
| try: |
| for _ in range(9): |
| items.append(self.queue.get_nowait()) |
| except Queue.Empty: |
| pass |
| return items |
| |
| def _worker_thread(self): |
| """Sends the packets in a loop through HTTP POST.""" |
| params = { |
| 'Content-type': 'application/x-www-form-urlencoded', |
| 'Accept': 'text/plain' |
| } |
| done = False |
| try: |
| while not done: |
| items = self._get_items() |
| if self._TERMINATE in items: |
| done = True |
| logging.debug('Worker thread exiting') |
| items.remove(self._TERMINATE) |
| url = self.url + self.resource |
| logging.debug('Sending %d items to %s' % (len(items), url)) |
| try: |
| data = [('p', json.dumps(item)) for item in items] |
| data.append(('password', self.password)) |
| urllib.urlopen(url, urllib.urlencode(data), params).read() |
| except IOError as e: |
| logging.error(e) |
| for item in items: |
| self.queue.put(item) |
| if not done: |
| time.sleep(1) |
| # Don't retry if done. |
| except Exception as e: |
| traceback.print_exc() |
| errors.send_stack(e) |