blob: c0bae426a8992090a79203a4ab9eb736cc0f2209 [file] [log] [blame]
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// TODO(satorux):
// - Handle "disconnected" signal.
// - Collect metrics (ex. # of method calls, method call time, etc.)
#include "dbus/bus.h"
#include "base/bind.h"
#include "base/logging.h"
#include "base/message_loop.h"
#include "base/stl_util.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "dbus/exported_object.h"
#include "dbus/object_proxy.h"
#include "dbus/scoped_dbus_error.h"
namespace dbus {
namespace {
// The class is used for watching the file descriptor used for D-Bus
// communication.
class Watch : public base::MessagePumpLibevent::Watcher {
public:
Watch(DBusWatch* watch)
: raw_watch_(watch) {
dbus_watch_set_data(raw_watch_, this, NULL);
}
~Watch() {
dbus_watch_set_data(raw_watch_, NULL, NULL);
}
// Returns true if the underlying file descriptor is ready to be watched.
bool IsReadyToBeWatched() {
return dbus_watch_get_enabled(raw_watch_);
}
// Starts watching the underlying file descriptor.
void StartWatching() {
const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
const int flags = dbus_watch_get_flags(raw_watch_);
MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ;
if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
mode = MessageLoopForIO::WATCH_READ_WRITE;
else if (flags & DBUS_WATCH_READABLE)
mode = MessageLoopForIO::WATCH_READ;
else if (flags & DBUS_WATCH_WRITABLE)
mode = MessageLoopForIO::WATCH_WRITE;
else
NOTREACHED();
const bool persistent = true; // Watch persistently.
const bool success = MessageLoopForIO::current()->WatchFileDescriptor(
file_descriptor,
persistent,
mode,
&file_descriptor_watcher_,
this);
CHECK(success) << "Unable to allocate memory";
}
// Stops watching the underlying file descriptor.
void StopWatching() {
file_descriptor_watcher_.StopWatchingFileDescriptor();
}
private:
// Implement MessagePumpLibevent::Watcher.
virtual void OnFileCanReadWithoutBlocking(int file_descriptor) {
const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
CHECK(success) << "Unable to allocate memory";
}
// Implement MessagePumpLibevent::Watcher.
virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) {
const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
CHECK(success) << "Unable to allocate memory";
}
DBusWatch* raw_watch_;
base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
};
// The class is used for monitoring the timeout used for D-Bus method
// calls.
//
// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
// the object is is alive when HandleTimeout() is called. It's unlikely
// but it may be possible that HandleTimeout() is called after
// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
// Bus::OnRemoveTimeout().
class Timeout : public base::RefCountedThreadSafe<Timeout> {
public:
Timeout(DBusTimeout* timeout)
: raw_timeout_(timeout),
monitoring_is_active_(false),
is_completed(false) {
dbus_timeout_set_data(raw_timeout_, this, NULL);
AddRef(); // Balanced on Complete().
}
// Returns true if the timeout is ready to be monitored.
bool IsReadyToBeMonitored() {
return dbus_timeout_get_enabled(raw_timeout_);
}
// Starts monitoring the timeout.
void StartMonitoring(dbus::Bus* bus) {
bus->PostDelayedTaskToDBusThread(FROM_HERE,
base::Bind(&Timeout::HandleTimeout,
this),
GetIntervalInMs());
monitoring_is_active_ = true;
}
// Stops monitoring the timeout.
void StopMonitoring() {
// We cannot take back the delayed task we posted in
// StartMonitoring(), so we just mark the monitoring is inactive now.
monitoring_is_active_ = false;
}
// Returns the interval in milliseconds.
int GetIntervalInMs() {
return dbus_timeout_get_interval(raw_timeout_);
}
// Cleans up the raw_timeout and marks that timeout is completed.
// See the class comment above for why we are doing this.
void Complete() {
dbus_timeout_set_data(raw_timeout_, NULL, NULL);
is_completed = true;
Release();
}
private:
friend class base::RefCountedThreadSafe<Timeout>;
~Timeout() {
}
// Handles the timeout.
void HandleTimeout() {
// If the timeout is marked completed, we should do nothing. This can
// occur if this function is called after Bus::OnRemoveTimeout().
if (is_completed)
return;
// Skip if monitoring is cancled.
if (!monitoring_is_active_)
return;
const bool success = dbus_timeout_handle(raw_timeout_);
CHECK(success) << "Unable to allocate memory";
}
DBusTimeout* raw_timeout_;
bool monitoring_is_active_;
bool is_completed;
};
} // namespace
Bus::Options::Options()
: bus_type(SESSION),
connection_type(PRIVATE),
dbus_thread(NULL) {
}
Bus::Options::~Options() {
}
Bus::Bus(const Options& options)
: bus_type_(options.bus_type),
connection_type_(options.connection_type),
dbus_thread_(options.dbus_thread),
connection_(NULL),
origin_loop_(MessageLoop::current()),
origin_thread_id_(base::PlatformThread::CurrentId()),
dbus_thread_id_(base::kInvalidThreadId),
async_operations_are_set_up_(false),
num_pending_watches_(0),
num_pending_timeouts_(0) {
if (dbus_thread_) {
dbus_thread_id_ = dbus_thread_->thread_id();
DCHECK(dbus_thread_->IsRunning())
<< "The D-Bus thread should be running";
DCHECK_EQ(MessageLoop::TYPE_IO,
dbus_thread_->message_loop()->type())
<< "The D-Bus thread should have an MessageLoopForIO attached";
}
// This is safe to call multiple times.
dbus_threads_init_default();
}
Bus::~Bus() {
DCHECK(!connection_);
DCHECK(owned_service_names_.empty());
DCHECK(match_rules_added_.empty());
DCHECK(filter_functions_added_.empty());
DCHECK(registered_object_paths_.empty());
DCHECK_EQ(0, num_pending_watches_);
DCHECK_EQ(0, num_pending_timeouts_);
}
ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
const std::string& object_path) {
AssertOnOriginThread();
scoped_refptr<ObjectProxy> object_proxy =
new ObjectProxy(this, service_name, object_path);
object_proxies_.push_back(object_proxy);
return object_proxy;
}
ExportedObject* Bus::GetExportedObject(const std::string& service_name,
const std::string& object_path) {
AssertOnOriginThread();
scoped_refptr<ExportedObject> exported_object =
new ExportedObject(this, service_name, object_path);
exported_objects_.push_back(exported_object);
return exported_object;
}
bool Bus::Connect() {
// dbus_bus_get_private() and dbus_bus_get() are blocking calls.
AssertOnDBusThread();
// Check if it's already initialized.
if (connection_)
return true;
ScopedDBusError error;
const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
if (connection_type_ == PRIVATE) {
connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
} else {
connection_ = dbus_bus_get(dbus_bus_type, error.get());
}
if (!connection_) {
LOG(ERROR) << "Failed to connect to the bus: "
<< (dbus_error_is_set(error.get()) ? error.message() : "");
return false;
}
// We shouldn't exit on the disconnected signal.
dbus_connection_set_exit_on_disconnect(connection_, false);
return true;
}
void Bus::ShutdownAndBlock() {
AssertOnDBusThread();
// Unregister the exported objects.
for (size_t i = 0; i < exported_objects_.size(); ++i) {
exported_objects_[i]->Unregister();
}
// Release all service names.
for (std::set<std::string>::iterator iter = owned_service_names_.begin();
iter != owned_service_names_.end();) {
// This is a bit tricky but we should increment the iter here as
// ReleaseOwnership() may remove |service_name| from the set.
const std::string& service_name = *iter++;
ReleaseOwnership(service_name);
}
if (!owned_service_names_.empty()) {
LOG(ERROR) << "Failed to release all service names. # of services left: "
<< owned_service_names_.size();
}
// Detach from the remote objects.
for (size_t i = 0; i < object_proxies_.size(); ++i) {
object_proxies_[i]->Detach();
}
// Private connection should be closed.
if (connection_ && connection_type_ == PRIVATE) {
dbus_connection_close(connection_);
}
// dbus_connection_close() won't unref.
dbus_connection_unref(connection_);
connection_ = NULL;
}
void Bus::Shutdown(OnShutdownCallback callback) {
AssertOnOriginThread();
PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal,
this,
callback));
}
bool Bus::RequestOwnership(const std::string& service_name) {
DCHECK(connection_);
// dbus_bus_request_name() is a blocking call.
AssertOnDBusThread();
// Check if we already own the service name.
if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
return true;
}
ScopedDBusError error;
const int result = dbus_bus_request_name(connection_,
service_name.c_str(),
DBUS_NAME_FLAG_DO_NOT_QUEUE,
error.get());
if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
LOG(ERROR) << "Failed to get the onwership of " << service_name << ": "
<< (dbus_error_is_set(error.get()) ? error.message() : "");
return false;
}
owned_service_names_.insert(service_name);
return true;
}
bool Bus::ReleaseOwnership(const std::string& service_name) {
DCHECK(connection_);
// dbus_bus_request_name() is a blocking call.
AssertOnDBusThread();
// Check if we already own the service name.
std::set<std::string>::iterator found =
owned_service_names_.find(service_name);
if (found == owned_service_names_.end()) {
LOG(ERROR) << service_name << " is not owned by the bus";
return false;
}
ScopedDBusError error;
const int result = dbus_bus_release_name(connection_, service_name.c_str(),
error.get());
if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
owned_service_names_.erase(found);
return true;
} else {
LOG(ERROR) << "Failed to release the onwership of " << service_name << ": "
<< (error.is_set() ? error.message() : "");
return false;
}
}
bool Bus::SetUpAsyncOperations() {
DCHECK(connection_);
AssertOnDBusThread();
if (async_operations_are_set_up_)
return true;
// Process all the incoming data if any, so that OnDispatchStatus() will
// be called when the incoming data is ready.
ProcessAllIncomingDataIfAny();
bool success = dbus_connection_set_watch_functions(connection_,
&Bus::OnAddWatchThunk,
&Bus::OnRemoveWatchThunk,
&Bus::OnToggleWatchThunk,
this,
NULL);
CHECK(success) << "Unable to allocate memory";
// TODO(satorux): Timeout is not yet implemented.
success = dbus_connection_set_timeout_functions(connection_,
&Bus::OnAddTimeoutThunk,
&Bus::OnRemoveTimeoutThunk,
&Bus::OnToggleTimeoutThunk,
this,
NULL);
CHECK(success) << "Unable to allocate memory";
dbus_connection_set_dispatch_status_function(
connection_,
&Bus::OnDispatchStatusChangedThunk,
this,
NULL);
async_operations_are_set_up_ = true;
return true;
}
DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
int timeout_ms,
DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();
return dbus_connection_send_with_reply_and_block(
connection_, request, timeout_ms, error);
}
void Bus::SendWithReply(DBusMessage* request,
DBusPendingCall** pending_call,
int timeout_ms) {
DCHECK(connection_);
AssertOnDBusThread();
const bool success = dbus_connection_send_with_reply(
connection_, request, pending_call, timeout_ms);
CHECK(success) << "Unable to allocate memory";
}
void Bus::Send(DBusMessage* request, uint32* serial) {
DCHECK(connection_);
AssertOnDBusThread();
const bool success = dbus_connection_send(connection_, request, serial);
CHECK(success) << "Unable to allocate memory";
}
void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data) {
DCHECK(connection_);
AssertOnDBusThread();
if (filter_functions_added_.find(filter_function) !=
filter_functions_added_.end()) {
LOG(ERROR) << "Filter function already exists: " << filter_function;
return;
}
const bool success = dbus_connection_add_filter(
connection_, filter_function, user_data, NULL);
CHECK(success) << "Unable to allocate memory";
filter_functions_added_.insert(filter_function);
}
void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
void* user_data) {
DCHECK(connection_);
AssertOnDBusThread();
if (filter_functions_added_.find(filter_function) ==
filter_functions_added_.end()) {
LOG(ERROR) << "Requested to remove an unknown filter function: "
<< filter_function;
return;
}
dbus_connection_remove_filter(connection_, filter_function, user_data);
filter_functions_added_.erase(filter_function);
}
void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();
if (match_rules_added_.find(match_rule) != match_rules_added_.end()) {
LOG(ERROR) << "Match rule already exists: " << match_rule;
return;
}
dbus_bus_add_match(connection_, match_rule.c_str(), error);
match_rules_added_.insert(match_rule);
}
void Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();
if (match_rules_added_.find(match_rule) == match_rules_added_.end()) {
LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
return;
}
dbus_bus_remove_match(connection_, match_rule.c_str(), error);
match_rules_added_.erase(match_rule);
}
bool Bus::TryRegisterObjectPath(const std::string& object_path,
const DBusObjectPathVTable* vtable,
void* user_data,
DBusError* error) {
DCHECK(connection_);
AssertOnDBusThread();
DCHECK(registered_object_paths_.find(object_path) ==
registered_object_paths_.end())
<< "Object path already registered: " << object_path;
const bool success = dbus_connection_try_register_object_path(
connection_,
object_path.c_str(),
vtable,
user_data,
error);
if (success)
registered_object_paths_.insert(object_path);
return success;
}
void Bus::UnregisterObjectPath(const std::string& object_path) {
DCHECK(connection_);
AssertOnDBusThread();
DCHECK(registered_object_paths_.find(object_path) !=
registered_object_paths_.end())
<< "Requested to unregister an unknown object path: " << object_path;
const bool success = dbus_connection_unregister_object_path(
connection_,
object_path.c_str());
CHECK(success) << "Unable to allocate memory";
registered_object_paths_.erase(object_path);
}
void Bus::ShutdownInternal(OnShutdownCallback callback) {
AssertOnDBusThread();
ShutdownAndBlock();
PostTaskToOriginThread(FROM_HERE, callback);
}
void Bus::ProcessAllIncomingDataIfAny() {
AssertOnDBusThread();
// As mentioned at the class comment in .h file, connection_ can be NULL.
if (!connection_ || !dbus_connection_get_is_connected(connection_))
return;
if (dbus_connection_get_dispatch_status(connection_) ==
DBUS_DISPATCH_DATA_REMAINS) {
while (dbus_connection_dispatch(connection_) ==
DBUS_DISPATCH_DATA_REMAINS);
}
}
void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
const base::Closure& task) {
origin_loop_->PostTask(from_here, task);
}
void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
const base::Closure& task) {
if (dbus_thread_)
dbus_thread_->message_loop()->PostTask(from_here, task);
else
origin_loop_->PostTask(from_here, task);
}
void Bus::PostDelayedTaskToDBusThread(
const tracked_objects::Location& from_here,
const base::Closure& task,
int delay_ms) {
if (dbus_thread_)
dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms);
else
origin_loop_->PostDelayedTask(from_here, task, delay_ms);
}
bool Bus::HasDBusThread() {
return dbus_thread_ != NULL;
}
void Bus::AssertOnOriginThread() {
DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
}
void Bus::AssertOnDBusThread() {
base::ThreadRestrictions::AssertIOAllowed();
if (dbus_thread_) {
DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId());
} else {
AssertOnOriginThread();
}
}
dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
AssertOnDBusThread();
// watch will be deleted when raw_watch is removed in OnRemoveWatch().
Watch* watch = new Watch(raw_watch);
if (watch->IsReadyToBeWatched()) {
watch->StartWatching();
}
++num_pending_watches_;
return true;
}
void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
AssertOnDBusThread();
Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
delete watch;
--num_pending_watches_;
}
void Bus::OnToggleWatch(DBusWatch* raw_watch) {
AssertOnDBusThread();
Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
if (watch->IsReadyToBeWatched()) {
watch->StartWatching();
} else {
// It's safe to call this if StartWatching() wasn't called, per
// message_pump_libevent.h.
watch->StopWatching();
}
}
dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
AssertOnDBusThread();
// timeout will be deleted when raw_timeout is removed in
// OnRemoveTimeoutThunk().
Timeout* timeout = new Timeout(raw_timeout);
if (timeout->IsReadyToBeMonitored()) {
timeout->StartMonitoring(this);
}
++num_pending_timeouts_;
return true;
}
void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
AssertOnDBusThread();
Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
timeout->Complete();
--num_pending_timeouts_;
}
void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
AssertOnDBusThread();
Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
if (timeout->IsReadyToBeMonitored()) {
timeout->StartMonitoring(this);
} else {
timeout->StopMonitoring();
}
}
void Bus::OnDispatchStatusChanged(DBusConnection* connection,
DBusDispatchStatus status) {
DCHECK_EQ(connection, connection_);
AssertOnDBusThread();
if (!dbus_connection_get_is_connected(connection))
return;
// We cannot call ProcessAllIncomingDataIfAny() here, as calling
// dbus_connection_dispatch() inside DBusDispatchStatusFunction is
// prohibited by the D-Bus library. Hence, we post a task here instead.
// See comments for dbus_connection_set_dispatch_status_function().
PostTaskToDBusThread(FROM_HERE,
base::Bind(&Bus::ProcessAllIncomingDataIfAny,
this));
}
dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnAddWatch(raw_watch);
}
void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnRemoveWatch(raw_watch);
}
void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnToggleWatch(raw_watch);
}
dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnAddTimeout(raw_timeout);
}
void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnRemoveTimeout(raw_timeout);
}
void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnToggleTimeout(raw_timeout);
}
void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
DBusDispatchStatus status,
void* data) {
Bus* self = static_cast<Bus*>(data);
return self->OnDispatchStatusChanged(connection, status);
}
} // namespace dbus