blob: 9f286978130da5544aae80f541fae990f6d7db49 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "core/internal/payload_manager.h"
#include <algorithm>
#include <cinttypes>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include "core/internal/internal_payload_factory.h"
#include "platform/public/count_down_latch.h"
#include "platform/public/mutex_lock.h"
#include "platform/public/single_thread_executor.h"
#include "platform/public/system_clock.h"
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
namespace location {
namespace nearby {
namespace connections {
// C++14 requires to declare this.
// TODO(apolyudov): remove when migration to c++17 is possible.
constexpr const absl::Duration PayloadManager::kWaitCloseTimeout;
bool PayloadManager::SendPayloadLoop(
ClientProxy* client, PendingPayload& pending_payload,
PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t& next_chunk_offset) {
// in lieu of structured binding:
auto pair = GetAvailableAndUnavailableEndpoints(pending_payload);
const EndpointIds& available_endpoint_ids =
EndpointsToEndpointIds(pair.first);
const Endpoints& unavailable_endpoints = pair.second;
// First, handle any non-available endpoints.
for (const auto& endpoint : unavailable_endpoints) {
HandleFinishedOutgoingPayload(
client, {endpoint->id}, payload_header, next_chunk_offset,
EndpointInfoStatusToPayloadStatus(endpoint->status.Get()));
}
// Update the still-active recipients of this payload.
if (available_endpoint_ids.empty()) {
NEARBY_LOG(INFO,
"PayloadManager short-circuiting payload_id=%" PRIX64
" after sending %" PRIX64
" bytes because none of the endpoints are available anymore.",
pending_payload.GetInternalPayload()->GetId(),
next_chunk_offset);
return false;
}
// Check if the payload has been cancelled by the client and, if so,
// notify the remaining recipients.
if (pending_payload.IsLocallyCanceled()) {
NEARBY_LOG(INFO,
"Aborting send of payload_id=%x" PRIx64 " at offset %x" PRIx64
" since it is marked canceled.",
static_cast<std::int64_t>(
pending_payload.GetInternalPayload()->GetId()),
next_chunk_offset);
HandleFinishedOutgoingPayload(
client, available_endpoint_ids, payload_header, next_chunk_offset,
proto::connections::PayloadStatus::LOCAL_CANCELLATION);
return false;
}
// Update the current offsets for all endpoints still active for this
// payload. For the sake of accuracy, we update the pending payload here
// because it's after all payload terminating events are handled, but
// right before we actually start detaching the next chunk.
for (const auto& endpoint_id : available_endpoint_ids) {
pending_payload.SetOffsetForEndpoint(endpoint_id, next_chunk_offset);
}
// This will block if there is no data to transfer.
// It will resume when new data arrives, or if Close() is called.
int chunk_size = GetOptimalChunkSize(available_endpoint_ids);
ByteArray next_chunk =
pending_payload.GetInternalPayload()->DetachNextChunk(chunk_size);
if (shutdown_.Get()) return false;
// Save chunk size. We'll need it after we move next_chunk.
auto next_chunk_size = next_chunk.size();
if (!next_chunk_size &&
pending_payload.GetInternalPayload()->GetTotalSize() > 0 &&
pending_payload.GetInternalPayload()->GetTotalSize() <
next_chunk_offset) {
NEARBY_LOG(INFO, "Payload xfer failed: payload_id=%x" PRIx64,
static_cast<std::int64_t>(
pending_payload.GetInternalPayload()->GetId()));
HandleFinishedOutgoingPayload(
client, available_endpoint_ids, payload_header, next_chunk_offset,
proto::connections::PayloadStatus::LOCAL_ERROR);
return false;
}
PayloadTransferFrame::PayloadChunk payload_chunk(
CreatePayloadChunk(next_chunk_offset, std::move(next_chunk)));
const EndpointIds& failed_endpoint_ids = endpoint_manager_->SendPayloadChunk(
payload_header, payload_chunk, available_endpoint_ids);
// Check whether at least one endpoint failed.
if (!failed_endpoint_ids.empty()) {
NEARBY_LOG(INFO,
"Payload xfer: endpoints failed: payload_id=%x" PRIx64
"; endpoint_ids={%s}",
static_cast<std::int64_t>(payload_header.id()),
ToString(failed_endpoint_ids).c_str());
HandleFinishedOutgoingPayload(
client, failed_endpoint_ids, payload_header, next_chunk_offset,
proto::connections::PayloadStatus::ENDPOINT_IO_ERROR);
}
// Check whether at least one endpoint succeeded -- if they all failed,
// we'll just go right back to the top of the loop and break out when
// availableEndpointIds is re-synced and found to be empty at that point.
if (failed_endpoint_ids.size() < available_endpoint_ids.size()) {
for (const auto& endpoint_id : available_endpoint_ids) {
if (std::find(failed_endpoint_ids.begin(), failed_endpoint_ids.end(),
endpoint_id) == failed_endpoint_ids.end()) {
HandleSuccessfulOutgoingChunk(
client, endpoint_id, payload_header, payload_chunk.flags(),
payload_chunk.offset(), payload_chunk.body().size());
}
}
NEARBY_LOG(VERBOSE,
"PayloadManager done sending chunk at offset %x" PRIx64
" of payload_id=%x" PRIx64,
next_chunk_offset,
static_cast<std::int64_t>(
pending_payload.GetInternalPayload()->GetId()));
next_chunk_offset += next_chunk_size;
if (!next_chunk_size) {
// That was the last chunk, we're outta here.
NEARBY_LOG(INFO,
"Payload xfer done: payload_id=%x" PRIx64 "; size=%" PRId64,
static_cast<std::int64_t>(
pending_payload.GetInternalPayload()->GetId()),
next_chunk_offset);
return false;
}
}
return true;
}
std::pair<PayloadManager::Endpoints, PayloadManager::Endpoints>
PayloadManager::GetAvailableAndUnavailableEndpoints(
const PendingPayload& pending_payload) {
Endpoints available;
Endpoints unavailable;
for (auto* endpoint_info : pending_payload.GetEndpoints()) {
if (endpoint_info->status.Get() ==
PayloadManager::EndpointInfo::Status::kAvailable) {
available.push_back(endpoint_info);
} else {
unavailable.push_back(endpoint_info);
}
}
return std::make_pair(std::move(available), std::move(unavailable));
}
PayloadManager::EndpointIds PayloadManager::EndpointsToEndpointIds(
const Endpoints& endpoints) {
EndpointIds endpoint_ids;
endpoint_ids.reserve(endpoints.size());
for (const auto& item : endpoints) {
if (item) {
endpoint_ids.emplace_back(item->id);
}
}
return endpoint_ids;
}
std::string PayloadManager::ToString(const Endpoints& endpoints) {
std::string endpoints_string = absl::StrCat(endpoints.size(), ": ");
bool first = true;
for (const auto& item : endpoints) {
if (first) {
absl::StrAppend(&endpoints_string, item->id);
first = false;
} else {
absl::StrAppend(&endpoints_string, ", ", item->id);
}
}
return endpoints_string;
}
std::string PayloadManager::ToString(const EndpointIds& endpoint_ids) {
std::string endpoints_string = absl::StrCat(endpoint_ids.size(), ": ");
bool first = true;
for (const auto& id : endpoint_ids) {
if (first) {
absl::StrAppend(&endpoints_string, id);
first = false;
} else {
absl::StrAppend(&endpoints_string, ", ", id);
}
}
return endpoints_string;
}
// Creates and starts tracking a PendingPayload for this Payload.
Payload::Id PayloadManager::CreateOutgoingPayload(
Payload payload, const EndpointIds& endpoint_ids) {
auto internal_payload{CreateOutgoingInternalPayload(std::move(payload))};
Payload::Id payload_id = internal_payload->GetId();
NEARBY_LOG(INFO, "CreateOutgoingPayload: payload_id=%x" PRIx64,
static_cast<std::int64_t>(payload_id));
MutexLock lock(&mutex_);
pending_payloads_.StartTrackingPayload(
payload_id, absl::make_unique<PendingPayload>(std::move(internal_payload),
endpoint_ids,
/*is_incoming=*/false));
return payload_id;
}
PayloadManager::PayloadManager(EndpointManager& endpoint_manager)
: endpoint_manager_(&endpoint_manager) {
endpoint_manager_->RegisterFrameProcessor(V1Frame::PAYLOAD_TRANSFER, this);
}
void PayloadManager::CancelAllPayloads() {
NEARBY_LOG(INFO, "PayloadManager: canceling payloads; self=%p", this);
{
MutexLock lock(&mutex_);
int pending_outgoing_payloads = 0;
for (const auto& pending_id : pending_payloads_.GetAllPayloads()) {
auto* pending = pending_payloads_.GetPayload(pending_id);
if (!pending->IsIncoming()) pending_outgoing_payloads++;
pending->MarkLocallyCanceled();
pending->Close(); // To unblock the sender thread, if there is no data.
}
if (pending_outgoing_payloads) {
shutdown_barrier_ =
absl::make_unique<CountDownLatch>(pending_outgoing_payloads);
}
}
if (shutdown_barrier_) {
NEARBY_LOG(INFO,
"PayloadManager: waiting for pending outgoing payloads; self=%p",
this);
shutdown_barrier_->Await();
}
}
void PayloadManager::DisconnectFromEndpointManager() {
if (shutdown_.Set(true)) return;
// Unregister ourselves from the FrameProcessors.
endpoint_manager_->UnregisterFrameProcessor(V1Frame::PAYLOAD_TRANSFER, this);
}
PayloadManager::~PayloadManager() {
NEARBY_LOG(INFO, "PayloadManager: going down; self=%p", this);
DisconnectFromEndpointManager();
CancelAllPayloads();
NEARBY_LOG(INFO, "PayloadManager: turn down payload executors; self=%p",
this);
bytes_payload_executor_.Shutdown();
stream_payload_executor_.Shutdown();
file_payload_executor_.Shutdown();
CountDownLatch stop_latch(1);
// Clear our tracked pending payloads.
RunOnStatusUpdateThread(
"~payload-manager",
[this, &stop_latch]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
NEARBY_LOG(INFO, "PayloadManager: stop tracking payloads; self=%p",
this);
MutexLock lock(&mutex_);
for (const auto& pending_id : pending_payloads_.GetAllPayloads()) {
pending_payloads_.StopTrackingPayload(pending_id);
}
stop_latch.CountDown();
});
stop_latch.Await();
NEARBY_LOG(INFO, "PayloadManager: turn down notification executor; self=%p",
this);
// Stop all the ongoing Runnables (as gracefully as possible).
payload_status_update_executor_.Shutdown();
NEARBY_LOG(INFO, "PayloadManager: down; self=%p", this);
}
bool PayloadManager::NotifyShutdown() {
MutexLock lock(&mutex_);
if (!shutdown_.Get()) return false;
if (!shutdown_barrier_) return false;
NEARBY_LOG(INFO, "PayloadManager [shutdown mode]");
shutdown_barrier_->CountDown();
return true;
}
void PayloadManager::SendPayload(ClientProxy* client,
const EndpointIds& endpoint_ids,
Payload payload) {
if (shutdown_.Get()) return;
NEARBY_LOG(INFO, "SendPayload: endpoint_ids={%s}",
ToString(endpoint_ids).c_str());
auto executor = GetOutgoingPayloadExecutor(payload.GetType());
// The |executor| will be null if the payload is of a type we cannot work
// with. This should never be reached since the ServiceControllerRouter has
// already checked whether or not we can work with this Payload type.
if (!executor) {
NEARBY_LOG(INFO,
"PayloadManager failed to determine the right executor for "
"outgoing payload_id=%" PRIx64 ", payload_type=%d",
static_cast<std::int64_t>(payload.GetId()), payload.GetType());
return;
}
// Each payload is sent in FCFS order within each Payload type, blocking any
// other payload of the same type from even starting until this one is
// completely done with. If we ever want to provide isolation across
// ClientProxy objects this will need to be significantly re-architected.
Payload::Type payload_type = payload.GetType();
Payload::Id payload_id =
CreateOutgoingPayload(std::move(payload), endpoint_ids);
executor->Execute("send-payload", [this, client, endpoint_ids, payload_id,
payload_type]() {
if (shutdown_.Get()) return;
PendingPayload* pending_payload = GetPayload(payload_id);
if (!pending_payload) {
NEARBY_LOG(INFO,
"PayloadManager failed to create InternalPayload for outgoing "
"payload_id=%x" PRIx64
", payload_type=%d, aborting sendPayload().",
static_cast<std::int64_t>(payload_id), payload_type);
return;
}
auto* internal_payload = pending_payload->GetInternalPayload();
if (!internal_payload) return;
PayloadTransferFrame::PayloadHeader payload_header{
CreatePayloadHeader(*internal_payload)};
bool should_continue = true;
std::int64_t next_chunk_offset = 0;
while (should_continue && !shutdown_.Get()) {
should_continue = SendPayloadLoop(client, *pending_payload,
payload_header, next_chunk_offset);
}
RunOnStatusUpdateThread("destroy-payload",
[this, payload_id]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
DestroyPendingPayload(payload_id);
});
});
NEARBY_LOG(INFO,
"PayloadManager: xfer scheduled: self=%p; payload_id=%x" PRIx64
", payload_type=%d",
this, static_cast<std::int64_t>(payload_id), payload_type);
}
PayloadManager::PendingPayload* PayloadManager::GetPayload(
Payload::Id payload_id) const {
MutexLock lock(&mutex_);
return pending_payloads_.GetPayload(payload_id);
}
Status PayloadManager::CancelPayload(ClientProxy* client,
Payload::Id payload_id) {
PendingPayload* canceled_payload = GetPayload(payload_id);
if (!canceled_payload) {
NEARBY_LOG(INFO,
"Client requested cancel for unknown payload_id=%x" PRIx64
", ignoring.",
static_cast<std::int64_t>(payload_id));
return {Status::kPayloadUnknown};
}
// Mark the payload as canceled.
canceled_payload->MarkLocallyCanceled();
NEARBY_LOG(INFO,
"Cancelling %s payload_id=%x" PRIx64 " at request of client.",
(canceled_payload->IsIncoming() ? "incoming" : "outgoing"),
static_cast<std::int64_t>(payload_id));
// Return SUCCESS immediately. Remaining cleanup and updates will be sent in
// SendPayload() or OnIncomingFrame()
return {Status::kSuccess};
}
// @EndpointManagerDataPool
void PayloadManager::OnIncomingFrame(
OfflineFrame& offline_frame, const std::string& from_endpoint_id,
ClientProxy* to_client, proto::connections::Medium current_medium) {
PayloadTransferFrame& frame =
*offline_frame.mutable_v1()->mutable_payload_transfer();
switch (frame.packet_type()) {
case PayloadTransferFrame::CONTROL:
NEARBY_LOG(
INFO,
"PayloadManager::OnIncomingFrame [CONTROL]: self=%p; endpoint_id=%s",
this, from_endpoint_id.c_str());
ProcessControlPacket(to_client, from_endpoint_id, frame);
break;
case PayloadTransferFrame::DATA:
ProcessDataPacket(to_client, from_endpoint_id, frame);
break;
default:
NEARBY_LOG(WARNING,
"PayloadManager: invalid frame; remote endpoint: self=%p; "
"endpoint_id=%s",
this, from_endpoint_id.c_str());
break;
}
}
void PayloadManager::OnEndpointDisconnect(ClientProxy* client,
const std::string& endpoint_id,
CountDownLatch barrier) {
if (shutdown_.Get()) {
barrier.CountDown();
return;
}
RunOnStatusUpdateThread(
"payload-manager-on-disconnect",
[this, client, endpoint_id, barrier]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() mutable {
// Iterate through all our payloads and look for payloads associated
// with this endpoint.
MutexLock lock(&mutex_);
for (const auto& payload_id : pending_payloads_.GetAllPayloads()) {
auto* pending_payload = pending_payloads_.GetPayload(payload_id);
if (!pending_payload) continue;
auto endpoint_info = pending_payload->GetEndpoint(endpoint_id);
if (!endpoint_info) continue;
std::int64_t endpoint_offset = endpoint_info->offset;
// Stop tracking the endpoint for this payload.
pending_payload->RemoveEndpoints({endpoint_id});
// |endpoint_info| is longer valid after calling RemoveEndpoints.
endpoint_info = nullptr;
std::int64_t payload_total_size =
pending_payload->GetInternalPayload()->GetTotalSize();
// If no endpoints are left for this payload, close it.
if (pending_payload->GetEndpoints().empty()) {
pending_payload->Close();
}
// Create the payload transfer update.
PayloadProgressInfo update{payload_id,
PayloadProgressInfo::Status::kFailure,
payload_total_size, endpoint_offset};
// Send a client notification of a payload transfer failure.
client->OnPayloadProgress(endpoint_id, update);
}
barrier.CountDown();
});
}
proto::connections::PayloadStatus
PayloadManager::EndpointInfoStatusToPayloadStatus(EndpointInfo::Status status) {
switch (status) {
case EndpointInfo::Status::kCanceled:
return proto::connections::PayloadStatus::REMOTE_CANCELLATION;
case EndpointInfo::Status::kError:
return proto::connections::PayloadStatus::REMOTE_ERROR;
case EndpointInfo::Status::kAvailable:
return proto::connections::PayloadStatus::SUCCESS;
default:
NEARBY_LOG(
INFO,
"PayloadManager: Unknown PayloadStatus for EndpointInfo.Status=%d",
status);
return proto::connections::PayloadStatus::UNKNOWN_PAYLOAD_STATUS;
}
}
proto::connections::PayloadStatus
PayloadManager::ControlMessageEventToPayloadStatus(
PayloadTransferFrame::ControlMessage::EventType event) {
switch (event) {
case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR:
return proto::connections::PayloadStatus::REMOTE_ERROR;
case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED:
return proto::connections::PayloadStatus::REMOTE_CANCELLATION;
default:
NEARBY_LOG(INFO, "PayloadManager: unknown event=%d", event);
return proto::connections::PayloadStatus::UNKNOWN_PAYLOAD_STATUS;
}
}
PayloadProgressInfo::Status PayloadManager::PayloadStatusToTransferUpdateStatus(
proto::connections::PayloadStatus status) {
switch (status) {
case proto::connections::LOCAL_CANCELLATION:
case proto::connections::REMOTE_CANCELLATION:
return PayloadProgressInfo::Status::kCanceled;
case proto::connections::SUCCESS:
return PayloadProgressInfo::Status::kSuccess;
default:
return PayloadProgressInfo::Status::kFailure;
}
}
SingleThreadExecutor* PayloadManager::GetOutgoingPayloadExecutor(
Payload::Type payload_type) {
switch (payload_type) {
case Payload::Type::kBytes:
return &bytes_payload_executor_;
case Payload::Type::kFile:
return &file_payload_executor_;
case Payload::Type::kStream:
return &stream_payload_executor_;
default:
return nullptr;
}
}
int PayloadManager::GetOptimalChunkSize(EndpointIds endpoint_ids) {
int minChunkSize = std::numeric_limits<int>::max();
for (const auto& endpoint_id : endpoint_ids) {
minChunkSize = std::min(
minChunkSize, endpoint_manager_->GetMaxTransmitPacketSize(endpoint_id));
}
return minChunkSize;
}
PayloadTransferFrame::PayloadHeader PayloadManager::CreatePayloadHeader(
const InternalPayload& internal_payload) {
PayloadTransferFrame::PayloadHeader payload_header;
payload_header.set_id(internal_payload.GetId());
payload_header.set_type(internal_payload.GetType());
payload_header.set_total_size(internal_payload.GetTotalSize());
return payload_header;
}
PayloadTransferFrame::PayloadChunk PayloadManager::CreatePayloadChunk(
std::int64_t payload_chunk_offset, ByteArray payload_chunk_body) {
PayloadTransferFrame::PayloadChunk payload_chunk;
payload_chunk.set_offset(payload_chunk_offset);
payload_chunk.set_flags(0);
if (!payload_chunk_body.Empty()) {
payload_chunk.set_body(std::string(std::move(payload_chunk_body)));
} else {
payload_chunk.set_flags(payload_chunk.flags() |
PayloadTransferFrame::PayloadChunk::LAST_CHUNK);
}
return payload_chunk;
}
PayloadManager::PendingPayload* PayloadManager::CreateIncomingPayload(
const PayloadTransferFrame& frame, const std::string& endpoint_id) {
auto internal_payload = CreateIncomingInternalPayload(frame);
if (!internal_payload) {
return nullptr;
}
Payload::Id payload_id = internal_payload->GetId();
NEARBY_LOG(INFO, "CreateIncomingPayload: payload_id=%x" PRIx64,
static_cast<std::int64_t>(payload_id));
MutexLock lock(&mutex_);
pending_payloads_.StartTrackingPayload(
payload_id,
absl::make_unique<PendingPayload>(std::move(internal_payload),
EndpointIds{endpoint_id}, true));
return pending_payloads_.GetPayload(payload_id);
}
void PayloadManager::SendClientCallbacksForFinishedOutgoingPayload(
ClientProxy* client, const EndpointIds& finished_endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
proto::connections::PayloadStatus status) {
RunOnStatusUpdateThread(
"outgoing-payload-callbacks",
[this, client, finished_endpoint_ids, payload_header,
num_bytes_successfully_transferred,
status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
// Make sure we're still tracking this payload.
PendingPayload* pending_payload = GetPayload(payload_header.id());
if (!pending_payload) {
return;
}
PayloadProgressInfo update{
payload_header.id(),
PayloadManager::PayloadStatusToTransferUpdateStatus(status),
payload_header.total_size(), num_bytes_successfully_transferred};
for (const auto& endpoint_id : finished_endpoint_ids) {
// Skip sending notifications if we have stopped tracking this
// endpoint.
if (!pending_payload->GetEndpoint(endpoint_id)) {
continue;
}
// Notify the client.
client->OnPayloadProgress(endpoint_id, update);
}
// Remove these endpoints from our tracking list for this payload.
pending_payload->RemoveEndpoints(finished_endpoint_ids);
// Close the payload if no endpoints remain.
if (pending_payload->GetEndpoints().empty()) {
pending_payload->Close();
}
});
}
void PayloadManager::SendClientCallbacksForFinishedIncomingPayload(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status) {
RunOnStatusUpdateThread(
"incoming-payload-callbacks",
[this, client, endpoint_id, payload_header, offset_bytes,
status]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
// Make sure we're still tracking this payload.
PendingPayload* pending_payload = GetPayload(payload_header.id());
if (!pending_payload) {
return;
}
// Unless we never started tracking this payload (meaning we failed to
// even create the InternalPayload), notify the client (and close it).
PayloadProgressInfo update{
payload_header.id(),
PayloadManager::PayloadStatusToTransferUpdateStatus(status),
payload_header.total_size(), offset_bytes};
NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
DestroyPendingPayload(payload_header.id());
});
}
void PayloadManager::SendControlMessage(
const EndpointIds& endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
PayloadTransferFrame::ControlMessage::EventType event_type) {
PayloadTransferFrame::ControlMessage control_message;
control_message.set_event(event_type);
control_message.set_offset(num_bytes_successfully_transferred);
endpoint_manager_->SendControlMessage(payload_header, control_message,
endpoint_ids);
}
void PayloadManager::HandleFinishedOutgoingPayload(
ClientProxy* client, const EndpointIds& finished_endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
proto::connections::PayloadStatus status) {
// This call will destroy a pending payload.
SendClientCallbacksForFinishedOutgoingPayload(
client, finished_endpoint_ids, payload_header,
num_bytes_successfully_transferred, status);
switch (status) {
case proto::connections::PayloadStatus::LOCAL_ERROR:
SendControlMessage(finished_endpoint_ids, payload_header,
num_bytes_successfully_transferred,
PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR);
break;
case proto::connections::PayloadStatus::LOCAL_CANCELLATION:
NEARBY_LOG(
INFO, "Sending PAYLOAD_CANCEL to receiver side; payload_id=%x" PRIx64,
static_cast<std::int64_t>(payload_header.id()));
SendControlMessage(
finished_endpoint_ids, payload_header,
num_bytes_successfully_transferred,
PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED);
break;
case proto::connections::PayloadStatus::ENDPOINT_IO_ERROR:
// Unregister these endpoints, since we had an IO error on the physical
// connection.
for (const auto& endpoint_id : finished_endpoint_ids) {
endpoint_manager_->DiscardEndpoint(client, endpoint_id);
}
break;
case proto::connections::PayloadStatus::REMOTE_ERROR:
case proto::connections::PayloadStatus::REMOTE_CANCELLATION:
// No special handling needed for these.
break;
default:
NEARBY_LOG(INFO,
"PayloadManager: Unhandled finished outgoing payload with "
"payload_status=%d",
status);
break;
}
}
void PayloadManager::HandleFinishedIncomingPayload(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status) {
SendClientCallbacksForFinishedIncomingPayload(
client, endpoint_id, payload_header, offset_bytes, status);
switch (status) {
case proto::connections::PayloadStatus::LOCAL_ERROR:
SendControlMessage({endpoint_id}, payload_header, offset_bytes,
PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR);
break;
case proto::connections::PayloadStatus::LOCAL_CANCELLATION:
SendControlMessage(
{endpoint_id}, payload_header, offset_bytes,
PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED);
break;
default:
NEARBY_LOG(INFO,
"Unhandled finished incoming payload_id=%x" PRIx64
" with payload_status=%d!",
static_cast<std::int64_t>(payload_header.id()), status);
break;
}
}
void PayloadManager::HandleSuccessfulOutgoingChunk(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size) {
RunOnStatusUpdateThread(
"outgoing-chunk-success",
[this, client, endpoint_id, payload_header, payload_chunk_flags,
payload_chunk_offset,
payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
// Make sure we're still tracking this payload and its associated
// endpoint.
PendingPayload* pending_payload = GetPayload(payload_header.id());
if (!pending_payload || !pending_payload->GetEndpoint(endpoint_id)) {
NEARBY_LOG(INFO,
"HandleSuccessfulOutgoingChunk: endpoint not found: "
"endpoint_id=%s",
endpoint_id.c_str());
return;
}
bool is_last_chunk =
(payload_chunk_flags &
PayloadTransferFrame::PayloadChunk::LAST_CHUNK) != 0;
PayloadProgressInfo update{
payload_header.id(),
is_last_chunk ? PayloadProgressInfo::Status::kSuccess
: PayloadProgressInfo::Status::kInProgress,
payload_header.total_size(),
is_last_chunk ? payload_chunk_offset
: payload_chunk_offset + payload_chunk_body_size};
// Notify the client.
client->OnPayloadProgress(endpoint_id, update);
if (is_last_chunk) {
// Stop tracking this endpoint.
pending_payload->RemoveEndpoints({endpoint_id});
// Close the payload if no endpoints remain.
if (pending_payload->GetEndpoints().empty()) {
pending_payload->Close();
}
}
});
}
// @PayloadManagerStatusUpdateThread
void PayloadManager::DestroyPendingPayload(Payload::Id payload_id) {
bool is_incoming = false;
{
MutexLock lock(&mutex_);
auto pending = pending_payloads_.StopTrackingPayload(payload_id);
if (!pending) return;
is_incoming = pending->IsIncoming();
const char* direction = is_incoming ? "incoming" : "outgoing";
NEARBY_LOG(INFO,
"PayloadManager: destroying %s pending payload: "
"self=%p; payload_id=%x" PRIx64,
direction, this, static_cast<std::int64_t>(payload_id));
pending->Close();
pending.reset();
}
if (!is_incoming) NotifyShutdown();
}
void PayloadManager::HandleSuccessfulIncomingChunk(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size) {
RunOnStatusUpdateThread(
"incoming-chunk-success",
[this, client, endpoint_id, payload_header, payload_chunk_flags,
payload_chunk_offset,
payload_chunk_body_size]() RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
// Make sure we're still tracking this payload.
PendingPayload* pending_payload = GetPayload(payload_header.id());
if (!pending_payload) {
return;
}
bool is_last_chunk =
(payload_chunk_flags &
PayloadTransferFrame::PayloadChunk::LAST_CHUNK) != 0;
PayloadProgressInfo update{
payload_header.id(),
is_last_chunk ? PayloadProgressInfo::Status::kSuccess
: PayloadProgressInfo::Status::kInProgress,
payload_header.total_size(),
is_last_chunk ? payload_chunk_offset
: payload_chunk_offset + payload_chunk_body_size};
// Notify the client of this update.
NotifyClientOfIncomingPayloadProgressInfo(client, endpoint_id, update);
});
}
// @EndpointManagerDataPool
void PayloadManager::ProcessDataPacket(
ClientProxy* to_client, const std::string& from_endpoint_id,
PayloadTransferFrame& payload_transfer_frame) {
PayloadTransferFrame::PayloadHeader& payload_header =
*payload_transfer_frame.mutable_payload_header();
PayloadTransferFrame::PayloadChunk& payload_chunk =
*payload_transfer_frame.mutable_payload_chunk();
NEARBY_LOG(VERBOSE,
"PayloadManager got data OfflineFrame for payload_id=%x" PRIx64
" from endpoint_id=%s at offset %x" PRIx64,
static_cast<std::int64_t>(payload_header.id()),
from_endpoint_id.c_str(), payload_chunk.offset());
PendingPayload* pending_payload;
if (payload_chunk.offset() == 0) {
pending_payload =
CreateIncomingPayload(payload_transfer_frame, from_endpoint_id);
if (!pending_payload) {
NEARBY_LOG(WARNING,
"PayloadManager failed to create InternalPayload from "
"PayloadTransferFrame with ID %x" PRIx64
" and type %d, aborting receipt.",
static_cast<std::int64_t>(payload_header.id()),
payload_header.type());
// Send the error to the remote endpoint.
SendControlMessage({from_endpoint_id}, payload_header,
payload_chunk.offset(),
PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR);
return;
}
// Also, let the client know of this new incoming payload.
RunOnStatusUpdateThread(
"process-data-packet",
[to_client, from_endpoint_id, pending_payload]()
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() {
NEARBY_LOG(INFO,
"PayloadManager received new payload_id=%x" PRIx64
" from endpoint_id=%s",
static_cast<std::int64_t>(
pending_payload->GetInternalPayload()->GetId()),
from_endpoint_id.c_str());
to_client->OnPayload(
from_endpoint_id,
pending_payload->GetInternalPayload()->ReleasePayload());
});
} else {
pending_payload = GetPayload(payload_header.id());
if (!pending_payload) {
NEARBY_LOG(
WARNING,
"ProcessDataPacket: [missing] endpoint_id=%s; payload_id=%x" PRIx64,
from_endpoint_id.c_str(),
static_cast<std::int64_t>(payload_header.id()));
return;
}
}
if (pending_payload->IsLocallyCanceled()) {
// This incoming payload was canceled by the client. Drop this frame and do
// all the cleanup. See go/nc-cancel-payload
NEARBY_LOG(
INFO,
"ProcessDataPacket: [cancel] endpoint_id=%s; payload_id=%x" PRIx64,
from_endpoint_id.c_str(),
static_cast<std::int64_t>(pending_payload->GetId()));
HandleFinishedIncomingPayload(
to_client, from_endpoint_id, payload_header, payload_chunk.offset(),
proto::connections::PayloadStatus::LOCAL_CANCELLATION);
return;
}
// Update the offset for this payload. An endpoint disconnection might occur
// from another thread and we would need to know the current offset to report
// back to the client. For the sake of accuracy, we update the pending payload
// here because it's after all payload terminating events are handled, but
// right before we actually start attaching the next chunk.
pending_payload->SetOffsetForEndpoint(from_endpoint_id,
payload_chunk.offset());
// Save size of packet before we move it.
std::int64_t payload_body_size = payload_chunk.body().size();
if (pending_payload->GetInternalPayload()
->AttachNextChunk(ByteArray(std::move(*payload_chunk.mutable_body())))
.Raised()) {
NEARBY_LOG(
ERROR,
"ProcessDataPacket: [data: error] endpoint_id=%s; payload_id=%x" PRIx64,
from_endpoint_id.c_str(),
static_cast<std::int64_t>(pending_payload->GetId()));
HandleFinishedIncomingPayload(
to_client, from_endpoint_id, payload_header, payload_chunk.offset(),
proto::connections::PayloadStatus::LOCAL_ERROR);
return;
}
HandleSuccessfulIncomingChunk(to_client, from_endpoint_id, payload_header,
payload_chunk.flags(), payload_chunk.offset(),
payload_body_size);
}
// @EndpointManagerDataPool
void PayloadManager::ProcessControlPacket(
ClientProxy* to_client, const std::string& from_endpoint_id,
PayloadTransferFrame& payload_transfer_frame) {
const PayloadTransferFrame::PayloadHeader& payload_header =
payload_transfer_frame.payload_header();
const PayloadTransferFrame::ControlMessage& control_message =
payload_transfer_frame.control_message();
PendingPayload* pending_payload = GetPayload(payload_header.id());
if (!pending_payload) {
NEARBY_LOG(INFO,
"Got ControlMessage for unknown payload_id=%x" PRIx64
", ignoring: %d",
static_cast<std::int64_t>(payload_header.id()),
control_message.event());
return;
}
switch (control_message.event()) {
case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED:
if (pending_payload->IsIncoming()) {
NEARBY_LOG(INFO,
"Incoming PAYLOAD_CANCELED: from endpoint_id=%s; self=%p",
from_endpoint_id.c_str(), this);
// No need to mark the pending payload as cancelled, since this is a
// remote cancellation for an incoming payload -- we handle everything
// inline here.
HandleFinishedIncomingPayload(
to_client, from_endpoint_id, payload_header,
control_message.offset(),
ControlMessageEventToPayloadStatus(control_message.event()));
} else {
NEARBY_LOG(INFO,
"Outgoing PAYLOAD_CANCELED: from endpoint_id=%s; self=%p",
from_endpoint_id.c_str(), this);
// Mark the payload as canceled *for this endpoint*.
pending_payload->SetEndpointStatusFromControlMessage(from_endpoint_id,
control_message);
}
NEARBY_LOG(VERBOSE,
"Marked %s payload_id=" PRIx64
" as canceled at request of endpoint_id=%s.",
(pending_payload->IsIncoming() ? "incoming" : "outgoing"),
static_cast<std::int64_t>(
pending_payload->GetInternalPayload()->GetId()),
from_endpoint_id.c_str());
break;
case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR:
if (pending_payload->IsIncoming()) {
HandleFinishedIncomingPayload(
to_client, from_endpoint_id, payload_header,
control_message.offset(),
ControlMessageEventToPayloadStatus(control_message.event()));
} else {
pending_payload->SetEndpointStatusFromControlMessage(from_endpoint_id,
control_message);
}
break;
default:
NEARBY_LOG(INFO, "Unhandled control message %d for payload_id= %x" PRIx64,
control_message.event(),
static_cast<std::int64_t>(
pending_payload->GetInternalPayload()->GetId()));
break;
}
}
// @PayloadManagerStatusUpdateThread
void PayloadManager::NotifyClientOfIncomingPayloadProgressInfo(
ClientProxy* client, const std::string& endpoint_id,
const PayloadProgressInfo& payload_transfer_update) {
client->OnPayloadProgress(endpoint_id, payload_transfer_update);
}
///////////////////////////////// EndpointInfo /////////////////////////////////
PayloadManager::EndpointInfo::Status
PayloadManager::EndpointInfo::ControlMessageEventToEndpointInfoStatus(
PayloadTransferFrame::ControlMessage::EventType event) {
switch (event) {
case PayloadTransferFrame::ControlMessage::PAYLOAD_ERROR:
return Status::kError;
case PayloadTransferFrame::ControlMessage::PAYLOAD_CANCELED:
return Status::kCanceled;
default:
NEARBY_LOG(INFO,
"Unknown EndpointInfo.Status for ControlMessage.EventType %d!",
event);
return Status::kUnknown;
}
}
void PayloadManager::EndpointInfo::SetStatusFromControlMessage(
const PayloadTransferFrame::ControlMessage& control_message) {
status.Set(ControlMessageEventToEndpointInfoStatus(control_message.event()));
NEARBY_LOG(VERBOSE,
"Marked endpoint %s with status %d based on OOB ControlMessage",
id.c_str(), status.Get());
}
//////////////////////////////// PendingPayload ////////////////////////////////
PayloadManager::PendingPayload::PendingPayload(
std::unique_ptr<InternalPayload> internal_payload,
const EndpointIds& endpoint_ids, bool is_incoming)
: is_incoming_(is_incoming),
internal_payload_(std::move(internal_payload)) {
// Initially we mark all endpoints as available.
// Later on some may become canceled, some may experience data transfer
// failures. Any of these situations will cause endpoint to be marked as
// unavailable.
for (const auto& id : endpoint_ids) {
endpoints_.emplace(id, EndpointInfo{
.id = id,
.status{EndpointInfo::Status::kAvailable},
});
}
}
Payload::Id PayloadManager::PendingPayload::GetId() const {
return internal_payload_->GetId();
}
InternalPayload* PayloadManager::PendingPayload::GetInternalPayload() {
return internal_payload_.get();
}
bool PayloadManager::PendingPayload::IsLocallyCanceled() const {
return is_locally_canceled_.Get();
}
void PayloadManager::PendingPayload::MarkLocallyCanceled() {
is_locally_canceled_.Set(true);
}
bool PayloadManager::PendingPayload::IsIncoming() const { return is_incoming_; }
std::vector<const PayloadManager::EndpointInfo*>
PayloadManager::PendingPayload::GetEndpoints() const {
MutexLock lock(&mutex_);
std::vector<const EndpointInfo*> result;
for (const auto& item : endpoints_) {
result.push_back(&item.second);
}
return result;
}
PayloadManager::EndpointInfo* PayloadManager::PendingPayload::GetEndpoint(
const std::string& endpoint_id) {
MutexLock lock(&mutex_);
auto it = endpoints_.find(endpoint_id);
if (it == endpoints_.end()) {
return {};
}
return &it->second;
}
void PayloadManager::PendingPayload::RemoveEndpoints(
const EndpointIds& endpoint_ids) {
MutexLock lock(&mutex_);
for (const auto& id : endpoint_ids) {
endpoints_.erase(id);
}
}
void PayloadManager::PendingPayload::SetEndpointStatusFromControlMessage(
const std::string& endpoint_id,
const PayloadTransferFrame::ControlMessage& control_message) {
MutexLock lock(&mutex_);
auto item = endpoints_.find(endpoint_id);
if (item != endpoints_.end()) {
item->second.SetStatusFromControlMessage(control_message);
}
}
void PayloadManager::PendingPayload::SetOffsetForEndpoint(
const std::string& endpoint_id, std::int64_t offset) {
MutexLock lock(&mutex_);
auto item = endpoints_.find(endpoint_id);
if (item != endpoints_.end()) {
item->second.offset = offset;
}
}
void PayloadManager::PendingPayload::Close() {
if (internal_payload_) internal_payload_->Close();
close_event_.CountDown();
}
bool PayloadManager::PendingPayload::WaitForClose() {
return close_event_.Await(kWaitCloseTimeout).result();
}
bool PayloadManager::PendingPayload::IsClosed() {
return close_event_.Await(absl::ZeroDuration()).result();
}
void PayloadManager::RunOnStatusUpdateThread(const std::string& name,
std::function<void()> runnable) {
payload_status_update_executor_.Execute(name, std::move(runnable));
}
/////////////////////////////// PendingPayloads ///////////////////////////////
void PayloadManager::PendingPayloads::StartTrackingPayload(
Payload::Id payload_id, std::unique_ptr<PendingPayload> pending_payload) {
MutexLock lock(&mutex_);
// If the |payload_id| is being re-used, always prefer the newer payload.
auto it = pending_payloads_.find(payload_id);
if (it != pending_payloads_.end()) {
pending_payloads_.erase(payload_id);
}
auto pair = pending_payloads_.emplace(payload_id, std::move(pending_payload));
NEARBY_LOG(INFO, "StartTrackingPayload: payload_id=%x" PRIx64 "; inserted=%d",
static_cast<std::int64_t>(payload_id), pair.second);
}
std::unique_ptr<PayloadManager::PendingPayload>
PayloadManager::PendingPayloads::StopTrackingPayload(Payload::Id payload_id) {
MutexLock lock(&mutex_);
auto it = pending_payloads_.find(payload_id);
if (it == pending_payloads_.end()) return {};
auto item = pending_payloads_.extract(it);
return std::move(item.mapped());
}
PayloadManager::PendingPayload* PayloadManager::PendingPayloads::GetPayload(
Payload::Id payload_id) const {
MutexLock lock(&mutex_);
auto item = pending_payloads_.find(payload_id);
return item != pending_payloads_.end() ? item->second.get() : nullptr;
}
std::vector<Payload::Id> PayloadManager::PendingPayloads::GetAllPayloads() {
MutexLock lock(&mutex_);
std::vector<Payload::Id> result;
for (const auto& item : pending_payloads_) {
result.push_back(item.first);
}
return result;
}
} // namespace connections
} // namespace nearby
} // namespace location