blob: 70a368f973ca04fabae894a879ae2f9a4647d147 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/internal/service_controller_router.h"
#include <cstddef>
#include <memory>
#include <string>
#include <utility>
#include "core/internal/client_proxy.h"
#include "core/listeners.h"
#include "core/options.h"
#include "core/params.h"
#include "core/payload.h"
#include "platform/base/feature_flags.h"
#include "platform/public/logging.h"
#include "absl/memory/memory.h"
#include "absl/time/clock.h"
namespace location {
namespace nearby {
namespace connections {
namespace {
// Length of a MAC address, which consists of 6 bytes uniquely identifying a
// hardware interface.
const std::size_t kMacAddressLength = 6u;
// Length used for an endpoint ID, which identifies a device discovery and
// associated connection request.
const std::size_t kEndpointIdLength = 4u;
// Maximum length for information describing an endpoint; this information is
// advertised by one device and can be used by the other device to identify the
// advertiser.
const std::size_t kMaxEndpointInfoLength = 131u;
} // namespace
ServiceControllerRouter::ServiceControllerRouter(
std::function<ServiceController*()> factory)
: service_controller_factory_(std::move(factory)) {
NEARBY_LOGS(INFO) << "ServiceControllerRouter going up.";
}
ServiceControllerRouter::~ServiceControllerRouter() {
NEARBY_LOGS(INFO) << "ServiceControllerRouter going down.";
if (service_controller_) {
service_controller_->Stop();
}
// And make sure that cleanup is the last thing we do.
serializer_.Shutdown();
}
void ServiceControllerRouter::StartAdvertising(
ClientProxy* client, absl::string_view service_id,
const ConnectionOptions& options, const ConnectionRequestInfo& info,
const ResultCallback& callback) {
RouteToServiceController(
"scr-start-advertising",
[this, client, service_id = std::string(service_id), options, info,
callback]() {
Status status =
AcquireServiceControllerForClient(client, options.strategy);
if (!status.Ok()) {
callback.result_cb(status);
return;
}
if (client->IsAdvertising()) {
callback.result_cb({Status::kAlreadyAdvertising});
return;
}
status = service_controller_->StartAdvertising(client, service_id,
options, info);
callback.result_cb(status);
});
}
void ServiceControllerRouter::StopAdvertising(ClientProxy* client,
const ResultCallback& callback) {
RouteToServiceController("scr-stop-advertising", [this, client, callback]() {
if (ClientHasAcquiredServiceController(client) && client->IsAdvertising()) {
service_controller_->StopAdvertising(client);
}
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::StartDiscovery(ClientProxy* client,
absl::string_view service_id,
const ConnectionOptions& options,
const DiscoveryListener& listener,
const ResultCallback& callback) {
RouteToServiceController("scr-start-discovery", [this, client,
service_id =
std::string(service_id),
options, listener,
callback]() {
Status status = AcquireServiceControllerForClient(client, options.strategy);
if (!status.Ok()) {
callback.result_cb(status);
return;
}
if (client->IsDiscovering()) {
callback.result_cb({Status::kAlreadyDiscovering});
return;
}
status = service_controller_->StartDiscovery(client, service_id, options,
listener);
callback.result_cb(status);
});
}
void ServiceControllerRouter::StopDiscovery(ClientProxy* client,
const ResultCallback& callback) {
RouteToServiceController("scr-stop-discovery", [this, client, callback]() {
if (ClientHasAcquiredServiceController(client) && client->IsDiscovering()) {
service_controller_->StopDiscovery(client);
}
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::InjectEndpoint(
ClientProxy* client, absl::string_view service_id,
const OutOfBandConnectionMetadata& metadata,
const ResultCallback& callback) {
RouteToServiceController("scr-inject-endpoint", [this, client,
service_id =
std::string(service_id),
metadata, callback]() {
// Currently, Bluetooth is the only supported medium for endpoint injection.
if (metadata.medium != Medium::BLUETOOTH ||
metadata.remote_bluetooth_mac_address.size() != kMacAddressLength) {
callback.result_cb({Status::kError});
return;
}
if (metadata.endpoint_id.size() != kEndpointIdLength) {
callback.result_cb({Status::kError});
return;
}
if (metadata.endpoint_info.Empty() ||
metadata.endpoint_info.size() > kMaxEndpointInfoLength) {
callback.result_cb({Status::kError});
return;
}
if (!ClientHasAcquiredServiceController(client) ||
!client->IsDiscovering()) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
service_controller_->InjectEndpoint(client, service_id, metadata);
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::RequestConnection(
ClientProxy* client, absl::string_view endpoint_id,
const ConnectionRequestInfo& info, const ConnectionOptions& options,
const ResultCallback& callback) {
// Cancellations can be fired from clients anytime, need to add the
// CancellationListener as soon as possible.
client->AddCancellationFlag(std::string(endpoint_id));
RouteToServiceController(
"scr-request-connection",
[this, client, endpoint_id = std::string(endpoint_id), info, options,
callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
if (client->HasPendingConnectionToEndpoint(endpoint_id) ||
client->IsConnectedToEndpoint(endpoint_id)) {
callback.result_cb({Status::kAlreadyConnectedToEndpoint});
return;
}
Status status = service_controller_->RequestConnection(
client, endpoint_id, info, options);
if (!status.Ok()) {
client->CancelEndpoint(endpoint_id);
}
callback.result_cb(status);
});
}
void ServiceControllerRouter::AcceptConnection(ClientProxy* client,
absl::string_view endpoint_id,
const PayloadListener& listener,
const ResultCallback& callback) {
RouteToServiceController("scr-accept-connection", [this, client,
endpoint_id = std::string(
endpoint_id),
listener, callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
if (client->IsConnectedToEndpoint(endpoint_id)) {
callback.result_cb({Status::kAlreadyConnectedToEndpoint});
return;
}
if (client->HasLocalEndpointResponded(endpoint_id)) {
NEARBY_LOGS(WARNING)
<< "Client " << client->GetClientId()
<< " invoked acceptConnectionRequest() after having already "
"accepted/rejected the connection to endpoint(id="
<< endpoint_id << ")";
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
callback.result_cb(
service_controller_->AcceptConnection(client, endpoint_id, listener));
});
}
void ServiceControllerRouter::RejectConnection(ClientProxy* client,
absl::string_view endpoint_id,
const ResultCallback& callback) {
client->CancelEndpoint(std::string(endpoint_id));
RouteToServiceController(
"scr-reject-connection",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
if (client->IsConnectedToEndpoint(endpoint_id)) {
callback.result_cb({Status::kAlreadyConnectedToEndpoint});
return;
}
if (client->HasLocalEndpointResponded(endpoint_id)) {
NEARBY_LOGS(WARNING)
<< "Client " << client->GetClientId()
<< " invoked rejectConnectionRequest() after having already "
"accepted/rejected the connection to endpoint(id="
<< endpoint_id << ")";
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
callback.result_cb(
service_controller_->RejectConnection(client, endpoint_id));
});
}
void ServiceControllerRouter::InitiateBandwidthUpgrade(
ClientProxy* client, absl::string_view endpoint_id,
const ResultCallback& callback) {
RouteToServiceController(
"scr-init-bwu",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (!ClientHasAcquiredServiceController(client) ||
!client->IsConnectedToEndpoint(endpoint_id)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
service_controller_->InitiateBandwidthUpgrade(client, endpoint_id);
// Operation is triggered; the caller can listen to
// ConnectionListener::OnBandwidthChanged() to determine its success.
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::SendPayload(
ClientProxy* client, absl::Span<const std::string> endpoint_ids,
Payload payload, const ResultCallback& callback) {
// Payload is a move-only type.
// We have to capture it by value inside the lambda, and pass it over to
// the executor as an std::function<void()> instance.
// Lambda must be copyable, in order ot satisfy std::function<> requirements.
// To make it so, we need Payload wrapped by a copyable wrapper.
// std::shared_ptr<> is used, because it is copyable.
auto shared_payload = std::make_shared<Payload>(std::move(payload));
const std::vector<std::string> endpoints =
std::vector<std::string>(endpoint_ids.begin(), endpoint_ids.end());
RouteToServiceController(
"scr-send-payload",
[this, client, shared_payload, endpoints, callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
if (!ClientHasConnectionToAtLeastOneEndpoint(client, endpoints)) {
callback.result_cb({Status::kEndpointUnknown});
return;
}
service_controller_->SendPayload(client, endpoints,
std::move(*shared_payload));
// At this point, we've queued up the send Payload request with the
// ServiceController; any further failures (e.g. one of the endpoints is
// unknown, goes away, or otherwise fails) will be returned to the
// client as a PayloadTransferUpdate.
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::CancelPayload(ClientProxy* client,
std::uint64_t payload_id,
const ResultCallback& callback) {
RouteToServiceController("scr-cancel-payload", [this, client, payload_id,
callback]() {
if (!ClientHasAcquiredServiceController(client)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
callback.result_cb(service_controller_->CancelPayload(client, payload_id));
});
}
void ServiceControllerRouter::DisconnectFromEndpoint(
ClientProxy* client, absl::string_view endpoint_id,
const ResultCallback& callback) {
// Client can emit the cancellation at anytime, we need to execute the request
// without further posting it.
client->CancelEndpoint(std::string(endpoint_id));
RouteToServiceController(
"scr-disconnect-endpoint",
[this, client, endpoint_id = std::string(endpoint_id), callback]() {
if (ClientHasAcquiredServiceController(client)) {
if (!client->IsConnectedToEndpoint(endpoint_id) &&
!client->HasPendingConnectionToEndpoint(endpoint_id)) {
callback.result_cb({Status::kOutOfOrderApiCall});
return;
}
service_controller_->DisconnectFromEndpoint(client, endpoint_id);
callback.result_cb({Status::kSuccess});
}
});
}
void ServiceControllerRouter::StopAllEndpoints(ClientProxy* client,
const ResultCallback& callback) {
// Client can emit the cancellation at anytime, we need to execute the request
// without further posting it.
client->CancelAllEndpoints();
RouteToServiceController("scr-stop-all-endpoints", [this, client,
callback]() {
NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
<< " has requested us to stop all endpoints. We will now "
"reset the client.";
if (ClientHasAcquiredServiceController(client)) {
DoneWithStrategySessionForClient(client);
}
callback.result_cb({Status::kSuccess});
});
}
void ServiceControllerRouter::ClientDisconnecting(
ClientProxy* client, const ResultCallback& callback) {
// Client can emit the cancellation at anytime, we need to execute the request
// without further posting it.
client->CancelAllEndpoints();
RouteToServiceController("scr-client-disconnecting", [this, client,
callback]() {
if (ClientHasAcquiredServiceController(client)) {
DoneWithStrategySessionForClient(client);
NEARBY_LOGS(INFO) << "Client " << client->GetClientId()
<< " has completed the client's connection.";
}
callback.result_cb({Status::kSuccess});
});
}
Status ServiceControllerRouter::AcquireServiceControllerForClient(
ClientProxy* client, Strategy strategy) {
if (current_strategy_.IsNone()) {
// Case 1: There is no existing Strategy at all.
// Set everything up for the first time.
Status status = UpdateCurrentServiceControllerAndStrategy(strategy);
if (!status.Ok()) {
return status;
}
clients_.insert(client);
return {Status::kSuccess};
} else if (strategy == current_strategy_) {
// Case 2: The existing Strategy matches.
// The new client just needs to be added to the set of clients using the
// current ServiceController.
clients_.insert(client);
return {Status::kSuccess};
} else {
// Case 3: The existing Strategy doesn't match.
// It's only safe for a client to cause a switch if it's the only client
// using the current ServiceController.
bool is_the_only_client_of_service_controller =
clients_.size() == 1 && ClientHasAcquiredServiceController(client);
if (!is_the_only_client_of_service_controller) {
NEARBY_LOGS(INFO) << "Client has already active strategy.";
return {Status::kAlreadyHaveActiveStrategy};
}
// If the client still has connected endpoints, they must disconnect before
// they can switch.
if (!client->GetConnectedEndpoints().empty()) {
NEARBY_LOGS(INFO) << "Client has connected endpoints.";
return {Status::kOutOfOrderApiCall};
}
// By this point, it's safe to switch the Strategy and ServiceController
// (and since it's the only client, there's no need to add it to the set of
// clients using the current ServiceController).
return UpdateCurrentServiceControllerAndStrategy(strategy);
}
}
bool ServiceControllerRouter::ClientHasAcquiredServiceController(
ClientProxy* client) const {
return clients_.contains(client);
}
void ServiceControllerRouter::ReleaseServiceControllerForClient(
ClientProxy* client) {
clients_.erase(client);
// service_controller_ won't be released here. Instead, in destructor.
service_controller_->Stop();
if (clients_.empty()) {
current_strategy_ = Strategy{};
}
}
/** Clean up all state for this client. The client is now free to switch
* strategies. */
void ServiceControllerRouter::DoneWithStrategySessionForClient(
ClientProxy* client) {
// Disconnect from all the connected endpoints tied to this clientProxy.
for (auto& endpoint_id : client->GetPendingConnectedEndpoints()) {
service_controller_->DisconnectFromEndpoint(client, endpoint_id);
}
for (auto& endpoint_id : client->GetConnectedEndpoints()) {
service_controller_->DisconnectFromEndpoint(client, endpoint_id);
}
// Stop any advertising and discovery that may be underway due to this
// clientProxy.
service_controller_->StopAdvertising(client);
service_controller_->StopDiscovery(client);
ReleaseServiceControllerForClient(client);
}
void ServiceControllerRouter::RouteToServiceController(const std::string& name,
Runnable runnable) {
serializer_.Execute(name, std::move(runnable));
}
bool ServiceControllerRouter::ClientHasConnectionToAtLeastOneEndpoint(
ClientProxy* client, const std::vector<std::string>& remote_endpoint_ids) {
for (auto& endpoint_id : remote_endpoint_ids) {
if (client->IsConnectedToEndpoint(endpoint_id)) {
return true;
}
}
return false;
}
Status ServiceControllerRouter::UpdateCurrentServiceControllerAndStrategy(
Strategy strategy) {
if (!strategy.IsValid()) {
NEARBY_LOGS(INFO) << "Strategy is not valid.";
return {Status::kError};
}
service_controller_.reset(service_controller_factory_());
current_strategy_ = strategy;
return {Status::kSuccess};
}
} // namespace connections
} // namespace nearby
} // namespace location