blob: 8b5994dad33ea7a245eb5e3cf275a1d9bb42bccf [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CORE_INTERNAL_PAYLOAD_MANAGER_H_
#define CORE_INTERNAL_PAYLOAD_MANAGER_H_
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "connections/implementation/client_proxy.h"
#include "connections/implementation/endpoint_manager.h"
#include "connections/implementation/internal_payload.h"
#include "connections/listeners.h"
#include "connections/payload.h"
#include "connections/status.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/atomic_boolean.h"
#include "internal/platform/atomic_reference.h"
#include "internal/platform/count_down_latch.h"
#include "internal/platform/mutex.h"
namespace location {
namespace nearby {
namespace connections {
// Annotations for methods that need to run on PayloadStatusUpdateThread.
// Use only in PayloadManager
#define RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD() \
ABSL_EXCLUSIVE_LOCKS_REQUIRED(payload_status_update_executor_)
class PayloadManager : public EndpointManager::FrameProcessor {
public:
using EndpointIds = std::vector<std::string>;
constexpr static const absl::Duration kWaitCloseTimeout =
absl::Milliseconds(5000);
explicit PayloadManager(EndpointManager& endpoint_manager);
~PayloadManager() override;
void SendPayload(ClientProxy* client, const EndpointIds& endpoint_ids,
Payload payload);
Status CancelPayload(ClientProxy* client, Payload::Id payload_id);
// @EndpointManagerReaderThread
void OnIncomingFrame(OfflineFrame& offline_frame,
const std::string& from_endpoint_id,
ClientProxy* to_client,
proto::connections::Medium current_medium) override;
// @EndpointManagerThread
void OnEndpointDisconnect(ClientProxy* client, const std::string& endpoint_id,
CountDownLatch barrier) override;
void DisconnectFromEndpointManager();
private:
// Information about an endpoint for a particular payload.
struct EndpointInfo {
// Status set for the endpoint out-of-band via a ControlMessage.
enum class Status {
kUnknown,
kAvailable,
kCanceled,
kError,
};
void SetStatusFromControlMessage(
const PayloadTransferFrame::ControlMessage& control_message);
static Status ControlMessageEventToEndpointInfoStatus(
PayloadTransferFrame::ControlMessage::EventType event);
std::string id;
AtomicReference<Status> status{Status::kUnknown};
std::int64_t offset = 0;
};
// Tracks state for an InternalPayload and the endpoints associated with it.
class PendingPayload {
public:
PendingPayload(std::unique_ptr<InternalPayload> internal_payload,
const EndpointIds& endpoint_ids, bool is_incoming);
PendingPayload(PendingPayload&&) = default;
PendingPayload& operator=(PendingPayload&&) = default;
~PendingPayload() { Close(); }
Payload::Id GetId() const;
InternalPayload* GetInternalPayload();
bool IsLocallyCanceled() const;
void MarkLocallyCanceled();
bool IsIncoming() const;
// Gets the EndpointInfo objects for the endpoints (still) associated with
// this payload.
std::vector<const EndpointInfo*> GetEndpoints() const
ABSL_LOCKS_EXCLUDED(mutex_);
// Returns the EndpointInfo for a given endpoint ID. Returns null if the
// endpoint is not associated with this payload.
EndpointInfo* GetEndpoint(const std::string& endpoint_id)
ABSL_LOCKS_EXCLUDED(mutex_);
// Removes the given endpoints, e.g. on error.
void RemoveEndpoints(const EndpointIds& endpoint_ids_to_remove)
ABSL_LOCKS_EXCLUDED(mutex_);
// Sets the status for a particular endpoint.
void SetEndpointStatusFromControlMessage(
const std::string& endpoint_id,
const PayloadTransferFrame::ControlMessage& control_message)
ABSL_LOCKS_EXCLUDED(mutex_);
// Sets the offset for a particular endpoint.
void SetOffsetForEndpoint(const std::string& endpoint_id,
std::int64_t offset) ABSL_LOCKS_EXCLUDED(mutex_);
// Closes internal_payload_ and triggers close_event_.
// Close is called when a pending peyload does not have associated
// endpoints.
void Close();
// Waits for close_event_ or for timeout to happen.
// Returns true, if event happened, false otherwise.
bool WaitForClose();
bool IsClosed();
private:
mutable Mutex mutex_;
bool is_incoming_;
AtomicBoolean is_locally_canceled_{false};
CountDownLatch close_event_{1};
std::unique_ptr<InternalPayload> internal_payload_;
absl::flat_hash_map<std::string, EndpointInfo> endpoints_
ABSL_GUARDED_BY(mutex_);
};
// Tracks and manages PendingPayload objects in a synchronized manner.
class PendingPayloads {
public:
PendingPayloads() = default;
~PendingPayloads() = default;
void StartTrackingPayload(Payload::Id payload_id,
std::unique_ptr<PendingPayload> pending_payload)
ABSL_LOCKS_EXCLUDED(mutex_);
std::unique_ptr<PendingPayload> StopTrackingPayload(Payload::Id payload_id)
ABSL_LOCKS_EXCLUDED(mutex_);
PendingPayload* GetPayload(Payload::Id payload_id) const
ABSL_LOCKS_EXCLUDED(mutex_);
std::vector<Payload::Id> GetAllPayloads() ABSL_LOCKS_EXCLUDED(mutex_);
private:
mutable Mutex mutex_;
absl::flat_hash_map<Payload::Id, std::unique_ptr<PendingPayload>>
pending_payloads_ ABSL_GUARDED_BY(mutex_);
};
using Endpoints = std::vector<const EndpointInfo*>;
static std::string ToString(const EndpointIds& endpoint_ids);
static std::string ToString(const Endpoints& endpoints);
static std::string ToString(Payload::Type type);
static std::string ToString(EndpointInfo::Status status);
// Splits the endpoints for this payload by availability.
// Returns a pair of lists of EndpointInfo*, with the first being the list of
// still-available endpoints, and the second for unavailable endpoints.
static std::pair<Endpoints, Endpoints> GetAvailableAndUnavailableEndpoints(
const PendingPayload& pending_payload);
// Converts list of EndpointInfo to list of Endpoint ids.
// Returns list of endpoint ids.
static EndpointIds EndpointsToEndpointIds(const Endpoints& endpoints);
bool SendPayloadLoop(ClientProxy* client, PendingPayload& pending_payload,
PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t& next_chunk_offset, size_t resume_offset);
void SendClientCallbacksForFinishedIncomingPayloadRunnable(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status);
// Converts the status of an endpoint that's been set out-of-band via a remote
// ControlMessage to the PayloadStatus for handling of that endpoint-payload
// pair.
static proto::connections::PayloadStatus EndpointInfoStatusToPayloadStatus(
EndpointInfo::Status status);
// Converts a ControlMessage::EventType for a particular payload to a
// PayloadStatus. Called when we've received a ControlMessage with this event
// from a remote endpoint; thus the PayloadStatuses are REMOTE_*.
static proto::connections::PayloadStatus ControlMessageEventToPayloadStatus(
PayloadTransferFrame::ControlMessage::EventType event);
static PayloadProgressInfo::Status PayloadStatusToTransferUpdateStatus(
proto::connections::PayloadStatus status);
int GetOptimalChunkSize(EndpointIds endpoint_ids);
PayloadTransferFrame::PayloadHeader CreatePayloadHeader(
const InternalPayload& payload, size_t offset);
PayloadTransferFrame::PayloadChunk CreatePayloadChunk(std::int64_t offset,
ByteArray body);
PendingPayload* CreateIncomingPayload(const PayloadTransferFrame& frame,
const std::string& endpoint_id)
ABSL_LOCKS_EXCLUDED(mutex_);
Payload::Id CreateOutgoingPayload(Payload payload,
const EndpointIds& endpoint_ids)
ABSL_LOCKS_EXCLUDED(mutex_);
void SendClientCallbacksForFinishedOutgoingPayload(
ClientProxy* client, const EndpointIds& finished_endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
proto::connections::PayloadStatus status);
void SendClientCallbacksForFinishedIncomingPayload(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status);
void SendControlMessage(
const EndpointIds& endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
PayloadTransferFrame::ControlMessage::EventType event_type);
// Handles a finished outgoing payload for the given endpointIds. All statuses
// except for SUCCESS are handled here.
void HandleFinishedOutgoingPayload(
ClientProxy* client, const EndpointIds& finished_endpoint_ids,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t num_bytes_successfully_transferred,
proto::connections::PayloadStatus status =
proto::connections::PayloadStatus::UNKNOWN_PAYLOAD_STATUS);
void HandleFinishedIncomingPayload(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int64_t offset_bytes, proto::connections::PayloadStatus status);
void HandleSuccessfulOutgoingChunk(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size);
void HandleSuccessfulIncomingChunk(
ClientProxy* client, const std::string& endpoint_id,
const PayloadTransferFrame::PayloadHeader& payload_header,
std::int32_t payload_chunk_flags, std::int64_t payload_chunk_offset,
std::int64_t payload_chunk_body_size);
void ProcessDataPacket(ClientProxy* to_client,
const std::string& from_endpoint_id,
PayloadTransferFrame& payload_transfer_frame);
void ProcessControlPacket(ClientProxy* to_client,
const std::string& from_endpoint_id,
PayloadTransferFrame& payload_transfer_frame);
void NotifyClientOfIncomingPayloadProgressInfo(
ClientProxy* client, const std::string& endpoint_id,
const PayloadProgressInfo& payload_transfer_update)
RUN_ON_PAYLOAD_STATUS_UPDATE_THREAD();
SingleThreadExecutor* GetOutgoingPayloadExecutor(Payload::Type payload_type);
void RunOnStatusUpdateThread(const std::string& name,
std::function<void()> runnable);
bool NotifyShutdown() ABSL_LOCKS_EXCLUDED(mutex_);
void DestroyPendingPayload(Payload::Id payload_id)
ABSL_LOCKS_EXCLUDED(mutex_);
PendingPayload* GetPayload(Payload::Id payload_id) const
ABSL_LOCKS_EXCLUDED(mutex_);
void CancelAllPayloads() ABSL_LOCKS_EXCLUDED(mutex_);
void RecordPayloadStartedAnalytics(ClientProxy* client,
const EndpointIds& endpoint_ids,
std::int64_t payload_id,
Payload::Type payload_type,
std::int64_t offset,
std::int64_t total_size);
void RecordInvalidPayloadAnalytics(ClientProxy* client,
const EndpointIds& endpoint_ids,
std::int64_t payload_id,
Payload::Type payload_type,
std::int64_t offset,
std::int64_t total_size);
Payload::Type FramePayloadTypeToPayloadType(
PayloadTransferFrame::PayloadHeader::PayloadType type);
mutable Mutex mutex_;
AtomicBoolean shutdown_{false};
std::unique_ptr<CountDownLatch> shutdown_barrier_;
int send_payload_count_ = 0;
PendingPayloads pending_payloads_ ABSL_GUARDED_BY(mutex_);
SingleThreadExecutor bytes_payload_executor_;
SingleThreadExecutor file_payload_executor_;
SingleThreadExecutor stream_payload_executor_;
SingleThreadExecutor payload_status_update_executor_;
EndpointManager* endpoint_manager_;
};
} // namespace connections
} // namespace nearby
} // namespace location
#endif // CORE_INTERNAL_PAYLOAD_MANAGER_H_