blob: 2e6d86137ff03add83e3389329e472467a6d7779 [file] [log] [blame]
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Out of band HTTP push."""
import Queue
import json
import logging
import os
import threading
import time
import traceback
import urllib
import errors
from verification import base
class AsyncPushNoop(object):
url = 'http://localhost'
def close(self):
pass
def send(self, pending, packet):
pass
@staticmethod
def _package(pending, packet):
data = {
'done': pending.get_state() not in (base.PROCESSING, base.IGNORED),
'issue': pending.issue,
'owner': pending.owner,
'patchset': pending.patchset,
'timestamp': time.time(),
}
if packet:
data.update(packet)
return data
class AsyncPushStore(AsyncPushNoop):
"""Saves all the events into workdir/events.json for later analysis.
Thread-safe.
"""
def __init__(self):
super(AsyncPushStore, self).__init__()
self.lock = threading.Lock()
self.queue = []
def close(self):
with self.lock:
with open(os.path.join('workdir', 'events.json'), 'w') as f:
json.dump(self.queue, f, indent=2)
def send(self, pending, packet):
with self.lock:
self.queue.append(self._package(pending, packet))
class AsyncPush(AsyncPushNoop):
"""Sends HTTP Post asynchronously to the tree status application.
This object uses a background worker thread, and is thread-safe.
"""
_TERMINATE = object()
def __init__(self, url, password, resource='/receiver'):
super(AsyncPush, self).__init__()
assert url
assert password
self.url = url
self.resource = resource
self.password = password
self.queue = Queue.Queue()
self.thread = threading.Thread(target=self._worker_thread)
self.thread.daemon = True
self.thread.start()
def close(self):
self.queue.put(self._TERMINATE)
self.thread.join()
def send(self, pending, packet):
"""Queues a packet."""
logging.debug('For issue %d, queuing for send: %s', pending.issue, packet)
self.queue.put(self._package(pending, packet))
def _get_items(self):
"""Waits for an item to be queued and returns up to 10 next items if queued
fast enough.
"""
items = [self.queue.get()]
try:
for _ in range(9):
items.append(self.queue.get_nowait())
except Queue.Empty:
pass
return items
def _worker_thread(self):
"""Sends the packets in a loop through HTTP POST."""
params = {
'Content-type': 'application/x-www-form-urlencoded',
'Accept': 'text/plain'
}
done = False
try:
while not done:
items = self._get_items()
if self._TERMINATE in items:
done = True
logging.debug('Worker thread exiting')
items.remove(self._TERMINATE)
url = self.url + self.resource
logging.debug('Sending %d items to %s: %r', len(items), url, items)
try:
data = [('p', json.dumps(item)) for item in items]
data.append(('password', self.password))
urllib.urlopen(url, urllib.urlencode(data), params).read()
except IOError as e:
logging.error(e)
for item in items:
self.queue.put(item)
if not done:
time.sleep(1)
# Don't retry if done.
except Exception as e:
traceback.print_exc()
errors.send_stack(e)