blob: 9f962a78b67e8d6b0fbaf42e141713ead5e999f5 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connections/implementation/internal_payload_factory.h"
#include <cstdint>
#include <memory>
#include "absl/memory/memory.h"
#include "connections/implementation/offline_frames_validator.h"
#include "connections/payload.h"
#include "internal/platform/byte_array.h"
#include "internal/platform/condition_variable.h"
#include "internal/platform/exception.h"
#include "internal/platform/feature_flags.h"
#include "internal/platform/file.h"
#include "internal/platform/implementation/platform.h"
#include "internal/platform/implementation/shared/file.h"
#include "internal/platform/logging.h"
#include "internal/platform/mutex.h"
#include "internal/platform/os_name.h"
#include "internal/platform/pipe.h"
namespace location {
namespace nearby {
namespace connections {
namespace {
class BytesInternalPayload : public InternalPayload {
public:
explicit BytesInternalPayload(Payload payload)
: InternalPayload(std::move(payload)),
total_size_(payload_.AsBytes().size()),
detached_only_chunk_(false) {}
PayloadTransferFrame::PayloadHeader::PayloadType GetType() const override {
return PayloadTransferFrame::PayloadHeader::BYTES;
}
std::int64_t GetTotalSize() const override { return total_size_; }
// Relinquishes ownership of the payload_; retrieves and returns the stored
// ByteArray.
ByteArray DetachNextChunk(int chunk_size) override {
if (detached_only_chunk_) {
return {};
}
detached_only_chunk_ = true;
return std::move(payload_).AsBytes();
}
// Does nothing.
Exception AttachNextChunk(const ByteArray& chunk) override {
return {Exception::kSuccess};
}
ExceptionOr<size_t> SkipToOffset(size_t offset) override {
NEARBY_LOGS(WARNING) << "Bytes payload does not support offsets";
return {Exception::kIo};
}
private:
// We're caching the total size here because the backing payload will be
// moved to another owner during the lifetime of an incoming
// InternalPayload.
const std::int64_t total_size_;
bool detached_only_chunk_;
};
class OutgoingStreamInternalPayload : public InternalPayload {
public:
explicit OutgoingStreamInternalPayload(Payload payload)
: InternalPayload(std::move(payload)) {}
PayloadTransferFrame::PayloadHeader::PayloadType GetType() const override {
return PayloadTransferFrame::PayloadHeader::STREAM;
}
std::int64_t GetTotalSize() const override { return -1; }
ByteArray DetachNextChunk(int chunk_size) override {
InputStream* input_stream = payload_.AsStream();
if (!input_stream) return {};
ExceptionOr<ByteArray> bytes_read = input_stream->Read(chunk_size);
if (!bytes_read.ok()) {
input_stream->Close();
return {};
}
ByteArray scoped_bytes_read = std::move(bytes_read.result());
if (scoped_bytes_read.Empty()) {
NEARBY_LOGS(INFO) << "No more data for outgoing payload " << this
<< ", closing InputStream.";
input_stream->Close();
return {};
}
return scoped_bytes_read;
}
Exception AttachNextChunk(const ByteArray& chunk) override {
return {Exception::kIo};
}
ExceptionOr<size_t> SkipToOffset(size_t offset) override {
InputStream* stream = payload_.AsStream();
if (stream == nullptr) return {Exception::kIo};
ExceptionOr<size_t> real_offset = stream->Skip(offset);
if (real_offset.ok() && real_offset.GetResult() == offset) {
return real_offset;
}
// Close the outgoing stream on any error
stream->Close();
if (!real_offset.ok()) {
return real_offset;
}
NEARBY_LOGS(WARNING) << "Skip offset: " << real_offset.GetResult()
<< ", expected offset: " << offset << " for payload "
<< this;
return {Exception::kIo};
}
void Close() override {
// Ignore the potential Exception returned by close(), as a counterpart
// to Java's closeQuietly().
InputStream* stream = payload_.AsStream();
if (stream) stream->Close();
}
};
class IncomingStreamInternalPayload : public InternalPayload {
public:
IncomingStreamInternalPayload(Payload payload, OutputStream& output_stream)
: InternalPayload(std::move(payload)), output_stream_(&output_stream) {}
PayloadTransferFrame::PayloadHeader::PayloadType GetType() const override {
return PayloadTransferFrame::PayloadHeader::STREAM;
}
std::int64_t GetTotalSize() const override { return -1; }
ByteArray DetachNextChunk(int chunk_size) override { return {}; }
Exception AttachNextChunk(const ByteArray& chunk) override {
if (chunk.Empty()) {
NEARBY_LOGS(INFO) << "Received null last chunk for incoming payload "
<< this << ", closing OutputStream.";
output_stream_->Close();
return {Exception::kSuccess};
}
return output_stream_->Write(chunk);
}
ExceptionOr<size_t> SkipToOffset(size_t offset) override {
NEARBY_LOGS(WARNING) << "Cannot skip offset for an incoming Payload "
<< this;
return {Exception::kIo};
}
void Close() override { output_stream_->Close(); }
private:
OutputStream* output_stream_;
};
class OutgoingFileInternalPayload : public InternalPayload {
public:
explicit OutgoingFileInternalPayload(Payload payload)
: InternalPayload(std::move(payload)),
total_size_{payload_.AsFile()->GetTotalSize()} {}
PayloadTransferFrame::PayloadHeader::PayloadType GetType() const override {
return PayloadTransferFrame::PayloadHeader::FILE;
}
std::int64_t GetTotalSize() const override { return total_size_; }
ByteArray DetachNextChunk(int chunk_size) override {
InputFile* file = payload_.AsFile();
if (!file) return {};
ExceptionOr<ByteArray> bytes_read = file->Read(chunk_size);
if (!bytes_read.ok()) {
return {};
}
ByteArray bytes = std::move(bytes_read.result());
if (bytes.Empty()) {
// No more data for outgoing payload.
file->Close();
return {};
}
return bytes;
}
Exception AttachNextChunk(const ByteArray& chunk) override {
return {Exception::kIo};
}
ExceptionOr<size_t> SkipToOffset(size_t offset) override {
NEARBY_LOGS(INFO) << "SkipToOffset " << offset;
InputFile* file = payload_.AsFile();
if (!file) {
return {Exception::kIo};
}
ExceptionOr<size_t> real_offset = file->Skip(offset);
if (real_offset.ok() && real_offset.GetResult() == offset) {
return real_offset;
}
// Close the outgoing file on any error
file->Close();
if (!real_offset.ok()) {
return real_offset;
}
NEARBY_LOGS(WARNING) << "Skip offset: " << real_offset.GetResult()
<< ", expected offset: " << offset
<< " for file payload " << this;
return {Exception::kIo};
}
void Close() override {
InputFile* file = payload_.AsFile();
if (file) file->Close();
}
private:
std::int64_t total_size_;
};
class IncomingFileInternalPayload : public InternalPayload {
public:
IncomingFileInternalPayload(Payload payload, OutputFile output_file,
std::int64_t total_size)
: InternalPayload(std::move(payload)),
output_file_(std::move(output_file)),
total_size_(total_size) {}
PayloadTransferFrame::PayloadHeader::PayloadType GetType() const override {
return PayloadTransferFrame::PayloadHeader::FILE;
}
std::int64_t GetTotalSize() const override { return total_size_; }
ByteArray DetachNextChunk(int chunk_size) override { return {}; }
Exception AttachNextChunk(const ByteArray& chunk) override {
if (chunk.Empty()) {
// Received null last chunk for incoming payload.
output_file_.Close();
return {Exception::kSuccess};
}
return output_file_.Write(chunk);
}
ExceptionOr<size_t> SkipToOffset(size_t offset) override {
NEARBY_LOGS(WARNING) << "Cannot skip offset for an incoming file Payload "
<< this;
return {Exception::kIo};
}
void Close() override { output_file_.Close(); }
private:
OutputFile output_file_;
const std::int64_t total_size_;
};
} // namespace
using location::nearby::api::ImplementationPlatform;
using location::nearby::api::OSName;
std::unique_ptr<InternalPayload> CreateOutgoingInternalPayload(
Payload payload) {
switch (payload.GetType()) {
case PayloadType::kBytes:
return absl::make_unique<BytesInternalPayload>(std::move(payload));
case PayloadType::kFile: {
return absl::make_unique<OutgoingFileInternalPayload>(std::move(payload));
}
case PayloadType::kStream:
return absl::make_unique<OutgoingStreamInternalPayload>(
std::move(payload));
default:
DCHECK(false); // This should never happen.
return {};
}
}
std::string make_path(std::string& parent_folder, std::string& file_name) {
return api::ImplementationPlatform::GetDownloadPath(parent_folder, file_name);
}
std::string make_path(std::string& parent_folder, int64_t id) {
std::string file_name(std::to_string(id));
return api::ImplementationPlatform::GetDownloadPath(parent_folder, file_name);
}
std::unique_ptr<InternalPayload> CreateIncomingInternalPayload(
const PayloadTransferFrame& frame) {
if (frame.packet_type() != PayloadTransferFrame::DATA) {
return {};
}
const Payload::Id payload_id = frame.payload_header().id();
switch (frame.payload_header().type()) {
case PayloadTransferFrame::PayloadHeader::BYTES: {
return absl::make_unique<BytesInternalPayload>(
Payload(payload_id, ByteArray(frame.payload_chunk().body())));
}
case PayloadTransferFrame::PayloadHeader::STREAM: {
auto pipe = std::make_shared<Pipe>();
return absl::make_unique<IncomingStreamInternalPayload>(
Payload(payload_id,
[pipe]() -> InputStream& {
return pipe->GetInputStream(); // NOLINT
}),
pipe->GetOutputStream());
}
case PayloadTransferFrame::PayloadHeader::FILE: {
std::string file_path("");
int64_t total_size = 0;
if (frame.payload_header().has_parent_folder()) {
file_path = frame.payload_header().parent_folder();
}
if (frame.payload_header().has_file_name()) {
std::string file_name(frame.payload_header().file_name());
file_path = make_path(file_path, file_name);
}
if (frame.payload_header().has_total_size()) {
total_size = frame.payload_header().total_size();
}
// These are ordered, the output file must be created first otherwise
// there will be no input file to open.
// On Chrome the file path should be empty, so use the payload id.
if (ImplementationPlatform::GetCurrentOS() == OSName::kChromeOS) {
return absl::make_unique<IncomingFileInternalPayload>(
Payload(payload_id, InputFile(payload_id, total_size)),
OutputFile(payload_id), total_size);
} else {
return absl::make_unique<IncomingFileInternalPayload>(
Payload(payload_id, InputFile(file_path, total_size)),
OutputFile(file_path), total_size);
}
}
default:
DCHECK(false); // This should never happen.
return {};
}
}
} // namespace connections
} // namespace nearby
} // namespace location