// Copyright (C) 2004-2006 The Trustees of Indiana University. | |
// Use, modification and distribution is subject to the Boost Software | |
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at | |
// http://www.boost.org/LICENSE_1_0.txt) | |
// Authors: Douglas Gregor | |
// Andrew Lumsdaine | |
#include <boost/optional.hpp> | |
#include <cassert> | |
#include <boost/graph/parallel/algorithm.hpp> | |
#include <boost/graph/parallel/process_group.hpp> | |
#include <functional> | |
#include <algorithm> | |
#include <boost/graph/parallel/simple_trigger.hpp> | |
#ifndef BOOST_GRAPH_USE_MPI | |
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" | |
#endif | |
namespace boost { namespace graph { namespace distributed { | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
const Buffer& buffer, bool polling) | |
: process_group(process_group, attach_distributed_object()), | |
owner(owner), | |
buffer(buffer), | |
polling(polling) | |
{ | |
if (!polling) | |
outgoing_buffers.reset( | |
new outgoing_buffers_t(num_processes(process_group))); | |
setup_triggers(); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
const Buffer& buffer, const UnaryPredicate& pred, | |
bool polling) | |
: process_group(process_group, attach_distributed_object()), | |
owner(owner), | |
buffer(buffer), | |
pred(pred), | |
polling(polling) | |
{ | |
if (!polling) | |
outgoing_buffers.reset( | |
new outgoing_buffers_t(num_processes(process_group))); | |
setup_triggers(); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, | |
const UnaryPredicate& pred, bool polling) | |
: process_group(process_group, attach_distributed_object()), | |
owner(owner), | |
pred(pred), | |
polling(polling) | |
{ | |
if (!polling) | |
outgoing_buffers.reset( | |
new outgoing_buffers_t(num_processes(process_group))); | |
setup_triggers(); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
void | |
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x) | |
{ | |
typename ProcessGroup::process_id_type dest = get(owner, x); | |
if (outgoing_buffers) | |
outgoing_buffers->at(dest).push_back(x); | |
else if (dest == process_id(process_group)) | |
buffer.push(x); | |
else | |
send(process_group, get(owner, x), msg_push, x); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
bool | |
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const | |
{ | |
/* Processes will stay here until the buffer is nonempty or | |
synchronization with the other processes indicates that all local | |
buffers are empty (and no messages are in transit). | |
*/ | |
while (buffer.empty() && !do_synchronize()) ; | |
return buffer.empty(); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type | |
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const | |
{ | |
empty(); | |
return buffer.size(); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers() | |
{ | |
using boost::graph::parallel::simple_trigger; | |
simple_trigger(process_group, msg_push, this, | |
&distributed_queue::handle_push); | |
simple_trigger(process_group, msg_multipush, this, | |
&distributed_queue::handle_multipush); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
void | |
BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
handle_push(int /*source*/, int /*tag*/, const value_type& value, | |
trigger_receive_context) | |
{ | |
if (pred(value)) buffer.push(value); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
void | |
BOOST_DISTRIBUTED_QUEUE_TYPE:: | |
handle_multipush(int /*source*/, int /*tag*/, | |
const std::vector<value_type>& values, | |
trigger_receive_context) | |
{ | |
for (std::size_t i = 0; i < values.size(); ++i) | |
if (pred(values[i])) buffer.push(values[i]); | |
} | |
template<BOOST_DISTRIBUTED_QUEUE_PARMS> | |
bool | |
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const | |
{ | |
#ifdef PBGL_ACCOUNTING | |
++num_synchronizations; | |
#endif | |
using boost::parallel::all_reduce; | |
using std::swap; | |
typedef typename ProcessGroup::process_id_type process_id_type; | |
if (outgoing_buffers) { | |
// Transfer all of the push requests | |
process_id_type id = process_id(process_group); | |
process_id_type np = num_processes(process_group); | |
for (process_id_type dest = 0; dest < np; ++dest) { | |
outgoing_buffer_t& outgoing = outgoing_buffers->at(dest); | |
std::size_t size = outgoing.size(); | |
if (size != 0) { | |
if (dest != id) { | |
send(process_group, dest, msg_multipush, outgoing); | |
} else { | |
for (std::size_t i = 0; i < size; ++i) | |
buffer.push(outgoing[i]); | |
} | |
outgoing.clear(); | |
} | |
} | |
} | |
synchronize(process_group); | |
unsigned local_size = buffer.size(); | |
unsigned global_size = | |
all_reduce(process_group, local_size, std::plus<unsigned>()); | |
return global_size == 0; | |
} | |
} } } // end namespace boost::graph::distributed |