| """An event loop. |
| |
| This event loop should handle both asynchronous App Engine RPC objects |
| (specifically urlfetch, memcache and datastore RPC objects) and arbitrary |
| callback functions with an optional time delay. |
| |
| Normally, event loops are singleton objects, though there is no |
| enforcement of this requirement. |
| |
| The API here is inspired by Monocle. |
| """ |
| |
| import collections |
| import logging |
| import os |
| import time |
| |
| from .google_imports import apiproxy_rpc |
| from .google_imports import datastore_rpc |
| |
| from . import utils |
| |
| __all__ = ['EventLoop', |
| 'add_idle', 'queue_call', 'queue_rpc', |
| 'get_event_loop', |
| 'run', 'run0', 'run1', |
| ] |
| |
| _logging_debug = utils.logging_debug |
| |
| _IDLE = apiproxy_rpc.RPC.IDLE |
| _RUNNING = apiproxy_rpc.RPC.RUNNING |
| _FINISHING = apiproxy_rpc.RPC.FINISHING |
| |
| |
| class EventLoop(object): |
| """An event loop.""" |
| |
| def __init__(self): |
| """Constructor.""" |
| self.current = collections.deque() # FIFO list of (callback, args, kwds) |
| self.idlers = collections.deque() # Cyclic list of (callback, args, kwds) |
| self.inactive = 0 # How many idlers in a row were no-ops |
| self.queue = [] # Sorted list of (time, callback, args, kwds) |
| self.rpcs = {} # Map of rpc -> (callback, args, kwds) |
| |
| def clear(self): |
| """Remove all pending events without running any.""" |
| while self.current or self.idlers or self.queue or self.rpcs: |
| current = self.current |
| idlers = self.idlers |
| queue = self.queue |
| rpcs = self.rpcs |
| _logging_debug('Clearing stale EventLoop instance...') |
| if current: |
| _logging_debug(' current = %s', current) |
| if idlers: |
| _logging_debug(' idlers = %s', idlers) |
| if queue: |
| _logging_debug(' queue = %s', queue) |
| if rpcs: |
| _logging_debug(' rpcs = %s', rpcs) |
| self.__init__() |
| current.clear() |
| idlers.clear() |
| queue[:] = [] |
| rpcs.clear() |
| _logging_debug('Cleared') |
| |
| def insort_event_right(self, event, lo=0, hi=None): |
| """Insert event in queue, and keep it sorted assuming queue is sorted. |
| |
| If event is already in queue, insert it to the right of the rightmost |
| event (to keep FIFO order). |
| |
| Optional args lo (default 0) and hi (default len(a)) bound the |
| slice of a to be searched. |
| """ |
| |
| if lo < 0: |
| raise ValueError('lo must be non-negative') |
| if hi is None: |
| hi = len(self.queue) |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| if event[0] < self.queue[mid][0]: hi = mid |
| else: lo = mid + 1 |
| self.queue.insert(lo, event) |
| |
| def queue_call(self, delay, callback, *args, **kwds): |
| """Schedule a function call at a specific time in the future.""" |
| if delay is None: |
| self.current.append((callback, args, kwds)) |
| return |
| if delay < 1e9: |
| when = delay + time.time() |
| else: |
| # Times over a billion seconds are assumed to be absolute. |
| when = delay |
| self.insort_event_right((when, callback, args, kwds)) |
| |
| def queue_rpc(self, rpc, callback=None, *args, **kwds): |
| """Schedule an RPC with an optional callback. |
| |
| The caller must have previously sent the call to the service. |
| The optional callback is called with the remaining arguments. |
| |
| NOTE: If the rpc is a MultiRpc, the callback will be called once |
| for each sub-RPC. TODO: Is this a good idea? |
| """ |
| if rpc is None: |
| return |
| if rpc.state not in (_RUNNING, _FINISHING): |
| raise RuntimeError('rpc must be sent to service before queueing') |
| if isinstance(rpc, datastore_rpc.MultiRpc): |
| rpcs = rpc.rpcs |
| if len(rpcs) > 1: |
| # Don't call the callback until all sub-rpcs have completed. |
| rpc.__done = False |
| def help_multi_rpc_along(r=rpc, c=callback, a=args, k=kwds): |
| if r.state == _FINISHING and not r.__done: |
| r.__done = True |
| c(*a, **k) |
| # TODO: And again, what about exceptions? |
| callback = help_multi_rpc_along |
| args = () |
| kwds = {} |
| else: |
| rpcs = [rpc] |
| for rpc in rpcs: |
| self.rpcs[rpc] = (callback, args, kwds) |
| |
| def add_idle(self, callback, *args, **kwds): |
| """Add an idle callback. |
| |
| An idle callback can return True, False or None. These mean: |
| |
| - None: remove the callback (don't reschedule) |
| - False: the callback did no work; reschedule later |
| - True: the callback did some work; reschedule soon |
| |
| If the callback raises an exception, the traceback is logged and |
| the callback is removed. |
| """ |
| self.idlers.append((callback, args, kwds)) |
| |
| def run_idle(self): |
| """Run one of the idle callbacks. |
| |
| Returns: |
| True if one was called, False if no idle callback was called. |
| """ |
| if not self.idlers or self.inactive >= len(self.idlers): |
| return False |
| idler = self.idlers.popleft() |
| callback, args, kwds = idler |
| _logging_debug('idler: %s', callback.__name__) |
| res = callback(*args, **kwds) |
| # See add_idle() for the meaning of the callback return value. |
| if res is not None: |
| if res: |
| self.inactive = 0 |
| else: |
| self.inactive += 1 |
| self.idlers.append(idler) |
| else: |
| _logging_debug('idler %s removed', callback.__name__) |
| return True |
| |
| def run0(self): |
| """Run one item (a callback or an RPC wait_any). |
| |
| Returns: |
| A time to sleep if something happened (may be 0); |
| None if all queues are empty. |
| """ |
| if self.current: |
| self.inactive = 0 |
| callback, args, kwds = self.current.popleft() |
| _logging_debug('nowevent: %s', callback.__name__) |
| callback(*args, **kwds) |
| return 0 |
| if self.run_idle(): |
| return 0 |
| delay = None |
| if self.queue: |
| delay = self.queue[0][0] - time.time() |
| if delay <= 0: |
| self.inactive = 0 |
| _, callback, args, kwds = self.queue.pop(0) |
| _logging_debug('event: %s', callback.__name__) |
| callback(*args, **kwds) |
| # TODO: What if it raises an exception? |
| return 0 |
| if self.rpcs: |
| self.inactive = 0 |
| rpc = datastore_rpc.MultiRpc.wait_any(self.rpcs) |
| if rpc is not None: |
| _logging_debug('rpc: %s.%s', rpc.service, rpc.method) |
| # Yes, wait_any() may return None even for a non-empty argument. |
| # But no, it won't ever return an RPC not in its argument. |
| if rpc not in self.rpcs: |
| raise RuntimeError('rpc %r was not given to wait_any as a choice %r' % |
| (rpc, self.rpcs)) |
| callback, args, kwds = self.rpcs[rpc] |
| del self.rpcs[rpc] |
| if callback is not None: |
| callback(*args, **kwds) |
| # TODO: Again, what about exceptions? |
| return 0 |
| return delay |
| |
| def run1(self): |
| """Run one item (a callback or an RPC wait_any) or sleep. |
| |
| Returns: |
| True if something happened; False if all queues are empty. |
| """ |
| delay = self.run0() |
| if delay is None: |
| return False |
| if delay > 0: |
| time.sleep(delay) |
| return True |
| |
| def run(self): |
| """Run until there's nothing left to do.""" |
| # TODO: A way to stop running before the queue is empty. |
| self.inactive = 0 |
| while True: |
| if not self.run1(): |
| break |
| |
| |
| class _State(utils.threading_local): |
| event_loop = None |
| |
| |
| _EVENT_LOOP_KEY = '__EVENT_LOOP__' |
| |
| _state = _State() |
| |
| |
| def get_event_loop(): |
| """Return a EventLoop instance. |
| |
| A new instance is created for each new HTTP request. We determine |
| that we're in a new request by inspecting os.environ, which is reset |
| at the start of each request. Also, each thread gets its own loop. |
| """ |
| ev = _state.event_loop |
| if not os.getenv(_EVENT_LOOP_KEY) and ev is not None: |
| ev.clear() |
| _state.event_loop = None |
| ev = None |
| if ev is None: |
| ev = EventLoop() |
| _state.event_loop = ev |
| os.environ[_EVENT_LOOP_KEY] = '1' |
| return ev |
| |
| |
| def queue_call(*args, **kwds): |
| ev = get_event_loop() |
| ev.queue_call(*args, **kwds) |
| |
| |
| def queue_rpc(rpc, callback=None, *args, **kwds): |
| ev = get_event_loop() |
| ev.queue_rpc(rpc, callback, *args, **kwds) |
| |
| |
| def add_idle(callback, *args, **kwds): |
| ev = get_event_loop() |
| ev.add_idle(callback, *args, **kwds) |
| |
| |
| def run(): |
| ev = get_event_loop() |
| ev.run() |
| |
| |
| def run1(): |
| ev = get_event_loop() |
| return ev.run1() |
| |
| |
| def run0(): |
| ev = get_event_loop() |
| return ev.run0() |