blob: 754694ed3d2d4d94def3b4a39c1bc255375ef53e [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! This module contains a thread-local run-loop.
//!
//! The run-loop may have handles and handlers pre-registers
//! (and in fact, must) in order to keep running. The run-loop
//! executes until it has no more handles or handlers on itself,
//! or until it is told to quit via stop().
//!
//! The run-loop waits until some signals on some handle is satisfied,
//! at which point it wakes up and executes the appropriate handler
//! method. This handler method may then be used to further populate
//! or de-populate the run-loop.
//!
//! As of yet, the run-loop is NOT thread-safe. Although it is useful
//! to be able to register tasks or handles from one thread onto
//! another thread's run-loop, this is as-of-yet unsupported, and
//! Rust should complain loudly when you try to do any threading here.
use std::cell::RefCell;
use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::i64;
use std::u32;
use std::vec::Vec;
use system;
use system::{Handle, MOJO_INDEFINITE, MojoResult};
use system::core;
use system::wait_set;
/// Define the equivalent of MOJO_INDEFINITE for absolute deadlines
const MOJO_INDEFINITE_ABSOLUTE: system::MojoTimeTicks = 0;
// TODO(mknyszek): The numbers below are arbitrary and come from the C++ bindings,
// and should probably be changed at some point
/// Initial size of the result buffer.
const INITIAL_WAIT_SET_NUM_RESULTS: usize = 16;
/// Maximum size of the result buffer.
const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256;
/// Thread-local data structure for keeping track of handles to wait on.
thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell::new(RunLoop::new()));
/// Token representing handle/callback to wait on for this thread only. This
/// token only has meaning on the thread in which the handle was registered.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Token(u64);
impl Token {
/// Get the wait token's "cookie" form, suitable for use in a wait set.
fn as_cookie(&self) -> u64 {
self.0
}
}
/// Represents the possible error cases that may occur when waiting
/// on a handle in a RunLoop.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WaitError {
/// The handle has been closed or is otherwise no longer valid.
HandleClosed,
/// The handle is currently busy in some transaction.
HandleBusy,
/// It has been determined that the signals provided will never
/// be satisfied for this handle.
Unsatisfiable,
}
/// A trait which defines an interface to be a handler usable by
/// a RunLoop.
pub trait Handler {
/// Called after a successful wait.
fn on_ready(&mut self, runloop: &mut RunLoop, token: Token);
/// Called after the given deadline expires.
fn on_timeout(&mut self, runloop: &mut RunLoop, token: Token);
/// Called when an unexpected error occurs.
fn on_error(&mut self, runloop: &mut RunLoop, token: Token, error: WaitError);
}
/// A wrapper struct for carrying the handler as well as various information
/// about it.
struct HandlerInfo<'h> {
/// The handle for which we are waiting.
///
/// We keep this handle around so that we may easily re-register.
handle: system::MojoHandle,
/// The handler, boxed up.
///
/// The handler is in an Option type because if it is currently being
/// used in a callback, we must take ownership to avoid mutability
/// cycles. The easiest way to do this is to take() from the Option then
/// put it back.
handler: Option<Box<Handler + 'h>>,
/// An absolute deadline in terms of time ticks.
///
/// This is the most recently updated deadline that
/// we should be watching out for. All others for this
/// token may be considered "stale".
deadline: system::MojoTimeTicks,
}
impl<'h> HandlerInfo<'h> {
/// Take the handler out of its Option type.
pub fn take(&mut self) -> Option<Box<Handler + 'h>> {
self.handler.take()
}
/// Put a new handler into the Option type.
pub fn give(&mut self, handler: Box<Handler + 'h>) {
self.handler = Some(handler);
}
/// Getter for the system::MojoHandle held inside.
pub fn handle(&self) -> system::MojoHandle {
self.handle
}
/// Getter for the current absolute deadline held inside.
pub fn deadline(&self) -> system::MojoTimeTicks {
self.deadline
}
/// Setter to update the current absolute deadline.
pub fn set_deadline(&mut self, deadline: system::MojoTimeTicks) {
self.deadline = deadline
}
}
/// A wrapper struct for carrying the task as well as some information about
/// it.
struct TaskInfo<'t> {
/// The task, boxed up.
closure: Box<FnMut(&mut RunLoop) + 't>,
/// An absolute deadline in terms of time ticks.
///
/// This is the most recently updated deadline that
/// we should be watching out for. All others for this
/// token may be considered "stale".
deadline: system::MojoTimeTicks,
}
impl<'t> TaskInfo<'t> {
/// Executes the task within the info object, consuming it
/// in the process.
pub fn execute_task(mut self, run_loop: &mut RunLoop) {
(*self.closure)(run_loop);
}
/// Getter for the current absolute deadline held inside.
pub fn deadline(&self) -> system::MojoTimeTicks {
self.deadline
}
}
impl<'t> PartialEq for TaskInfo<'t> {
/// Equality for TaskInfo in terms of its deadline
fn eq(&self, other: &TaskInfo) -> bool {
self.deadline == other.deadline
}
}
impl<'t> Eq for TaskInfo<'t> {}
impl<'t> PartialOrd for TaskInfo<'t> {
/// Partial comparison for TaskInfo in terms of its deadline
///
/// Reverses the comparison because the Rust std library only
/// offers a max-heap, and we need a min-heap.
fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> {
other.deadline.partial_cmp(&self.deadline)
}
}
impl<'t> Ord for TaskInfo<'t> {
/// Implement comparisons for Task Info.
///
/// Reverses the comparison because the Rust std library only
/// offers a max-heap, and we need a min-heap.
fn cmp(&self, other: &Self) -> Ordering {
other.deadline.cmp(&self.deadline)
}
}
/// Wrapper struct intended to be used in a priority queue
/// for efficiently retrieving the next closest deadline.
#[derive(Clone)]
struct DeadlineInfo {
/// The ID of the associated Handler struct in the RunLoop's
/// hash map.
token: Token,
/// An absolute deadline in terms of time ticks.
deadline: system::MojoTimeTicks,
}
impl DeadlineInfo {
/// Getter for an immutable borrow for the token inside.
pub fn token(&self) -> &Token {
&self.token
}
/// Getter for the absolute deadline inside.
pub fn deadline(&self) -> system::MojoTimeTicks {
self.deadline
}
}
impl PartialEq for DeadlineInfo {
/// Equality for DeadlineInfo in terms of its deadline
fn eq(&self, other: &DeadlineInfo) -> bool {
self.deadline == other.deadline
}
}
impl Eq for DeadlineInfo {}
impl PartialOrd for DeadlineInfo {
/// Partial comparison for DeadlineInfo in terms of its deadline
///
/// Reverses the comparison because the Rust std library only
/// offers a max-heap, and we need a min-heap.
fn partial_cmp(&self, other: &DeadlineInfo) -> Option<Ordering> {
other.deadline.partial_cmp(&self.deadline)
}
}
impl Ord for DeadlineInfo {
/// Implement comparisons for Deadline Info.
///
/// Reverses the comparison because the Rust std library only
/// offers a max-heap, and we need a min-heap.
fn cmp(&self, other: &Self) -> Ordering {
other.deadline.cmp(&self.deadline)
}
}
/// Convert a mojo deadline (which is a relative deadline to "now") to
/// an absolute deadline based on time ticks.
fn absolute_deadline(deadline: system::MojoDeadline) -> system::MojoTimeTicks {
if deadline == MOJO_INDEFINITE {
return MOJO_INDEFINITE_ABSOLUTE;
}
let mut converted = MOJO_INDEFINITE_ABSOLUTE;
let max_time_ticks = i64::MAX as system::MojoDeadline;
if deadline <= max_time_ticks {
let now = core::get_time_ticks_now();
if deadline <= (max_time_ticks - (now as u64)) {
converted = (deadline as system::MojoTimeTicks) + now
}
}
converted
}
/// Convert an absolute deadline to a mojo deadline which is relative to some
/// notion of "now".
///
/// If the deadline is earlier than "now", this routine rounds up to "now".
fn relative_deadline(deadline: system::MojoTimeTicks,
now: system::MojoTimeTicks)
-> system::MojoDeadline {
if deadline == MOJO_INDEFINITE_ABSOLUTE {
MOJO_INDEFINITE
} else if now >= deadline {
0
} else {
(deadline - now) as system::MojoDeadline
}
}
/// This structure contains all information necessary to wait on handles
/// asynchronously.
///
/// Ultimately, it should only be a singleton living in
/// thread-local storage.
pub struct RunLoop<'h, 't> {
/// Running count of the next available token slot.
token_count: u64,
/// A map of handlers.
///
/// TODO(mknyszek): Try out a Slab allocator instead of a hashmap.
handlers: HashMap<Token, HandlerInfo<'h>>,
/// A min-heap of delayed tasks in order to pull the soonest task to
/// execute efficiently.
tasks: BinaryHeap<TaskInfo<'t>>,
/// A min-heap containing deadlines in order to pull out the soonest
/// deadline efficiently.
///
/// Warning: may contain "stale" deadlines which are not kept in the
/// map!
deadlines: BinaryHeap<DeadlineInfo>,
/// The Mojo structure keeping track of handles and signals.
///
/// This structure must be kept in sync with handlers.
handle_set: wait_set::WaitSet,
/// A flag that tells the RunLoop whether it should quit.
should_quit: bool,
/// A flag that indicates whether the RunLoop is running or now
running: bool,
}
impl<'h, 't> RunLoop<'h, 't> {
/// Create a new RunLoop.
pub fn new() -> RunLoop<'h, 't> {
RunLoop {
token_count: 0,
handlers: HashMap::new(),
tasks: BinaryHeap::new(),
deadlines: BinaryHeap::new(),
handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(),
should_quit: false,
running: false,
}
}
/// Generate a new Token for this RunLoop
fn generate_token(&mut self) -> Token {
self.token_count = self.token_count.wrapping_add(1);
Token(self.token_count)
}
/// Adds a new entry to the runloop queue.
pub fn register<H>(&mut self,
handle: &Handle,
signals: system::HandleSignals,
deadline: system::MojoDeadline,
handler: H)
-> Token
where H: Handler + 'h
{
let token = self.generate_token();
let abs_deadline = absolute_deadline(deadline);
self.handle_set.add(handle, signals, token.as_cookie(), wsflags!(Add::None));
self.deadlines.push(DeadlineInfo {
token: token.clone(),
deadline: abs_deadline,
});
debug_assert!(!self.handlers.contains_key(&token));
self.handlers.insert(token.clone(),
HandlerInfo {
handle: handle.get_native_handle(),
handler: Some(Box::new(handler)),
deadline: abs_deadline,
});
token
}
/// Updates the signals and deadline of an existing handler in the
/// runloop via token. The token remains unchanged and valid.
///
/// Returns true on a successful update and false if the token was not
/// found.
pub fn reregister(&mut self,
token: &Token,
signals: system::HandleSignals,
deadline: system::MojoDeadline)
-> bool {
match self.handlers.get_mut(&token) {
Some(info) => {
let _result = self.handle_set.remove(token.as_cookie());
debug_assert_eq!(_result, MojoResult::Okay);
let abs_deadline = absolute_deadline(deadline);
// Update what deadline we should be looking for, rendering
// all previously set deadlines "stale".
info.set_deadline(abs_deadline);
// Add a new deadline
self.deadlines.push(DeadlineInfo {
token: token.clone(),
deadline: abs_deadline,
});
// Acquire the raw handle held by the HandlerInfo in order to
// call the wait_set's add method. Invalidate it immediately after
// in order to prevent the handle from being closed.
//
// It's perfectly okay for the handle to be invalid, so although this
// is all unsafe, the whole system should just call the handler with an
// error.
let mut dummy = unsafe { system::acquire(info.handle()) };
self.handle_set.add(&dummy, signals, token.as_cookie(), wsflags!(Add::None));
unsafe {
dummy.invalidate();
}
true
}
None => false,
}
}
/// Removes an entry from the runloop.
///
/// Since we cannot remove from the deadlines heap, we just leave the deadline
/// in there as "stale", and we handle those when trying to find the next closest
/// deadline.
pub fn deregister(&mut self, token: Token) -> bool {
match self.handlers.remove(&token) {
Some(_) => {
let _result = self.handle_set.remove(token.as_cookie());
debug_assert_eq!(_result, MojoResult::Okay);
true
}
None => false,
}
}
/// Adds a task to be run by the runloop after some delay.
///
/// Returns a token if the delay is valid, otherwise returns None.
pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Result<(), ()>
where F: FnMut(&mut RunLoop) + 't
{
let now = core::get_time_ticks_now();
if delay > i64::MAX - now {
return Err(());
}
let deadline = now + delay;
self.tasks.push(TaskInfo {
closure: Box::new(task),
deadline: deadline,
});
Ok(())
}
/// Uses the binary heaps to get the next closest deadline.
///
/// Removes stale deadline entries as they are found, but
/// does not otherwise modify the heap of deadlines.
fn get_next_deadline(&mut self) -> system::MojoTimeTicks {
debug_assert!(!self.handlers.is_empty());
let top_task_deadline = match self.tasks.peek() {
Some(info) => info.deadline(),
None => MOJO_INDEFINITE_ABSOLUTE,
};
let mut top = match self.deadlines.peek() {
Some(info) => info.clone(),
None => return MOJO_INDEFINITE_ABSOLUTE,
};
while !self.handlers.contains_key(top.token()) {
self.deadlines.pop();
top = match self.deadlines.peek() {
Some(info) => info.clone(),
None => return MOJO_INDEFINITE_ABSOLUTE,
}
}
if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE &&
top_task_deadline < top.deadline() {
top_task_deadline
} else {
top.deadline()
}
}
/// Gets a handler by token to be manipulated in a consistent environment.
///
/// This method provides a method of accessing a handler in order to manipulate
/// it in a manner that avoids a borrow cycle, that is, it take()s the handler
/// out of the HashMap, and returns it when manipulation has completed.
fn get_handler_with<F>(&mut self, token: &Token, invoker: F)
where F: FnOnce(&mut Self,
&mut Box<Handler + 'h>,
Token,
system::MojoTimeTicks)
{
// Logic for pulling out the handler as well as its current deadline.
//
// Unfortunately, pulling out the handler value here and "onto the stack"
// (it probably won't actually end up on the stack thanks to optimizations)
// is necessary since otherwise the borrow checker complains that we pass
// a mutable reference to the RunLoop and the handler (as &mut self) to
// the callbacks at the same time. This is understandably unsafe since
// modifying the hashmap with register and deregister can invalidate the
// reference to self in the callback. In the C++ bindings and in other Rust
// event loops this is exactly what happens, but I avoided this. The downside
// is that we can no longer nest event loop run() calls. Once we put a handler
// onto the stack here, we can no longer call its callback in a nested manner
// from the RunLoop. I could just enable nesting with this one restriction, that
// the handler calling run() will always be ignored, but this is unintuitive.
let (mut handler, deadline) = match self.handlers.get_mut(&token) {
Some(ref_info) => {
(match ref_info.take() {
Some(handler) => handler,
None => return,
},
ref_info.deadline())
}
None => return,
};
// Call the closure that will invoke the callbacks.
invoker(self, &mut handler, token.clone(), deadline);
// Restore the handler to its location in the HashMap
if let Some(ref_info) = self.handlers.get_mut(&token) {
ref_info.give(handler);
}
}
/// For all the results we received, we notify the respective handle
/// owners of the results by calling their given callbacks.
///
/// We do NOT dequeue notified handles.
fn notify_of_results(&mut self, results: &Vec<system::WaitSetResult>) {
debug_assert!(!self.handlers.is_empty());
for wsr in results.iter() {
let token = Token(wsr.cookie());
self.get_handler_with(&token, move |runloop, boxed_handler, token, _dl| {
let handler = boxed_handler.as_mut();
match wsr.result() {
MojoResult::Okay => handler.on_ready(runloop, token),
MojoResult::Cancelled => {
handler.on_error(runloop, token, WaitError::HandleClosed)
}
MojoResult::Busy => handler.on_error(runloop, token, WaitError::HandleBusy),
MojoResult::FailedPrecondition => {
handler.on_error(runloop, token, WaitError::Unsatisfiable)
}
other => panic!("Unexpected result received after waiting: {}", other),
}
});
// In order to quit as soon as possible, we should check to quit after every
// potential handler call, as any of them could have signaled to quit.
if self.should_quit {
break;
}
}
}
/// Since the deadline expired, we notify the relevant handle
/// owners of the expiration by calling their given callbacks.
///
/// We do NOT dequeue notified handles.
fn notify_of_expired(&mut self, expired_deadline: system::MojoTimeTicks) {
debug_assert!(!self.handlers.is_empty());
let mut top = match self.deadlines.peek() {
Some(info) => info.clone(),
None => panic!("Should not be in notify_of_expired without at least one deadline!"),
};
while expired_deadline >= top.deadline() {
let next_deadline = top.deadline();
self.get_handler_with(top.token(),
move |runloop, boxed_handler, token, expected_dl| {
let handler = boxed_handler.as_mut();
if next_deadline == expected_dl {
handler.on_timeout(runloop, token);
}
});
// In order to quit as soon as possible, we should check to quit after every
// potential handler call, as any of them could have signaled to quit.
if self.should_quit {
break;
}
// Remove the deadline
self.deadlines.pop();
// Break if the next deadline has not yet expired.
top = match self.deadlines.peek() {
Some(info) => info.clone(),
None => break,
};
}
}
/// Iterates through all tasks whose deadline has passed and executes
/// them, consuming their information object.
///
/// These tasks all have access to the RunLoop so that they may reschedule
/// themselves or manipulate the RunLoop in some other way.
fn execute_ready_tasks(&mut self) {
let now = core::get_time_ticks_now();
let mut deadline = match self.tasks.peek() {
Some(info) => info.deadline(),
None => return,
};
while deadline < now {
let top = self.tasks.pop().expect("Sudden change to heap?");
top.execute_task(self);
if self.should_quit {
return;
}
deadline = match self.tasks.peek() {
Some(info) => info.deadline(),
None => return,
};
}
}
/// Blocks on handle_set.wait_on_set using the information contained
/// within itself.
///
/// This method blocks for only as long as the shortest deadline among all
/// handles this thread has registered. This method returns immediately as
/// soon as any one handle has its signals satisfied, fails to ever have its
/// signals satisfied, or reaches its deadline.
fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) {
debug_assert!(!self.handlers.is_empty());
self.execute_ready_tasks();
// If after executing a task we quit or there are no handles,
// we have no reason to continue.
if self.handlers.is_empty() || self.should_quit {
return;
}
let deadline = self.get_next_deadline();
let until_deadline = relative_deadline(deadline, core::get_time_ticks_now());
// Perform the wait
match self.handle_set.wait_on_set(until_deadline, results_buffer) {
Ok(max_results) => {
self.notify_of_results(results_buffer);
// Clear the buffer since we don't need the results anymore.
// Helps prevent a copy if we resize the buffer.
results_buffer.clear();
// Increase the size of the buffer if there are more results
// we could be holding.
let capacity = results_buffer.capacity();
if capacity < MAXIMUM_WAIT_SET_NUM_RESULTS && capacity < (max_results) as usize {
results_buffer.reserve(capacity);
}
}
Err(result) => {
assert_eq!(result, MojoResult::DeadlineExceeded);
self.notify_of_expired(deadline);
}
}
}
/// Loop forever until a callback tells us to quit.
pub fn run(&mut self) {
// It's an error it already be running...
if self.running {
panic!("RunLoop is already running!");
}
self.running = true;
self.should_quit = false;
let mut buffer: Vec<system::WaitSetResult> =
Vec::with_capacity(INITIAL_WAIT_SET_NUM_RESULTS);
// Loop while we haven't been signaled to quit, and there's something to wait on.
while !self.should_quit && !self.handlers.is_empty() {
self.wait(&mut buffer)
}
self.running = false;
}
/// Set a flag to quit at the next available moment.
pub fn quit(&mut self) {
self.should_quit = true;
}
}
/// Provides a scope to modify the current thread's runloop.
pub fn with_current<F>(modifier: F)
where F: FnOnce(&mut RunLoop)
{
TL_RUN_LOOP.with(|ref_runloop| {
let mut runloop = ref_runloop.borrow_mut();
modifier(&mut *runloop);
});
}