| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "core/internal/payload_manager.h" |
| |
| #include <algorithm> |
| #include <cinttypes> |
| #include <limits> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include "core/internal/internal_payload_factory.h" |
| #include "platform/public/count_down_latch.h" |
| #include "platform/public/mutex_lock.h" |
| #include "platform/public/single_thread_executor.h" |
| #include "platform/public/system_clock.h" |
| #include "absl/memory/memory.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/time/time.h" |
| |
| namespace location { |
| namespace nearby { |
| namespace connections { |
| |
| // C++14 requires to declare this. |
| // TODO(apolyudov): remove when migration to c++17 is possible. |
| constexpr const absl::Duration PayloadManager::kWaitCloseTimeout; |
| |
| bool PayloadManager::SendPayloadLoop( |
| ClientProxy* client, PendingPayload& pending_payload, |
| PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t& next_chunk_offset) { |
| // in lieu of structured binding: |
| auto pair = GetAvailableAndUnavailableEndpoints(pending_payload); |
| const EndpointIds& available_endpoint_ids = |
| EndpointsToEndpointIds(pair.first); |
| const Endpoints& unavailable_endpoints = pair.second; |
| |
| // First, handle any non-available endpoints. |
| for (const auto& endpoint : unavailable_endpoints) { |
| HandleFinishedOutgoingPayload( |
| client, {endpoint->id}, payload_header, next_chunk_offset, |
| EndpointInfoStatusToPayloadStatus(endpoint->status.Get())); |
| } |
| |
| // Update the still-active recipients of this payload. |
| if (available_endpoint_ids.empty()) { |
| NEARBY_LOG(INFO, |
| "PayloadManager short-circuiting payload_id=%" PRIX64 |
| " after sending %" PRIX64 |
| " bytes because none of the endpoints are available anymore.", |
| pending_payload.GetInternalPayload()->GetId(), |
| next_chunk_offset); |
| return false; |
| } |
| |
| // Check if the payload has been cancelled by the client and, if so, |
| // notify the remaining recipients. |
| if (pending_payload.IsLocallyCanceled()) { |
| NEARBY_LOG(INFO, |
| "Aborting send of payload_id=%x" PRIx64 " at offset %x" PRIx64 |
| " since it is marked canceled.", |
| static_cast<std::int64_t>( |
| pending_payload.GetInternalPayload()->GetId()), |
| next_chunk_offset); |
| HandleFinishedOutgoingPayload( |
| client, available_endpoint_ids, payload_header, next_chunk_offset, |
| proto::connections::PayloadStatus::LOCAL_CANCELLATION); |
| return false; |
| } |
| |
| // Update the current offsets for all endpoints still active for this |
| // payload. For the sake of accuracy, we update the pending payload here |
| // because it's after all payload terminating events are handled, but |
| // right before we actually start detaching the next chunk. |
| for (const auto& endpoint_id : available_endpoint_ids) { |
| pending_payload.SetOffsetForEndpoint(endpoint_id, next_chunk_offset); |
| } |
| |
| // This will block if there is no data to transfer. |
| // It will resume when new data arrives, or if Close() is called. |
| int chunk_size = GetOptimalChunkSize(available_endpoint_ids); |
| ByteArray next_chunk = |
| pending_payload.GetInternalPayload()->DetachNextChunk(chunk_size); |
| if (shutdown_.Get()) return false; |
| // Save chunk size. We'll need it after we move next_chunk. |
| auto next_chunk_size = next_chunk.size(); |
| if (!next_chunk_size && |
| pending_payload.GetInternalPayload()->GetTotalSize() > 0 && |
| pending_payload.GetInternalPayload()->GetTotalSize() < |
| next_chunk_offset) { |
| NEARBY_LOG(INFO, "Payload xfer failed: payload_id=%x" PRIx64, |
| static_cast<std::int64_t>( |
| pending_payload.GetInternalPayload()->GetId())); |
| HandleFinishedOutgoingPayload( |
| client, available_endpoint_ids, payload_header, next_chunk_offset, |
| proto::connections::PayloadStatus::LOCAL_ERROR); |
| return false; |
| } |
| |
| PayloadTransferFrame::PayloadChunk payload_chunk( |
| CreatePayloadChunk(next_chunk_offset, std::move(next_chunk))); |
| const EndpointIds& failed_endpoint_ids = endpoint_manager_->SendPayloadChunk( |
| payload_header, payload_chunk, available_endpoint_ids); |
| // Check whether at least one endpoint failed. |
| if (!failed_endpoint_ids.empty()) { |
| NEARBY_LOG(INFO, |
| "Payload xfer: endpoints failed: payload_id=%x" PRIx64 |
| "; endpoint_ids={%s}", |
| static_cast<std::int64_t>(payload_header.id()), |
| ToString(failed_endpoint_ids).c_str()); |
| HandleFinishedOutgoingPayload( |
| client, failed_endpoint_ids, payload_header, next_chunk_offset, |
| proto::connections::PayloadStatus::ENDPOINT_IO_ERROR); |
| } |
| |
| // Check whether at least one endpoint succeeded -- if they all failed, |
| // we'll just go right back to the top of the loop and break out when |
| // availableEndpointIds is re-synced and found to be empty at that point. |
| if (failed_endpoint_ids.size() < available_endpoint_ids.size()) { |
| for (const auto& endpoint_id : available_endpoint_ids) { |
| if (std::find(failed_endpoint_ids.begin(), failed_endpoint_ids.end(), |
| endpoint_id) == failed_endpoint_ids.end()) { |
| HandleSuccessfulOutgoingChunk( |
| client, endpoint_id, payload_header, payload_chunk.flags(), |
| payload_chunk.offset(), payload_chunk.body().size()); |
| } |
| } |
| NEARBY_LOG(VERBOSE, |
| "PayloadManager done sending chunk at offset %x" PRIx64 |
| " of payload_id=%x" PRIx64, |
| next_chunk_offset, |
| static_cast<std::int64_t>( |
| pending_payload.GetInternalPayload()->GetId())); |
| next_chunk_offset += next_chunk_size; |
| |
| if (!next_chunk_size) { |
| // That was the last chunk, we're outta here. |
| NEARBY_LOG(INFO, |
| "Payload xfer done: payload_id=%x" PRIx64 "; size=%" PRId64, |
| static_cast<std::int64_t>( |
| pending_payload.GetInternalPayload()->GetId()), |
| next_chunk_offset); |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| std::pair<PayloadManager::Endpoints, PayloadManager::Endpoints> |
| PayloadManager::GetAvailableAndUnavailableEndpoints( |
| const PendingPayload& pending_payload) { |
| Endpoints available; |
| Endpoints unavailable; |
| for (auto* endpoint_info : pending_payload.GetEndpoints()) { |
| if (endpoint_info->status.Get() == |
| PayloadManager::EndpointInfo::Status::kAvailable) { |
| available.push_back(endpoint_info); |
| } else { |
| unavailable.push_back(endpoint_info); |
| } |
| } |
| return std::make_pair(std::move(available), std::move(unavailable)); |
| } |
| |
| PayloadManager::EndpointIds PayloadManager::EndpointsToEndpointIds( |
| const Endpoints& endpoints) { |
| EndpointIds endpoint_ids; |
| endpoint_ids.reserve(endpoints.size()); |
| for (const auto& item : endpoints) { |
| if (item) { |
| endpoint_ids.emplace_back(item->id); |
| } |
| } |
| return endpoint_ids; |
| } |
| |
| std::string PayloadManager::ToString(const Endpoints& endpoints) { |
| std::string endpoints_string = absl::StrCat(endpoints.size(), ": "); |
| bool first = true; |
| for (const auto& item : endpoints) { |
| if (first) { |
| absl::StrAppend(&endpoints_string, item->id); |
| first = false; |
| } else { |
| absl::StrAppend(&endpoints_string, ", ", item->id); |
| } |
| } |
| return endpoints_string; |
| } |
| |
| std::string PayloadManager::ToString(const EndpointIds& endpoint_ids) { |
| std::string endpoints_string = absl::StrCat(endpoint_ids.size(), ": "); |
| bool first = true; |
| for (const auto& id : endpoint_ids) { |
| if (first) { |
| absl::StrAppend(&endpoints_string, id); |
| first = false; |
| } else { |
| absl::StrAppend(&endpoints_string, ", ", id); |
| } |
| } |
| return endpoints_string; |
| } |
| |
| // Creates and starts tracking a PendingPayload for this Payload. |
| Payload::Id PayloadManager::CreateOutgoingPayload( |
| Payload payload, const EndpointIds& endpoint_ids) { |
| auto internal_payload{CreateOutgoingInternalPayload(std::move(payload))}; |
| Payload::Id payload_id = internal_payload->GetId(); |
| NEARBY_LOG(INFO, "CreateOutgoingPayload: payload_id=%x" PRIx64, |
| static_cast<std::int64_t>(payload_id)); |
| MutexLock lock(&mutex_); |
| pending_payloads_.StartTrackingPayload( |
| payload_id, absl::make_unique<PendingPayload>(std::move(internal_payload), |
| endpoint_ids, |
| /*is_incoming=*/false)); |
| |
| return payload_id; |
| } |
| |
| PayloadManager::PayloadManager(EndpointManager& endpoint_manager) |
| : endpoint_manager_(&endpoint_manager) { |
| endpoint_manager_->RegisterFrameProcessor(V1Frame::PAYLOAD_TRANSFER, this); |
| } |
| |
| void PayloadManager::CancelAllPayloads() { |
| NEARBY_LOG(INFO, "PayloadManager: canceling payloads; self=%p", this); |
| { |
| MutexLock lock(&mutex_); |
| int pending_outgoing_payloads = 0; |
| for (const auto& pending_id : pending_payloads_.GetAllPayloads()) { |
| auto* pending = pending_payloads_.GetPayload(pending_id); |
| if (!pending->IsIncoming()) pending_outgoing_payloads++; |
| pending->MarkLocallyCanceled(); |
| pending->Close(); // To unblock the sender thread, if there is no data. |
| } |
| if (pending_outgoing_payloads) { |
| shutdown_barrier_ = |
| absl::make_unique<CountDownLatch>(pending_outgoing_payloads); |
| } |
| } |
| |
| if (shutdown_barrier_) { |
| NEARBY_LOG(INFO, |
| "PayloadManager: waiting for pending outgoing payloads; self=%p", |
| this); |
| shutdown_barrier_->Await(); |
| } |
| } |
| |
| void PayloadManager::DisconnectFromEndpointManager() { |
| if (shutdown_.Set(true)) return; |
| // Unregister ourselves from the FrameProcessors. |
| endpoint_manager_->UnregisterFrameProcessor(V1Frame::PAYLOAD_TRANSFER, this); |
| } |
| |
| PayloadManager::~PayloadManager() { |
| NEARBY_LOG(INFO, "PayloadManager: going down; self=%p", this); |
| DisconnectFromEndpointManager(); |
| CancelAllPayloads(); |
| NEARBY_LOG(INFO, "PayloadManager: turn down payload executors; self=%p", |
| this); |
| bytes_payload_executor_.Shutdown(); |
| stream_payload_executor_.Shutdown(); |
| file_payload_executor_.Shutdown(); |
| |
| CountDownLatch stop_latch(1); |
| // Clear our tracked pending payloads. |
| RunOnStatusUpdateThread( |
| "~payload-manager", |
| [this, &stop_latch]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| NEARBY_LOG(INFO, "PayloadManager: stop tracking payloads; self=%p", |
| this); |
| MutexLock lock(&mutex_); |
| for (const auto& pending_id : pending_payloads_.GetAllPayloads()) { |
| pending_payloads_.StopTrackingPayload(pending_id); |
| } |
| stop_latch.CountDown(); |
| }); |
| stop_latch.Await(); |
| |
| NEARBY_LOG(INFO, "PayloadManager: turn down notification executor; self=%p", |
| this); |
| // Stop all the ongoing Runnables (as gracefully as possible). |
| payload_status_update_executor_.Shutdown(); |
| |
| NEARBY_LOG(INFO, "PayloadManager: down; self=%p", this); |
| } |
| |
| bool PayloadManager::NotifyShutdown() { |
| MutexLock lock(&mutex_); |
| if (!shutdown_.Get()) return false; |
| if (!shutdown_barrier_) return false; |
| NEARBY_LOG(INFO, "PayloadManager [shutdown mode]"); |
| shutdown_barrier_->CountDown(); |
| return true; |
| } |
| |
| void PayloadManager::SendPayload(ClientProxy* client, |
| const EndpointIds& endpoint_ids, |
| Payload payload) { |
| if (shutdown_.Get()) return; |
| NEARBY_LOG(INFO, "SendPayload: endpoint_ids={%s}", |
| ToString(endpoint_ids).c_str()); |
| auto executor = GetOutgoingPayloadExecutor(payload.GetType()); |
| // The |executor| will be null if the payload is of a type we cannot work |
| // with. This should never be reached since the ServiceControllerRouter has |
| // already checked whether or not we can work with this Payload type. |
| if (!executor) { |
| NEARBY_LOG(INFO, |
| "PayloadManager failed to determine the right executor for " |
| "outgoing payload_id=%" PRIx64 ", payload_type=%d", |
| static_cast<std::int64_t>(payload.GetId()), payload.GetType()); |
| return; |
| } |
| |
| // Each payload is sent in FCFS order within each Payload type, blocking any |
| // other payload of the same type from even starting until this one is |
| // completely done with. If we ever want to provide isolation across |
| // ClientProxy objects this will need to be significantly re-architected. |
| Payload::Type payload_type = payload.GetType(); |
| Payload::Id payload_id = |
| CreateOutgoingPayload(std::move(payload), endpoint_ids); |
| executor->Execute("send-payload", [this, client, endpoint_ids, payload_id, |
| payload_type]() { |
| if (shutdown_.Get()) return; |
| PendingPayload* pending_payload = GetPayload(payload_id); |
| if (!pending_payload) { |
| NEARBY_LOG(INFO, |
| "PayloadManager failed to create InternalPayload for outgoing " |
| "payload_id=%x" PRIx64 |
| ", payload_type=%d, aborting sendPayload().", |
| static_cast<std::int64_t>(payload_id), payload_type); |
| return; |
| } |
| auto* internal_payload = pending_payload->GetInternalPayload(); |
| if (!internal_payload) return; |
| PayloadTransferFrame::PayloadHeader payload_header{ |
| CreatePayloadHeader(*internal_payload)}; |
| bool should_continue = true; |
| std::int64_t next_chunk_offset = 0; |
| while (should_continue && !shutdown_.Get()) { |
| should_continue = SendPayloadLoop(client, *pending_payload, |
| payload_header, next_chunk_offset); |
| } |
| RunOnStatusUpdateThread("destroy-payload", |
| [this, payload_id]() |
| RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| DestroyPendingPayload(payload_id); |
| }); |
| }); |
| NEARBY_LOG(INFO, |
| "PayloadManager: xfer scheduled: self=%p; payload_id=%x" PRIx64 |
| ", payload_type=%d", |
| this, static_cast<std::int64_t>(payload_id), payload_type); |
| } |
| |
| PayloadManager::PendingPayload* PayloadManager::GetPayload( |
| Payload::Id payload_id) const { |
| MutexLock lock(&mutex_); |
| return pending_payloads_.GetPayload(payload_id); |
| } |
| |
| Status PayloadManager::CancelPayload(ClientProxy* client, |
| Payload::Id payload_id) { |
| PendingPayload* canceled_payload = GetPayload(payload_id); |
| if (!canceled_payload) { |
| NEARBY_LOG(INFO, |
| "Client requested cancel for unknown payload_id=%x" PRIx64 |
| ", ignoring.", |
| static_cast<std::int64_t>(payload_id)); |
| return {Status::kPayloadUnknown}; |
| } |
| |
| // Mark the payload as canceled. |
| canceled_payload->MarkLocallyCanceled(); |
| NEARBY_LOG(INFO, |
| "Cancelling %s payload_id=%x" PRIx64 " at request of client.", |
| (canceled_payload->IsIncoming() ? "incoming" : "outgoing"), |
| static_cast<std::int64_t>(payload_id)); |
| |
| // Return SUCCESS immediately. Remaining cleanup and updates will be sent in |
| // SendPayload() or OnIncomingFrame() |
| return {Status::kSuccess}; |
| } |
| |
| // @EndpointManagerDataPool |
| void PayloadManager::OnIncomingFrame( |
| OfflineFrame& offline_frame, const std::string& from_endpoint_id, |
| ClientProxy* to_client, proto::connections::Medium current_medium) { |
| PayloadTransferFrame& frame = |
| *offline_frame.mutable_v1()->mutable_payload_transfer(); |
| |
| switch (frame.packet_type()) { |
| case PayloadTransferFrame::CONTROL: |
| NEARBY_LOG( |
| INFO, |
| "PayloadManager::OnIncomingFrame [CONTROL]: self=%p; endpoint_id=%s", |
| this, from_endpoint_id.c_str()); |
| ProcessControlPacket(to_client, from_endpoint_id, frame); |
| break; |
| case PayloadTransferFrame::DATA: |
| ProcessDataPacket(to_client, from_endpoint_id, frame); |
| break; |
| default: |
| NEARBY_LOG(WARNING, |
| "PayloadManager: invalid frame; remote endpoint: self=%p; " |
| "endpoint_id=%s", |
| this, from_endpoint_id.c_str()); |
| break; |
| } |
| } |
| |
| void PayloadManager::OnEndpointDisconnect(ClientProxy* client, |
| const std::string& endpoint_id, |
| CountDownLatch barrier) { |
| if (shutdown_.Get()) { |
| barrier.CountDown(); |
| return; |
| } |
| RunOnStatusUpdateThread( |
| "payload-manager-on-disconnect", |
| [this, client, endpoint_id, barrier]() |
| RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() mutable { |
| // Iterate through all our payloads and look for payloads associated |
| // with this endpoint. |
| MutexLock lock(&mutex_); |
| for (const auto& payload_id : pending_payloads_.GetAllPayloads()) { |
| auto* pending_payload = pending_payloads_.GetPayload(payload_id); |
| if (!pending_payload) continue; |
| auto endpoint_info = pending_payload->GetEndpoint(endpoint_id); |
| if (!endpoint_info) continue; |
| std::int64_t endpoint_offset = endpoint_info->offset; |
| // Stop tracking the endpoint for this payload. |
| pending_payload->RemoveEndpoints({endpoint_id}); |
| // |endpoint_info| is longer valid after calling RemoveEndpoints. |
| endpoint_info = nullptr; |
| |
| std::int64_t payload_total_size = |
| pending_payload->GetInternalPayload()->GetTotalSize(); |
| |
| // If no endpoints are left for this payload, close it. |
| if (pending_payload->GetEndpoints().empty()) { |
| pending_payload->Close(); |
| } |
| |
| // Create the payload transfer update. |
| PayloadProgressInfo update{payload_id, |
| PayloadProgressInfo::Status::kFailure, |
| payload_total_size, endpoint_offset}; |
| |
| // Send a client notification of a payload transfer failure. |
| client->OnPayloadProgress(endpoint_id, update); |
| } |
| |
| barrier.CountDown(); |
| }); |
| } |
| |
| proto::connections::PayloadStatus |
| PayloadManager::EndpointInfoStatusToPayloadStatus(EndpointInfo::Status status) { |
| switch (status) { |
| case EndpointInfo::Status::kCanceled: |
| return proto::connections::PayloadStatus::REMOTE_CANCELLATION; |
| case EndpointInfo::Status::kError: |
| return proto::connections::PayloadStatus::REMOTE_ERROR; |
| case EndpointInfo::Status::kAvailable: |
| return proto::connections::PayloadStatus::SUCCESS; |
| default: |
| NEARBY_LOG( |
| INFO, |
| "PayloadManager: Unknown PayloadStatus for EndpointInfo.Status=%d", |
| status); |
| return proto::connections::PayloadStatus::UNKNOWN_PAYLOAD_STATUS; |
| } |
| } |
| |
| proto::connections::PayloadStatus |
| PayloadManager::ControlMessageEventToPayloadStatus( |
| PayloadTransferFrame::ControlMessage::EventType event) { |
| switch (event) { |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR: |
| return proto::connections::PayloadStatus::REMOTE_ERROR; |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED: |
| return proto::connections::PayloadStatus::REMOTE_CANCELLATION; |
| default: |
| NEARBY_LOG(INFO, "PayloadManager: unknown event=%d", event); |
| return proto::connections::PayloadStatus::UNKNOWN_PAYLOAD_STATUS; |
| } |
| } |
| |
| PayloadProgressInfo::Status PayloadManager::PayloadStatusToTransferUpdateStatus( |
| proto::connections::PayloadStatus status) { |
| switch (status) { |
| case proto::connections::LOCAL_CANCELLATION: |
| case proto::connections::REMOTE_CANCELLATION: |
| return PayloadProgressInfo::Status::kCanceled; |
| case proto::connections::SUCCESS: |
| return PayloadProgressInfo::Status::kSuccess; |
| default: |
| return PayloadProgressInfo::Status::kFailure; |
| } |
| } |
| |
| SingleThreadExecutor* PayloadManager::GetOutgoingPayloadExecutor( |
| Payload::Type payload_type) { |
| switch (payload_type) { |
| case Payload::Type::kBytes: |
| return &bytes_payload_executor_; |
| case Payload::Type::kFile: |
| return &file_payload_executor_; |
| case Payload::Type::kStream: |
| return &stream_payload_executor_; |
| default: |
| return nullptr; |
| } |
| } |
| |
| int PayloadManager::GetOptimalChunkSize(EndpointIds endpoint_ids) { |
| int minChunkSize = std::numeric_limits<int>::max(); |
| for (const auto& endpoint_id : endpoint_ids) { |
| minChunkSize = std::min( |
| minChunkSize, endpoint_manager_->GetMaxTransmitPacketSize(endpoint_id)); |
| } |
| return minChunkSize; |
| } |
| |
| PayloadTransferFrame::PayloadHeader PayloadManager::CreatePayloadHeader( |
| const InternalPayload& internal_payload) { |
| PayloadTransferFrame::PayloadHeader payload_header; |
| |
| payload_header.set_id(internal_payload.GetId()); |
| payload_header.set_type(internal_payload.GetType()); |
| payload_header.set_total_size(internal_payload.GetTotalSize()); |
| |
| return payload_header; |
| } |
| |
| PayloadTransferFrame::PayloadChunk PayloadManager::CreatePayloadChunk( |
| std::int64_t payload_chunk_offset, ByteArray payload_chunk_body) { |
| PayloadTransferFrame::PayloadChunk payload_chunk; |
| |
| payload_chunk.set_offset(payload_chunk_offset); |
| payload_chunk.set_flags(0); |
| if (!payload_chunk_body.Empty()) { |
| payload_chunk.set_body(std::string(std::move(payload_chunk_body))); |
| } else { |
| payload_chunk.set_flags(payload_chunk.flags() | |
| PayloadTransferFrame::PayloadChunk::LAST_CHUNK); |
| } |
| |
| return payload_chunk; |
| } |
| |
| PayloadManager::PendingPayload* PayloadManager::CreateIncomingPayload( |
| const PayloadTransferFrame& frame, const std::string& endpoint_id) { |
| auto internal_payload = CreateIncomingInternalPayload(frame); |
| if (!internal_payload) { |
| return nullptr; |
| } |
| |
| Payload::Id payload_id = internal_payload->GetId(); |
| NEARBY_LOG(INFO, "CreateIncomingPayload: payload_id=%x" PRIx64, |
| static_cast<std::int64_t>(payload_id)); |
| MutexLock lock(&mutex_); |
| pending_payloads_.StartTrackingPayload( |
| payload_id, |
| absl::make_unique<PendingPayload>(std::move(internal_payload), |
| EndpointIds{endpoint_id}, true)); |
| |
| return pending_payloads_.GetPayload(payload_id); |
| } |
| |
| void PayloadManager::SendClientCallbacksForFinishedOutgoingPayload( |
| ClientProxy* client, const EndpointIds& finished_endpoint_ids, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t num_bytes_successfully_transferred, |
| proto::connections::PayloadStatus status) { |
| RunOnStatusUpdateThread( |
| "outgoing-payload-callbacks", |
| [this, client, finished_endpoint_ids, payload_header, |
| num_bytes_successfully_transferred, |
| status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| // Make sure we're still tracking this payload. |
| PendingPayload* pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload) { |
| return; |
| } |
| |
| PayloadProgressInfo update{ |
| payload_header.id(), |
| PayloadManager::PayloadStatusToTransferUpdateStatus(status), |
| payload_header.total_size(), num_bytes_successfully_transferred}; |
| for (const auto& endpoint_id : finished_endpoint_ids) { |
| // Skip sending notifications if we have stopped tracking this |
| // endpoint. |
| if (!pending_payload->GetEndpoint(endpoint_id)) { |
| continue; |
| } |
| |
| // Notify the client. |
| client->OnPayloadProgress(endpoint_id, update); |
| } |
| |
| // Remove these endpoints from our tracking list for this payload. |
| pending_payload->RemoveEndpoints(finished_endpoint_ids); |
| |
| // Close the payload if no endpoints remain. |
| if (pending_payload->GetEndpoints().empty()) { |
| pending_payload->Close(); |
| } |
| }); |
| } |
| |
| void PayloadManager::SendClientCallbacksForFinishedIncomingPayload( |
| ClientProxy* client, const std::string& endpoint_id, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t offset_bytes, proto::connections::PayloadStatus status) { |
| RunOnStatusUpdateThread( |
| "incoming-payload-callbacks", |
| [this, client, endpoint_id, payload_header, offset_bytes, |
| status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| // Make sure we're still tracking this payload. |
| PendingPayload* pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload) { |
| return; |
| } |
| |
| // Unless we never started tracking this payload (meaning we failed to |
| // even create the InternalPayload), notify the client (and close it). |
| PayloadProgressInfo update{ |
| payload_header.id(), |
| PayloadManager::PayloadStatusToTransferUpdateStatus(status), |
| payload_header.total_size(), offset_bytes}; |
| NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update); |
| DestroyPendingPayload(payload_header.id()); |
| }); |
| } |
| |
| void PayloadManager::SendControlMessage( |
| const EndpointIds& endpoint_ids, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t num_bytes_successfully_transferred, |
| PayloadTransferFrame::ControlMessage::EventType event_type) { |
| PayloadTransferFrame::ControlMessage control_message; |
| control_message.set_event(event_type); |
| control_message.set_offset(num_bytes_successfully_transferred); |
| |
| endpoint_manager_->SendControlMessage(payload_header, control_message, |
| endpoint_ids); |
| } |
| |
| void PayloadManager::HandleFinishedOutgoingPayload( |
| ClientProxy* client, const EndpointIds& finished_endpoint_ids, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t num_bytes_successfully_transferred, |
| proto::connections::PayloadStatus status) { |
| // This call will destroy a pending payload. |
| SendClientCallbacksForFinishedOutgoingPayload( |
| client, finished_endpoint_ids, payload_header, |
| num_bytes_successfully_transferred, status); |
| |
| switch (status) { |
| case proto::connections::PayloadStatus::LOCAL_ERROR: |
| SendControlMessage(finished_endpoint_ids, payload_header, |
| num_bytes_successfully_transferred, |
| PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR); |
| break; |
| case proto::connections::PayloadStatus::LOCAL_CANCELLATION: |
| NEARBY_LOG( |
| INFO, "Sending PAYLOAD_CANCEL to receiver side; payload_id=%x" PRIx64, |
| static_cast<std::int64_t>(payload_header.id())); |
| SendControlMessage( |
| finished_endpoint_ids, payload_header, |
| num_bytes_successfully_transferred, |
| PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED); |
| break; |
| case proto::connections::PayloadStatus::ENDPOINT_IO_ERROR: |
| // Unregister these endpoints, since we had an IO error on the physical |
| // connection. |
| for (const auto& endpoint_id : finished_endpoint_ids) { |
| endpoint_manager_->DiscardEndpoint(client, endpoint_id); |
| } |
| break; |
| case proto::connections::PayloadStatus::REMOTE_ERROR: |
| case proto::connections::PayloadStatus::REMOTE_CANCELLATION: |
| // No special handling needed for these. |
| break; |
| default: |
| NEARBY_LOG(INFO, |
| "PayloadManager: Unhandled finished outgoing payload with " |
| "payload_status=%d", |
| status); |
| break; |
| } |
| } |
| |
| void PayloadManager::HandleFinishedIncomingPayload( |
| ClientProxy* client, const std::string& endpoint_id, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int64_t offset_bytes, proto::connections::PayloadStatus status) { |
| SendClientCallbacksForFinishedIncomingPayload( |
| client, endpoint_id, payload_header, offset_bytes, status); |
| |
| switch (status) { |
| case proto::connections::PayloadStatus::LOCAL_ERROR: |
| SendControlMessage({endpoint_id}, payload_header, offset_bytes, |
| PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR); |
| break; |
| case proto::connections::PayloadStatus::LOCAL_CANCELLATION: |
| SendControlMessage( |
| {endpoint_id}, payload_header, offset_bytes, |
| PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED); |
| break; |
| default: |
| NEARBY_LOG(INFO, |
| "Unhandled finished incoming payload_id=%x" PRIx64 |
| " with payload_status=%d!", |
| static_cast<std::int64_t>(payload_header.id()), status); |
| break; |
| } |
| } |
| |
| void PayloadManager::HandleSuccessfulOutgoingChunk( |
| ClientProxy* client, const std::string& endpoint_id, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset, |
| std::int64_t payload_chunk_body_size) { |
| RunOnStatusUpdateThread( |
| "outgoing-chunk-success", |
| [this, client, endpoint_id, payload_header, payload_chunk_flags, |
| payload_chunk_offset, |
| payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| // Make sure we're still tracking this payload and its associated |
| // endpoint. |
| PendingPayload* pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload || !pending_payload->GetEndpoint(endpoint_id)) { |
| NEARBY_LOG(INFO, |
| "HandleSuccessfulOutgoingChunk: endpoint not found: " |
| "endpoint_id=%s", |
| endpoint_id.c_str()); |
| return; |
| } |
| |
| bool is_last_chunk = |
| (payload_chunk_flags & |
| PayloadTransferFrame::PayloadChunk::LAST_CHUNK) != 0; |
| PayloadProgressInfo update{ |
| payload_header.id(), |
| is_last_chunk ? PayloadProgressInfo::Status::kSuccess |
| : PayloadProgressInfo::Status::kInProgress, |
| payload_header.total_size(), |
| is_last_chunk ? payload_chunk_offset |
| : payload_chunk_offset + payload_chunk_body_size}; |
| |
| // Notify the client. |
| client->OnPayloadProgress(endpoint_id, update); |
| |
| if (is_last_chunk) { |
| // Stop tracking this endpoint. |
| pending_payload->RemoveEndpoints({endpoint_id}); |
| |
| // Close the payload if no endpoints remain. |
| if (pending_payload->GetEndpoints().empty()) { |
| pending_payload->Close(); |
| } |
| } |
| }); |
| } |
| |
| // @PayloadManagerStatusUpdateThread |
| void PayloadManager::DestroyPendingPayload(Payload::Id payload_id) { |
| bool is_incoming = false; |
| { |
| MutexLock lock(&mutex_); |
| auto pending = pending_payloads_.StopTrackingPayload(payload_id); |
| if (!pending) return; |
| is_incoming = pending->IsIncoming(); |
| const char* direction = is_incoming ? "incoming" : "outgoing"; |
| NEARBY_LOG(INFO, |
| "PayloadManager: destroying %s pending payload: " |
| "self=%p; payload_id=%x" PRIx64, |
| direction, this, static_cast<std::int64_t>(payload_id)); |
| pending->Close(); |
| pending.reset(); |
| } |
| if (!is_incoming) NotifyShutdown(); |
| } |
| |
| void PayloadManager::HandleSuccessfulIncomingChunk( |
| ClientProxy* client, const std::string& endpoint_id, |
| const PayloadTransferFrame::PayloadHeader& payload_header, |
| std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset, |
| std::int64_t payload_chunk_body_size) { |
| RunOnStatusUpdateThread( |
| "incoming-chunk-success", |
| [this, client, endpoint_id, payload_header, payload_chunk_flags, |
| payload_chunk_offset, |
| payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| // Make sure we're still tracking this payload. |
| PendingPayload* pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload) { |
| return; |
| } |
| |
| bool is_last_chunk = |
| (payload_chunk_flags & |
| PayloadTransferFrame::PayloadChunk::LAST_CHUNK) != 0; |
| PayloadProgressInfo update{ |
| payload_header.id(), |
| is_last_chunk ? PayloadProgressInfo::Status::kSuccess |
| : PayloadProgressInfo::Status::kInProgress, |
| payload_header.total_size(), |
| is_last_chunk ? payload_chunk_offset |
| : payload_chunk_offset + payload_chunk_body_size}; |
| |
| // Notify the client of this update. |
| NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update); |
| }); |
| } |
| |
| // @EndpointManagerDataPool |
| void PayloadManager::ProcessDataPacket( |
| ClientProxy* to_client, const std::string& from_endpoint_id, |
| PayloadTransferFrame& payload_transfer_frame) { |
| PayloadTransferFrame::PayloadHeader& payload_header = |
| *payload_transfer_frame.mutable_payload_header(); |
| PayloadTransferFrame::PayloadChunk& payload_chunk = |
| *payload_transfer_frame.mutable_payload_chunk(); |
| NEARBY_LOG(VERBOSE, |
| "PayloadManager got data OfflineFrame for payload_id=%x" PRIx64 |
| " from endpoint_id=%s at offset %x" PRIx64, |
| static_cast<std::int64_t>(payload_header.id()), |
| from_endpoint_id.c_str(), payload_chunk.offset()); |
| |
| PendingPayload* pending_payload; |
| if (payload_chunk.offset() == 0) { |
| pending_payload = |
| CreateIncomingPayload(payload_transfer_frame, from_endpoint_id); |
| if (!pending_payload) { |
| NEARBY_LOG(WARNING, |
| "PayloadManager failed to create InternalPayload from " |
| "PayloadTransferFrame with ID %x" PRIx64 |
| " and type %d, aborting receipt.", |
| static_cast<std::int64_t>(payload_header.id()), |
| payload_header.type()); |
| // Send the error to the remote endpoint. |
| SendControlMessage({from_endpoint_id}, payload_header, |
| payload_chunk.offset(), |
| PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR); |
| return; |
| } |
| |
| // Also, let the client know of this new incoming payload. |
| RunOnStatusUpdateThread( |
| "process-data-packet", |
| [to_client, from_endpoint_id, pending_payload]() |
| RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() { |
| NEARBY_LOG(INFO, |
| "PayloadManager received new payload_id=%x" PRIx64 |
| " from endpoint_id=%s", |
| static_cast<std::int64_t>( |
| pending_payload->GetInternalPayload()->GetId()), |
| from_endpoint_id.c_str()); |
| to_client->OnPayload( |
| from_endpoint_id, |
| pending_payload->GetInternalPayload()->ReleasePayload()); |
| }); |
| } else { |
| pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload) { |
| NEARBY_LOG( |
| WARNING, |
| "ProcessDataPacket: [missing] endpoint_id=%s; payload_id=%x" PRIx64, |
| from_endpoint_id.c_str(), |
| static_cast<std::int64_t>(payload_header.id())); |
| return; |
| } |
| } |
| |
| if (pending_payload->IsLocallyCanceled()) { |
| // This incoming payload was canceled by the client. Drop this frame and do |
| // all the cleanup. See go/nc-cancel-payload |
| NEARBY_LOG( |
| INFO, |
| "ProcessDataPacket: [cancel] endpoint_id=%s; payload_id=%x" PRIx64, |
| from_endpoint_id.c_str(), |
| static_cast<std::int64_t>(pending_payload->GetId())); |
| HandleFinishedIncomingPayload( |
| to_client, from_endpoint_id, payload_header, payload_chunk.offset(), |
| proto::connections::PayloadStatus::LOCAL_CANCELLATION); |
| return; |
| } |
| |
| // Update the offset for this payload. An endpoint disconnection might occur |
| // from another thread and we would need to know the current offset to report |
| // back to the client. For the sake of accuracy, we update the pending payload |
| // here because it's after all payload terminating events are handled, but |
| // right before we actually start attaching the next chunk. |
| pending_payload->SetOffsetForEndpoint(from_endpoint_id, |
| payload_chunk.offset()); |
| |
| // Save size of packet before we move it. |
| std::int64_t payload_body_size = payload_chunk.body().size(); |
| if (pending_payload->GetInternalPayload() |
| ->AttachNextChunk(ByteArray(std::move(*payload_chunk.mutable_body()))) |
| .Raised()) { |
| NEARBY_LOG( |
| ERROR, |
| "ProcessDataPacket: [data: error] endpoint_id=%s; payload_id=%x" PRIx64, |
| from_endpoint_id.c_str(), |
| static_cast<std::int64_t>(pending_payload->GetId())); |
| HandleFinishedIncomingPayload( |
| to_client, from_endpoint_id, payload_header, payload_chunk.offset(), |
| proto::connections::PayloadStatus::LOCAL_ERROR); |
| return; |
| } |
| |
| HandleSuccessfulIncomingChunk(to_client, from_endpoint_id, payload_header, |
| payload_chunk.flags(), payload_chunk.offset(), |
| payload_body_size); |
| } |
| |
| // @EndpointManagerDataPool |
| void PayloadManager::ProcessControlPacket( |
| ClientProxy* to_client, const std::string& from_endpoint_id, |
| PayloadTransferFrame& payload_transfer_frame) { |
| const PayloadTransferFrame::PayloadHeader& payload_header = |
| payload_transfer_frame.payload_header(); |
| const PayloadTransferFrame::ControlMessage& control_message = |
| payload_transfer_frame.control_message(); |
| PendingPayload* pending_payload = GetPayload(payload_header.id()); |
| if (!pending_payload) { |
| NEARBY_LOG(INFO, |
| "Got ControlMessage for unknown payload_id=%x" PRIx64 |
| ", ignoring: %d", |
| static_cast<std::int64_t>(payload_header.id()), |
| control_message.event()); |
| return; |
| } |
| |
| switch (control_message.event()) { |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED: |
| if (pending_payload->IsIncoming()) { |
| NEARBY_LOG(INFO, |
| "Incoming PAYLOAD_CANCELED: from endpoint_id=%s; self=%p", |
| from_endpoint_id.c_str(), this); |
| // No need to mark the pending payload as cancelled, since this is a |
| // remote cancellation for an incoming payload -- we handle everything |
| // inline here. |
| HandleFinishedIncomingPayload( |
| to_client, from_endpoint_id, payload_header, |
| control_message.offset(), |
| ControlMessageEventToPayloadStatus(control_message.event())); |
| } else { |
| NEARBY_LOG(INFO, |
| "Outgoing PAYLOAD_CANCELED: from endpoint_id=%s; self=%p", |
| from_endpoint_id.c_str(), this); |
| // Mark the payload as canceled *for this endpoint*. |
| pending_payload->SetEndpointStatusFromControlMessage(from_endpoint_id, |
| control_message); |
| } |
| NEARBY_LOG(VERBOSE, |
| "Marked %s payload_id=" PRIx64 |
| " as canceled at request of endpoint_id=%s.", |
| (pending_payload->IsIncoming() ? "incoming" : "outgoing"), |
| static_cast<std::int64_t>( |
| pending_payload->GetInternalPayload()->GetId()), |
| from_endpoint_id.c_str()); |
| break; |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR: |
| if (pending_payload->IsIncoming()) { |
| HandleFinishedIncomingPayload( |
| to_client, from_endpoint_id, payload_header, |
| control_message.offset(), |
| ControlMessageEventToPayloadStatus(control_message.event())); |
| } else { |
| pending_payload->SetEndpointStatusFromControlMessage(from_endpoint_id, |
| control_message); |
| } |
| break; |
| default: |
| NEARBY_LOG(INFO, "Unhandled control message %d for payload_id= %x" PRIx64, |
| control_message.event(), |
| static_cast<std::int64_t>( |
| pending_payload->GetInternalPayload()->GetId())); |
| break; |
| } |
| } |
| |
| // @PayloadManagerStatusUpdateThread |
| void PayloadManager::NotifyClientOfIncomingPayloadProgressInfo( |
| ClientProxy* client, const std::string& endpoint_id, |
| const PayloadProgressInfo& payload_transfer_update) { |
| client->OnPayloadProgress(endpoint_id, payload_transfer_update); |
| } |
| |
| ///////////////////////////////// EndpointInfo ///////////////////////////////// |
| |
| PayloadManager::EndpointInfo::Status |
| PayloadManager::EndpointInfo::ControlMessageEventToEndpointInfoStatus( |
| PayloadTransferFrame::ControlMessage::EventType event) { |
| switch (event) { |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR: |
| return Status::kError; |
| case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED: |
| return Status::kCanceled; |
| default: |
| NEARBY_LOG(INFO, |
| "Unknown EndpointInfo.Status for ControlMessage.EventType %d!", |
| event); |
| return Status::kUnknown; |
| } |
| } |
| |
| void PayloadManager::EndpointInfo::SetStatusFromControlMessage( |
| const PayloadTransferFrame::ControlMessage& control_message) { |
| status.Set(ControlMessageEventToEndpointInfoStatus(control_message.event())); |
| NEARBY_LOG(VERBOSE, |
| "Marked endpoint %s with status %d based on OOB ControlMessage", |
| id.c_str(), status.Get()); |
| } |
| |
| //////////////////////////////// PendingPayload //////////////////////////////// |
| |
| PayloadManager::PendingPayload::PendingPayload( |
| std::unique_ptr<InternalPayload> internal_payload, |
| const EndpointIds& endpoint_ids, bool is_incoming) |
| : is_incoming_(is_incoming), |
| internal_payload_(std::move(internal_payload)) { |
| // Initially we mark all endpoints as available. |
| // Later on some may become canceled, some may experience data transfer |
| // failures. Any of these situations will cause endpoint to be marked as |
| // unavailable. |
| for (const auto& id : endpoint_ids) { |
| endpoints_.emplace(id, EndpointInfo{ |
| .id = id, |
| .status{EndpointInfo::Status::kAvailable}, |
| }); |
| } |
| } |
| |
| Payload::Id PayloadManager::PendingPayload::GetId() const { |
| return internal_payload_->GetId(); |
| } |
| |
| InternalPayload* PayloadManager::PendingPayload::GetInternalPayload() { |
| return internal_payload_.get(); |
| } |
| |
| bool PayloadManager::PendingPayload::IsLocallyCanceled() const { |
| return is_locally_canceled_.Get(); |
| } |
| |
| void PayloadManager::PendingPayload::MarkLocallyCanceled() { |
| is_locally_canceled_.Set(true); |
| } |
| |
| bool PayloadManager::PendingPayload::IsIncoming() const { return is_incoming_; } |
| |
| std::vector<const PayloadManager::EndpointInfo*> |
| PayloadManager::PendingPayload::GetEndpoints() const { |
| MutexLock lock(&mutex_); |
| |
| std::vector<const EndpointInfo*> result; |
| for (const auto& item : endpoints_) { |
| result.push_back(&item.second); |
| } |
| return result; |
| } |
| |
| PayloadManager::EndpointInfo* PayloadManager::PendingPayload::GetEndpoint( |
| const std::string& endpoint_id) { |
| MutexLock lock(&mutex_); |
| |
| auto it = endpoints_.find(endpoint_id); |
| if (it == endpoints_.end()) { |
| return {}; |
| } |
| |
| return &it->second; |
| } |
| |
| void PayloadManager::PendingPayload::RemoveEndpoints( |
| const EndpointIds& endpoint_ids) { |
| MutexLock lock(&mutex_); |
| |
| for (const auto& id : endpoint_ids) { |
| endpoints_.erase(id); |
| } |
| } |
| |
| void PayloadManager::PendingPayload::SetEndpointStatusFromControlMessage( |
| const std::string& endpoint_id, |
| const PayloadTransferFrame::ControlMessage& control_message) { |
| MutexLock lock(&mutex_); |
| |
| auto item = endpoints_.find(endpoint_id); |
| if (item != endpoints_.end()) { |
| item->second.SetStatusFromControlMessage(control_message); |
| } |
| } |
| |
| void PayloadManager::PendingPayload::SetOffsetForEndpoint( |
| const std::string& endpoint_id, std::int64_t offset) { |
| MutexLock lock(&mutex_); |
| |
| auto item = endpoints_.find(endpoint_id); |
| if (item != endpoints_.end()) { |
| item->second.offset = offset; |
| } |
| } |
| |
| void PayloadManager::PendingPayload::Close() { |
| if (internal_payload_) internal_payload_->Close(); |
| close_event_.CountDown(); |
| } |
| |
| bool PayloadManager::PendingPayload::WaitForClose() { |
| return close_event_.Await(kWaitCloseTimeout).result(); |
| } |
| |
| bool PayloadManager::PendingPayload::IsClosed() { |
| return close_event_.Await(absl::ZeroDuration()).result(); |
| } |
| |
| void PayloadManager::RunOnStatusUpdateThread(const std::string& name, |
| std::function<void()> runnable) { |
| payload_status_update_executor_.Execute(name, std::move(runnable)); |
| } |
| |
| /////////////////////////////// PendingPayloads /////////////////////////////// |
| |
| void PayloadManager::PendingPayloads::StartTrackingPayload( |
| Payload::Id payload_id, std::unique_ptr<PendingPayload> pending_payload) { |
| MutexLock lock(&mutex_); |
| |
| // If the |payload_id| is being re-used, always prefer the newer payload. |
| auto it = pending_payloads_.find(payload_id); |
| if (it != pending_payloads_.end()) { |
| pending_payloads_.erase(payload_id); |
| } |
| auto pair = pending_payloads_.emplace(payload_id, std::move(pending_payload)); |
| NEARBY_LOG(INFO, "StartTrackingPayload: payload_id=%x" PRIx64 "; inserted=%d", |
| static_cast<std::int64_t>(payload_id), pair.second); |
| } |
| |
| std::unique_ptr<PayloadManager::PendingPayload> |
| PayloadManager::PendingPayloads::StopTrackingPayload(Payload::Id payload_id) { |
| MutexLock lock(&mutex_); |
| |
| auto it = pending_payloads_.find(payload_id); |
| if (it == pending_payloads_.end()) return {}; |
| |
| auto item = pending_payloads_.extract(it); |
| return std::move(item.mapped()); |
| } |
| |
| PayloadManager::PendingPayload* PayloadManager::PendingPayloads::GetPayload( |
| Payload::Id payload_id) const { |
| MutexLock lock(&mutex_); |
| |
| auto item = pending_payloads_.find(payload_id); |
| return item != pending_payloads_.end() ? item->second.get() : nullptr; |
| } |
| |
| std::vector<Payload::Id> PayloadManager::PendingPayloads::GetAllPayloads() { |
| MutexLock lock(&mutex_); |
| |
| std::vector<Payload::Id> result; |
| for (const auto& item : pending_payloads_) { |
| result.push_back(item.first); |
| } |
| return result; |
| } |
| |
| } // namespace connections |
| } // namespace nearby |
| } // namespace location |