| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "core/internal/bwu_manager.h" |
| |
| #include <algorithm> |
| #include <memory> |
| |
| #include "core/internal/bluetooth_bwu_handler.h" |
| #include "core/internal/bwu_handler.h" |
| #include "core/internal/offline_frames.h" |
| #include "core/internal/webrtc_bwu_handler.h" |
| #include "core/internal/wifi_lan_bwu_handler.h" |
| #include "platform/base/byte_array.h" |
| #include "platform/public/count_down_latch.h" |
| #include "proto/connections_enums.pb.h" |
| #include "absl/functional/bind_front.h" |
| #include "absl/time/time.h" |
| |
| namespace location { |
| namespace nearby { |
| namespace connections { |
| |
| using ::location::nearby::proto::connections::ConnectionAttemptResult; |
| using ::location::nearby::proto::connections::DisconnectionReason; |
| |
| BwuManager::BwuManager( |
| Mediums& mediums, EndpointManager& endpoint_manager, |
| EndpointChannelManager& channel_manager, |
| absl::flat_hash_map<Medium, std::unique_ptr<BwuHandler>> handlers, |
| Config config) |
| : config_(config), |
| mediums_(&mediums), |
| endpoint_manager_(&endpoint_manager), |
| channel_manager_(&channel_manager) { |
| if (config_.bandwidth_upgrade_retry_delay == absl::ZeroDuration()) { |
| config_.bandwidth_upgrade_retry_delay = absl::Seconds(5); |
| } |
| if (config_.bandwidth_upgrade_retry_max_delay == absl::ZeroDuration()) { |
| config_.bandwidth_upgrade_retry_max_delay = absl::Seconds(10); |
| } |
| if (config_.allow_upgrade_to.All(false)) { |
| config_.allow_upgrade_to.web_rtc = true; |
| } |
| if (!handlers.empty()) { |
| handlers_ = std::move(handlers); |
| } else { |
| InitBwuHandlers(); |
| } |
| |
| // Register the offline frame processor. |
| endpoint_manager_->RegisterFrameProcessor( |
| V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION, this); |
| } |
| |
| void BwuManager::InitBwuHandlers() { |
| // Register the supported concrete BwuMedium implementations. |
| BwuHandler::BwuNotifications notifications{ |
| .incoming_connection_cb = |
| absl::bind_front(&BwuManager::OnIncomingConnection, this), |
| }; |
| if (config_.allow_upgrade_to.wifi_lan) { |
| handlers_.emplace(Medium::WIFI_LAN, |
| std::make_unique<WifiLanBwuHandler>( |
| *mediums_, *channel_manager_, notifications)); |
| } |
| if (config_.allow_upgrade_to.web_rtc) { |
| handlers_.emplace(Medium::WEB_RTC, |
| std::make_unique<WebrtcBwuHandler>( |
| *mediums_, *channel_manager_, notifications)); |
| } |
| if (config_.allow_upgrade_to.bluetooth) { |
| handlers_.emplace(Medium::BLUETOOTH, |
| std::make_unique<BluetoothBwuHandler>( |
| *mediums_, *channel_manager_, notifications)); |
| } |
| } |
| |
| void BwuManager::Shutdown() { |
| NEARBY_LOG(INFO, "Initiating shutdown of BwuManager."); |
| |
| endpoint_manager_->UnregisterFrameProcessor( |
| V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION, this); |
| |
| // Stop all the ongoing Runnables (as gracefully as possible). |
| alarm_executor_.Shutdown(); |
| serial_executor_.Shutdown(); |
| |
| // After worker threads are down we became exclusive owners of data and |
| // may access it from current thread. |
| for (auto& item : previous_endpoint_channels_) { |
| EndpointChannel* channel = item.second.get(); |
| if (!channel) continue; |
| channel->Close(DisconnectionReason::SHUTDOWN); |
| } |
| |
| CancelAllRetryUpgradeAlarms(); |
| medium_ = Medium::UNKNOWN_MEDIUM; |
| for (auto& item : handlers_) { |
| BwuHandler& handler = *item.second; |
| handler.Revert(); |
| } |
| handlers_.clear(); |
| |
| NEARBY_LOG(INFO, "BwuHandler has shut down."); |
| } |
| |
| // This is the point on the Initiator side where the |
| // medium_ is set. |
| void BwuManager::InitiateBwuForEndpoint(ClientProxy* client, |
| const std::string& endpoint_id, |
| Medium new_medium) { |
| RunOnBwuManagerThread([this, client, endpoint_id, new_medium]() { |
| Medium proposed_medium = ChooseBestUpgradeMedium( |
| client->GetUpgradeMediums(endpoint_id).GetMediums(true)); |
| if (new_medium != Medium::UNKNOWN_MEDIUM) { |
| proposed_medium = new_medium; |
| } |
| auto* handler = SetCurrentBwuHandler(proposed_medium); |
| |
| if (!handler) return; |
| |
| if (in_progress_upgrades_.contains(endpoint_id)) { |
| return; |
| } |
| |
| auto channel = channel_manager_->GetChannelForEndpoint(endpoint_id); |
| |
| if (channel == nullptr) { |
| return; |
| } |
| |
| // Ignore requests where the medium we're upgrading to is the medium we're |
| // already connected over. This can happen now that Bluetooth is both an |
| // advertising medium and a potential bandwidth upgrade, and will continue |
| // to be possible as we add other new advertising mediums like mDNS (WiFi |
| // LAN). Very specifically, this happens now when a device uses P2P_CLUSTER, |
| // connects over Bluetooth, and is not connected to LAN. Bluetooth is the |
| // best medium, and we attempt to upgrade from Bluetooth to Bluetooth. |
| if (medium_ == channel->GetMedium()) { |
| return; |
| } |
| |
| std::string service_id = client->GetServiceId(); |
| ByteArray bytes = handler->InitializeUpgradedMediumForEndpoint( |
| client, service_id, endpoint_id); |
| |
| // Because we grab the endpointChannel first thing, it is possible the |
| // endpointChannel is stale by the time we attempt to write over it. |
| if (bytes.Empty()) { |
| NEARBY_LOG(ERROR, |
| "Couldn't complete the upgrade for endpoint " |
| "%s to %d because it failed to initialize the " |
| "BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE OfflineFrame.", |
| endpoint_id.c_str(), medium_); |
| UpgradePathInfo info; |
| info.set_medium(parser::MediumToUpgradePathInfoMedium(medium_)); |
| |
| ProcessUpgradeFailureEvent(client, endpoint_id, info); |
| return; |
| } |
| if (!channel->Write(bytes).Ok()) { |
| NEARBY_LOG(ERROR, |
| "Couldn't complete the upgrade for endpoint %s to %d because " |
| "it failed to write the " |
| "BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE OfflineFrame.", |
| endpoint_id.c_str(), medium_); |
| return; |
| } |
| |
| NEARBY_LOG(INFO, |
| "Successfully wrote the BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE " |
| "OfflineFrame while upgrading endpoint %s to %d.", |
| endpoint_id.c_str(), medium_); |
| in_progress_upgrades_.emplace(endpoint_id, client); |
| }); |
| } |
| |
| void BwuManager::OnIncomingFrame(OfflineFrame& frame, |
| const std::string& endpoint_id, |
| ClientProxy* client, Medium medium) { |
| if (parser::GetFrameType(frame) != V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION) |
| return; |
| auto bwu_frame = frame.v1().bandwidth_upgrade_negotiation(); |
| CountDownLatch latch(1); |
| RunOnBwuManagerThread([this, client, endpoint_id, &bwu_frame, &latch]() { |
| OnBwuNegotiationFrame(client, bwu_frame, endpoint_id); |
| latch.CountDown(); |
| }); |
| latch.Await(); |
| } |
| |
| void BwuManager::OnEndpointDisconnect(ClientProxy* client, |
| const std::string& endpoint_id, |
| CountDownLatch barrier) { |
| RunOnBwuManagerThread([this, client, endpoint_id, barrier]() mutable { |
| if (medium_ == Medium::UNKNOWN_MEDIUM) { |
| barrier.CountDown(); |
| return; |
| } |
| |
| if (handler_) { |
| handler_->OnEndpointDisconnect(client, endpoint_id); |
| } |
| |
| auto item = previous_endpoint_channels_.extract(endpoint_id); |
| |
| if (!item.empty()) { |
| auto old_channel = item.mapped(); |
| if (old_channel != nullptr) { |
| old_channel->Close(DisconnectionReason::SHUTDOWN); |
| } |
| } |
| in_progress_upgrades_.erase(endpoint_id); |
| CancelRetryUpgradeAlarm(endpoint_id); |
| |
| successfully_upgraded_endpoints_.erase(endpoint_id); |
| |
| // If this was our very last endpoint: |
| // |
| // a) revert all the changes for currentBwuMedium. |
| // b) reset currentBwuMedium. |
| if (channel_manager_->GetConnectedEndpointsCount() <= 1) { |
| Revert(); |
| } |
| barrier.CountDown(); |
| }); |
| } |
| |
| BwuHandler* BwuManager::SetCurrentBwuHandler(Medium medium) { |
| handler_ = nullptr; |
| medium_ = medium; |
| if (medium != Medium::UNKNOWN_MEDIUM) { |
| auto item = handlers_.find(medium); |
| if (item != handlers_.end()) { |
| handler_ = item->second.get(); |
| } |
| } |
| return handler_; |
| } |
| |
| void BwuManager::Revert() { |
| if (handler_) { |
| handler_->Revert(); |
| medium_ = Medium::UNKNOWN_MEDIUM; |
| handler_ = nullptr; |
| } |
| } |
| |
| void BwuManager::OnBwuNegotiationFrame(ClientProxy* client, |
| const BwuNegotiationFrame& frame, |
| const string& endpoint_id) { |
| switch (frame.event_type()) { |
| case BwuNegotiationFrame::UPGRADE_PATH_AVAILABLE: |
| ProcessBwuPathAvailableEvent(client, endpoint_id, |
| frame.upgrade_path_info()); |
| break; |
| case BwuNegotiationFrame::UPGRADE_FAILURE: |
| ProcessUpgradeFailureEvent(client, endpoint_id, |
| frame.upgrade_path_info()); |
| break; |
| case BwuNegotiationFrame::LAST_WRITE_TO_PRIOR_CHANNEL: |
| ProcessLastWriteToPriorChannelEvent(client, endpoint_id); |
| break; |
| case BwuNegotiationFrame::SAFE_TO_CLOSE_PRIOR_CHANNEL: |
| ProcessSafeToClosePriorChannelEvent(client, endpoint_id); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| void BwuManager::OnIncomingConnection( |
| ClientProxy* client, |
| std::unique_ptr<BwuHandler::IncomingSocketConnection> mutable_connection) { |
| std::shared_ptr<BwuHandler::IncomingSocketConnection> connection( |
| mutable_connection.release()); |
| RunOnBwuManagerThread([this, client, connection]() { |
| EndpointChannel* channel = connection->channel.get(); |
| if (channel == nullptr) { |
| connection->socket->Close(); |
| return; |
| } |
| |
| ClientIntroduction introduction; |
| if (!ReadClientIntroductionFrame(channel, introduction)) { |
| // This was never a fully EstablishedConnection, no need to provide a |
| // closure reason. |
| channel->Close(); |
| return; |
| } |
| |
| if (!WriteClientIntroductionAckFrame(channel)) { |
| // This was never a fully EstablishedConnection, no need to provide a |
| // closure reason. |
| channel->Close(); |
| return; |
| } |
| |
| const std::string& endpoint_id = introduction.endpoint_id(); |
| auto item = in_progress_upgrades_.extract(endpoint_id); |
| if (item.empty()) return; |
| ClientProxy* mapped_client = item.mapped(); |
| CancelRetryUpgradeAlarm(endpoint_id); |
| if (mapped_client == nullptr) { |
| // This was never a fully EstablishedConnection, no need to provide a |
| // closure reason. |
| channel->Close(); |
| return; |
| } |
| |
| CHECK(client == mapped_client); |
| |
| // Use the introductory client information sent over to run the upgrade |
| // protocol. |
| RunUpgradeProtocol(mapped_client, endpoint_id, |
| std::move(connection->channel)); |
| }); |
| } |
| |
| void BwuManager::RunOnBwuManagerThread(Runnable runnable) { |
| serial_executor_.Execute(std::move(runnable)); |
| } |
| |
| void BwuManager::RunUpgradeProtocol( |
| ClientProxy* client, const std::string& endpoint_id, |
| std::unique_ptr<EndpointChannel> new_channel) { |
| // First, register this new EndpointChannel as *the* EndpointChannel to use |
| // for this endpoint here onwards. NOTE: We pause this new EndpointChannel |
| // until we've completely drained the old EndpointChannel to avoid out of |
| // order reads on the other side. This is a consequence of using the same |
| // UKEY2 context for both the previous and new EndpointChannels. UKEY2 uses |
| // sequence numbers for writes and reads, and simultaneously sending Payloads |
| // on the new channel and control messages on the old channel cause the other |
| // side to read messages out of sequence |
| new_channel->Pause(); |
| auto old_channel = channel_manager_->GetChannelForEndpoint(endpoint_id); |
| if (!old_channel) return; |
| channel_manager_->ReplaceChannelForEndpoint(client, endpoint_id, |
| std::move(new_channel)); |
| |
| // Next, initiate a clean shutdown for the previous EndpointChannel used for |
| // this endpoint by telling the remote device that it will not receive any |
| // more writes over that EndpointChannel. |
| if (!old_channel->Write(parser::ForBwuLastWrite()).Ok()) { |
| return; |
| } |
| |
| // The remainder of this clean shutdown for the previous EndpointChannel will |
| // continue when we receive a corresponding |
| // BANDWIDTH_UPGRADE_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame from |
| // the remote device, so for now, just store that previous EndpointChannel. |
| previous_endpoint_channels_.emplace(endpoint_id, old_channel); |
| |
| // If we already read LAST_WRITE on the old endpoint channel, then we can |
| // safely close it now. |
| auto item = successfully_upgraded_endpoints_.extract(endpoint_id); |
| if (!item.empty()) { |
| ProcessLastWriteToPriorChannelEvent(client, endpoint_id); |
| } |
| } |
| |
| // Outgoing BWU session. |
| void BwuManager::ProcessBwuPathAvailableEvent( |
| ClientProxy* client, const string& endpoint_id, |
| const UpgradePathInfo& upgrade_path_info) { |
| Medium medium = |
| parser::UpgradePathInfoMediumToMedium(upgrade_path_info.medium()); |
| if (medium_ == Medium::UNKNOWN_MEDIUM) { |
| SetCurrentBwuHandler(medium); |
| } |
| // Check for the correct medium so we don't process an incorrect OfflineFrame. |
| if (medium != medium_) { |
| RunUpgradeFailedProtocol(client, endpoint_id, upgrade_path_info); |
| return; |
| } |
| |
| auto channel = ProcessBwuPathAvailableEventInternal(client, endpoint_id, |
| upgrade_path_info); |
| ConnectionAttemptResult connectionAttemptResult; |
| if (channel != nullptr) { |
| connectionAttemptResult = ConnectionAttemptResult::RESULT_SUCCESS; |
| } else { |
| connectionAttemptResult = ConnectionAttemptResult::RESULT_ERROR; |
| } |
| |
| if (channel == nullptr) { |
| RunUpgradeFailedProtocol(client, endpoint_id, upgrade_path_info); |
| return; |
| } |
| |
| RunUpgradeProtocol(client, endpoint_id, std::move(channel)); |
| } |
| |
| std::unique_ptr<EndpointChannel> |
| BwuManager::ProcessBwuPathAvailableEventInternal( |
| ClientProxy* client, const string& endpoint_id, |
| const UpgradePathInfo& upgrade_path_info) { |
| std::unique_ptr<EndpointChannel> channel = |
| handler_->CreateUpgradedEndpointChannel(client, client->GetServiceId(), |
| endpoint_id, upgrade_path_info); |
| if (!channel) { |
| return nullptr; |
| } |
| |
| // Write the requisite BANDWIDTH_UPGRADE_NEGOTIATION.CLIENT_INTRODUCTION as |
| // the first OfflineFrame on this new EndpointChannel. |
| if (!channel->Write(parser::ForBwuIntroduction(client->GetLocalEndpointId())) |
| .Ok()) { |
| // This was never a fully EstablishedConnection, no need to provide a |
| // closure reason. |
| channel->Close(); |
| |
| NEARBY_LOG( |
| ERROR, |
| "Failed to write BWU_NEGOTIATION.CLIENT_INTRODUCTION OfflineFrame to " |
| "newly-created EndpointChannel %s, aborting upgrade.", |
| channel->GetName().c_str()); |
| |
| return {}; |
| } |
| |
| if (upgrade_path_info.supports_client_introduction_ack()) { |
| if (!ReadClientIntroductionAckFrame(channel.get())) { |
| // This was never a fully EstablishedConnection, no need to provide a |
| // closure reason. |
| channel->Close(); |
| |
| NEARBY_LOG( |
| ERROR, |
| "Failed to read BWU_NEGOTIATION.CLIENT_INTRODUCTION_ACK OfflineFrame " |
| "to newly-created EndpointChannel %s, aborting upgrade.", |
| channel->GetName().c_str()); |
| |
| return {}; |
| } |
| } |
| |
| NEARBY_LOG( |
| INFO, |
| "Successfully wrote BWU_NEGOTIATION.CLIENT_INTRODUCTION OfflineFrame to " |
| "newly-created EndpointChannel %s while upgrading endpoint %s.", |
| channel->GetName().c_str(), endpoint_id.c_str()); |
| |
| // Set the AnalyticsRecorder so that the future closure of this |
| // EndpointChannel will be recorded. |
| return channel; |
| } |
| |
| void BwuManager::RunUpgradeFailedProtocol( |
| ClientProxy* client, const std::string& endpoint_id, |
| const UpgradePathInfo& upgrade_path_info) { |
| // We attempted to connect to the new medium that the remote device has set up |
| // for us but we failed. We need to let the remote device know so that they |
| // can pick another medium for us to try. |
| std::shared_ptr<EndpointChannel> channel = |
| channel_manager_->GetChannelForEndpoint(endpoint_id); |
| if (!channel) { |
| NEARBY_LOG(ERROR, |
| "Couldn't find a previous EndpointChannel for %s " |
| "when sending an upgrade failure frame, short-circuiting the " |
| "upgrade protocol.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| // Report UPGRADE_FAILURE to the remote device. |
| if (!channel->Write(parser::ForBwuFailure(upgrade_path_info)).Ok()) { |
| channel->Close(DisconnectionReason::IO_ERROR); |
| |
| NEARBY_LOG( |
| ERROR, |
| "Failed to write BANDWIDTH_UPGRADE_NEGOTIATION.UPGRADE_FAILURE " |
| "OfflineFrame to endpoint %s, short-circuiting the upgrade protocol.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| // And lastly, clean up our currentBwuMedium since we failed to |
| // utilize it anyways. |
| if (medium_ != Medium::UNKNOWN_MEDIUM) { |
| Revert(); |
| } |
| } |
| |
| bool BwuManager::ReadClientIntroductionFrame(EndpointChannel* channel, |
| ClientIntroduction& introduction) { |
| auto data = channel->Read(); |
| if (!data.ok()) return false; |
| auto transfer(parser::FromBytes(data.result())); |
| if (!transfer.ok()) return false; |
| OfflineFrame frame = transfer.result(); |
| if (!frame.has_v1() || !frame.v1().has_bandwidth_upgrade_negotiation()) |
| return false; |
| if (frame.v1().bandwidth_upgrade_negotiation().event_type() != |
| BandwidthUpgradeNegotiationFrame::CLIENT_INTRODUCTION) |
| return false; |
| const auto& frame_intro = |
| frame.v1().bandwidth_upgrade_negotiation().client_introduction(); |
| introduction = frame_intro; |
| return true; |
| } |
| |
| bool BwuManager::ReadClientIntroductionAckFrame(EndpointChannel* channel) { |
| auto data = channel->Read(); |
| if (!data.ok()) return false; |
| auto transfer(parser::FromBytes(data.result())); |
| if (!transfer.ok()) return false; |
| OfflineFrame frame = transfer.result(); |
| if (!frame.has_v1() || !frame.v1().has_bandwidth_upgrade_negotiation()) |
| return false; |
| if (frame.v1().bandwidth_upgrade_negotiation().event_type() != |
| BandwidthUpgradeNegotiationFrame::CLIENT_INTRODUCTION_ACK) |
| return false; |
| return true; |
| } |
| |
| bool BwuManager::WriteClientIntroductionAckFrame(EndpointChannel* channel) { |
| return channel->Write(parser::ForBwuIntroductionAck()).Ok(); |
| } |
| |
| void BwuManager::ProcessLastWriteToPriorChannelEvent( |
| ClientProxy* client, const std::string& endpoint_id) { |
| // By this point in the upgrade protocol, there is the guarantee that both |
| // involved endpoints have registered a new EndpointChannel with the |
| // EndpointChannelManager as the official channel for communication; given |
| // the way communication is structured in the EndpointManager, this means |
| // that all new writes are happening over that new EndpointChannel, but |
| // reads are still happening over this prior EndpointChannel (to avoid data |
| // loss). But now that we've received this definitive final write over that |
| // prior EndpointChannel, we can let the remote device that they can safely |
| // close their end of this now-dormant EndpointChannel. |
| EndpointChannel* previous_endpoint_channel = |
| previous_endpoint_channels_[endpoint_id].get(); |
| if (!previous_endpoint_channel) { |
| NEARBY_LOG( |
| ERROR, |
| "Received a BWU_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame " |
| "for unknown endpoint %s, can't complete the upgrade protocol.", |
| endpoint_id.c_str()); |
| |
| successfully_upgraded_endpoints_.emplace(endpoint_id); |
| return; |
| } |
| |
| if (!previous_endpoint_channel->Write(parser::ForBwuSafeToClose()).Ok()) { |
| previous_endpoint_channel->Close(DisconnectionReason::IO_ERROR); |
| // Remove this prior EndpointChannel from previous_endpoint_channels to |
| // avoid leaks. |
| previous_endpoint_channels_.erase(endpoint_id); |
| |
| NEARBY_LOG( |
| ERROR, |
| "Failed to write BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL " |
| "OfflineFrame to endpoint %s, short-circuiting the upgrade protocol.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| // The upgrade protocol's clean shutdown of the prior EndpointChannel will |
| // conclude when we receive a corresponding |
| // BANDWIDTH_UPGRADE_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL OfflineFrame |
| // from the remote device. |
| } |
| |
| void BwuManager::ProcessSafeToClosePriorChannelEvent( |
| ClientProxy* client, const std::string& endpoint_id) { |
| // By this point in the upgrade protocol, there's no more writes happening |
| // over the prior EndpointChannel, and the remote device has given us the |
| // go-ahead to close this EndpointChannel [1], so we can safely close it |
| // (and depend on the EndpointManager querying the EndpointChannelManager to |
| // start reading from the new EndpointChannel). |
| // |
| // [1] Which also implies that they've received our |
| // BANDWIDTH_UPGRADE_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame), |
| // so there can be no data loss, regardless of whether the EndpointChannel |
| // allows reads of queued, unread data after the EndpointChannel has been |
| // closed from the other end (as is the case with conventional TCP sockets) |
| // or not (as is the case with Android's Bluetooth sockets, where closing |
| // instantly throws an IOException on the remote device). |
| auto item = previous_endpoint_channels_.extract(endpoint_id); |
| auto& previous_endpoint_channel = item.mapped(); |
| if (previous_endpoint_channel == nullptr) { |
| NEARBY_LOG( |
| ERROR, |
| "Received a BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL OfflineFrame " |
| "for unknown endpoint %s, can't complete the upgrade protocol.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| NEARBY_LOG(INFO, |
| "BwuManager successfully received a " |
| "BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL OfflineFrame while " |
| "trying to upgrade endpoint %s.", |
| endpoint_id.c_str()); |
| |
| // Each encrypted message includes the key to decrypt the next message. The |
| // disconnect message is optional and may not be received under normal |
| // circumstances so it is necessary to send it unencrypted. This way the |
| // serial crypto context does not increment here. |
| previous_endpoint_channel->DisableEncryption(); |
| previous_endpoint_channel->Write(parser::ForDisconnection()); |
| |
| // TODO(b/172380349): Match the Java implementation with no sleep call |
| |
| // Wait for in-flight messages to reach their peers. |
| SystemClock::Sleep(absl::Seconds(1)); |
| previous_endpoint_channel->Close(DisconnectionReason::UPGRADED); |
| |
| // Now that the old channel has been drained, we can unpause the new channel |
| std::shared_ptr<EndpointChannel> channel = |
| channel_manager_->GetChannelForEndpoint(endpoint_id); |
| |
| if (!channel) { |
| NEARBY_LOG(ERROR, |
| "Attempted to resume the current EndpointChannel with endpoint " |
| "%s, but none was found", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| channel->Resume(); |
| |
| // Report the success to the client |
| client->OnBandwidthChanged(endpoint_id, channel->GetMedium()); |
| } |
| |
| void BwuManager::ProcessUpgradeFailureEvent( |
| ClientProxy* client, const std::string& endpoint_id, |
| const UpgradePathInfo& upgrade_info) { |
| // The remote device failed to upgrade to the new medium we set up for them. |
| // That's alright! We'll just try the next available medium (if there is |
| // one). |
| in_progress_upgrades_.erase(endpoint_id); |
| |
| // The first thing we have to do is to replace our |
| // currentBwuMedium with the next best upgrade medium we share |
| // with the remote device. The catch is that we can only do this if we only |
| // have one connected endpoint. Otherwise, we'll end up disrupting our other |
| // connected peers. |
| if (channel_manager_->GetConnectedEndpointsCount() > 1) { |
| // We can't change the currentBwuMedium, so there are no more |
| // upgrade attempts for this endpoint. Sorry. |
| NEARBY_LOG( |
| ERROR, |
| "Failed to attempt a new bandwidth upgrade for endpoint %s because we " |
| "have other connected endpoints and can't try a new upgrade medium.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| // Revert the existing upgrade medium for now. |
| if (medium_ != Medium::UNKNOWN_MEDIUM) { |
| Revert(); |
| } |
| |
| // Loop through the ordered list of upgrade mediums. One by one, remove the |
| // top element until we get to the medium we last attempted to upgrade to. |
| // The remainder of the list will contain the mediums we haven't attempted |
| // yet. |
| Medium last = parser::UpgradePathInfoMediumToMedium(upgrade_info.medium()); |
| std::vector<Medium> all_possible_mediums = |
| client->GetUpgradeMediums(endpoint_id).GetMediums(true); |
| std::vector<Medium> untried_mediums(all_possible_mediums); |
| for (Medium medium : all_possible_mediums) { |
| untried_mediums.erase(untried_mediums.begin()); |
| if (medium == last) { |
| break; |
| } |
| } |
| |
| RetryUpgradeMediums(client, endpoint_id, untried_mediums); |
| } |
| |
| void BwuManager::RetryUpgradeMediums(ClientProxy* client, |
| const std::string& endpoint_id, |
| std::vector<Medium> upgrade_mediums) { |
| Medium next_medium = ChooseBestUpgradeMedium(upgrade_mediums); |
| |
| // If current medium is not WiFi and we have not succeeded with upgrading |
| // yet, retry upgrade. |
| Medium current_medium = GetEndpointMedium(endpoint_id); |
| if (current_medium != Medium::WIFI_LAN && |
| (next_medium == current_medium || next_medium == Medium::UNKNOWN_MEDIUM || |
| upgrade_mediums.empty())) { |
| RetryUpgradesAfterDelay(client, endpoint_id); |
| return; |
| } |
| |
| // Attempt to set the new upgrade medium. |
| if (!SetCurrentBwuHandler(next_medium)) { |
| NEARBY_LOG( |
| INFO, |
| "BwuManager failed to attempt a new bandwidth upgrade for endpoint %s " |
| "because we couldn't set a new bandwidth upgrade medium.", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| // Now that we've successfully picked a new upgrade medium to try, |
| // re-initiate the bandwidth upgrade. |
| NEARBY_LOG(INFO, |
| "BwuManager is attempting to upgrade endpoint %s again with a new " |
| " bandwidth upgrade medium.", |
| endpoint_id.c_str()); |
| InitiateBwuForEndpoint(client, endpoint_id); |
| } |
| |
| std::vector<Medium> BwuManager::StripOutUnavailableMediums( |
| const std::vector<Medium>& mediums) { |
| std::vector<Medium> available_mediums; |
| for (Medium m : mediums) { |
| bool available = false; |
| switch (m) { |
| case Medium::WIFI_LAN: |
| available = mediums_->GetWifiLan().IsAvailable(); |
| break; |
| case Medium::WEB_RTC: |
| available = mediums_->GetWebRtc().IsAvailable(); |
| break; |
| case Medium::BLUETOOTH: |
| available = mediums_->GetBluetoothClassic().IsAvailable(); |
| break; |
| default: |
| break; |
| } |
| if (available) { |
| available_mediums.push_back(m); |
| } |
| } |
| return available_mediums; |
| } |
| |
| // Returns the optimal medium supported by both devices. |
| // Each medium in the passed in list is checked for its availability with the |
| // medium_manager_ to ensure that the chosen upgrade medium is supported and |
| // available locally before continuing the upgrade. Once we pick a medium, all |
| // future connections will use it too. eg. If we chose Wifi LAN, we'll attempt |
| // to upgrade the 2nd, 3rd, etc remote endpoints with Wifi LAN even if they're |
| // on a different network (or had a better medium). This is a quick and easy |
| // way to prevent mediums, like Wifi Hotspot, from interfering with active |
| // connections (although it's suboptimal for bandwidth throughput). When all |
| // endpoints disconnect, we reset the bandwidth upgrade medium. |
| Medium BwuManager::ChooseBestUpgradeMedium(const std::vector<Medium>& mediums) { |
| auto available_mediums = StripOutUnavailableMediums(mediums); |
| if (medium_ == Medium::UNKNOWN_MEDIUM) { |
| if (!available_mediums.empty()) { |
| // Case 1: This is our first time upgrading, and we have at least one |
| // supported medium to choose from. Return the first medium in the list, |
| // since they are ordered by preference. |
| return available_mediums[0]; |
| } |
| // Case 2: This is our first time upgrading, but there are no available |
| // upgrade mediums. Fall through to returning UNKNOWN_MEDIUM at the |
| // bottom. |
| NEARBY_LOG( |
| INFO, |
| "Current upgrade medium is unset, but there are no common supported " |
| "upgrade mediums."); |
| } else { |
| // Case 3: We have already upgraded, and there is a list of supported |
| // mediums to check against. Return the current upgrade medium if it's in |
| // the supported list. |
| if (std::find(available_mediums.begin(), available_mediums.end(), |
| medium_) != available_mediums.end()) { |
| return medium_; |
| } |
| // Case 4: We have already upgraded, but the current medium is not |
| // supported by the remote endpoint (it's not in the list, or the list is |
| // empty). Fall through and return Medium.UNKNOWN_MEDIUM because we cannot |
| // continue with the current upgrade medium, and we are not allowed to |
| // switch. |
| NEARBY_LOG( |
| INFO, |
| "Current upgrade medium %d is not supported by the remote endpoint", |
| medium_); |
| } |
| |
| return Medium::UNKNOWN_MEDIUM; |
| } |
| |
| void BwuManager::RetryUpgradesAfterDelay(ClientProxy* client, |
| const std::string& endpoint_id) { |
| absl::Duration delay = CalculateNextRetryDelay(endpoint_id); |
| CancelRetryUpgradeAlarm(endpoint_id); |
| CancelableAlarm alarm( |
| "BWU alarm", |
| [this, client, endpoint_id]() { |
| RunOnBwuManagerThread([this, client, endpoint_id]() { |
| if (!client->IsConnectedToEndpoint(endpoint_id)) { |
| return; |
| } |
| RetryUpgradeMediums( |
| client, endpoint_id, |
| client->GetUpgradeMediums(endpoint_id).GetMediums(true)); |
| }); |
| }, |
| delay, &alarm_executor_); |
| |
| retry_upgrade_alarms_.emplace(endpoint_id, |
| std::make_pair(std::move(alarm), delay)); |
| NEARBY_LOGS(INFO) << "Retry bandwidth upgrade after " << delay; |
| } |
| |
| absl::Duration BwuManager::CalculateNextRetryDelay( |
| const std::string& endpoint_id) { |
| auto item = retry_upgrade_alarms_.find(endpoint_id); |
| auto initial_delay = config_.bandwidth_upgrade_retry_delay; |
| auto delay = item == retry_upgrade_alarms_.end() |
| ? initial_delay |
| : item->second.second + initial_delay; |
| return std::min(delay, config_.bandwidth_upgrade_retry_max_delay); |
| } |
| |
| void BwuManager::CancelRetryUpgradeAlarm(const std::string& endpoint_id) { |
| auto item = retry_upgrade_alarms_.extract(endpoint_id); |
| if (item.empty()) return; |
| auto& pair = item.mapped(); |
| pair.first.Cancel(); |
| } |
| |
| void BwuManager::CancelAllRetryUpgradeAlarms() { |
| for (const auto& item : retry_upgrade_alarms_) { |
| const std::string& endpoint_id = item.first; |
| CancelRetryUpgradeAlarm(endpoint_id); |
| } |
| } |
| |
| Medium BwuManager::GetEndpointMedium(const std::string& endpoint_id) { |
| auto channel = channel_manager_->GetChannelForEndpoint(endpoint_id); |
| return channel == nullptr ? Medium::UNKNOWN_MEDIUM : channel->GetMedium(); |
| } |
| |
| } // namespace connections |
| } // namespace nearby |
| } // namespace location |