Rust: Support delayed tasks in RunLoop

Adds the ability to schedule a delayed task in the RunLoop.
The task is inserted as a closure and as long as one handle
lives in the RunLoop it is guaranteed to run at the confirmed
deadline (with no handlers the RunLoop exits).

R=eholk@chromium.org

Review URL: https://codereview.chromium.org/2244463002 .

Cr-Mirrored-From: https://github.com/domokit/mojo
Cr-Mirrored-Commit: fefa014de0c9b55547ea19ed005b252a00dbe2e2
diff --git a/rust/src/bindings/run_loop.rs b/rust/src/bindings/run_loop.rs
index 1310910..754694e 100644
--- a/rust/src/bindings/run_loop.rs
+++ b/rust/src/bindings/run_loop.rs
@@ -13,6 +13,11 @@
 //! at which point it wakes up and executes the appropriate handler
 //! method. This handler method may then be used to further populate
 //! or de-populate the run-loop.
+//!
+//! As of yet, the run-loop is NOT thread-safe. Although it is useful
+//! to be able to register tasks or handles from one thread onto
+//! another thread's run-loop, this is as-of-yet unsupported, and
+//! Rust should complain loudly when you try to do any threading here.
 
 use std::cell::RefCell;
 use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering};
@@ -40,7 +45,7 @@
 const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256;
 
 /// Thread-local data structure for keeping track of handles to wait on.
-thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static>> = RefCell::new(RunLoop::new()));
+thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell::new(RunLoop::new()));
 
 /// Token representing handle/callback to wait on for this thread only. This
 /// token only has meaning on the thread in which the handle was registered.
@@ -133,6 +138,62 @@
     }
 }
 
+/// A wrapper struct for carrying the task as well as some information about
+/// it.
+struct TaskInfo<'t> {
+    /// The task, boxed up.
+    closure: Box<FnMut(&mut RunLoop) + 't>,
+
+    /// An absolute deadline in terms of time ticks.
+    ///
+    /// This is the most recently updated deadline that
+    /// we should be watching out for. All others for this
+    /// token may be considered "stale".
+    deadline: system::MojoTimeTicks,
+}
+
+impl<'t> TaskInfo<'t> {
+    /// Executes the task within the info object, consuming it
+    /// in the process.
+    pub fn execute_task(mut self, run_loop: &mut RunLoop) {
+        (*self.closure)(run_loop);
+    }
+
+    /// Getter for the current absolute deadline held inside.
+    pub fn deadline(&self) -> system::MojoTimeTicks {
+        self.deadline
+    }
+}
+
+impl<'t> PartialEq for TaskInfo<'t> {
+    /// Equality for TaskInfo in terms of its deadline
+    fn eq(&self, other: &TaskInfo) -> bool {
+        self.deadline == other.deadline
+    }
+}
+
+impl<'t> Eq for TaskInfo<'t> {}
+
+impl<'t> PartialOrd for TaskInfo<'t> {
+    /// Partial comparison for TaskInfo in terms of its deadline
+    ///
+    /// Reverses the comparison because the Rust std library only
+    /// offers a max-heap, and we need a min-heap.
+    fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> {
+        other.deadline.partial_cmp(&self.deadline)
+    }
+}
+
+impl<'t> Ord for TaskInfo<'t> {
+    /// Implement comparisons for Task Info.
+    ///
+    /// Reverses the comparison because the Rust std library only
+    /// offers a max-heap, and we need a min-heap.
+    fn cmp(&self, other: &Self) -> Ordering {
+        other.deadline.cmp(&self.deadline)
+    }
+}
+
 /// Wrapper struct intended to be used in a priority queue
 /// for efficiently retrieving the next closest deadline.
 #[derive(Clone)]
@@ -224,7 +285,7 @@
 ///
 /// Ultimately, it should only be a singleton living in
 /// thread-local storage.
-pub struct RunLoop<'h> {
+pub struct RunLoop<'h, 't> {
     /// Running count of the next available token slot.
     token_count: u64,
 
@@ -233,6 +294,10 @@
     /// TODO(mknyszek): Try out a Slab allocator instead of a hashmap.
     handlers: HashMap<Token, HandlerInfo<'h>>,
 
+    /// A min-heap of delayed tasks in order to pull the soonest task to
+    /// execute efficiently.
+    tasks: BinaryHeap<TaskInfo<'t>>,
+
     /// A min-heap containing deadlines in order to pull out the soonest
     /// deadline efficiently.
     ///
@@ -252,12 +317,13 @@
     running: bool,
 }
 
-impl<'h> RunLoop<'h> {
+impl<'h, 't> RunLoop<'h, 't> {
     /// Create a new RunLoop.
-    pub fn new() -> RunLoop<'h> {
+    pub fn new() -> RunLoop<'h, 't> {
         RunLoop {
             token_count: 0,
             handlers: HashMap::new(),
+            tasks: BinaryHeap::new(),
             deadlines: BinaryHeap::new(),
             handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(),
             should_quit: false,
@@ -356,12 +422,34 @@
         }
     }
 
-    /// Uses the binary heap to get the next closest deadline.
+    /// Adds a task to be run by the runloop after some delay.
+    ///
+    /// Returns a token if the delay is valid, otherwise returns None.
+    pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Result<(), ()>
+        where F: FnMut(&mut RunLoop) + 't
+    {
+        let now = core::get_time_ticks_now();
+        if delay > i64::MAX - now {
+            return Err(());
+        }
+        let deadline = now + delay;
+        self.tasks.push(TaskInfo {
+            closure: Box::new(task),
+            deadline: deadline,
+        });
+        Ok(())
+    }
+
+    /// Uses the binary heaps to get the next closest deadline.
     ///
     /// Removes stale deadline entries as they are found, but
     /// does not otherwise modify the heap of deadlines.
     fn get_next_deadline(&mut self) -> system::MojoTimeTicks {
         debug_assert!(!self.handlers.is_empty());
+        let top_task_deadline = match self.tasks.peek() {
+            Some(info) => info.deadline(),
+            None => MOJO_INDEFINITE_ABSOLUTE,
+        };
         let mut top = match self.deadlines.peek() {
             Some(info) => info.clone(),
             None => return MOJO_INDEFINITE_ABSOLUTE,
@@ -373,7 +461,12 @@
                 None => return MOJO_INDEFINITE_ABSOLUTE,
             }
         }
-        top.deadline()
+        if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE &&
+           top_task_deadline < top.deadline() {
+            top_task_deadline
+        } else {
+            top.deadline()
+        }
     }
 
     /// Gets a handler by token to be manipulated in a consistent environment.
@@ -483,6 +576,30 @@
         }
     }
 
+    /// Iterates through all tasks whose deadline has passed and executes
+    /// them, consuming their information object.
+    ///
+    /// These tasks all have access to the RunLoop so that they may reschedule
+    /// themselves or manipulate the RunLoop in some other way.
+    fn execute_ready_tasks(&mut self) {
+        let now = core::get_time_ticks_now();
+        let mut deadline = match self.tasks.peek() {
+            Some(info) => info.deadline(),
+            None => return,
+        };
+        while deadline < now {
+            let top = self.tasks.pop().expect("Sudden change to heap?");
+            top.execute_task(self);
+            if self.should_quit {
+                return;
+            }
+            deadline = match self.tasks.peek() {
+                Some(info) => info.deadline(),
+                None => return,
+            };
+        }
+    }
+
     /// Blocks on handle_set.wait_on_set using the information contained
     /// within itself.
     ///
@@ -492,6 +609,12 @@
     /// signals satisfied, or reaches its deadline.
     fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) {
         debug_assert!(!self.handlers.is_empty());
+        self.execute_ready_tasks();
+        // If after executing a task we quit or there are no handles,
+        // we have no reason to continue.
+        if self.handlers.is_empty() || self.should_quit {
+            return;
+        }
         let deadline = self.get_next_deadline();
         let until_deadline = relative_deadline(deadline, core::get_time_ticks_now());
         // Perform the wait
diff --git a/rust/tests/run_loop.rs b/rust/tests/run_loop.rs
index 6b1f9d4..af159cb 100644
--- a/rust/tests/run_loop.rs
+++ b/rust/tests/run_loop.rs
@@ -19,6 +19,9 @@
 use mojo::system::MOJO_INDEFINITE;
 use mojo::system::message_pipe;
 
+use std::cell::Cell;
+use std::rc::Rc;
+
 struct HandlerExpectReady {}
 
 impl Handler for HandlerExpectReady {
@@ -174,6 +177,63 @@
     }
 }
 
+struct HandlerTasks {
+    count: Rc<Cell<u64>>,
+}
+
+impl Handler for HandlerTasks {
+    fn on_ready(&mut self, runloop: &mut RunLoop, token: Token) {
+        let r = self.count.clone();
+        let _ = runloop.post_task(move |_runloop| {
+            let val = (*r).get();
+            (*r).set(val+1);
+        }, 10);
+        if (*self.count).get() > 10 {
+            runloop.deregister(token);
+        }
+    }
+    fn on_timeout(&mut self, _runloop: &mut RunLoop, _token: Token) {
+        panic!("Timed-out when expected error");
+    }
+    fn on_error(&mut self, _runloop: &mut RunLoop, _token: Token, _error: WaitError) {
+        panic!("Error when expected ready");
+    }
+}
+
+struct NestedTasks {
+    count: Rc<Cell<u64>>,
+    quitter: bool,
+}
+
+impl Handler for NestedTasks {
+    fn on_ready(&mut self, runloop: &mut RunLoop, token: Token) {
+        let r = self.count.clone();
+        let quit = self.quitter;
+        let _ = runloop.post_task(move |runloop| {
+            let r2 = r.clone();
+            let tk = token.clone();
+            if (*r).get() < 10 {
+                let _ = runloop.post_task(move |_runloop| {
+                    let val = (*r2).get();
+                    (*r2).set(val+1);
+                }, 0);
+            } else {
+                if quit {
+                    runloop.quit();
+                } else {
+                    runloop.deregister(tk);
+                }
+            }
+        }, 0);
+    }
+    fn on_timeout(&mut self, _runloop: &mut RunLoop, _token: Token) {
+        panic!("Timed-out when expected error");
+    }
+    fn on_error(&mut self, _runloop: &mut RunLoop, _token: Token, _error: WaitError) {
+        panic!("Error when expected ready");
+    }
+}
+
 tests! {
     // Verifies that after adding and removing, we can run, exit and be
     // left in a consistent state.
@@ -279,4 +339,48 @@
             runloop.run();
         });
     }
+
+    // Tests adding a simple task that adds a handler.
+    fn simple_task() {
+        run_loop::with_current(|runloop| {
+            let _ = runloop.post_task(|runloop| {
+                let (_, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+                let _ = runloop.register(&endpt1, signals!(Signals::Readable), 0, HandlerExpectError {});
+            }, 0);
+            runloop.run();
+        });
+    }
+
+    // Tests using a handler that adds a bunch of tasks.
+    fn handler_tasks() {
+        let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+        let r = Rc::new(Cell::new(0));
+        run_loop::with_current(|runloop| {
+            let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, HandlerTasks { count: r.clone() });
+            runloop.run();
+            assert!((*r).get() >= 11);
+        });
+    }
+
+    // Tests using a handler that adds a bunch of tasks.
+    fn nested_tasks() {
+        let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+        let r = Rc::new(Cell::new(0));
+        run_loop::with_current(|runloop| {
+            let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, NestedTasks { count: r.clone(), quitter: false });
+            runloop.run();
+            assert!((*r).get() >= 10);
+        });
+    }
+
+    // Tests using a handler that adds a bunch of tasks.
+    fn nested_tasks_quit() {
+        let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+        let r = Rc::new(Cell::new(0));
+        run_loop::with_current(|runloop| {
+            let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, NestedTasks { count: r.clone(), quitter: true });
+            runloop.run();
+            assert!((*r).get() >= 10);
+        });
+    }
 }