| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "chromecast/external_mojo/external_service_support/external_connector_impl.h" |
| |
| #include <utility> |
| |
| #include "base/bind.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/memory/scoped_refptr.h" |
| #include "base/synchronization/lock.h" |
| #include "base/thread_annotations.h" |
| #include "base/threading/sequenced_task_runner_handle.h" |
| #include "base/time/time.h" |
| #include "chromecast/external_mojo/broker_service/broker_service.h" |
| #include "chromecast/external_mojo/external_service_support/external_service.h" |
| #include "mojo/public/cpp/bindings/pending_remote.h" |
| #include "mojo/public/cpp/platform/named_platform_channel.h" |
| #include "mojo/public/cpp/platform/platform_channel_endpoint.h" |
| #include "mojo/public/cpp/system/invitation.h" |
| #include "services/service_manager/public/cpp/connector.h" |
| |
| namespace chromecast { |
| namespace external_service_support { |
| |
| namespace { |
| constexpr base::TimeDelta kConnectRetryDelay = |
| base::TimeDelta::FromMilliseconds(500); |
| } // namespace |
| |
| // Since we are only allowed to make a single underlying connection to the |
| // broker, we share the underlying connection between all ExternalConnector |
| // instances. The ExternalConnectors use clones of the underlying connection. |
| // |
| // Since connection error callbacks are called in some arbitrary order, we need |
| // to be careful to handle disconnection correctly. Each underlying connection |
| // has a unique token (int64_t) associated with it, which is propagated to all |
| // clones. If any clone receives a disconnect callback, it tries to reconnect by |
| // calling ConnectClone(), passing in the previous token (associated with the |
| // broken connection). BrokerConnection then attempts to reconnect the |
| // underlying connection if the broken connection token matches the token for |
| // the current connection; if it doesn't match, the connection was already |
| // recreated, so nothing needs to be done. |
| class ExternalConnectorImpl::BrokerConnection |
| : public base::RefCountedThreadSafe<BrokerConnection> { |
| public: |
| explicit BrokerConnection(std::string broker_path) |
| : broker_path_(std::move(broker_path)), |
| task_runner_(base::SequencedTaskRunnerHandle::Get()) { |
| Connect(); |
| } |
| |
| int64_t ConnectClone( |
| int64_t dead_connection_token, |
| mojo::PendingReceiver<external_mojo::mojom::ExternalConnector> receiver) { |
| int64_t token; |
| { |
| base::AutoLock lock(lock_); |
| if (dead_connection_token == connection_token_) { |
| task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&BrokerConnection::Connect, this)); |
| ++connection_token_; |
| } |
| token = connection_token_; |
| } |
| task_runner_->PostTask(FROM_HERE, |
| base::BindOnce(&BrokerConnection::AttachClone, this, |
| std::move(receiver))); |
| return token; |
| } |
| |
| private: |
| friend class base::RefCountedThreadSafe<BrokerConnection>; |
| ~BrokerConnection() = default; |
| |
| void Connect() { |
| DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| connector_.reset(); |
| pending_receiver_ = connector_.BindNewPipeAndPassReceiver(); |
| AttemptBrokerConnection(); |
| } |
| |
| void AttachClone( |
| mojo::PendingReceiver<external_mojo::mojom::ExternalConnector> receiver) { |
| DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| connector_->Clone(std::move(receiver)); |
| } |
| |
| void AttemptBrokerConnection() { |
| mojo::PlatformChannelEndpoint endpoint = |
| mojo::NamedPlatformChannel::ConnectToServer(broker_path_); |
| if (!endpoint.is_valid()) { |
| task_runner_->PostDelayedTask( |
| FROM_HERE, |
| base::BindOnce(&BrokerConnection::AttemptBrokerConnection, |
| weak_factory_.GetWeakPtr()), |
| kConnectRetryDelay); |
| return; |
| } |
| |
| auto invitation = mojo::IncomingInvitation::Accept(std::move(endpoint)); |
| auto remote_pipe = invitation.ExtractMessagePipe(0); |
| if (!remote_pipe) { |
| LOG(ERROR) << "Invalid message pipe"; |
| task_runner_->PostDelayedTask( |
| FROM_HERE, |
| base::BindOnce(&BrokerConnection::AttemptBrokerConnection, |
| weak_factory_.GetWeakPtr()), |
| kConnectRetryDelay); |
| return; |
| } |
| |
| mojo::FuseMessagePipes(pending_receiver_.PassPipe(), |
| std::move(remote_pipe)); |
| } |
| |
| const std::string broker_path_; |
| const scoped_refptr<base::SequencedTaskRunner> task_runner_; |
| |
| mojo::Remote<external_mojo::mojom::ExternalConnector> connector_; |
| mojo::PendingReceiver<external_mojo::mojom::ExternalConnector> |
| pending_receiver_; |
| |
| base::Lock lock_; |
| int64_t connection_token_ GUARDED_BY(lock_) = 1; |
| |
| base::WeakPtrFactory<BrokerConnection> weak_factory_{this}; |
| }; |
| |
| // static |
| void ExternalConnector::Connect( |
| const std::string& broker_path, |
| base::OnceCallback<void(std::unique_ptr<ExternalConnector>)> callback) { |
| DCHECK(callback); |
| std::move(callback).Run(Create(broker_path)); |
| } |
| |
| // static |
| std::unique_ptr<ExternalConnector> ExternalConnector::Create( |
| const std::string& broker_path) { |
| return std::make_unique<ExternalConnectorImpl>(broker_path); |
| } |
| |
| // static |
| std::unique_ptr<ExternalConnector> ExternalConnector::Create( |
| mojo::PendingRemote<external_mojo::mojom::ExternalConnector> remote) { |
| return std::make_unique<ExternalConnectorImpl>(std::move(remote)); |
| } |
| |
| // static |
| std::unique_ptr<ExternalConnector> ExternalConnector::Create( |
| service_manager::Connector* connector) { |
| mojo::PendingRemote<external_mojo::mojom::ExternalConnector> pending_remote; |
| connector->BindInterface(external_mojo::BrokerService::kServiceName, |
| pending_remote.InitWithNewPipeAndPassReceiver()); |
| return std::make_unique<ExternalConnectorImpl>(std::move(pending_remote)); |
| } |
| |
| ExternalConnectorImpl::ExternalConnectorImpl(const std::string& broker_path) |
| : broker_connection_(base::MakeRefCounted<BrokerConnection>(broker_path)) { |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| Connect(); |
| } |
| |
| ExternalConnectorImpl::ExternalConnectorImpl( |
| scoped_refptr<BrokerConnection> broker_connection) |
| : broker_connection_(broker_connection) { |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| Connect(); |
| } |
| |
| ExternalConnectorImpl::ExternalConnectorImpl( |
| mojo::PendingRemote<external_mojo::mojom::ExternalConnector> pending_remote) |
| : pending_remote_(std::move(pending_remote)) { |
| DETACH_FROM_SEQUENCE(sequence_checker_); |
| } |
| |
| ExternalConnectorImpl::~ExternalConnectorImpl() = default; |
| |
| base::CallbackListSubscription |
| ExternalConnectorImpl::AddConnectionErrorCallback( |
| base::RepeatingClosure callback) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| return error_closures_.Add(std::move(callback)); |
| } |
| |
| void ExternalConnectorImpl::RegisterService(const std::string& service_name, |
| ExternalService* service) { |
| RegisterService(service_name, service->GetReceiver()); |
| } |
| |
| void ExternalConnectorImpl::RegisterService( |
| const std::string& service_name, |
| mojo::PendingRemote<external_mojo::mojom::ExternalService> service_remote) { |
| BindConnectorIfNecessary(); |
| auto service_instance_info = |
| chromecast::external_mojo::mojom::ServiceInstanceInfo::New( |
| service_name, std::move(service_remote)); |
| std::vector<chromecast::external_mojo::mojom::ServiceInstanceInfoPtr> v; |
| v.emplace_back(std::move(service_instance_info)); |
| connector_->RegisterServiceInstances(std::move(v)); |
| } |
| |
| void ExternalConnectorImpl::RegisterServices( |
| const std::vector<std::string>& service_names, |
| const std::vector<ExternalService*>& services) { |
| CHECK(service_names.size() == services.size()); |
| std::vector<chromecast::external_mojo::mojom::ServiceInstanceInfoPtr> |
| service_instances_info; |
| service_instances_info.reserve(services.size()); |
| for (size_t i = 0; i < services.size(); ++i) { |
| service_instances_info.emplace_back( |
| chromecast::external_mojo::mojom::ServiceInstanceInfo::New( |
| service_names[i], services[i]->GetReceiver())); |
| } |
| |
| RegisterServices(std::move(service_instances_info)); |
| } |
| |
| void ExternalConnectorImpl::RegisterServices( |
| std::vector<chromecast::external_mojo::mojom::ServiceInstanceInfoPtr> |
| service_instances_info) { |
| BindConnectorIfNecessary(); |
| connector_->RegisterServiceInstances(std::move(service_instances_info)); |
| } |
| |
| void ExternalConnectorImpl::QueryServiceList( |
| base::OnceCallback<void( |
| std::vector<chromecast::external_mojo::mojom::ExternalServiceInfoPtr>)> |
| callback) { |
| BindConnectorIfNecessary(); |
| connector_->QueryServiceList(std::move(callback)); |
| } |
| |
| void ExternalConnectorImpl::BindInterface( |
| const std::string& service_name, |
| const std::string& interface_name, |
| mojo::ScopedMessagePipeHandle interface_pipe, |
| bool async) { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (!async) { |
| BindInterfaceImmediately(service_name, interface_name, |
| std::move(interface_pipe)); |
| return; |
| } |
| base::SequencedTaskRunnerHandle::Get()->PostTask( |
| FROM_HERE, |
| base::BindOnce(&ExternalConnectorImpl::BindInterfaceImmediately, |
| weak_factory_.GetWeakPtr(), service_name, interface_name, |
| std::move(interface_pipe))); |
| } |
| |
| void ExternalConnectorImpl::BindInterfaceImmediately( |
| const std::string& service_name, |
| const std::string& interface_name, |
| mojo::ScopedMessagePipeHandle interface_pipe) { |
| BindConnectorIfNecessary(); |
| connector_->BindInterface(service_name, interface_name, |
| std::move(interface_pipe)); |
| } |
| |
| std::unique_ptr<ExternalConnector> ExternalConnectorImpl::Clone() { |
| if (broker_connection_) { |
| return std::make_unique<ExternalConnectorImpl>(broker_connection_); |
| } |
| BindConnectorIfNecessary(); |
| mojo::PendingRemote<external_mojo::mojom::ExternalConnector> remote; |
| connector_->Clone(remote.InitWithNewPipeAndPassReceiver()); |
| return std::make_unique<ExternalConnectorImpl>(std::move(remote)); |
| } |
| |
| void ExternalConnectorImpl::SendChromiumConnectorRequest( |
| mojo::ScopedMessagePipeHandle request) { |
| BindConnectorIfNecessary(); |
| connector_->BindChromiumConnector(std::move(request)); |
| } |
| |
| void ExternalConnectorImpl::Connect() { |
| DCHECK(broker_connection_); |
| connection_token_ = broker_connection_->ConnectClone( |
| connection_token_, pending_remote_.InitWithNewPipeAndPassReceiver()); |
| } |
| |
| void ExternalConnectorImpl::OnMojoDisconnect() { |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| connector_.reset(); |
| pending_remote_.reset(); |
| if (broker_connection_) { |
| Connect(); |
| BindConnectorIfNecessary(); |
| } |
| error_closures_.Notify(); |
| } |
| |
| void ExternalConnectorImpl::BindConnectorIfNecessary() { |
| // Bind the message pipe and SequenceChecker to the current thread the first |
| // time it is used to connect. |
| DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); |
| if (connector_.is_bound()) { |
| return; |
| } |
| |
| DCHECK(pending_remote_.is_valid()); |
| |
| connector_.Bind(std::move(pending_remote_)); |
| connector_.set_disconnect_handler(base::BindOnce( |
| &ExternalConnectorImpl::OnMojoDisconnect, base::Unretained(this))); |
| } |
| |
| } // namespace external_service_support |
| } // namespace chromecast |