blob: 7d710d88c88bf491ebb7d6a10091d35680b9ed33 [file] [log] [blame]
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/chromeos/drive/drive_file_stream_reader.h"
#include <stddef.h>
#include <algorithm>
#include <cstring>
#include <memory>
#include <utility>
#include "base/bind.h"
#include "base/logging.h"
#include "base/sequenced_task_runner.h"
#include "base/task/post_task.h"
#include "components/drive/chromeos/file_system_interface.h"
#include "components/drive/drive.pb.h"
#include "components/drive/local_file_reader.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
#include "google_apis/drive/task_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/http/http_byte_range.h"
using content::BrowserThread;
namespace drive {
namespace {
// Converts FileError code to net::Error code.
int FileErrorToNetError(FileError error) {
return net::FileErrorToNetError(FileErrorToBaseFileError(error));
}
// Computes the concrete |start| offset and the |length| of |range| in a file
// of |total| size.
//
// This is a thin wrapper of HttpByteRange::ComputeBounds, extended to allow
// an empty range at the end of the file, like "Range: bytes 0-" on a zero byte
// file. This is for convenience in unifying implementation with the seek
// operation of stream reader. HTTP doesn't allow such ranges but we want to
// treat such seeking as valid.
bool ComputeConcretePosition(net::HttpByteRange range,
int64_t total,
int64_t* start,
int64_t* length) {
// The special case when empty range in the end of the file is selected.
if (range.HasFirstBytePosition() && range.first_byte_position() == total) {
*start = range.first_byte_position();
*length = 0;
return true;
}
// Otherwise forward to HttpByteRange::ComputeBounds.
if (!range.ComputeBounds(total))
return false;
*start = range.first_byte_position();
*length = range.last_byte_position() - range.first_byte_position() + 1;
return true;
}
} // namespace
namespace internal {
namespace {
// Copies the content in |pending_data| into |buffer| at most
// |buffer_length| bytes, and erases the copied data from
// |pending_data|. Returns the number of copied bytes.
int ReadInternal(std::vector<std::unique_ptr<std::string>>* pending_data,
net::IOBuffer* buffer,
int buffer_length) {
size_t index = 0;
int offset = 0;
for (; index < pending_data->size() && offset < buffer_length; ++index) {
const std::string& chunk = *(*pending_data)[index];
DCHECK(!chunk.empty());
size_t bytes_to_read = std::min(
chunk.size(), static_cast<size_t>(buffer_length - offset));
std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read);
offset += bytes_to_read;
if (bytes_to_read < chunk.size()) {
// The chunk still has some remaining data.
// So remove leading (copied) bytes, and quit the loop so that
// the remaining data won't be deleted in the following erase().
(*pending_data)[index]->erase(0, bytes_to_read);
break;
}
}
// Consume the copied data.
pending_data->erase(pending_data->begin(), pending_data->begin() + index);
return offset;
}
} // namespace
LocalReaderProxy::LocalReaderProxy(
std::unique_ptr<util::LocalFileReader> file_reader,
int64_t length)
: file_reader_(std::move(file_reader)),
remaining_length_(length),
weak_ptr_factory_(this) {
DCHECK(file_reader_);
}
LocalReaderProxy::~LocalReaderProxy() = default;
int LocalReaderProxy::Read(net::IOBuffer* buffer,
int buffer_length,
net::CompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(file_reader_);
if (buffer_length > remaining_length_) {
// Here, narrowing is safe.
buffer_length = static_cast<int>(remaining_length_);
}
if (!buffer_length)
return 0;
file_reader_->Read(
buffer, buffer_length,
base::BindOnce(&LocalReaderProxy::OnReadCompleted,
weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
return net::ERR_IO_PENDING;
}
void LocalReaderProxy::OnGetContent(std::unique_ptr<std::string> data) {
// This method should never be called, because no data should be received
// from the network during the reading of local-cache file.
NOTREACHED();
}
void LocalReaderProxy::OnCompleted(FileError error) {
// If this method is called, no network error should be happened.
DCHECK_EQ(FILE_ERROR_OK, error);
}
void LocalReaderProxy::OnReadCompleted(net::CompletionOnceCallback callback,
int read_result) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(file_reader_);
if (read_result >= 0) {
// |read_result| bytes data is read.
DCHECK_LE(read_result, remaining_length_);
remaining_length_ -= read_result;
} else {
// An error occurs. Close the |file_reader_|.
file_reader_.reset();
}
std::move(callback).Run(read_result);
}
NetworkReaderProxy::NetworkReaderProxy(int64_t offset,
int64_t content_length,
int64_t full_content_length,
const base::Closure& job_canceller)
: remaining_offset_(offset),
remaining_content_length_(content_length),
is_full_download_(offset + content_length == full_content_length),
error_code_(net::OK),
buffer_length_(0),
job_canceller_(job_canceller) {}
NetworkReaderProxy::~NetworkReaderProxy() {
if (!job_canceller_.is_null()) {
job_canceller_.Run();
}
}
int NetworkReaderProxy::Read(net::IOBuffer* buffer,
int buffer_length,
net::CompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// Check if there is no pending Read operation.
DCHECK(!buffer_.get());
DCHECK_EQ(buffer_length_, 0);
DCHECK(callback_.is_null());
// Validate the arguments.
DCHECK(buffer);
DCHECK_GT(buffer_length, 0);
DCHECK(callback);
if (error_code_ != net::OK) {
// An error is already found. Return it immediately.
return error_code_;
}
if (remaining_content_length_ == 0) {
// If no more data, return immediately.
return 0;
}
if (buffer_length > remaining_content_length_) {
// Here, narrowing cast should be safe.
buffer_length = static_cast<int>(remaining_content_length_);
}
if (pending_data_.empty()) {
// No data is available. Keep the arguments, and return pending status.
buffer_ = buffer;
buffer_length_ = buffer_length;
callback_ = std::move(callback);
return net::ERR_IO_PENDING;
}
int result = ReadInternal(&pending_data_, buffer, buffer_length);
remaining_content_length_ -= result;
DCHECK_GE(remaining_content_length_, 0);
// Although OnCompleted() should reset |job_canceller_| when download is done,
// due to timing issues the ReaderProxy instance may be destructed before the
// notification. To fix the case we reset here earlier.
if (is_full_download_ && remaining_content_length_ == 0)
job_canceller_.Reset();
return result;
}
void NetworkReaderProxy::OnGetContent(std::unique_ptr<std::string> data) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(data && !data->empty());
if (remaining_offset_ >= static_cast<int64_t>(data->length())) {
// Skip unneeded leading data.
remaining_offset_ -= data->length();
return;
}
if (remaining_offset_ > 0) {
// Erase unnecessary leading bytes.
data->erase(0, static_cast<size_t>(remaining_offset_));
remaining_offset_ = 0;
}
pending_data_.push_back(std::move(data));
if (!buffer_.get()) {
// No pending Read operation.
return;
}
int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_);
remaining_content_length_ -= result;
DCHECK_GE(remaining_content_length_, 0);
if (is_full_download_ && remaining_content_length_ == 0)
job_canceller_.Reset();
buffer_ = nullptr;
buffer_length_ = 0;
DCHECK(!callback_.is_null());
std::move(callback_).Run(result);
}
void NetworkReaderProxy::OnCompleted(FileError error) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// The downloading is completed, so we do not need to cancel the job
// in the destructor.
job_canceller_.Reset();
if (error == FILE_ERROR_OK) {
return;
}
error_code_ = FileErrorToNetError(error);
pending_data_.clear();
if (callback_.is_null()) {
// No pending Read operation.
return;
}
buffer_ = nullptr;
buffer_length_ = 0;
std::move(callback_).Run(error_code_);
}
} // namespace internal
namespace {
// Calls FileSystemInterface::GetFileContent if the file system
// is available. If not, the |completion_callback| is invoked with
// FILE_ERROR_FAILED.
base::Closure GetFileContentOnUIThread(
const DriveFileStreamReader::FileSystemGetter& file_system_getter,
const base::FilePath& drive_file_path,
GetFileContentInitializedCallback initialized_callback,
const google_apis::GetContentCallback& get_content_callback,
const FileOperationCallback& completion_callback) {
DCHECK_CURRENTLY_ON(BrowserThread::UI);
FileSystemInterface* file_system = file_system_getter.Run();
if (!file_system) {
completion_callback.Run(FILE_ERROR_FAILED);
return base::Closure();
}
return google_apis::CreateRelayCallback(file_system->GetFileContent(
drive_file_path, std::move(initialized_callback), get_content_callback,
completion_callback));
}
// Helper to run FileSystemInterface::GetFileContent on UI thread.
void GetFileContent(
const DriveFileStreamReader::FileSystemGetter& file_system_getter,
const base::FilePath& drive_file_path,
GetFileContentInitializedCallback initialized_callback,
const google_apis::GetContentCallback& get_content_callback,
const FileOperationCallback& completion_callback,
const base::Callback<void(const base::Closure&)>& reply_callback) {
DCHECK_CURRENTLY_ON(BrowserThread::IO);
base::PostTaskWithTraitsAndReplyWithResult(
FROM_HERE, {BrowserThread::UI},
base::Bind(&GetFileContentOnUIThread, file_system_getter, drive_file_path,
base::Passed(google_apis::CreateRelayCallback(
std::move(initialized_callback))),
google_apis::CreateRelayCallback(get_content_callback),
google_apis::CreateRelayCallback(completion_callback)),
reply_callback);
}
} // namespace
DriveFileStreamReader::DriveFileStreamReader(
const FileSystemGetter& file_system_getter,
base::SequencedTaskRunner* file_task_runner)
: file_system_getter_(file_system_getter),
file_task_runner_(file_task_runner),
weak_ptr_factory_(this) {
}
DriveFileStreamReader::~DriveFileStreamReader() = default;
bool DriveFileStreamReader::IsInitialized() const {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
return reader_proxy_.get() != nullptr;
}
void DriveFileStreamReader::Initialize(
const base::FilePath& drive_file_path,
const net::HttpByteRange& byte_range,
InitializeCompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(callback);
init_callback_ = std::move(callback);
GetFileContent(
file_system_getter_, drive_file_path,
base::Bind(
&DriveFileStreamReader ::InitializeAfterGetFileContentInitialized,
weak_ptr_factory_.GetWeakPtr(), byte_range),
base::Bind(&DriveFileStreamReader::OnGetContent,
weak_ptr_factory_.GetWeakPtr()),
base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion,
weak_ptr_factory_.GetWeakPtr()),
base::Bind(&DriveFileStreamReader::StoreCancelDownloadClosure,
weak_ptr_factory_.GetWeakPtr()));
}
int DriveFileStreamReader::Read(net::IOBuffer* buffer,
int buffer_length,
net::CompletionOnceCallback callback) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(reader_proxy_);
DCHECK(buffer);
DCHECK(callback);
return reader_proxy_->Read(buffer, buffer_length, std::move(callback));
}
void DriveFileStreamReader::StoreCancelDownloadClosure(
const base::Closure& cancel_download_closure) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
cancel_download_closure_ = cancel_download_closure;
}
void DriveFileStreamReader::InitializeAfterGetFileContentInitialized(
const net::HttpByteRange& byte_range,
FileError error,
const base::FilePath& local_cache_file_path,
std::unique_ptr<ResourceEntry> entry) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
// StoreCancelDownloadClosure() should be called before this function.
DCHECK(!cancel_download_closure_.is_null());
if (error != FILE_ERROR_OK) {
std::move(init_callback_)
.Run(FileErrorToNetError(error), std::unique_ptr<ResourceEntry>());
return;
}
DCHECK(entry);
int64_t range_start = 0, range_length = 0;
if (!ComputeConcretePosition(byte_range, entry->file_info().size(),
&range_start, &range_length)) {
// If |byte_range| is invalid (e.g. out of bounds), return with an error.
// At the same time, we cancel the in-flight downloading operation if
// needed and and invalidate weak pointers so that we won't
// receive unwanted callbacks.
cancel_download_closure_.Run();
weak_ptr_factory_.InvalidateWeakPtrs();
std::move(init_callback_)
.Run(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE,
std::unique_ptr<ResourceEntry>());
return;
}
if (local_cache_file_path.empty()) {
// The file is not cached, and being downloaded.
reader_proxy_ = std::make_unique<internal::NetworkReaderProxy>(
range_start, range_length, entry->file_info().size(),
cancel_download_closure_);
std::move(init_callback_).Run(net::OK, std::move(entry));
return;
}
// Otherwise, open the stream for file.
std::unique_ptr<util::LocalFileReader> file_reader(
new util::LocalFileReader(file_task_runner_.get()));
util::LocalFileReader* file_reader_ptr = file_reader.get();
file_reader_ptr->Open(
local_cache_file_path,
range_start,
base::Bind(
&DriveFileStreamReader::InitializeAfterLocalFileOpen,
weak_ptr_factory_.GetWeakPtr(),
range_length,
base::Passed(&entry),
base::Passed(&file_reader)));
}
void DriveFileStreamReader::InitializeAfterLocalFileOpen(
int64_t length,
std::unique_ptr<ResourceEntry> entry,
std::unique_ptr<util::LocalFileReader> file_reader,
int open_result) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (open_result != net::OK) {
std::move(init_callback_)
.Run(net::ERR_FAILED, std::unique_ptr<ResourceEntry>());
return;
}
reader_proxy_ = std::make_unique<internal::LocalReaderProxy>(
std::move(file_reader), length);
std::move(init_callback_).Run(net::OK, std::move(entry));
}
void DriveFileStreamReader::OnGetContent(
google_apis::DriveApiErrorCode error_code,
std::unique_ptr<std::string> data) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(reader_proxy_);
reader_proxy_->OnGetContent(std::move(data));
}
void DriveFileStreamReader::OnGetFileContentCompletion(
FileError error) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (reader_proxy_) {
// If the proxy object available, send the error to it.
reader_proxy_->OnCompleted(error);
} else {
// Here the proxy object is not yet available.
// There are two cases. 1) Some error happens during the initialization.
// 2) the cache file is found, but the proxy object is not *yet*
// initialized because the file is being opened.
// We are interested in 1) only. The callback for 2) will be called
// after opening the file is completed.
// Note: due to the same reason, LocalReaderProxy::OnCompleted may
// or may not be called. This is timing issue, and it is difficult to avoid
// unfortunately.
if (error != FILE_ERROR_OK) {
std::move(init_callback_)
.Run(FileErrorToNetError(error), std::unique_ptr<ResourceEntry>());
}
}
}
} // namespace drive