Instalog: BufferEventStream doesn't time out in time when filtering

Instalog has flow policy to filter events in buffer.  If a output plugin
doesn't accept a event, the BufferEventStream will ignore it
automatically.  However, if buffer has lots of event which is not for
the output plugin, the BufferEventStream won't time out in time.

BUG=b:109905899
TEST=manually test in local device

Change-Id: Iccc9aad211fec47ecc80f9e591097303f30f5d90
Reviewed-on: https://chromium-review.googlesource.com/1141547
Commit-Ready: Chun-Tsen Kuo <chuntsen@chromium.org>
Tested-by: Chun-Tsen Kuo <chuntsen@chromium.org>
Reviewed-by: Ting Shen <phoenixshen@chromium.org>
diff --git a/py/instalog/datatypes.py b/py/instalog/datatypes.py
index 26dffd6..3e9ff6f 100644
--- a/py/instalog/datatypes.py
+++ b/py/instalog/datatypes.py
@@ -243,20 +243,23 @@
     """The total number of events retrieved so far."""
     return self._count
 
-  def Next(self):
+  def Next(self, timeout=1):
     """Gets the next available event from the buffer.
 
     Just like a normal Python iterable, should raise StopIteration when no
     more events are available.  However, in the case that the plugin has been
     paused, a WaitException will be raised.
 
+    Args:
+      timeout: Seconds to wait for retrieving next event.
+
     Returns:
-      None if no more events are currently available.
+      None if timeout or no more events are currently available.
 
     Raises:
       WaitException if the plugin has been paused.
     """
-    ret = self._plugin_api.EventStreamNext(self._plugin, self)
+    ret = self._plugin_api.EventStreamNext(self._plugin, self, timeout)
     if ret is not None:
       self._count += 1
     return ret
@@ -347,7 +350,8 @@
       # Try getting the next event.  If the plugin is in a waiting state,
       # stop iteration immediately.
       try:
-        ret = self.event_stream.Next()
+        remaining_time = self._start + self.timeout - time_utils.MonotonicTime()
+        ret = self.event_stream.Next(timeout=remaining_time)
       except plugin_base.WaitException:
         raise StopIteration
 
diff --git a/py/instalog/datatypes_unittest.py b/py/instalog/datatypes_unittest.py
index 85d544f..3b0a93e 100755
--- a/py/instalog/datatypes_unittest.py
+++ b/py/instalog/datatypes_unittest.py
@@ -86,8 +86,8 @@
     del plugin
     return False
 
-  def EventStreamNext(self, plugin, event_stream):
-    del plugin, event_stream
+  def EventStreamNext(self, plugin, event_stream, timeout=1):
+    del plugin, event_stream, timeout
     if self._expired:
       raise plugin_base.EventStreamExpired
     if self._buffer_queue.empty():
@@ -346,8 +346,8 @@
   def testBlockUntilWaitException(self):
     """Tests that iterator aborts before its timeout on WaitException."""
     wait_exception_begin = time_utils.MonotonicTime() + 1
-    def DelayedWaitException(plugin, event_stream):
-      del plugin, event_stream
+    def DelayedWaitException(plugin, event_stream, timeout):
+      del plugin, event_stream, timeout
       if time_utils.MonotonicTime() > wait_exception_begin:
         raise plugin_base.WaitException
       else:
@@ -365,8 +365,8 @@
   def testBlockUntilCountFulfilled(self):
     """Tests that an iterator ends when its count is fulfilled."""
     wait_event_begin = time_utils.MonotonicTime() + 1
-    def DelayedEvent(plugin, event_stream):
-      del plugin, event_stream
+    def DelayedEvent(plugin, event_stream, timeout):
+      del plugin, event_stream, timeout
       if time_utils.MonotonicTime() > wait_event_begin:
         return 'delayed_event'
       else:
diff --git a/py/instalog/plugin_base.py b/py/instalog/plugin_base.py
index a9cbb5f..4705184 100644
--- a/py/instalog/plugin_base.py
+++ b/py/instalog/plugin_base.py
@@ -82,7 +82,7 @@
     """See OutputPlugin.NewStream."""
     raise NotImplementedError
 
-  def EventStreamNext(self, plugin, plugin_stream):
+  def EventStreamNext(self, plugin, plugin_stream, timeout):
     """See BufferEventStream.Next."""
     raise NotImplementedError
 
diff --git a/py/instalog/plugin_sandbox.py b/py/instalog/plugin_sandbox.py
index 1e484b7..b5c699d 100644
--- a/py/instalog/plugin_sandbox.py
+++ b/py/instalog/plugin_sandbox.py
@@ -609,13 +609,13 @@
     self._event_stream_map[plugin_stream] = buffer_stream
     return plugin_stream
 
-  def EventStreamNext(self, plugin, plugin_stream):
+  def EventStreamNext(self, plugin, plugin_stream, timeout=1):
     """See PluginAPI.EventStreamNext."""
     self._AskGatekeeper(plugin, self._GATEKEEPER_ALLOW_UP)
     self.debug('EventStreamNext called with state=%s', self._state)
     if plugin_stream not in self._event_stream_map:
       raise plugin_base.UnexpectedAccess
-    ret = self._NextMatchingEvent(plugin_stream)
+    ret = self._NextMatchingEvent(plugin_stream, timeout)
     if ret:
       # TODO(kitching): Relocate the ProcessStage annotation into Core.
       process_stage = datatypes.ProcessStage(
@@ -627,17 +627,27 @@
       ret.AppendStage(process_stage)
     return ret
 
-  def _NextMatchingEvent(self, plugin_stream):
+  def _NextMatchingEvent(self, plugin_stream, timeout):
     """Retrieves the next event matching the plugin's FlowPolicy.
 
-    Returns None if no events are available.
+    Args:
+      plugin_stream: A stream of events for an output plugin to process.
+      timeout: Seconds to wait for retrieving next event.
+
+    Returns:
+      None if timeout or no events are available.
     """
-    while True:
-      ret = self._event_stream_map[plugin_stream].Next()
-      if ret is None:
-        return None
-      if self._policy.MatchEvent(ret):
-        return ret
+    try:
+      def CheckEvent(event):
+        return event is None or self._policy.MatchEvent(event)
+
+      return sync_utils.PollForCondition(
+          poll_method=self._event_stream_map[plugin_stream].Next,
+          condition_method=CheckEvent,
+          timeout_secs=timeout,
+          poll_interval_secs=0)
+    except type_utils.TimeoutError:
+      return None
 
   def EventStreamCommit(self, plugin, plugin_stream):
     """See PluginAPI.EventStreamCommit."""