Instalog: BufferEventStream doesn't time out in time when filtering
Instalog has flow policy to filter events in buffer. If a output plugin
doesn't accept a event, the BufferEventStream will ignore it
automatically. However, if buffer has lots of event which is not for
the output plugin, the BufferEventStream won't time out in time.
BUG=b:109905899
TEST=manually test in local device
Change-Id: Iccc9aad211fec47ecc80f9e591097303f30f5d90
Reviewed-on: https://chromium-review.googlesource.com/1141547
Commit-Ready: Chun-Tsen Kuo <chuntsen@chromium.org>
Tested-by: Chun-Tsen Kuo <chuntsen@chromium.org>
Reviewed-by: Ting Shen <phoenixshen@chromium.org>
diff --git a/py/instalog/datatypes.py b/py/instalog/datatypes.py
index 26dffd6..3e9ff6f 100644
--- a/py/instalog/datatypes.py
+++ b/py/instalog/datatypes.py
@@ -243,20 +243,23 @@
"""The total number of events retrieved so far."""
return self._count
- def Next(self):
+ def Next(self, timeout=1):
"""Gets the next available event from the buffer.
Just like a normal Python iterable, should raise StopIteration when no
more events are available. However, in the case that the plugin has been
paused, a WaitException will be raised.
+ Args:
+ timeout: Seconds to wait for retrieving next event.
+
Returns:
- None if no more events are currently available.
+ None if timeout or no more events are currently available.
Raises:
WaitException if the plugin has been paused.
"""
- ret = self._plugin_api.EventStreamNext(self._plugin, self)
+ ret = self._plugin_api.EventStreamNext(self._plugin, self, timeout)
if ret is not None:
self._count += 1
return ret
@@ -347,7 +350,8 @@
# Try getting the next event. If the plugin is in a waiting state,
# stop iteration immediately.
try:
- ret = self.event_stream.Next()
+ remaining_time = self._start + self.timeout - time_utils.MonotonicTime()
+ ret = self.event_stream.Next(timeout=remaining_time)
except plugin_base.WaitException:
raise StopIteration
diff --git a/py/instalog/datatypes_unittest.py b/py/instalog/datatypes_unittest.py
index 85d544f..3b0a93e 100755
--- a/py/instalog/datatypes_unittest.py
+++ b/py/instalog/datatypes_unittest.py
@@ -86,8 +86,8 @@
del plugin
return False
- def EventStreamNext(self, plugin, event_stream):
- del plugin, event_stream
+ def EventStreamNext(self, plugin, event_stream, timeout=1):
+ del plugin, event_stream, timeout
if self._expired:
raise plugin_base.EventStreamExpired
if self._buffer_queue.empty():
@@ -346,8 +346,8 @@
def testBlockUntilWaitException(self):
"""Tests that iterator aborts before its timeout on WaitException."""
wait_exception_begin = time_utils.MonotonicTime() + 1
- def DelayedWaitException(plugin, event_stream):
- del plugin, event_stream
+ def DelayedWaitException(plugin, event_stream, timeout):
+ del plugin, event_stream, timeout
if time_utils.MonotonicTime() > wait_exception_begin:
raise plugin_base.WaitException
else:
@@ -365,8 +365,8 @@
def testBlockUntilCountFulfilled(self):
"""Tests that an iterator ends when its count is fulfilled."""
wait_event_begin = time_utils.MonotonicTime() + 1
- def DelayedEvent(plugin, event_stream):
- del plugin, event_stream
+ def DelayedEvent(plugin, event_stream, timeout):
+ del plugin, event_stream, timeout
if time_utils.MonotonicTime() > wait_event_begin:
return 'delayed_event'
else:
diff --git a/py/instalog/plugin_base.py b/py/instalog/plugin_base.py
index a9cbb5f..4705184 100644
--- a/py/instalog/plugin_base.py
+++ b/py/instalog/plugin_base.py
@@ -82,7 +82,7 @@
"""See OutputPlugin.NewStream."""
raise NotImplementedError
- def EventStreamNext(self, plugin, plugin_stream):
+ def EventStreamNext(self, plugin, plugin_stream, timeout):
"""See BufferEventStream.Next."""
raise NotImplementedError
diff --git a/py/instalog/plugin_sandbox.py b/py/instalog/plugin_sandbox.py
index 1e484b7..b5c699d 100644
--- a/py/instalog/plugin_sandbox.py
+++ b/py/instalog/plugin_sandbox.py
@@ -609,13 +609,13 @@
self._event_stream_map[plugin_stream] = buffer_stream
return plugin_stream
- def EventStreamNext(self, plugin, plugin_stream):
+ def EventStreamNext(self, plugin, plugin_stream, timeout=1):
"""See PluginAPI.EventStreamNext."""
self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP)
self.debug('EventStreamNext called with state=%s', self._state)
if plugin_stream not in self._event_stream_map:
raise plugin_base.UnexpectedAccess
- ret = self._NextMatchingEvent(plugin_stream)
+ ret = self._NextMatchingEvent(plugin_stream, timeout)
if ret:
# TODO(kitching): Relocate the ProcessStage annotation into Core.
process_stage = datatypes.ProcessStage(
@@ -627,17 +627,27 @@
ret.AppendStage(process_stage)
return ret
- def _NextMatchingEvent(self, plugin_stream):
+ def _NextMatchingEvent(self, plugin_stream, timeout):
"""Retrieves the next event matching the plugin's FlowPolicy.
- Returns None if no events are available.
+ Args:
+ plugin_stream: A stream of events for an output plugin to process.
+ timeout: Seconds to wait for retrieving next event.
+
+ Returns:
+ None if timeout or no events are available.
"""
- while True:
- ret = self._event_stream_map[plugin_stream].Next()
- if ret is None:
- return None
- if self._policy.MatchEvent(ret):
- return ret
+ try:
+ def CheckEvent(event):
+ return event is None or self._policy.MatchEvent(event)
+
+ return sync_utils.PollForCondition(
+ poll_method=self._event_stream_map[plugin_stream].Next,
+ condition_method=CheckEvent,
+ timeout_secs=timeout,
+ poll_interval_secs=0)
+ except type_utils.TimeoutError:
+ return None
def EventStreamCommit(self, plugin, plugin_stream):
"""See PluginAPI.EventStreamCommit."""