// | |
// detail/impl/epoll_reactor.ipp | |
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
// | |
// Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com) | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
// | |
#ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP | |
#define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP | |
#if defined(_MSC_VER) && (_MSC_VER >= 1200) | |
# pragma once | |
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) | |
#include <boost/asio/detail/config.hpp> | |
#if defined(BOOST_ASIO_HAS_EPOLL) | |
#include <cstddef> | |
#include <sys/epoll.h> | |
#include <boost/asio/detail/epoll_reactor.hpp> | |
#include <boost/asio/detail/throw_error.hpp> | |
#include <boost/asio/error.hpp> | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
# include <sys/timerfd.h> | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
#include <boost/asio/detail/push_options.hpp> | |
namespace boost { | |
namespace asio { | |
namespace detail { | |
epoll_reactor::epoll_reactor(boost::asio::io_service& io_service) | |
: boost::asio::detail::service_base<epoll_reactor>(io_service), | |
io_service_(use_service<io_service_impl>(io_service)), | |
mutex_(), | |
epoll_fd_(do_epoll_create()), | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
timer_fd_(timerfd_create(CLOCK_MONOTONIC, 0)), | |
#else // defined(BOOST_ASIO_HAS_TIMERFD) | |
timer_fd_(-1), | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
interrupter_(), | |
shutdown_(false) | |
{ | |
// Add the interrupter's descriptor to epoll. | |
epoll_event ev = { 0, { 0 } }; | |
ev.events = EPOLLIN | EPOLLERR | EPOLLET; | |
ev.data.ptr = &interrupter_; | |
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); | |
interrupter_.interrupt(); | |
// Add the timer descriptor to epoll. | |
if (timer_fd_ != -1) | |
{ | |
ev.events = EPOLLIN | EPOLLERR; | |
ev.data.ptr = &timer_fd_; | |
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); | |
} | |
} | |
epoll_reactor::~epoll_reactor() | |
{ | |
close(epoll_fd_); | |
if (timer_fd_ != -1) | |
close(timer_fd_); | |
} | |
void epoll_reactor::shutdown_service() | |
{ | |
mutex::scoped_lock lock(mutex_); | |
shutdown_ = true; | |
lock.unlock(); | |
op_queue<operation> ops; | |
while (descriptor_state* state = registered_descriptors_.first()) | |
{ | |
for (int i = 0; i < max_ops; ++i) | |
ops.push(state->op_queue_[i]); | |
state->shutdown_ = true; | |
registered_descriptors_.free(state); | |
} | |
timer_queues_.get_all_timers(ops); | |
} | |
void epoll_reactor::init_task() | |
{ | |
io_service_.init_task(); | |
} | |
int epoll_reactor::register_descriptor(socket_type descriptor, | |
epoll_reactor::per_descriptor_data& descriptor_data) | |
{ | |
mutex::scoped_lock lock(registered_descriptors_mutex_); | |
descriptor_data = registered_descriptors_.alloc(); | |
descriptor_data->shutdown_ = false; | |
lock.unlock(); | |
epoll_event ev = { 0, { 0 } }; | |
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET; | |
ev.data.ptr = descriptor_data; | |
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); | |
if (result != 0) | |
return errno; | |
return 0; | |
} | |
void epoll_reactor::start_op(int op_type, socket_type descriptor, | |
epoll_reactor::per_descriptor_data& descriptor_data, | |
reactor_op* op, bool allow_speculative) | |
{ | |
if (!descriptor_data) | |
{ | |
op->ec_ = boost::asio::error::bad_descriptor; | |
post_immediate_completion(op); | |
return; | |
} | |
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
if (descriptor_data->shutdown_) | |
{ | |
post_immediate_completion(op); | |
return; | |
} | |
if (descriptor_data->op_queue_[op_type].empty()) | |
{ | |
if (allow_speculative | |
&& (op_type != read_op | |
|| descriptor_data->op_queue_[except_op].empty())) | |
{ | |
if (op->perform()) | |
{ | |
descriptor_lock.unlock(); | |
io_service_.post_immediate_completion(op); | |
return; | |
} | |
} | |
else | |
{ | |
epoll_event ev = { 0, { 0 } }; | |
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | |
| EPOLLOUT | EPOLLPRI | EPOLLET; | |
ev.data.ptr = descriptor_data; | |
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); | |
} | |
} | |
descriptor_data->op_queue_[op_type].push(op); | |
io_service_.work_started(); | |
} | |
void epoll_reactor::cancel_ops(socket_type, | |
epoll_reactor::per_descriptor_data& descriptor_data) | |
{ | |
if (!descriptor_data) | |
return; | |
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
op_queue<operation> ops; | |
for (int i = 0; i < max_ops; ++i) | |
{ | |
while (reactor_op* op = descriptor_data->op_queue_[i].front()) | |
{ | |
op->ec_ = boost::asio::error::operation_aborted; | |
descriptor_data->op_queue_[i].pop(); | |
ops.push(op); | |
} | |
} | |
descriptor_lock.unlock(); | |
io_service_.post_deferred_completions(ops); | |
} | |
void epoll_reactor::close_descriptor(socket_type, | |
epoll_reactor::per_descriptor_data& descriptor_data) | |
{ | |
if (!descriptor_data) | |
return; | |
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); | |
if (!descriptor_data->shutdown_) | |
{ | |
// Remove the descriptor from the set of known descriptors. The descriptor | |
// will be automatically removed from the epoll set when it is closed. | |
op_queue<operation> ops; | |
for (int i = 0; i < max_ops; ++i) | |
{ | |
while (reactor_op* op = descriptor_data->op_queue_[i].front()) | |
{ | |
op->ec_ = boost::asio::error::operation_aborted; | |
descriptor_data->op_queue_[i].pop(); | |
ops.push(op); | |
} | |
} | |
descriptor_data->shutdown_ = true; | |
descriptor_lock.unlock(); | |
registered_descriptors_.free(descriptor_data); | |
descriptor_data = 0; | |
descriptors_lock.unlock(); | |
io_service_.post_deferred_completions(ops); | |
} | |
} | |
void epoll_reactor::run(bool block, op_queue<operation>& ops) | |
{ | |
// Calculate a timeout only if timerfd is not used. | |
int timeout; | |
if (timer_fd_ != -1) | |
timeout = block ? -1 : 0; | |
else | |
{ | |
mutex::scoped_lock lock(mutex_); | |
timeout = block ? get_timeout() : 0; | |
} | |
// Block on the epoll descriptor. | |
epoll_event events[128]; | |
int num_events = epoll_wait(epoll_fd_, events, 128, timeout); | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
bool check_timers = (timer_fd_ == -1); | |
#else // defined(BOOST_ASIO_HAS_TIMERFD) | |
bool check_timers = true; | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
// Dispatch the waiting events. | |
for (int i = 0; i < num_events; ++i) | |
{ | |
void* ptr = events[i].data.ptr; | |
if (ptr == &interrupter_) | |
{ | |
// No need to reset the interrupter since we're leaving the descriptor | |
// in a ready-to-read state and relying on edge-triggered notifications | |
// to make it so that we only get woken up when the descriptor's epoll | |
// registration is updated. | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
if (timer_fd_ == -1) | |
check_timers = true; | |
#else // defined(BOOST_ASIO_HAS_TIMERFD) | |
check_timers = true; | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
} | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
else if (ptr == &timer_fd_) | |
{ | |
check_timers = true; | |
} | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
else | |
{ | |
descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); | |
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); | |
// Exception operations must be processed first to ensure that any | |
// out-of-band data is read before normal data. | |
static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; | |
for (int j = max_ops - 1; j >= 0; --j) | |
{ | |
if (events[i].events & (flag[j] | EPOLLERR | EPOLLHUP)) | |
{ | |
while (reactor_op* op = descriptor_data->op_queue_[j].front()) | |
{ | |
if (op->perform()) | |
{ | |
descriptor_data->op_queue_[j].pop(); | |
ops.push(op); | |
} | |
else | |
break; | |
} | |
} | |
} | |
} | |
} | |
if (check_timers) | |
{ | |
mutex::scoped_lock common_lock(mutex_); | |
timer_queues_.get_ready_timers(ops); | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
if (timer_fd_ != -1) | |
{ | |
itimerspec new_timeout; | |
itimerspec old_timeout; | |
int flags = get_timeout(new_timeout); | |
timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); | |
} | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
} | |
} | |
void epoll_reactor::interrupt() | |
{ | |
epoll_event ev = { 0, { 0 } }; | |
ev.events = EPOLLIN | EPOLLERR | EPOLLET; | |
ev.data.ptr = &interrupter_; | |
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev); | |
} | |
int epoll_reactor::do_epoll_create() | |
{ | |
int fd = epoll_create(epoll_size); | |
if (fd == -1) | |
{ | |
boost::system::error_code ec(errno, | |
boost::asio::error::get_system_category()); | |
boost::asio::detail::throw_error(ec, "epoll"); | |
} | |
return fd; | |
} | |
void epoll_reactor::do_add_timer_queue(timer_queue_base& queue) | |
{ | |
mutex::scoped_lock lock(mutex_); | |
timer_queues_.insert(&queue); | |
} | |
void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue) | |
{ | |
mutex::scoped_lock lock(mutex_); | |
timer_queues_.erase(&queue); | |
} | |
void epoll_reactor::update_timeout() | |
{ | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
if (timer_fd_ != -1) | |
{ | |
itimerspec new_timeout; | |
itimerspec old_timeout; | |
int flags = get_timeout(new_timeout); | |
timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); | |
return; | |
} | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
interrupt(); | |
} | |
int epoll_reactor::get_timeout() | |
{ | |
// By default we will wait no longer than 5 minutes. This will ensure that | |
// any changes to the system clock are detected after no longer than this. | |
return timer_queues_.wait_duration_msec(5 * 60 * 1000); | |
} | |
#if defined(BOOST_ASIO_HAS_TIMERFD) | |
int epoll_reactor::get_timeout(itimerspec& ts) | |
{ | |
ts.it_interval.tv_sec = 0; | |
ts.it_interval.tv_nsec = 0; | |
long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); | |
ts.it_value.tv_sec = usec / 1000000; | |
ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1; | |
return usec ? 0 : TFD_TIMER_ABSTIME; | |
} | |
#endif // defined(BOOST_ASIO_HAS_TIMERFD) | |
} // namespace detail | |
} // namespace asio | |
} // namespace boost | |
#include <boost/asio/detail/pop_options.hpp> | |
#endif // defined(BOOST_ASIO_HAS_EPOLL) | |
#endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP |