// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! This module contains a thread-local run-loop.
//!
//! The run-loop may have handles and handlers pre-registers
//! (and in fact, must) in order to keep running. The run-loop
//! executes until it has no more handles or handlers on itself,
//! or until it is told to quit via stop().
//!
//! The run-loop waits until some signals on some handle is satisfied,
//! at which point it wakes up and executes the appropriate handler
//! method. This handler method may then be used to further populate
//! or de-populate the run-loop.
//!
//! As of yet, the run-loop is NOT thread-safe. Although it is useful
//! to be able to register tasks or handles from one thread onto
//! another thread's run-loop, this is as-of-yet unsupported, and
//! Rust should complain loudly when you try to do any threading here.

use std::cell::RefCell;
use std::cmp::{PartialEq, Eq, PartialOrd, Ord, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::i64;
use std::u32;
use std::vec::Vec;

use system;
use system::{Handle, MOJO_INDEFINITE, MojoResult};
use system::core;
use system::wait_set;

/// Define the equivalent of MOJO_INDEFINITE for absolute deadlines
const MOJO_INDEFINITE_ABSOLUTE: system::MojoTimeTicks = 0;

// TODO(mknyszek): The numbers below are arbitrary and come from the C++ bindings,
// and should probably be changed at some point

/// Initial size of the result buffer.
const INITIAL_WAIT_SET_NUM_RESULTS: usize = 16;

/// Maximum size of the result buffer.
const MAXIMUM_WAIT_SET_NUM_RESULTS: usize = 256;

/// Thread-local data structure for keeping track of handles to wait on.
thread_local!(static TL_RUN_LOOP: RefCell<RunLoop<'static, 'static>> = RefCell::new(RunLoop::new()));

/// Token representing handle/callback to wait on for this thread only. This
/// token only has meaning on the thread in which the handle was registered.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Token(u64);

impl Token {
    /// Get the wait token's "cookie" form, suitable for use in a wait set.
    fn as_cookie(&self) -> u64 {
        self.0
    }
}

/// Represents the possible error cases that may occur when waiting
/// on a handle in a RunLoop.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WaitError {
    /// The handle has been closed or is otherwise no longer valid.
    HandleClosed,

    /// The handle is currently busy in some transaction.
    HandleBusy,

    /// It has been determined that the signals provided will never
    /// be satisfied for this handle.
    Unsatisfiable,
}

/// A trait which defines an interface to be a handler usable by
/// a RunLoop.
pub trait Handler {
    /// Called after a successful wait.
    fn on_ready(&mut self, runloop: &mut RunLoop, token: Token);

    /// Called after the given deadline expires.
    fn on_timeout(&mut self, runloop: &mut RunLoop, token: Token);

    /// Called when an unexpected error occurs.
    fn on_error(&mut self, runloop: &mut RunLoop, token: Token, error: WaitError);
}

/// A wrapper struct for carrying the handler as well as various information
/// about it.
struct HandlerInfo<'h> {
    /// The handle for which we are waiting.
    ///
    /// We keep this handle around so that we may easily re-register.
    handle: system::MojoHandle,

    /// The handler, boxed up.
    ///
    /// The handler is in an Option type because if it is currently being
    /// used in a callback, we must take ownership to avoid mutability
    /// cycles. The easiest way to do this is to take() from the Option then
    /// put it back.
    handler: Option<Box<Handler + 'h>>,

    /// An absolute deadline in terms of time ticks.
    ///
    /// This is the most recently updated deadline that
    /// we should be watching out for. All others for this
    /// token may be considered "stale".
    deadline: system::MojoTimeTicks,
}

impl<'h> HandlerInfo<'h> {
    /// Take the handler out of its Option type.
    pub fn take(&mut self) -> Option<Box<Handler + 'h>> {
        self.handler.take()
    }

    /// Put a new handler into the Option type.
    pub fn give(&mut self, handler: Box<Handler + 'h>) {
        self.handler = Some(handler);
    }

    /// Getter for the system::MojoHandle held inside.
    pub fn handle(&self) -> system::MojoHandle {
        self.handle
    }

    /// Getter for the current absolute deadline held inside.
    pub fn deadline(&self) -> system::MojoTimeTicks {
        self.deadline
    }

    /// Setter to update the current absolute deadline.
    pub fn set_deadline(&mut self, deadline: system::MojoTimeTicks) {
        self.deadline = deadline
    }
}

/// A wrapper struct for carrying the task as well as some information about
/// it.
struct TaskInfo<'t> {
    /// The task, boxed up.
    closure: Box<FnMut(&mut RunLoop) + 't>,

    /// An absolute deadline in terms of time ticks.
    ///
    /// This is the most recently updated deadline that
    /// we should be watching out for. All others for this
    /// token may be considered "stale".
    deadline: system::MojoTimeTicks,
}

impl<'t> TaskInfo<'t> {
    /// Executes the task within the info object, consuming it
    /// in the process.
    pub fn execute_task(mut self, run_loop: &mut RunLoop) {
        (*self.closure)(run_loop);
    }

    /// Getter for the current absolute deadline held inside.
    pub fn deadline(&self) -> system::MojoTimeTicks {
        self.deadline
    }
}

impl<'t> PartialEq for TaskInfo<'t> {
    /// Equality for TaskInfo in terms of its deadline
    fn eq(&self, other: &TaskInfo) -> bool {
        self.deadline == other.deadline
    }
}

impl<'t> Eq for TaskInfo<'t> {}

impl<'t> PartialOrd for TaskInfo<'t> {
    /// Partial comparison for TaskInfo in terms of its deadline
    ///
    /// Reverses the comparison because the Rust std library only
    /// offers a max-heap, and we need a min-heap.
    fn partial_cmp(&self, other: &TaskInfo) -> Option<Ordering> {
        other.deadline.partial_cmp(&self.deadline)
    }
}

impl<'t> Ord for TaskInfo<'t> {
    /// Implement comparisons for Task Info.
    ///
    /// Reverses the comparison because the Rust std library only
    /// offers a max-heap, and we need a min-heap.
    fn cmp(&self, other: &Self) -> Ordering {
        other.deadline.cmp(&self.deadline)
    }
}

/// Wrapper struct intended to be used in a priority queue
/// for efficiently retrieving the next closest deadline.
#[derive(Clone)]
struct DeadlineInfo {
    /// The ID of the associated Handler struct in the RunLoop's
    /// hash map.
    token: Token,

    /// An absolute deadline in terms of time ticks.
    deadline: system::MojoTimeTicks,
}

impl DeadlineInfo {
    /// Getter for an immutable borrow for the token inside.
    pub fn token(&self) -> &Token {
        &self.token
    }

    /// Getter for the absolute deadline inside.
    pub fn deadline(&self) -> system::MojoTimeTicks {
        self.deadline
    }
}

impl PartialEq for DeadlineInfo {
    /// Equality for DeadlineInfo in terms of its deadline
    fn eq(&self, other: &DeadlineInfo) -> bool {
        self.deadline == other.deadline
    }
}

impl Eq for DeadlineInfo {}

impl PartialOrd for DeadlineInfo {
    /// Partial comparison for DeadlineInfo in terms of its deadline
    ///
    /// Reverses the comparison because the Rust std library only
    /// offers a max-heap, and we need a min-heap.
    fn partial_cmp(&self, other: &DeadlineInfo) -> Option<Ordering> {
        other.deadline.partial_cmp(&self.deadline)
    }
}

impl Ord for DeadlineInfo {
    /// Implement comparisons for Deadline Info.
    ///
    /// Reverses the comparison because the Rust std library only
    /// offers a max-heap, and we need a min-heap.
    fn cmp(&self, other: &Self) -> Ordering {
        other.deadline.cmp(&self.deadline)
    }
}

/// Convert a mojo deadline (which is a relative deadline to "now") to
/// an absolute deadline based on time ticks.
fn absolute_deadline(deadline: system::MojoDeadline) -> system::MojoTimeTicks {
    if deadline == MOJO_INDEFINITE {
        return MOJO_INDEFINITE_ABSOLUTE;
    }
    let mut converted = MOJO_INDEFINITE_ABSOLUTE;
    let max_time_ticks = i64::MAX as system::MojoDeadline;
    if deadline <= max_time_ticks {
        let now = core::get_time_ticks_now();
        if deadline <= (max_time_ticks - (now as u64)) {
            converted = (deadline as system::MojoTimeTicks) + now
        }
    }
    converted
}

/// Convert an absolute deadline to a mojo deadline which is relative to some
/// notion of "now".
///
/// If the deadline is earlier than "now", this routine rounds up to "now".
fn relative_deadline(deadline: system::MojoTimeTicks,
                     now: system::MojoTimeTicks)
                     -> system::MojoDeadline {
    if deadline == MOJO_INDEFINITE_ABSOLUTE {
        MOJO_INDEFINITE
    } else if now >= deadline {
        0
    } else {
        (deadline - now) as system::MojoDeadline
    }
}

/// This structure contains all information necessary to wait on handles
/// asynchronously.
///
/// Ultimately, it should only be a singleton living in
/// thread-local storage.
pub struct RunLoop<'h, 't> {
    /// Running count of the next available token slot.
    token_count: u64,

    /// A map of handlers.
    ///
    /// TODO(mknyszek): Try out a Slab allocator instead of a hashmap.
    handlers: HashMap<Token, HandlerInfo<'h>>,

    /// A min-heap of delayed tasks in order to pull the soonest task to
    /// execute efficiently.
    tasks: BinaryHeap<TaskInfo<'t>>,

    /// A min-heap containing deadlines in order to pull out the soonest
    /// deadline efficiently.
    ///
    /// Warning: may contain "stale" deadlines which are not kept in the
    /// map!
    deadlines: BinaryHeap<DeadlineInfo>,

    /// The Mojo structure keeping track of handles and signals.
    ///
    /// This structure must be kept in sync with handlers.
    handle_set: wait_set::WaitSet,

    /// A flag that tells the RunLoop whether it should quit.
    should_quit: bool,

    /// A flag that indicates whether the RunLoop is running or now
    running: bool,
}

impl<'h, 't> RunLoop<'h, 't> {
    /// Create a new RunLoop.
    pub fn new() -> RunLoop<'h, 't> {
        RunLoop {
            token_count: 0,
            handlers: HashMap::new(),
            tasks: BinaryHeap::new(),
            deadlines: BinaryHeap::new(),
            handle_set: wait_set::WaitSet::new(wsflags!(Create::None)).unwrap(),
            should_quit: false,
            running: false,
        }
    }

    /// Generate a new Token for this RunLoop
    fn generate_token(&mut self) -> Token {
        self.token_count = self.token_count.wrapping_add(1);
        Token(self.token_count)
    }

    /// Adds a new entry to the runloop queue.
    pub fn register<H>(&mut self,
                       handle: &Handle,
                       signals: system::HandleSignals,
                       deadline: system::MojoDeadline,
                       handler: H)
                       -> Token
        where H: Handler + 'h
    {

        let token = self.generate_token();
        let abs_deadline = absolute_deadline(deadline);
        self.handle_set.add(handle, signals, token.as_cookie(), wsflags!(Add::None));
        self.deadlines.push(DeadlineInfo {
            token: token.clone(),
            deadline: abs_deadline,
        });
        debug_assert!(!self.handlers.contains_key(&token));
        self.handlers.insert(token.clone(),
                             HandlerInfo {
                                 handle: handle.get_native_handle(),
                                 handler: Some(Box::new(handler)),
                                 deadline: abs_deadline,
                             });
        token
    }

    /// Updates the signals and deadline of an existing handler in the
    /// runloop via token. The token remains unchanged and valid.
    ///
    /// Returns true on a successful update and false if the token was not
    /// found.
    pub fn reregister(&mut self,
                      token: &Token,
                      signals: system::HandleSignals,
                      deadline: system::MojoDeadline)
                      -> bool {

        match self.handlers.get_mut(&token) {
            Some(info) => {
                let _result = self.handle_set.remove(token.as_cookie());
                debug_assert_eq!(_result, MojoResult::Okay);
                let abs_deadline = absolute_deadline(deadline);
                // Update what deadline we should be looking for, rendering
                // all previously set deadlines "stale".
                info.set_deadline(abs_deadline);
                // Add a new deadline
                self.deadlines.push(DeadlineInfo {
                    token: token.clone(),
                    deadline: abs_deadline,
                });
                // Acquire the raw handle held by the HandlerInfo in order to
                // call the wait_set's add method. Invalidate it immediately after
                // in order to prevent the handle from being closed.
                //
                // It's perfectly okay for the handle to be invalid, so although this
                // is all unsafe, the whole system should just call the handler with an
                // error.
                let mut dummy = unsafe { system::acquire(info.handle()) };
                self.handle_set.add(&dummy, signals, token.as_cookie(), wsflags!(Add::None));
                unsafe {
                    dummy.invalidate();
                }
                true
            }
            None => false,
        }
    }

    /// Removes an entry from the runloop.
    ///
    /// Since we cannot remove from the deadlines heap, we just leave the deadline
    /// in there as "stale", and we handle those when trying to find the next closest
    /// deadline.
    pub fn deregister(&mut self, token: Token) -> bool {
        match self.handlers.remove(&token) {
            Some(_) => {
                let _result = self.handle_set.remove(token.as_cookie());
                debug_assert_eq!(_result, MojoResult::Okay);
                true
            }
            None => false,
        }
    }

    /// Adds a task to be run by the runloop after some delay.
    ///
    /// Returns a token if the delay is valid, otherwise returns None.
    pub fn post_task<F>(&mut self, task: F, delay: system::MojoTimeTicks) -> Result<(), ()>
        where F: FnMut(&mut RunLoop) + 't
    {
        let now = core::get_time_ticks_now();
        if delay > i64::MAX - now {
            return Err(());
        }
        let deadline = now + delay;
        self.tasks.push(TaskInfo {
            closure: Box::new(task),
            deadline: deadline,
        });
        Ok(())
    }

    /// Uses the binary heaps to get the next closest deadline.
    ///
    /// Removes stale deadline entries as they are found, but
    /// does not otherwise modify the heap of deadlines.
    fn get_next_deadline(&mut self) -> system::MojoTimeTicks {
        debug_assert!(!self.handlers.is_empty());
        let top_task_deadline = match self.tasks.peek() {
            Some(info) => info.deadline(),
            None => MOJO_INDEFINITE_ABSOLUTE,
        };
        let mut top = match self.deadlines.peek() {
            Some(info) => info.clone(),
            None => return MOJO_INDEFINITE_ABSOLUTE,
        };
        while !self.handlers.contains_key(top.token()) {
            self.deadlines.pop();
            top = match self.deadlines.peek() {
                Some(info) => info.clone(),
                None => return MOJO_INDEFINITE_ABSOLUTE,
            }
        }
        if top_task_deadline != MOJO_INDEFINITE_ABSOLUTE &&
           top_task_deadline < top.deadline() {
            top_task_deadline
        } else {
            top.deadline()
        }
    }

    /// Gets a handler by token to be manipulated in a consistent environment.
    ///
    /// This method provides a method of accessing a handler in order to manipulate
    /// it in a manner that avoids a borrow cycle, that is, it take()s the handler
    /// out of the HashMap, and returns it when manipulation has completed.
    fn get_handler_with<F>(&mut self, token: &Token, invoker: F)
        where F: FnOnce(&mut Self,
                        &mut Box<Handler + 'h>,
                        Token,
                        system::MojoTimeTicks)
    {
        // Logic for pulling out the handler as well as its current deadline.
        //
        // Unfortunately, pulling out the handler value here and "onto the stack"
        // (it probably won't actually end up on the stack thanks to optimizations)
        // is necessary since otherwise the borrow checker complains that we pass
        // a mutable reference to the RunLoop and the handler (as &mut self) to
        // the callbacks at the same time. This is understandably unsafe since
        // modifying the hashmap with register and deregister can invalidate the
        // reference to self in the callback. In the C++ bindings and in other Rust
        // event loops this is exactly what happens, but I avoided this. The downside
        // is that we can no longer nest event loop run() calls. Once we put a handler
        // onto the stack here, we can no longer call its callback in a nested manner
        // from the RunLoop. I could just enable nesting with this one restriction, that
        // the handler calling run() will always be ignored, but this is unintuitive.
        let (mut handler, deadline) = match self.handlers.get_mut(&token) {
            Some(ref_info) => {
                (match ref_info.take() {
                    Some(handler) => handler,
                    None => return,
                },
                 ref_info.deadline())
            }
            None => return,
        };
        // Call the closure that will invoke the callbacks.
        invoker(self, &mut handler, token.clone(), deadline);
        // Restore the handler to its location in the HashMap
        if let Some(ref_info) = self.handlers.get_mut(&token) {
            ref_info.give(handler);
        }
    }

    /// For all the results we received, we notify the respective handle
    /// owners of the results by calling their given callbacks.
    ///
    /// We do NOT dequeue notified handles.
    fn notify_of_results(&mut self, results: &Vec<system::WaitSetResult>) {
        debug_assert!(!self.handlers.is_empty());
        for wsr in results.iter() {
            let token = Token(wsr.cookie());
            self.get_handler_with(&token, move |runloop, boxed_handler, token, _dl| {
                let handler = boxed_handler.as_mut();
                match wsr.result() {
                    MojoResult::Okay => handler.on_ready(runloop, token),
                    MojoResult::Cancelled => {
                        handler.on_error(runloop, token, WaitError::HandleClosed)
                    }
                    MojoResult::Busy => handler.on_error(runloop, token, WaitError::HandleBusy),
                    MojoResult::FailedPrecondition => {
                        handler.on_error(runloop, token, WaitError::Unsatisfiable)
                    }
                    other => panic!("Unexpected result received after waiting: {}", other),
                }
            });
            // In order to quit as soon as possible, we should check to quit after every
            // potential handler call, as any of them could have signaled to quit.
            if self.should_quit {
                break;
            }
        }
    }

    /// Since the deadline expired, we notify the relevant handle
    /// owners of the expiration by calling their given callbacks.
    ///
    /// We do NOT dequeue notified handles.
    fn notify_of_expired(&mut self, expired_deadline: system::MojoTimeTicks) {
        debug_assert!(!self.handlers.is_empty());
        let mut top = match self.deadlines.peek() {
            Some(info) => info.clone(),
            None => panic!("Should not be in notify_of_expired without at least one deadline!"),
        };
        while expired_deadline >= top.deadline() {
            let next_deadline = top.deadline();
            self.get_handler_with(top.token(),
                                  move |runloop, boxed_handler, token, expected_dl| {
                                      let handler = boxed_handler.as_mut();
                                      if next_deadline == expected_dl {
                                          handler.on_timeout(runloop, token);
                                      }
                                  });
            // In order to quit as soon as possible, we should check to quit after every
            // potential handler call, as any of them could have signaled to quit.
            if self.should_quit {
                break;
            }
            // Remove the deadline
            self.deadlines.pop();
            // Break if the next deadline has not yet expired.
            top = match self.deadlines.peek() {
                Some(info) => info.clone(),
                None => break,
            };
        }
    }

    /// Iterates through all tasks whose deadline has passed and executes
    /// them, consuming their information object.
    ///
    /// These tasks all have access to the RunLoop so that they may reschedule
    /// themselves or manipulate the RunLoop in some other way.
    fn execute_ready_tasks(&mut self) {
        let now = core::get_time_ticks_now();
        let mut deadline = match self.tasks.peek() {
            Some(info) => info.deadline(),
            None => return,
        };
        while deadline < now {
            let top = self.tasks.pop().expect("Sudden change to heap?");
            top.execute_task(self);
            if self.should_quit {
                return;
            }
            deadline = match self.tasks.peek() {
                Some(info) => info.deadline(),
                None => return,
            };
        }
    }

    /// Blocks on handle_set.wait_on_set using the information contained
    /// within itself.
    ///
    /// This method blocks for only as long as the shortest deadline among all
    /// handles this thread has registered. This method returns immediately as
    /// soon as any one handle has its signals satisfied, fails to ever have its
    /// signals satisfied, or reaches its deadline.
    fn wait(&mut self, results_buffer: &mut Vec<system::WaitSetResult>) {
        debug_assert!(!self.handlers.is_empty());
        self.execute_ready_tasks();
        // If after executing a task we quit or there are no handles,
        // we have no reason to continue.
        if self.handlers.is_empty() || self.should_quit {
            return;
        }
        let deadline = self.get_next_deadline();
        let until_deadline = relative_deadline(deadline, core::get_time_ticks_now());
        // Perform the wait
        match self.handle_set.wait_on_set(until_deadline, results_buffer) {
            Ok(max_results) => {
                self.notify_of_results(results_buffer);
                // Clear the buffer since we don't need the results anymore.
                // Helps prevent a copy if we resize the buffer.
                results_buffer.clear();
                // Increase the size of the buffer if there are more results
                // we could be holding.
                let capacity = results_buffer.capacity();
                if capacity < MAXIMUM_WAIT_SET_NUM_RESULTS && capacity < (max_results) as usize {
                    results_buffer.reserve(capacity);
                }
            }
            Err(result) => {
                assert_eq!(result, MojoResult::DeadlineExceeded);
                self.notify_of_expired(deadline);
            }
        }
    }

    /// Loop forever until a callback tells us to quit.
    pub fn run(&mut self) {
        // It's an error it already be running...
        if self.running {
            panic!("RunLoop is already running!");
        }
        self.running = true;
        self.should_quit = false;
        let mut buffer: Vec<system::WaitSetResult> =
            Vec::with_capacity(INITIAL_WAIT_SET_NUM_RESULTS);
        // Loop while we haven't been signaled to quit, and there's something to wait on.
        while !self.should_quit && !self.handlers.is_empty() {
            self.wait(&mut buffer)
        }
        self.running = false;
    }

    /// Set a flag to quit at the next available moment.
    pub fn quit(&mut self) {
        self.should_quit = true;
    }
}

/// Provides a scope to modify the current thread's runloop.
pub fn with_current<F>(modifier: F)
    where F: FnOnce(&mut RunLoop)
{
    TL_RUN_LOOP.with(|ref_runloop| {
        let mut runloop = ref_runloop.borrow_mut();
        modifier(&mut *runloop);
    });
}
