Rust: Support delayed tasks in RunLoop
Adds the ability to schedule a delayed task in the RunLoop.
The task is inserted as a closure and as long as one handle
lives in the RunLoop it is guaranteed to run at the confirmed
deadline (with no handlers the RunLoop exits).
R=eholk@chromium.org
Review URL: https://codereview.chromium.org/2244463002 .
Cr-Mirrored-From: https://github.com/domokit/mojo
Cr-Mirrored-Commit: fefa014de0c9b55547ea19ed005b252a00dbe2e2
diff --git a/rust/src/bindings/run_loop.rs b/rust/src/bindings/run_loop.rs
index 1310910..754694e 100644
--- a/rust/src/bindings/run_loop.rs
+++ b/rust/src/bindings/run_loop.rs
@@ -13,6 +13,11 @@
//! at which point it wakes up and executes the appropriate handler
//! method. This handler method may then be used to further populate
//! or de-populate the run-loop.
+//!
+//! As of yet, the run-loop is NOT thread-safe. Although it is useful
+//! to be able to register tasks or handles from one thread onto
+//! another thread's run-loop, this is as-of-yet unsupported, and
+//! Rust should complain loudly when you try to do any threading here.
use std::cell::RefCell;
use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering};
@@ -40,7 +45,7 @@
const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256;
/// Thread-local data structure for keeping track of handles to wait on.
-thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static>> = RefCell::new(RunLoop::new()));
+thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell::new(RunLoop::new()));
/// Token representing handle/callback to wait on for this thread only. This
/// token only has meaning on the thread in which the handle was registered.
@@ -133,6 +138,62 @@
}
}
+/// A wrapper struct for carrying the task as well as some information about
+/// it.
+struct TaskInfo<'t> {
+ /// The task, boxed up.
+ closure: Box<FnMut(&mut RunLoop) + 't>,
+
+ /// An absolute deadline in terms of time ticks.
+ ///
+ /// This is the most recently updated deadline that
+ /// we should be watching out for. All others for this
+ /// token may be considered "stale".
+ deadline: system::MojoTimeTicks,
+}
+
+impl<'t> TaskInfo<'t> {
+ /// Executes the task within the info object, consuming it
+ /// in the process.
+ pub fn execute_task(mut self, run_loop: &mut RunLoop) {
+ (*self.closure)(run_loop);
+ }
+
+ /// Getter for the current absolute deadline held inside.
+ pub fn deadline(&self) -> system::MojoTimeTicks {
+ self.deadline
+ }
+}
+
+impl<'t> PartialEq for TaskInfo<'t> {
+ /// Equality for TaskInfo in terms of its deadline
+ fn eq(&self, other: &TaskInfo) -> bool {
+ self.deadline == other.deadline
+ }
+}
+
+impl<'t> Eq for TaskInfo<'t> {}
+
+impl<'t> PartialOrd for TaskInfo<'t> {
+ /// Partial comparison for TaskInfo in terms of its deadline
+ ///
+ /// Reverses the comparison because the Rust std library only
+ /// offers a max-heap, and we need a min-heap.
+ fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> {
+ other.deadline.partial_cmp(&self.deadline)
+ }
+}
+
+impl<'t> Ord for TaskInfo<'t> {
+ /// Implement comparisons for Task Info.
+ ///
+ /// Reverses the comparison because the Rust std library only
+ /// offers a max-heap, and we need a min-heap.
+ fn cmp(&self, other: &Self) -> Ordering {
+ other.deadline.cmp(&self.deadline)
+ }
+}
+
/// Wrapper struct intended to be used in a priority queue
/// for efficiently retrieving the next closest deadline.
#[derive(Clone)]
@@ -224,7 +285,7 @@
///
/// Ultimately, it should only be a singleton living in
/// thread-local storage.
-pub struct RunLoop<'h> {
+pub struct RunLoop<'h, 't> {
/// Running count of the next available token slot.
token_count: u64,
@@ -233,6 +294,10 @@
/// TODO(mknyszek): Try out a Slab allocator instead of a hashmap.
handlers: HashMap<Token, HandlerInfo<'h>>,
+ /// A min-heap of delayed tasks in order to pull the soonest task to
+ /// execute efficiently.
+ tasks: BinaryHeap<TaskInfo<'t>>,
+
/// A min-heap containing deadlines in order to pull out the soonest
/// deadline efficiently.
///
@@ -252,12 +317,13 @@
running: bool,
}
-impl<'h> RunLoop<'h> {
+impl<'h, 't> RunLoop<'h, 't> {
/// Create a new RunLoop.
- pub fn new() -> RunLoop<'h> {
+ pub fn new() -> RunLoop<'h, 't> {
RunLoop {
token_count: 0,
handlers: HashMap::new(),
+ tasks: BinaryHeap::new(),
deadlines: BinaryHeap::new(),
handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(),
should_quit: false,
@@ -356,12 +422,34 @@
}
}
- /// Uses the binary heap to get the next closest deadline.
+ /// Adds a task to be run by the runloop after some delay.
+ ///
+ /// Returns a token if the delay is valid, otherwise returns None.
+ pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Result<(), ()>
+ where F: FnMut(&mut RunLoop) + 't
+ {
+ let now = core::get_time_ticks_now();
+ if delay > i64::MAX - now {
+ return Err(());
+ }
+ let deadline = now + delay;
+ self.tasks.push(TaskInfo {
+ closure: Box::new(task),
+ deadline: deadline,
+ });
+ Ok(())
+ }
+
+ /// Uses the binary heaps to get the next closest deadline.
///
/// Removes stale deadline entries as they are found, but
/// does not otherwise modify the heap of deadlines.
fn get_next_deadline(&mut self) -> system::MojoTimeTicks {
debug_assert!(!self.handlers.is_empty());
+ let top_task_deadline = match self.tasks.peek() {
+ Some(info) => info.deadline(),
+ None => MOJO_INDEFINITE_ABSOLUTE,
+ };
let mut top = match self.deadlines.peek() {
Some(info) => info.clone(),
None => return MOJO_INDEFINITE_ABSOLUTE,
@@ -373,7 +461,12 @@
None => return MOJO_INDEFINITE_ABSOLUTE,
}
}
- top.deadline()
+ if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE &&
+ top_task_deadline < top.deadline() {
+ top_task_deadline
+ } else {
+ top.deadline()
+ }
}
/// Gets a handler by token to be manipulated in a consistent environment.
@@ -483,6 +576,30 @@
}
}
+ /// Iterates through all tasks whose deadline has passed and executes
+ /// them, consuming their information object.
+ ///
+ /// These tasks all have access to the RunLoop so that they may reschedule
+ /// themselves or manipulate the RunLoop in some other way.
+ fn execute_ready_tasks(&mut self) {
+ let now = core::get_time_ticks_now();
+ let mut deadline = match self.tasks.peek() {
+ Some(info) => info.deadline(),
+ None => return,
+ };
+ while deadline < now {
+ let top = self.tasks.pop().expect("Sudden change to heap?");
+ top.execute_task(self);
+ if self.should_quit {
+ return;
+ }
+ deadline = match self.tasks.peek() {
+ Some(info) => info.deadline(),
+ None => return,
+ };
+ }
+ }
+
/// Blocks on handle_set.wait_on_set using the information contained
/// within itself.
///
@@ -492,6 +609,12 @@
/// signals satisfied, or reaches its deadline.
fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) {
debug_assert!(!self.handlers.is_empty());
+ self.execute_ready_tasks();
+ // If after executing a task we quit or there are no handles,
+ // we have no reason to continue.
+ if self.handlers.is_empty() || self.should_quit {
+ return;
+ }
let deadline = self.get_next_deadline();
let until_deadline = relative_deadline(deadline, core::get_time_ticks_now());
// Perform the wait
diff --git a/rust/tests/run_loop.rs b/rust/tests/run_loop.rs
index 6b1f9d4..af159cb 100644
--- a/rust/tests/run_loop.rs
+++ b/rust/tests/run_loop.rs
@@ -19,6 +19,9 @@
use mojo::system::MOJO_INDEFINITE;
use mojo::system::message_pipe;
+use std::cell::Cell;
+use std::rc::Rc;
+
struct HandlerExpectReady {}
impl Handler for HandlerExpectReady {
@@ -174,6 +177,63 @@
}
}
+struct HandlerTasks {
+ count: Rc<Cell<u64>>,
+}
+
+impl Handler for HandlerTasks {
+ fn on_ready(&mut self, runloop: &mut RunLoop, token: Token) {
+ let r = self.count.clone();
+ let _ = runloop.post_task(move |_runloop| {
+ let val = (*r).get();
+ (*r).set(val+1);
+ }, 10);
+ if (*self.count).get() > 10 {
+ runloop.deregister(token);
+ }
+ }
+ fn on_timeout(&mut self, _runloop: &mut RunLoop, _token: Token) {
+ panic!("Timed-out when expected error");
+ }
+ fn on_error(&mut self, _runloop: &mut RunLoop, _token: Token, _error: WaitError) {
+ panic!("Error when expected ready");
+ }
+}
+
+struct NestedTasks {
+ count: Rc<Cell<u64>>,
+ quitter: bool,
+}
+
+impl Handler for NestedTasks {
+ fn on_ready(&mut self, runloop: &mut RunLoop, token: Token) {
+ let r = self.count.clone();
+ let quit = self.quitter;
+ let _ = runloop.post_task(move |runloop| {
+ let r2 = r.clone();
+ let tk = token.clone();
+ if (*r).get() < 10 {
+ let _ = runloop.post_task(move |_runloop| {
+ let val = (*r2).get();
+ (*r2).set(val+1);
+ }, 0);
+ } else {
+ if quit {
+ runloop.quit();
+ } else {
+ runloop.deregister(tk);
+ }
+ }
+ }, 0);
+ }
+ fn on_timeout(&mut self, _runloop: &mut RunLoop, _token: Token) {
+ panic!("Timed-out when expected error");
+ }
+ fn on_error(&mut self, _runloop: &mut RunLoop, _token: Token, _error: WaitError) {
+ panic!("Error when expected ready");
+ }
+}
+
tests! {
// Verifies that after adding and removing, we can run, exit and be
// left in a consistent state.
@@ -279,4 +339,48 @@
runloop.run();
});
}
+
+ // Tests adding a simple task that adds a handler.
+ fn simple_task() {
+ run_loop::with_current(|runloop| {
+ let _ = runloop.post_task(|runloop| {
+ let (_, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+ let _ = runloop.register(&endpt1, signals!(Signals::Readable), 0, HandlerExpectError {});
+ }, 0);
+ runloop.run();
+ });
+ }
+
+ // Tests using a handler that adds a bunch of tasks.
+ fn handler_tasks() {
+ let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+ let r = Rc::new(Cell::new(0));
+ run_loop::with_current(|runloop| {
+ let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, HandlerTasks { count: r.clone() });
+ runloop.run();
+ assert!((*r).get() >= 11);
+ });
+ }
+
+ // Tests using a handler that adds a bunch of tasks.
+ fn nested_tasks() {
+ let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+ let r = Rc::new(Cell::new(0));
+ run_loop::with_current(|runloop| {
+ let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, NestedTasks { count: r.clone(), quitter: false });
+ runloop.run();
+ assert!((*r).get() >= 10);
+ });
+ }
+
+ // Tests using a handler that adds a bunch of tasks.
+ fn nested_tasks_quit() {
+ let (_endpt0, endpt1) = message_pipe::create(mpflags!(Create::None)).unwrap();
+ let r = Rc::new(Cell::new(0));
+ run_loop::with_current(|runloop| {
+ let _ = runloop.register(&endpt1, signals!(Signals::Writable), 0, NestedTasks { count: r.clone(), quitter: true });
+ runloop.run();
+ assert!((*r).get() >= 10);
+ });
+ }
}