| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "connections/implementation/endpoint_channel_manager.h" |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/time/time.h" |
| #include "connections/implementation/proto/offline_wire_formats.pb.h" |
| #include "connections/implementation/offline_frames.h" |
| #include "internal/platform/feature_flags.h" |
| #include "internal/platform/logging.h" |
| #include "internal/platform/mutex.h" |
| #include "internal/platform/mutex_lock.h" |
| #include "internal/platform/system_clock.h" |
| |
| namespace location { |
| namespace nearby { |
| namespace connections { |
| |
| namespace { |
| const absl::Duration kDataTransferDelay = absl::Milliseconds(500); |
| } |
| |
| EndpointChannelManager::~EndpointChannelManager() { |
| NEARBY_LOG(INFO, "Initiating shutdown of EndpointChannelManager."); |
| MutexLock lock(&mutex_); |
| channel_state_.DestroyAll(); |
| NEARBY_LOG(INFO, "EndpointChannelManager has shut down."); |
| } |
| |
| void EndpointChannelManager::RegisterChannelForEndpoint( |
| ClientProxy* client, const std::string& endpoint_id, |
| std::unique_ptr<EndpointChannel> channel) { |
| MutexLock lock(&mutex_); |
| |
| NEARBY_LOGS(INFO) << "EndpointChannelManager registered channel of type " |
| << channel->GetType() << " to endpoint " << endpoint_id; |
| SetActiveEndpointChannel(client, endpoint_id, std::move(channel)); |
| |
| NEARBY_LOG(INFO, "Registered channel: id=%s", endpoint_id.c_str()); |
| } |
| |
| void EndpointChannelManager::ReplaceChannelForEndpoint( |
| ClientProxy* client, const std::string& endpoint_id, |
| std::unique_ptr<EndpointChannel> channel) { |
| MutexLock lock(&mutex_); |
| |
| auto* endpoint = channel_state_.LookupEndpointData(endpoint_id); |
| if (endpoint != nullptr && endpoint->channel == nullptr) { |
| NEARBY_LOGS(INFO) << "EndpointChannelManager is missing channel while " |
| "trying to update: endpoint " |
| << endpoint_id; |
| } |
| |
| SetActiveEndpointChannel(client, endpoint_id, std::move(channel)); |
| } |
| |
| bool EndpointChannelManager::EncryptChannelForEndpoint( |
| const std::string& endpoint_id, |
| std::unique_ptr<EncryptionContext> context) { |
| MutexLock lock(&mutex_); |
| |
| channel_state_.UpdateEncryptionContextForEndpoint(endpoint_id, |
| std::move(context)); |
| auto* endpoint = channel_state_.LookupEndpointData(endpoint_id); |
| return channel_state_.EncryptChannel(endpoint); |
| } |
| |
| std::shared_ptr<EndpointChannel> EndpointChannelManager::GetChannelForEndpoint( |
| const std::string& endpoint_id) { |
| MutexLock lock(&mutex_); |
| |
| auto* endpoint = channel_state_.LookupEndpointData(endpoint_id); |
| if (endpoint == nullptr) { |
| NEARBY_LOGS(INFO) << "No channel info for endpoint " << endpoint_id; |
| return {}; |
| } |
| |
| return endpoint->channel; |
| } |
| |
| void EndpointChannelManager::SetActiveEndpointChannel( |
| ClientProxy* client, const std::string& endpoint_id, |
| std::unique_ptr<EndpointChannel> channel) { |
| // Update the channel first, then encrypt this new channel, if |
| // crypto context is present. |
| channel->SetAnalyticsRecorder(&client->GetAnalyticsRecorder(), endpoint_id); |
| channel_state_.UpdateChannelForEndpoint(endpoint_id, std::move(channel)); |
| |
| auto* endpoint = channel_state_.LookupEndpointData(endpoint_id); |
| if (endpoint->IsEncrypted()) channel_state_.EncryptChannel(endpoint); |
| } |
| |
| int EndpointChannelManager::GetConnectedEndpointsCount() const { |
| MutexLock lock(&mutex_); |
| return channel_state_.GetConnectedEndpointsCount(); |
| } |
| |
| ///////////////////////////////// ChannelState ///////////////////////////////// |
| |
| // endpoint - channel endpoint to encrypt |
| bool EndpointChannelManager::ChannelState::EncryptChannel( |
| EndpointChannelManager::ChannelState::EndpointData* endpoint) { |
| if (endpoint != nullptr && endpoint->channel != nullptr && |
| endpoint->context != nullptr) { |
| endpoint->channel->EnableEncryption(endpoint->context); |
| return true; |
| } |
| return false; |
| } |
| |
| EndpointChannelManager::ChannelState::EndpointData* |
| EndpointChannelManager::ChannelState::LookupEndpointData( |
| const std::string& endpoint_id) { |
| auto item = endpoints_.find(endpoint_id); |
| return item != endpoints_.end() ? &item->second : nullptr; |
| } |
| |
| void EndpointChannelManager::ChannelState::UpdateChannelForEndpoint( |
| const std::string& endpoint_id, std::unique_ptr<EndpointChannel> channel) { |
| // Create EndpointData instance, if necessary, and populate channel. |
| endpoints_[endpoint_id].channel = std::move(channel); |
| } |
| |
| void EndpointChannelManager::ChannelState::UpdateEncryptionContextForEndpoint( |
| const std::string& endpoint_id, |
| std::unique_ptr<EncryptionContext> context) { |
| // Create EndpointData instance, if necessary, and populate crypto context. |
| endpoints_[endpoint_id].context = std::move(context); |
| } |
| |
| bool EndpointChannelManager::ChannelState::RemoveEndpoint( |
| const std::string& endpoint_id, |
| proto::connections::DisconnectionReason reason) { |
| auto item = endpoints_.find(endpoint_id); |
| if (item == endpoints_.end()) return false; |
| item->second.disconnect_reason = reason; |
| auto channel = item->second.channel; |
| if (channel) { |
| // If the channel was paused (i.e. during a bandwidth upgrade negotiation) |
| // we resume to ensure the thread won't hang when trying to write to it. |
| channel->Resume(); |
| |
| channel->Write(parser::ForDisconnection()); |
| NEARBY_LOGS(INFO) |
| << "EndpointChannelManager reported the disconnection to endpoint " |
| << endpoint_id; |
| SystemClock::Sleep(kDataTransferDelay); |
| } |
| endpoints_.erase(item); |
| return true; |
| } |
| |
| bool EndpointChannelManager::UnregisterChannelForEndpoint( |
| const std::string& endpoint_id) { |
| MutexLock lock(&mutex_); |
| |
| if (!channel_state_.RemoveEndpoint( |
| endpoint_id, |
| proto::connections::DisconnectionReason::LOCAL_DISCONNECTION)) { |
| return false; |
| } |
| |
| NEARBY_LOGS(INFO) |
| << "EndpointChannelManager unregistered channel for endpoint " |
| << endpoint_id; |
| |
| return true; |
| } |
| |
| } // namespace connections |
| } // namespace nearby |
| } // namespace location |