blob: 86ca7c8ceda658dfb3390fe2862c23f6dd4c4b84 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connections/implementation/bwu_manager.h"
#include <algorithm>
#include <memory>
#include "absl/functional/bind_front.h"
#include "absl/time/time.h"
#include "connections/implementation/bluetooth_bwu_handler.h"
#include "connections/implementation/bwu_handler.h"
#include "connections/implementation/offline_frames.h"
#include "connections/implementation/webrtc_bwu_handler.h"
#include "connections/implementation/wifi_lan_bwu_handler.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/feature_flags.h"
#include "internal/platform/count_down_latch.h"
#include "proto/connections_enums.pb.h"
namespace location {
namespace nearby {
namespace connections {
using ::location::nearby::proto::connections::DisconnectionReason;
// Required for C++ 14 support in Chrome
constexpr absl::Duration BwuManager::kReadClientIntroductionFrameTimeout;
BwuManager::BwuManager(
Mediums& mediums, EndpointManager& endpoint_manager,
EndpointChannelManager& channel_manager,
absl::flat_hash_map<Medium, std::unique_ptr<BwuHandler>> handlers,
Config config)
: config_(config),
mediums_(&mediums),
endpoint_manager_(&endpoint_manager),
channel_manager_(&channel_manager) {
if (config_.bandwidth_upgrade_retry_delay == absl::ZeroDuration()) {
if (FeatureFlags::GetInstance().GetFlags().use_exp_backoff_in_bwu_retry) {
config_.bandwidth_upgrade_retry_delay =
FeatureFlags::GetInstance()
.GetFlags()
.bwu_retry_exp_backoff_initial_delay;
} else {
config_.bandwidth_upgrade_retry_delay = absl::Seconds(5);
}
}
if (config_.bandwidth_upgrade_retry_max_delay == absl::ZeroDuration()) {
if (FeatureFlags::GetInstance().GetFlags().use_exp_backoff_in_bwu_retry) {
config_.bandwidth_upgrade_retry_max_delay =
FeatureFlags::GetInstance()
.GetFlags()
.bwu_retry_exp_backoff_maximum_delay;
} else {
config_.bandwidth_upgrade_retry_max_delay = absl::Seconds(10);
}
}
if (config_.allow_upgrade_to.All(false)) {
config_.allow_upgrade_to.web_rtc = true;
config_.allow_upgrade_to.wifi_lan = true;
}
if (!handlers.empty()) {
handlers_ = std::move(handlers);
} else {
InitBwuHandlers();
}
// Register the offline frame processor.
endpoint_manager_->RegisterFrameProcessor(
V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION, this);
}
BwuManager::~BwuManager() {
NEARBY_LOGS(INFO) << "BwuManager going down";
Shutdown();
}
void BwuManager::InitBwuHandlers() {
// Register the supported concrete BwuMedium implementations.
BwuHandler::BwuNotifications notifications{
.incoming_connection_cb =
absl::bind_front(&BwuManager::OnIncomingConnection, this),
};
if (config_.allow_upgrade_to.wifi_lan) {
handlers_.emplace(Medium::WIFI_LAN,
std::make_unique<WifiLanBwuHandler>(
*mediums_, *channel_manager_, notifications));
}
if (config_.allow_upgrade_to.web_rtc) {
handlers_.emplace(Medium::WEB_RTC,
std::make_unique<WebrtcBwuHandler>(
*mediums_, *channel_manager_, notifications));
}
if (config_.allow_upgrade_to.bluetooth) {
handlers_.emplace(Medium::BLUETOOTH,
std::make_unique<BluetoothBwuHandler>(
*mediums_, *channel_manager_, notifications));
}
}
void BwuManager::Shutdown() {
NEARBY_LOGS(INFO) << "Initiating shutdown of BwuManager.";
endpoint_manager_->UnregisterFrameProcessor(
V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION, this);
// Stop all the ongoing Runnables (as gracefully as possible).
alarm_executor_.Shutdown();
serial_executor_.Shutdown();
// After worker threads are down we became exclusive owners of data and
// may access it from current thread.
for (auto& item : previous_endpoint_channels_) {
EndpointChannel* channel = item.second.get();
if (!channel) continue;
channel->Close(DisconnectionReason::SHUTDOWN);
}
CancelAllRetryUpgradeAlarms();
medium_ = Medium::UNKNOWN_MEDIUM;
for (auto& item : handlers_) {
BwuHandler& handler = *item.second;
handler.Revert();
}
handlers_.clear();
NEARBY_LOGS(INFO) << "BwuHandler has shut down.";
}
// This is the point on the Initiator side where the
// medium_ is set.
void BwuManager::InitiateBwuForEndpoint(ClientProxy* client,
const std::string& endpoint_id,
Medium new_medium) {
NEARBY_LOGS(INFO) << "InitiateBwuForEndpoint for endpoint " << endpoint_id
<< " with medium " << new_medium;
RunOnBwuManagerThread("bwu-init", [this, client, endpoint_id, new_medium]() {
Medium proposed_medium = ChooseBestUpgradeMedium(
client->GetUpgradeMediums(endpoint_id).GetMediums(true));
if (new_medium != Medium::UNKNOWN_MEDIUM) {
proposed_medium = new_medium;
}
auto* handler = SetCurrentBwuHandler(proposed_medium);
if (!handler) {
NEARBY_LOGS(ERROR)
<< "BwuManager cannot initiate bandwidth upgrade for endpoint "
<< endpoint_id
<< " because the current BandwidthUpgradeMedium cannot be deduced.";
return;
}
if (in_progress_upgrades_.contains(endpoint_id)) {
NEARBY_LOGS(INFO)
<< "BwuManager is ignoring bandwidth upgrade for endpoint "
<< endpoint_id
<< " because we're already upgrading bandwidth for that endpoint.";
return;
}
CancelRetryUpgradeAlarm(endpoint_id);
auto channel = channel_manager_->GetChannelForEndpoint(endpoint_id);
Medium channel_medium =
channel ? channel->GetMedium() : Medium::UNKNOWN_MEDIUM;
client->GetAnalyticsRecorder().OnBandwidthUpgradeStarted(
endpoint_id, channel_medium, medium_, proto::connections::INCOMING,
client->GetConnectionToken(endpoint_id));
if (channel == nullptr) {
NEARBY_LOGS(INFO)
<< "BwuManager couldn't complete the upgrade for endpoint "
<< endpoint_id
<< " because it couldn't find an existing EndpointChannel for it.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::CHANNEL_ERROR,
proto::connections::NETWORK_AVAILABLE);
return;
}
// Ignore requests where the medium we're upgrading to is the medium we're
// already connected over. This can happen now that Bluetooth is both an
// advertising medium and a potential bandwidth upgrade, and will continue
// to be possible as we add other new advertising mediums like mDNS (WiFi
// LAN). Very specifically, this happens now when a device uses P2P_CLUSTER,
// connects over Bluetooth, and is not connected to LAN. Bluetooth is the
// best medium, and we attempt to upgrade from Bluetooth to Bluetooth.
// if (medium_ == channel->GetMedium()) {
NEARBY_LOGS(INFO) << "BwuManager ignoring the upgrade for endpoint "
<< endpoint_id
<< " because it is already connected over medium "
<< proto::connections::Medium_Name(medium_);
if (medium_ == channel->GetMedium()) {
return;
}
std::string service_id = client->GetServiceId();
ByteArray bytes = handler->InitializeUpgradedMediumForEndpoint(
client, service_id, endpoint_id);
// Because we grab the endpointChannel first thing, it is possible the
// endpointChannel is stale by the time we attempt to write over it.
if (bytes.Empty()) {
NEARBY_LOGS(ERROR)
<< "BwuManager couldn't complete the upgrade for endpoint "
<< endpoint_id << " to medium "
<< proto::connections::Medium_Name(medium_)
<< " because it failed to initialize the "
"BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE OfflineFrame.";
UpgradePathInfo info;
info.set_medium(parser::MediumToUpgradePathInfoMedium(medium_));
ProcessUpgradeFailureEvent(client, endpoint_id, info);
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::NETWORK_AVAILABLE);
return;
}
if (!channel->Write(bytes).Ok()) {
NEARBY_LOGS(ERROR)
<< "BwuManager couldn't complete the upgrade for endpoint "
<< endpoint_id << " to medium "
<< proto::connections::Medium_Name(medium_)
<< " because it failed to write the "
"BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE OfflineFrame.";
return;
}
NEARBY_LOGS(INFO)
<< "BwuManager successfully wrote the "
"BWU_NEGOTIATION.UPGRADE_PATH_AVAILABLE OfflineFrame while "
"upgrading endpoint "
<< endpoint_id << " to medium"
<< proto::connections::Medium_Name(medium_);
in_progress_upgrades_.emplace(endpoint_id, client);
});
}
void BwuManager::OnIncomingFrame(OfflineFrame& frame,
const std::string& endpoint_id,
ClientProxy* client, Medium medium) {
NEARBY_LOGS(INFO) << "OnIncomingFrame for endpoint " << endpoint_id
<< " with medium "
<< proto::connections::Medium_Name(medium);
if (parser::GetFrameType(frame) != V1Frame::BANDWIDTH_UPGRADE_NEGOTIATION)
return;
auto bwu_frame = frame.v1().bandwidth_upgrade_negotiation();
if (FeatureFlags::GetInstance().GetFlags().enable_async_bandwidth_upgrade) {
RunOnBwuManagerThread(
"bwu-on-incoming-frame", [this, client, endpoint_id, bwu_frame]() {
OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
});
} else {
CountDownLatch latch(1);
RunOnBwuManagerThread("bwu-on-incoming-frame", [this, client, endpoint_id,
bwu_frame, &latch]() {
OnBwuNegotiationFrame(client, bwu_frame, endpoint_id);
latch.CountDown();
});
latch.Await();
}
}
void BwuManager::OnEndpointDisconnect(ClientProxy* client,
const std::string& endpoint_id,
CountDownLatch barrier) {
NEARBY_LOGS(INFO)
<< "BwuManager has processed endpoint disconnection for endpoint "
<< endpoint_id;
RunOnBwuManagerThread(
"bwu-on-endpoint-disconnect",
[this, client, endpoint_id, barrier]() mutable {
if (medium_ == Medium::UNKNOWN_MEDIUM) {
NEARBY_LOGS(INFO)
<< "BwuManager has processed endpoint disconnection for endpoint "
<< endpoint_id
<< " because there is no current BandwidthUpgradeMedium.";
barrier.CountDown();
return;
}
if (handler_) {
handler_->OnEndpointDisconnect(client, endpoint_id);
}
auto item = previous_endpoint_channels_.extract(endpoint_id);
if (!item.empty()) {
auto old_channel = item.mapped();
if (old_channel != nullptr) {
old_channel->Close(DisconnectionReason::SHUTDOWN);
}
}
in_progress_upgrades_.erase(endpoint_id);
retry_delays_.erase(endpoint_id);
CancelRetryUpgradeAlarm(endpoint_id);
successfully_upgraded_endpoints_.erase(endpoint_id);
// If this was our very last endpoint:
//
// a) revert all the changes for currentBwuMedium.
// b) reset currentBwuMedium.
if (channel_manager_->GetConnectedEndpointsCount() <= 1) {
Revert();
}
barrier.CountDown();
});
}
BwuHandler* BwuManager::SetCurrentBwuHandler(Medium medium) {
NEARBY_LOGS(INFO) << "SetCurrentBwuHandler to medium "
<< proto::connections::Medium_Name(medium);
handler_ = nullptr;
medium_ = medium;
if (medium != Medium::UNKNOWN_MEDIUM) {
auto item = handlers_.find(medium);
if (item != handlers_.end()) {
handler_ = item->second.get();
}
}
return handler_;
}
void BwuManager::Revert() {
NEARBY_LOGS(INFO) << "Revert reseting medium "
<< proto::connections::Medium_Name(medium_);
if (handler_) {
handler_->Revert();
handler_ = nullptr;
}
medium_ = Medium::UNKNOWN_MEDIUM;
}
void BwuManager::OnBwuNegotiationFrame(ClientProxy* client,
const BwuNegotiationFrame frame,
const string& endpoint_id) {
NEARBY_LOGS(INFO) << "BwuManager process incoming OfflineFrame for endpoint "
<< endpoint_id;
switch (frame.event_type()) {
case BwuNegotiationFrame::UPGRADE_PATH_AVAILABLE:
ProcessBwuPathAvailableEvent(client, endpoint_id,
frame.upgrade_path_info());
break;
case BwuNegotiationFrame::UPGRADE_FAILURE:
ProcessUpgradeFailureEvent(client, endpoint_id,
frame.upgrade_path_info());
break;
case BwuNegotiationFrame::LAST_WRITE_TO_PRIOR_CHANNEL:
ProcessLastWriteToPriorChannelEvent(client, endpoint_id);
break;
case BwuNegotiationFrame::SAFE_TO_CLOSE_PRIOR_CHANNEL:
ProcessSafeToClosePriorChannelEvent(client, endpoint_id);
break;
default:
NEARBY_LOGS(WARNING)
<< "BwuManager can't process unknown incoming OfflineFrame of type "
<< frame.event_type() << ", ignoring it.";
break;
}
}
void BwuManager::OnIncomingConnection(
ClientProxy* client,
std::unique_ptr<BwuHandler::IncomingSocketConnection> mutable_connection) {
NEARBY_LOGS(INFO) << "BwuManager process incoming connection service_id="
<< client->GetServiceId();
std::shared_ptr<BwuHandler::IncomingSocketConnection> connection(
mutable_connection.release());
RunOnBwuManagerThread(
"bwu-on-incoming-connection", [this, client, connection]() {
absl::Time connection_attempt_start_time =
SystemClock::ElapsedRealtime();
EndpointChannel* channel = connection->channel.get();
if (channel == nullptr) {
NEARBY_LOGS(ERROR)
<< "BwuManager failed to create new EndpointChannel for incoming "
"socket.";
connection->socket->Close();
AttemptToRecordBandwidthUpgradeErrorForUnknownEndpoint(
proto::connections::MEDIUM_ERROR,
proto::connections::SOCKET_CREATION);
return;
}
NEARBY_LOGS(VERBOSE)
<< "BwuManager successfully created new EndpointChannel for "
"incoming socket";
ClientIntroduction introduction;
if (!ReadClientIntroductionFrame(channel, introduction)) {
// This was never a fully EstablishedConnection, no need to provide a
// closure reason.
channel->Close();
NEARBY_LOGS(ERROR)
<< "BwuManager failed to read "
"BWU_NEGOTIATION.CLIENT_INTRODUCTION OfflineFrame from "
"newly-created EndpointChannel "
<< channel->GetName()
<< ", so the EndpointChannel was discarded.";
return;
}
if (!WriteClientIntroductionAckFrame(channel)) {
// This was never a fully EstablishedConnection, no need to provide a
// closure reason.
channel->Close();
return;
}
NEARBY_LOGS(VERBOSE) << "BwuManager successfully received "
"BWU_NEGOTIATION.CLIENT_INTRODUCTION "
"OfflineFrame on EndpointChannel "
<< channel->GetName();
const std::string& endpoint_id = introduction.endpoint_id();
ClientProxy* mapped_client;
const auto item = in_progress_upgrades_.find(endpoint_id);
if (item == in_progress_upgrades_.end()) return;
mapped_client = item->second;
CancelRetryUpgradeAlarm(endpoint_id);
if (mapped_client == nullptr) {
// This was never a fully EstablishedConnection, no need to provide a
// closure reason.
channel->Close();
return;
}
CHECK(client == mapped_client);
// The ConnectionAttempt has now succeeded, so record it as such.
std::unique_ptr<ConnectionAttemptMetadataParams>
connections_attempt_metadata_params;
if (channel != nullptr) {
connections_attempt_metadata_params =
client->GetAnalyticsRecorder()
.BuildConnectionAttemptMetadataParams(
channel->GetTechnology(), channel->GetBand(),
channel->GetFrequency(), channel->GetTryCount());
}
client->GetAnalyticsRecorder().OnIncomingConnectionAttempt(
proto::connections::UPGRADE, channel->GetMedium(),
proto::connections::RESULT_SUCCESS,
SystemClock::ElapsedRealtime() - connection_attempt_start_time,
client->GetConnectionToken(endpoint_id),
connections_attempt_metadata_params.get());
// Use the introductory client information sent over to run the upgrade
// protocol.
RunUpgradeProtocol(mapped_client, endpoint_id,
std::move(connection->channel));
});
}
void BwuManager::RunOnBwuManagerThread(const std::string& name,
Runnable runnable) {
serial_executor_.Execute(name, std::move(runnable));
}
void BwuManager::RunUpgradeProtocol(
ClientProxy* client, const std::string& endpoint_id,
std::unique_ptr<EndpointChannel> new_channel) {
NEARBY_LOGS(INFO) << "RunUpgradeProtocol new channel @" << new_channel.get()
<< " name: " << new_channel->GetName()
<< ", medium: " << new_channel->GetMedium();
// First, register this new EndpointChannel as *the* EndpointChannel to use
// for this endpoint here onwards. NOTE: We pause this new EndpointChannel
// until we've completely drained the old EndpointChannel to avoid out of
// order reads on the other side. This is a consequence of using the same
// UKEY2 context for both the previous and new EndpointChannels. UKEY2 uses
// sequence numbers for writes and reads, and simultaneously sending Payloads
// on the new channel and control messages on the old channel cause the other
// side to read messages out of sequence
new_channel->Pause();
auto old_channel = channel_manager_->GetChannelForEndpoint(endpoint_id);
if (!old_channel) {
NEARBY_LOGS(INFO)
<< "BwuManager didn't find a previous EndpointChannel for "
<< endpoint_id
<< " when registering the new EndpointChannel, short-circuiting the "
"upgrade protocol.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::CHANNEL_ERROR,
proto::connections::PRIOR_ENDPOINT_CHANNEL);
return;
}
channel_manager_->ReplaceChannelForEndpoint(client, endpoint_id,
std::move(new_channel));
// Next, initiate a clean shutdown for the previous EndpointChannel used for
// this endpoint by telling the remote device that it will not receive any
// more writes over that EndpointChannel.
if (!old_channel->Write(parser::ForBwuLastWrite()).Ok()) {
NEARBY_LOGS(ERROR)
<< "BwuManager failed to write "
"BWU_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame to "
"endpoint "
<< endpoint_id << ", short-circuiting the upgrade protocol.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::LAST_WRITE_TO_PRIOR_CHANNEL);
return;
}
NEARBY_LOGS(VERBOSE) << "BwuManager successfully wrote "
"BWU_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL "
"OfflineFrame while upgrading endpoint "
<< endpoint_id;
// The remainder of this clean shutdown for the previous EndpointChannel will
// continue when we receive a corresponding
// BANDWIDTH_UPGRADE_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame from
// the remote device, so for now, just store that previous EndpointChannel.
previous_endpoint_channels_.emplace(endpoint_id, old_channel);
// If we already read LAST_WRITE on the old endpoint channel, then we can
// safely close it now.
auto item = successfully_upgraded_endpoints_.extract(endpoint_id);
if (!item.empty()) {
ProcessLastWriteToPriorChannelEvent(client, endpoint_id);
}
}
// Outgoing BWU session.
void BwuManager::ProcessBwuPathAvailableEvent(
ClientProxy* client, const string& endpoint_id,
const UpgradePathInfo& upgrade_path_info) {
NEARBY_LOGS(INFO) << "ProcessBwuPathAvailableEvent for endpoint "
<< endpoint_id << " medium "
<< parser::UpgradePathInfoMediumToMedium(
upgrade_path_info.medium());
if (in_progress_upgrades_.contains(endpoint_id)) {
NEARBY_LOGS(ERROR)
<< "BwuManager received a duplicate bandwidth upgrade for endpoint "
<< endpoint_id
<< ". We're out of sync with the remote device and cannot recover; "
"closing all channels.";
auto item = previous_endpoint_channels_.extract(endpoint_id);
if (!item.empty()) {
std::shared_ptr<EndpointChannel> previous_endpoint_channel =
item.mapped();
if (previous_endpoint_channel) {
previous_endpoint_channel->Close(DisconnectionReason::UNFINISHED);
}
}
std::shared_ptr<EndpointChannel> new_channel =
channel_manager_->GetChannelForEndpoint(endpoint_id);
if (new_channel) {
// The upgraded channel never finished upgrading, and therefore is still
// paused.
new_channel->Resume();
new_channel->Close(DisconnectionReason::UNFINISHED);
}
return;
}
Medium medium =
parser::UpgradePathInfoMediumToMedium(upgrade_path_info.medium());
if (medium_ == Medium::UNKNOWN_MEDIUM) {
SetCurrentBwuHandler(medium);
}
// Check for the correct medium so we don't process an incorrect OfflineFrame.
if (medium != medium_) {
NEARBY_LOGS(INFO) << "Medium not matching";
RunUpgradeFailedProtocol(client, endpoint_id, upgrade_path_info);
return;
}
client->GetAnalyticsRecorder().OnBandwidthUpgradeStarted(
endpoint_id, medium, medium_, proto::connections::OUTGOING,
client->GetConnectionToken(endpoint_id));
absl::Time connection_attempt_start_time = SystemClock::ElapsedRealtime();
auto channel = ProcessBwuPathAvailableEventInternal(client, endpoint_id,
upgrade_path_info);
proto::connections::ConnectionAttemptResult connection_attempt_result;
if (channel != nullptr) {
connection_attempt_result = proto::connections::RESULT_SUCCESS;
} else if (client->GetCancellationFlag(endpoint_id)->Cancelled()) {
connection_attempt_result = proto::connections::RESULT_CANCELLED;
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_REMOTE_ERROR,
proto::connections::UPGRADE_CANCEL);
} else {
connection_attempt_result = proto::connections::RESULT_ERROR;
}
std::unique_ptr<ConnectionAttemptMetadataParams>
connections_attempt_metadata_params;
if (channel != nullptr) {
connections_attempt_metadata_params =
client->GetAnalyticsRecorder().BuildConnectionAttemptMetadataParams(
channel->GetTechnology(), channel->GetBand(),
channel->GetFrequency(), channel->GetTryCount());
}
client->GetAnalyticsRecorder().OnOutgoingConnectionAttempt(
endpoint_id, proto::connections::UPGRADE, medium_,
connection_attempt_result,
SystemClock::ElapsedRealtime() - connection_attempt_start_time,
client->GetConnectionToken(endpoint_id),
connections_attempt_metadata_params.get());
if (channel == nullptr) {
NEARBY_LOGS(INFO) << "Failed to get new channel.";
RunUpgradeFailedProtocol(client, endpoint_id, upgrade_path_info);
return;
}
in_progress_upgrades_.emplace(endpoint_id, client);
RunUpgradeProtocol(client, endpoint_id, std::move(channel));
}
std::unique_ptr<EndpointChannel>
BwuManager::ProcessBwuPathAvailableEventInternal(
ClientProxy* client, const string& endpoint_id,
const UpgradePathInfo& upgrade_path_info) {
NEARBY_LOGS(INFO) << "ProcessBwuPathAvailableEventInternal for endpoint "
<< endpoint_id << " medium "
<< parser::UpgradePathInfoMediumToMedium(
upgrade_path_info.medium());
std::unique_ptr<EndpointChannel> channel =
handler_->CreateUpgradedEndpointChannel(client, client->GetServiceId(),
endpoint_id, upgrade_path_info);
if (!channel) {
NEARBY_LOGS(ERROR)
<< "BwuManager failed to create an endpoint channel to endpoint"
<< endpoint_id << ", aborting upgrade.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::SOCKET_CREATION);
return nullptr;
}
// Write the requisite BANDWIDTH_UPGRADE_NEGOTIATION.CLIENT_INTRODUCTION as
// the first OfflineFrame on this new EndpointChannel.
if (!channel->Write(parser::ForBwuIntroduction(client->GetLocalEndpointId()))
.Ok()) {
// This was never a fully EstablishedConnection, no need to provide a
// closure reason.
channel->Close();
NEARBY_LOGS(ERROR)
<< "BwuManager failed to write BWU_NEGOTIATION.CLIENT_INTRODUCTION "
"OfflineFrame to newly-created EndpointChannel "
<< channel->GetName() << ", aborting upgrade.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::CLIENT_INTRODUCTION);
return {};
}
if (upgrade_path_info.supports_client_introduction_ack()) {
if (!ReadClientIntroductionAckFrame(channel.get())) {
// This was never a fully EstablishedConnection, no need to provide a
// closure reason.
channel->Close();
NEARBY_LOGS(ERROR) << "BwuManager failed to read "
"BWU_NEGOTIATION.CLIENT_INTRODUCTION_ACK "
"OfflineFrame to newly-created EndpointChannel "
<< channel->GetName() << ", aborting upgrade.";
return {};
}
}
NEARBY_LOGS(INFO) << "BwuManager successfully wrote "
"BWU_NEGOTIATION.CLIENT_INTRODUCTION OfflineFrame to "
"newly-created EndpointChannel "
<< channel->GetName() << " while upgrading endpoint "
<< endpoint_id;
// Set the AnalyticsRecorder so that the future closure of this
// EndpointChannel will be recorded.
return channel;
}
void BwuManager::RunUpgradeFailedProtocol(
ClientProxy* client, const std::string& endpoint_id,
const UpgradePathInfo& upgrade_path_info) {
NEARBY_LOGS(INFO) << "RunUpgradeFailedProtocol for endpoint " << endpoint_id
<< " medium "
<< parser::UpgradePathInfoMediumToMedium(
upgrade_path_info.medium());
// We attempted to connect to the new medium that the remote device has set up
// for us but we failed. We need to let the remote device know so that they
// can pick another medium for us to try.
std::shared_ptr<EndpointChannel> channel =
channel_manager_->GetChannelForEndpoint(endpoint_id);
if (!channel) {
NEARBY_LOGS(ERROR)
<< "BwuManager didn't find a previous EndpointChannel for "
<< endpoint_id
<< " when sending an upgrade failure frame, short-circuiting the "
"upgrade protocol.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::CHANNEL_ERROR,
proto::connections::NETWORK_AVAILABLE);
return;
}
// Report UPGRADE_FAILURE to the remote device.
if (!channel->Write(parser::ForBwuFailure(upgrade_path_info)).Ok()) {
channel->Close(DisconnectionReason::IO_ERROR);
NEARBY_LOGS(ERROR)
<< "BwuManager failed to write BWU_NEGOTIATION.UPGRADE_FAILURE "
"OfflineFrame to endpoint "
<< endpoint_id << ", short-circuiting the upgrade protocol.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::NETWORK_AVAILABLE);
return;
}
// And lastly, clean up our currentBwuMedium since we failed to
// utilize it anyways.
if (medium_ != Medium::UNKNOWN_MEDIUM) {
Revert();
}
in_progress_upgrades_.erase(endpoint_id);
NEARBY_LOGS(INFO) << "BwuManager has informed endpoint " << endpoint_id
<< " that the bandwidth upgrade failed.";
}
bool BwuManager::ReadClientIntroductionFrame(EndpointChannel* channel,
ClientIntroduction& introduction) {
NEARBY_LOGS(INFO) << "ReadClientIntroductionFrame with channel name: "
<< channel->GetName()
<< ", medium: " << channel->GetMedium();
CancelableAlarm timeout_alarm(
"BwuManager::ReadClientIntroductionFrame",
[channel]() {
NEARBY_LOGS(ERROR) << "In BwuManager, failed to read the "
"ClientIntroductionFrame after "
<< absl::FormatDuration(
kReadClientIntroductionFrameTimeout)
<< ". Timing out and closing EndpointChannel "
<< channel->GetType();
channel->Close();
},
kReadClientIntroductionFrameTimeout, &alarm_executor_);
auto data = channel->Read();
timeout_alarm.Cancel();
if (!data.ok()) return false;
auto transfer(parser::FromBytes(data.result()));
if (!transfer.ok()) {
NEARBY_LOGS(ERROR) << "In ReadClientIntroductionFrame, attempted to read a "
"ClientIntroductionFrame from EndpointChannel "
<< channel->GetType()
<< " but was unable to obtain any OfflineFrame.";
return false;
}
OfflineFrame frame = transfer.result();
if (!frame.has_v1() || !frame.v1().has_bandwidth_upgrade_negotiation()) {
NEARBY_LOGS(ERROR)
<< "In ReadClientIntroductionFrame, expected a "
"BANDWIDTH_UPGRADE_NEGOTIATION v1 OfflineFrame but got a "
<< parser::GetFrameType(frame) << " frame instead.";
return false;
}
if (frame.v1().bandwidth_upgrade_negotiation().event_type() !=
BandwidthUpgradeNegotiationFrame::CLIENT_INTRODUCTION) {
NEARBY_LOGS(ERROR)
<< "In ReadClientIntroductionFrame, expected a CLIENT_INTRODUCTION "
"v1 OfflineFrame but got a BANDWIDTH_UPGRADE_NEGOTIATION frame "
"with eventType "
<< frame.v1().bandwidth_upgrade_negotiation().event_type()
<< " instead.";
return false;
}
const auto& frame_intro =
frame.v1().bandwidth_upgrade_negotiation().client_introduction();
introduction = frame_intro;
return true;
}
bool BwuManager::ReadClientIntroductionAckFrame(EndpointChannel* channel) {
NEARBY_LOGS(INFO) << "ReadClientIntroductionAckFrame with channel name: "
<< channel->GetName() << ", medium: "
<< proto::connections::Medium_Name(channel->GetMedium());
CancelableAlarm timeout_alarm(
"BwuManager::ReadClientIntroductionAckFrame",
[channel]() {
NEARBY_LOGS(ERROR)
<< "In BwuManager, failed to read the ClientIntroductionAckFrame "
"after "
<< absl::FormatDuration(kReadClientIntroductionFrameTimeout)
<< ". Timing out and closing EndpointChannel "
<< channel->GetType();
channel->Close();
},
kReadClientIntroductionFrameTimeout, &alarm_executor_);
auto data = channel->Read();
timeout_alarm.Cancel();
if (!data.ok()) return false;
auto transfer(parser::FromBytes(data.result()));
if (!transfer.ok()) return false;
OfflineFrame frame = transfer.result();
if (!frame.has_v1() || !frame.v1().has_bandwidth_upgrade_negotiation())
return false;
if (frame.v1().bandwidth_upgrade_negotiation().event_type() !=
BandwidthUpgradeNegotiationFrame::CLIENT_INTRODUCTION_ACK)
return false;
return true;
}
bool BwuManager::WriteClientIntroductionAckFrame(EndpointChannel* channel) {
NEARBY_LOGS(INFO) << "WriteClientIntroductionAckFrame channel name: "
<< channel->GetName()
<< ", medium: " << channel->GetMedium();
return channel->Write(parser::ForBwuIntroductionAck()).Ok();
}
void BwuManager::ProcessLastWriteToPriorChannelEvent(
ClientProxy* client, const std::string& endpoint_id) {
NEARBY_LOGS(INFO) << "ProcessLastWriteToPriorChannelEvent for endpoint "
<< endpoint_id;
// By this point in the upgrade protocol, there is the guarantee that both
// involved endpoints have registered a new EndpointChannel with the
// EndpointChannelManager as the official channel for communication; given
// the way communication is structured in the EndpointManager, this means
// that all new writes are happening over that new EndpointChannel, but
// reads are still happening over this prior EndpointChannel (to avoid data
// loss). But now that we've received this definitive final write over that
// prior EndpointChannel, we can let the remote device that they can safely
// close their end of this now-dormant EndpointChannel.
EndpointChannel* previous_endpoint_channel =
previous_endpoint_channels_[endpoint_id].get();
if (!previous_endpoint_channel) {
NEARBY_LOGS(ERROR)
<< "BwuManager received a BWU_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL "
"OfflineFrame for unknown endpoint "
<< endpoint_id << ", can't complete the upgrade protocol.";
successfully_upgraded_endpoints_.emplace(endpoint_id);
return;
}
if (!previous_endpoint_channel->Write(parser::ForBwuSafeToClose()).Ok()) {
previous_endpoint_channel->Close(DisconnectionReason::IO_ERROR);
// Remove this prior EndpointChannel from previous_endpoint_channels to
// avoid leaks.
previous_endpoint_channels_.erase(endpoint_id);
NEARBY_LOGS(ERROR) << "BwuManager failed to write "
"BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL "
"OfflineFrame to endpoint "
<< endpoint_id
<< ", short-circuiting the upgrade protocol.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::RESULT_IO_ERROR,
proto::connections::SAFE_TO_CLOSE_PRIOR_CHANNEL);
return;
}
NEARBY_LOGS(VERBOSE) << "BwuManager successfully wrote "
"BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL "
"OfflineFrame while trying to upgrade endpoint "
<< endpoint_id;
// The upgrade protocol's clean shutdown of the prior EndpointChannel will
// conclude when we receive a corresponding
// BANDWIDTH_UPGRADE_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL OfflineFrame
// from the remote device.
}
void BwuManager::ProcessSafeToClosePriorChannelEvent(
ClientProxy* client, const std::string& endpoint_id) {
NEARBY_LOGS(INFO) << "ProcessSafeToClosePriorChannelEvent for endpoint "
<< endpoint_id;
// By this point in the upgrade protocol, there's no more writes happening
// over the prior EndpointChannel, and the remote device has given us the
// go-ahead to close this EndpointChannel [1], so we can safely close it
// (and depend on the EndpointManager querying the EndpointChannelManager to
// start reading from the new EndpointChannel).
//
// [1] Which also implies that they've received our
// BANDWIDTH_UPGRADE_NEGOTIATION.LAST_WRITE_TO_PRIOR_CHANNEL OfflineFrame),
// so there can be no data loss, regardless of whether the EndpointChannel
// allows reads of queued, unread data after the EndpointChannel has been
// closed from the other end (as is the case with conventional TCP sockets)
// or not (as is the case with Android's Bluetooth sockets, where closing
// instantly throws an IOException on the remote device).
auto item = previous_endpoint_channels_.extract(endpoint_id);
auto& previous_endpoint_channel = item.mapped();
if (previous_endpoint_channel == nullptr) {
NEARBY_LOGS(ERROR)
<< "BwuManager received a BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL "
"OfflineFrame for unknown endpoint "
<< endpoint_id << ", can't complete the upgrade protocol.";
return;
}
NEARBY_LOGS(INFO)
<< "BwuManager successfully received a "
<< "BWU_NEGOTIATION.SAFE_TO_CLOSE_PRIOR_CHANNEL OfflineFrame while "
<< "trying to upgrade endpoint " << endpoint_id;
// Each encrypted message includes the key to decrypt the next message. The
// disconnect message is optional and may not be received under normal
// circumstances so it is necessary to send it unencrypted. This way the
// serial crypto context does not increment here.
previous_endpoint_channel->DisableEncryption();
previous_endpoint_channel->Write(parser::ForDisconnection());
// Attempt to read the disconnect message from the previous channel. We don't
// care whether we successfully read it or whether we get an exception here.
// The idea is just to make sure the other side has had a chance to receive
// the full SAFE_TO_CLOSE_PRIOR_CHANNEL message before we actually close the
// channel. See b/172380349 for more context.
previous_endpoint_channel->Read();
previous_endpoint_channel->Close(DisconnectionReason::UPGRADED);
NEARBY_LOGS(VERBOSE)
<< "BwuManager cleanly shut down prior "
<< previous_endpoint_channel->GetType()
<< " EndpointChannel to conclude upgrade protocol for endpoint "
<< endpoint_id;
// Now the upgrade protocol has completed, record analytics for this new
// upgraded bandwidth connection...
client->GetAnalyticsRecorder().OnConnectionEstablished(
endpoint_id, medium_, client->GetConnectionToken(endpoint_id));
// ...and the success of the upgrade itself.
client->GetAnalyticsRecorder().OnBandwidthUpgradeSuccess(endpoint_id);
// Now that the old channel has been drained, we can unpause the new channel
std::shared_ptr<EndpointChannel> channel =
channel_manager_->GetChannelForEndpoint(endpoint_id);
if (!channel) {
NEARBY_LOGS(ERROR) << "BwuManager attempted to resume the current "
"EndpointChannel with endpoint "
<< endpoint_id << ", but none was found.";
return;
}
channel->Resume();
// Report the success to the client
client->OnBandwidthChanged(endpoint_id, channel->GetMedium());
in_progress_upgrades_.erase(endpoint_id);
}
void BwuManager::ProcessUpgradeFailureEvent(
ClientProxy* client, const std::string& endpoint_id,
const UpgradePathInfo& upgrade_info) {
NEARBY_LOGS(INFO) << "ProcessUpgradeFailureEvent for endpoint " << endpoint_id
<< " from medium: "
<< parser::UpgradePathInfoMediumToMedium(
upgrade_info.medium());
// The remote device failed to upgrade to the new medium we set up for them.
// That's alright! We'll just try the next available medium (if there is
// one).
in_progress_upgrades_.erase(endpoint_id);
// The first thing we have to do is to replace our
// currentBwuMedium with the next best upgrade medium we share
// with the remote device. The catch is that we can only do this if we only
// have one connected endpoint. Otherwise, we'll end up disrupting our other
// connected peers.
if (channel_manager_->GetConnectedEndpointsCount() > 1) {
// We can't change the currentBwuMedium, so there are no more
// upgrade attempts for this endpoint. Sorry.
NEARBY_LOGS(ERROR)
<< "BwuManager failed to attempt a new bandwidth upgrade for endpoint "
<< endpoint_id
<< " because we have other connected endpoints and can't try a new "
"upgrade medium.";
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(
endpoint_id, proto::connections::CHANNEL_ERROR,
proto::connections::NETWORK_AVAILABLE);
return;
}
// Revert the existing upgrade medium for now.
if (medium_ != Medium::UNKNOWN_MEDIUM) {
Revert();
}
// Loop through the ordered list of upgrade mediums. One by one, remove the
// top element until we get to the medium we last attempted to upgrade to.
// The remainder of the list will contain the mediums we haven't attempted
// yet.
Medium last = parser::UpgradePathInfoMediumToMedium(upgrade_info.medium());
std::vector<Medium> all_possible_mediums =
client->GetUpgradeMediums(endpoint_id).GetMediums(true);
std::vector<Medium> untried_mediums(all_possible_mediums);
for (Medium medium : all_possible_mediums) {
untried_mediums.erase(untried_mediums.begin());
if (medium == last) {
break;
}
}
RetryUpgradeMediums(client, endpoint_id, untried_mediums);
}
void BwuManager::RetryUpgradeMediums(ClientProxy* client,
const std::string& endpoint_id,
std::vector<Medium> upgrade_mediums) {
Medium next_medium = ChooseBestUpgradeMedium(upgrade_mediums);
NEARBY_LOGS(INFO) << "RetryUpgradeMediums for endpoint " << endpoint_id
<< " after ChooseBestUpgradeMedium: " << next_medium;
// If current medium is not WiFi and we have not succeeded with upgrading
// yet, retry upgrade.
Medium current_medium = GetEndpointMedium(endpoint_id);
if (current_medium != Medium::WIFI_LAN &&
(next_medium == current_medium || next_medium == Medium::UNKNOWN_MEDIUM ||
upgrade_mediums.empty())) {
RetryUpgradesAfterDelay(client, endpoint_id);
return;
}
// Attempt to set the new upgrade medium.
if (!SetCurrentBwuHandler(next_medium)) {
NEARBY_LOGS(INFO)
<< "BwuManager failed to attempt a new bandwidth upgrade for endpoint "
<< endpoint_id
<< " because we couldn't set a new bandwidth upgrade medium.";
return;
}
// Now that we've successfully picked a new upgrade medium to try,
// re-initiate the bandwidth upgrade.
NEARBY_LOGS(INFO) << "BwuManager is attempting to upgrade endpoint "
<< endpoint_id
<< " again with a new bandwidth upgrade medium.";
InitiateBwuForEndpoint(client, endpoint_id, next_medium);
}
std::vector<Medium> BwuManager::StripOutUnavailableMediums(
const std::vector<Medium>& mediums) {
std::vector<Medium> available_mediums;
for (Medium m : mediums) {
bool available = false;
switch (m) {
case Medium::WIFI_LAN:
available = mediums_->GetWifiLan().IsAvailable();
break;
case Medium::WEB_RTC:
available = mediums_->GetWebRtc().IsAvailable();
break;
case Medium::BLUETOOTH:
available = mediums_->GetBluetoothClassic().IsAvailable();
break;
default:
break;
}
if (available) {
available_mediums.push_back(m);
}
}
return available_mediums;
}
// Returns the optimal medium supported by both devices.
// Each medium in the passed in list is checked for its availability with the
// medium_manager_ to ensure that the chosen upgrade medium is supported and
// available locally before continuing the upgrade. Once we pick a medium, all
// future connections will use it too. eg. If we chose Wifi LAN, we'll attempt
// to upgrade the 2nd, 3rd, etc remote endpoints with Wifi LAN even if they're
// on a different network (or had a better medium). This is a quick and easy
// way to prevent mediums, like Wifi Hotspot, from interfering with active
// connections (although it's suboptimal for bandwidth throughput). When all
// endpoints disconnect, we reset the bandwidth upgrade medium.
Medium BwuManager::ChooseBestUpgradeMedium(const std::vector<Medium>& mediums) {
auto available_mediums = StripOutUnavailableMediums(mediums);
if (medium_ == Medium::UNKNOWN_MEDIUM) {
if (!available_mediums.empty()) {
// Case 1: This is our first time upgrading, and we have at least one
// supported medium to choose from. Return the first medium in the list,
// since they are ordered by preference.
return available_mediums[0];
}
// Case 2: This is our first time upgrading, but there are no available
// upgrade mediums. Fall through to returning UNKNOWN_MEDIUM at the
// bottom.
NEARBY_LOGS(INFO)
<< "Current upgrade medium is unset, but there are no common supported "
"upgrade mediums.";
} else {
// Case 3: We have already upgraded, and there is a list of supported
// mediums to check against. Return the current upgrade medium if it's in
// the supported list.
if (std::find(available_mediums.begin(), available_mediums.end(),
medium_) != available_mediums.end()) {
return medium_;
}
// Case 4: We have already upgraded, but the current medium is not
// supported by the remote endpoint (it's not in the list, or the list is
// empty). Fall through and return Medium.UNKNOWN_MEDIUM because we cannot
// continue with the current upgrade medium, and we are not allowed to
// switch.
std::string mediums_string;
for (const auto& medium : available_mediums) {
absl::StrAppend(&mediums_string, proto::connections::Medium_Name(medium),
"; ");
}
NEARBY_LOGS(INFO)
<< "Current upgrade medium " << proto::connections::Medium_Name(medium_)
<< " is not supported by the remote endpoint (supported mediums: "
<< mediums_string << ")";
}
return Medium::UNKNOWN_MEDIUM;
}
void BwuManager::RetryUpgradesAfterDelay(ClientProxy* client,
const std::string& endpoint_id) {
absl::Duration delay = CalculateNextRetryDelay(endpoint_id);
CancelRetryUpgradeAlarm(endpoint_id);
CancelableAlarm alarm(
"BWU alarm",
[this, client, endpoint_id]() {
RunOnBwuManagerThread(
"bwu-retry-upgrade", [this, client, endpoint_id]() {
if (!client->IsConnectedToEndpoint(endpoint_id)) {
return;
}
RetryUpgradeMediums(
client, endpoint_id,
client->GetUpgradeMediums(endpoint_id).GetMediums(true));
});
},
delay, &alarm_executor_);
retry_upgrade_alarms_.emplace(endpoint_id,
std::make_pair(std::move(alarm), delay));
retry_delays_[endpoint_id] = delay;
NEARBY_LOGS(INFO) << "Retry bandwidth upgrade after "
<< absl::FormatDuration(delay);
}
void BwuManager::AttemptToRecordBandwidthUpgradeErrorForUnknownEndpoint(
proto::connections::BandwidthUpgradeResult result,
proto::connections::BandwidthUpgradeErrorStage error_stage) {
if (in_progress_upgrades_.size() == 1) {
auto it = in_progress_upgrades_.begin();
std::string endpoint_id = it->first;
ClientProxy* client = it->second;
// Note: Even though we know this is an error, we cannot clear state yet.
// We've sent the remote device the credentials they need and it's up to
// them if they want to repeatedly attempt to connect or if they want to
// give up and have us try a different medium. This isn't a decision we can
// make for them.
client->GetAnalyticsRecorder().OnBandwidthUpgradeError(endpoint_id, result,
error_stage);
NEARBY_LOGS(INFO) << "BwuManager got error "
<< proto::connections::BandwidthUpgradeResult_Name(result)
<< " at stage "
<< proto::connections::BandwidthUpgradeErrorStage_Name(
error_stage)
<< " when upgrading endpoint " << endpoint_id;
}
// Otherwise, we have no way of knowing which endpoint was trying to connect
// to us :(
NEARBY_LOGS(INFO) << "BwuManager got error "
<< proto::connections::BandwidthUpgradeResult_Name(result)
<< " at stage "
<< proto::connections::BandwidthUpgradeErrorStage_Name(
error_stage)
<< ", but we don't know which endpoint was trying to "
"connect to us, so skipping analytics for his error.";
}
absl::Duration BwuManager::CalculateNextRetryDelay(
const std::string& endpoint_id) {
auto item = retry_delays_.find(endpoint_id);
auto initial_delay = config_.bandwidth_upgrade_retry_delay;
if (item == retry_delays_.end()) return initial_delay;
/*
* Without use_exp_backoff_in_bwu_retry, bwu retry intervals for the same
* endpoint_id in seconds will be like: 5, 10, 10, 10...
* With use_exp_backoff_in_bwu_retry enabled, bwu retry intervals in seconds
* would be like: 3, 6, 12, 24, ... 300, 300, ...
*/
auto delay =
FeatureFlags::GetInstance().GetFlags().use_exp_backoff_in_bwu_retry
? item->second * 2
: item->second + initial_delay;
return std::min(delay, config_.bandwidth_upgrade_retry_max_delay);
}
void BwuManager::CancelRetryUpgradeAlarm(const std::string& endpoint_id) {
NEARBY_LOGS(INFO) << "CancelRetryUpgradeAlarm for endpoint " << endpoint_id;
auto item = retry_upgrade_alarms_.extract(endpoint_id);
if (item.empty()) return;
auto& pair = item.mapped();
pair.first.Cancel();
}
void BwuManager::CancelAllRetryUpgradeAlarms() {
NEARBY_LOGS(INFO) << "CancelAllRetryUpgradeAlarms invoked";
for (auto& item : retry_upgrade_alarms_) {
const std::string& endpoint_id = item.first;
CancelableAlarm& cancellable_alarm = item.second.first;
NEARBY_LOGS(INFO) << "CancelRetryUpgradeAlarm for endpoint " << endpoint_id;
cancellable_alarm.Cancel();
}
retry_upgrade_alarms_.clear();
retry_delays_.clear();
}
Medium BwuManager::GetEndpointMedium(const std::string& endpoint_id) {
auto channel = channel_manager_->GetChannelForEndpoint(endpoint_id);
return channel == nullptr ? Medium::UNKNOWN_MEDIUM : channel->GetMedium();
}
} // namespace connections
} // namespace nearby
} // namespace location