| import uuid |
| import time |
| from threading import Timer |
| |
| |
| STATUS_EVENT = "status" |
| RESUME_EVENT = "resume" |
| TEST_COMPLETED_EVENT = "test_completed" |
| |
| DEVICES = "devices" |
| DEVICE_ADDED_EVENT = "device_added" |
| DEVICE_REMOVED_EVENT = "device_removed" |
| |
| class EventDispatcher(object): |
| def __init__(self, event_cache_duration): |
| self._listeners = {} |
| self._events = {} |
| self._current_events = {} |
| self._cache_duration = event_cache_duration |
| self._cache_timeout = None |
| |
| def add_event_listener(self, listener, last_event_number=None): |
| token = listener.dispatcher_token |
| |
| if last_event_number is not None \ |
| and token in self._current_events \ |
| and self._current_events[token] > last_event_number: |
| diff_events = self._get_diff_events(token, last_event_number) |
| if len(diff_events) > 0: |
| listener.send_message(diff_events) |
| return |
| |
| if token not in self._listeners: |
| self._listeners[token] = [] |
| self._listeners[token].append(listener) |
| listener.token = str(uuid.uuid1()) |
| return listener.token |
| |
| def remove_event_listener(self, listener_token): |
| if listener_token is None: |
| return |
| |
| for dispatcher_token in self._listeners: |
| for listener in self._listeners[dispatcher_token]: |
| if listener.token == listener_token: |
| self._listeners[dispatcher_token].remove(listener) |
| if len(self._listeners[dispatcher_token]) == 0: |
| del self._listeners[dispatcher_token] |
| return |
| |
| def dispatch_event(self, dispatcher_token, event_type, data=None): |
| if dispatcher_token not in self._current_events: |
| self._current_events[dispatcher_token] = -1 |
| |
| if dispatcher_token not in self._events: |
| self._events[dispatcher_token] = [] |
| |
| self._add_to_cache(dispatcher_token, event_type, data) |
| self._set_cache_timer() |
| |
| if dispatcher_token not in self._listeners: |
| return |
| |
| event = { |
| "type": event_type, |
| "data": data, |
| "number": self._current_events[dispatcher_token] |
| } |
| |
| for listener in self._listeners[dispatcher_token]: |
| listener.send_message([event]) |
| |
| def _get_diff_events(self, dispatcher_token, last_event_number): |
| token = dispatcher_token |
| diff_events = [] |
| if token not in self._events: |
| return diff_events |
| for event in self._events[token]: |
| if event["number"] <= last_event_number: |
| continue |
| diff_events.append({ |
| "type": event["type"], |
| "data": event["data"], |
| "number": event["number"] |
| }) |
| return diff_events |
| |
| def _set_cache_timer(self): |
| if self._cache_timeout is not None: |
| return |
| |
| events = self._read_cached_events() |
| if len(events) == 0: |
| return |
| |
| next_event = events[0] |
| for event in events: |
| if next_event["expiration_date"] > event["expiration_date"]: |
| next_event = event |
| |
| timeout = next_event["expiration_date"] / 1000.0 - time.time() |
| if timeout < 0: |
| timeout = 0 |
| |
| def handle_timeout(self): |
| self._delete_expired_events() |
| self._cache_timeout = None |
| self._set_cache_timer() |
| |
| self._cache_timeout = Timer(timeout, handle_timeout, [self]) |
| self._cache_timeout.start() |
| |
| def _delete_expired_events(self): |
| events = self._read_cached_events() |
| now = int(time.time() * 1000) |
| |
| for event in events: |
| if event["expiration_date"] < now: |
| self._remove_from_cache(event) |
| |
| def _add_to_cache(self, dispatcher_token, event_type, data): |
| self._current_events[dispatcher_token] += 1 |
| current_event_number = self._current_events[dispatcher_token] |
| event = { |
| "type": event_type, |
| "data": data, |
| "number": current_event_number, |
| "expiration_date": int(time.time() * 1000) + self._cache_duration |
| } |
| self._events[dispatcher_token].append(event) |
| |
| def _remove_from_cache(self, event): |
| for dispatcher_token in self._events: |
| for cached_event in self._events[dispatcher_token]: |
| if cached_event is not event: |
| continue |
| self._events[dispatcher_token].remove(cached_event) |
| if len(self._events[dispatcher_token]) == 0: |
| del self._events[dispatcher_token] |
| return |
| |
| def _read_cached_events(self): |
| events = [] |
| for dispatcher_token in self._events: |
| events = events + self._events[dispatcher_token] |
| return events |