blob: 278594afb0ca8f93c7e394842f0a3576f0ade2ab [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/download/download_file_impl.h"
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/files/file_util.h"
#include "base/memory/ptr_util.h"
#include "base/strings/stringprintf.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/time.h"
#include "base/values.h"
#include "content/browser/byte_stream.h"
#include "content/browser/download/download_create_info.h"
#include "content/browser/download/download_destination_observer.h"
#include "content/browser/download/download_interrupt_reasons_impl.h"
#include "content/browser/download/download_stats.h"
#include "content/browser/download/download_utils.h"
#include "content/browser/download/parallel_download_utils.h"
#include "content/public/browser/browser_thread.h"
#include "content/public/common/content_features.h"
#include "crypto/secure_hash.h"
#include "crypto/sha2.h"
#include "mojo/public/c/system/types.h"
#include "net/base/io_buffer.h"
namespace content {
namespace {
const int kUpdatePeriodMs = 500;
const int kMaxTimeBlockingFileThreadMs = 1000;
// These constants control the default retry behavior for failing renames. Each
// retry is performed after a delay that is twice the previous delay. The
// initial delay is specified by kInitialRenameRetryDelayMs.
const int kInitialRenameRetryDelayMs = 200;
// Number of times a failing rename is retried before giving up.
const int kMaxRenameRetries = 3;
// Because DownloadSaveInfo::kLengthFullContent is 0, we should avoid using
// 0 for length if we found that a stream can no longer write any data.
const int kNoBytesToWrite = -1;
// Default content length when the potential file size is not yet determined.
const int kUnknownContentLength = -1;
// Data length to read from data pipe.
const int kBytesToRead = 4096;
} // namespace
DownloadFileImpl::SourceStream::SourceStream(
int64_t offset,
int64_t length,
std::unique_ptr<DownloadManager::InputStream> stream)
: offset_(offset),
length_(length),
bytes_written_(0),
finished_(false),
index_(0u),
stream_reader_(std::move(stream->stream_reader_)),
completion_status_(DOWNLOAD_INTERRUPT_REASON_NONE),
is_response_completed_(false),
stream_handle_(std::move(stream->stream_handle_)) {}
DownloadFileImpl::SourceStream::~SourceStream() = default;
void DownloadFileImpl::SourceStream::Initialize() {
if (stream_handle_.is_null())
return;
binding_ = std::make_unique<mojo::Binding<mojom::DownloadStreamClient>>(
this, std::move(stream_handle_->client_request));
binding_->set_connection_error_handler(base::Bind(
&DownloadFileImpl::SourceStream::OnStreamCompleted,
base::Unretained(this), mojom::NetworkRequestStatus::USER_CANCELED));
handle_watcher_ = std::make_unique<mojo::SimpleWatcher>(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
}
void DownloadFileImpl::SourceStream::OnStreamCompleted(
mojom::NetworkRequestStatus status) {
// This can be called before or after data pipe is completely drained. So we
// need to pass the |completion_status_| to DownloadFileImpl if the data pipe
// is already drained.
OnResponseCompleted(ConvertMojoNetworkRequestStatusToInterruptReason(status));
}
void DownloadFileImpl::SourceStream::OnResponseCompleted(
DownloadInterruptReason reason) {
is_response_completed_ = true;
completion_status_ = reason;
if (completion_callback_)
std::move(completion_callback_).Run(this);
}
void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
bytes_written_ += bytes_write;
}
void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
int64_t offset,
int64_t bytes_written) {
DCHECK_GT(bytes_written, 0);
if (length_ == kNoBytesToWrite)
return;
if (offset <= offset_) {
if (offset + bytes_written > offset_) {
length_ = kNoBytesToWrite;
finished_ = true;
}
return;
}
if (length_ == DownloadSaveInfo::kLengthFullContent ||
length_ > offset - offset_) {
length_ = offset - offset_;
}
}
void DownloadFileImpl::SourceStream::RegisterDataReadyCallback(
const mojo::SimpleWatcher::ReadyCallback& callback) {
if (handle_watcher_) {
handle_watcher_->Watch(stream_handle_->stream.get(),
MOJO_HANDLE_SIGNAL_READABLE, callback);
} else if (stream_reader_) {
stream_reader_->RegisterCallback(base::Bind(callback, MOJO_RESULT_OK));
}
}
void DownloadFileImpl::SourceStream::ClearDataReadyCallback() {
if (handle_watcher_)
handle_watcher_->Cancel();
else if (stream_reader_)
stream_reader_->RegisterCallback(base::Closure());
}
DownloadInterruptReason DownloadFileImpl::SourceStream::GetCompletionStatus() {
if (stream_reader_)
return static_cast<DownloadInterruptReason>(stream_reader_->GetStatus());
return completion_status_;
}
void DownloadFileImpl::SourceStream::RegisterCompletionCallback(
DownloadFileImpl::SourceStream::CompletionCallback callback) {
completion_callback_ = std::move(callback);
}
DownloadFileImpl::SourceStream::StreamState
DownloadFileImpl::SourceStream::Read(scoped_refptr<net::IOBuffer>* data,
size_t* length) {
if (handle_watcher_) {
*length = kBytesToRead;
*data = new net::IOBuffer(kBytesToRead);
MojoResult mojo_result = stream_handle_->stream->ReadData(
(*data)->data(), (uint32_t*)length, MOJO_READ_DATA_FLAG_NONE);
// TODO(qinmin): figure out when COMPLETE should be returned.
switch (mojo_result) {
case MOJO_RESULT_OK:
return HAS_DATA;
case MOJO_RESULT_SHOULD_WAIT:
return EMPTY;
case MOJO_RESULT_FAILED_PRECONDITION:
if (is_response_completed_)
return COMPLETE;
stream_handle_->stream.reset();
ClearDataReadyCallback();
return WAIT_FOR_COMPLETION;
case MOJO_RESULT_INVALID_ARGUMENT:
case MOJO_RESULT_OUT_OF_RANGE:
case MOJO_RESULT_BUSY:
NOTREACHED();
return COMPLETE;
}
} else if (stream_reader_) {
ByteStreamReader::StreamState state = stream_reader_->Read(data, length);
switch (state) {
case ByteStreamReader::STREAM_EMPTY:
return EMPTY;
case ByteStreamReader::STREAM_HAS_DATA:
return HAS_DATA;
case ByteStreamReader::STREAM_COMPLETE:
return COMPLETE;
}
}
return COMPLETE;
}
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
std::unique_ptr<DownloadManager::InputStream> stream,
uint32_t download_id,
base::WeakPtr<DownloadDestinationObserver> observer)
: DownloadFileImpl(std::move(save_info),
default_download_directory,
download_id,
observer) {
source_streams_[save_info_->offset] = std::make_unique<SourceStream>(
save_info_->offset, save_info_->length, std::move(stream));
}
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
uint32_t download_id,
base::WeakPtr<DownloadDestinationObserver> observer)
: file_(download_id),
save_info_(std::move(save_info)),
default_download_directory_(default_download_directory),
potential_file_length_(kUnknownContentLength),
bytes_seen_(0),
num_active_streams_(0),
record_stream_bandwidth_(false),
bytes_seen_with_parallel_streams_(0),
bytes_seen_without_parallel_streams_(0),
is_paused_(false),
download_id_(download_id),
observer_(observer),
weak_factory_(this) {
TRACE_EVENT_INSTANT0("download", "DownloadFileCreated",
TRACE_EVENT_SCOPE_THREAD);
TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("download", "DownloadFileActive",
download_id);
DETACH_FROM_SEQUENCE(sequence_checker_);
}
DownloadFileImpl::~DownloadFileImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
TRACE_EVENT_NESTABLE_ASYNC_END0("download", "DownloadFileActive",
download_id_);
}
void DownloadFileImpl::Initialize(
const InitializeCallback& initialize_callback,
const CancelRequestCallback& cancel_request_callback,
const DownloadItem::ReceivedSlices& received_slices,
bool is_parallelizable) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
update_timer_.reset(new base::RepeatingTimer());
int64_t bytes_so_far = 0;
cancel_request_callback_ = cancel_request_callback;
received_slices_ = received_slices;
if (IsSparseFile()) {
for (const auto& received_slice : received_slices_) {
bytes_so_far += received_slice.received_bytes;
}
} else {
bytes_so_far = save_info_->offset;
}
DownloadInterruptReason result = file_.Initialize(
save_info_->file_path, default_download_directory_,
std::move(save_info_->file), bytes_so_far,
save_info_->hash_of_partial_file, std::move(save_info_->hash_state),
IsSparseFile());
if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
base::BindOnce(initialize_callback, result));
return;
}
download_start_ = base::TimeTicks::Now();
last_update_time_ = download_start_;
record_stream_bandwidth_ = is_parallelizable;
// Primarily to make reset to zero in restart visible to owner.
SendUpdate();
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::BindOnce(initialize_callback, DOWNLOAD_INTERRUPT_REASON_NONE));
// Initial pull from the straw from all source streams.
for (auto& source_stream : source_streams_)
RegisterAndActivateStream(source_stream.second.get());
}
void DownloadFileImpl::AddInputStream(
std::unique_ptr<DownloadManager::InputStream> stream,
int64_t offset,
int64_t length) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
source_streams_[offset] =
std::make_unique<SourceStream>(offset, length, std::move(stream));
OnSourceStreamAdded(source_streams_[offset].get());
}
void DownloadFileImpl::OnResponseCompleted(int64_t offset,
DownloadInterruptReason status) {
auto iter = source_streams_.find(offset);
if (iter != source_streams_.end())
iter->second->OnResponseCompleted(status);
}
void DownloadFileImpl::OnSourceStreamAdded(SourceStream* source_stream) {
// There are writers at different offsets now, create the received slices
// vector if necessary.
if (received_slices_.empty() && TotalBytesReceived() > 0) {
size_t index = AddOrMergeReceivedSliceIntoSortedArray(
DownloadItem::ReceivedSlice(0, TotalBytesReceived()), received_slices_);
DCHECK_EQ(index, 0u);
}
// If the file is initialized, start to write data, or wait until file opened.
if (file_.in_progress())
RegisterAndActivateStream(source_stream);
}
DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
const char* data,
size_t data_len) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
WillWriteToDisk(data_len);
return file_.WriteDataToFile(offset, data, data_len);
}
bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_write) {
if (source_stream->length() == kNoBytesToWrite) {
*bytes_to_write = 0;
return true;
}
// If a new slice finds that its target position has already been written,
// terminate the stream.
if (source_stream->bytes_written() == 0) {
for (const auto& received_slice : received_slices_) {
if (received_slice.offset <= source_stream->offset() &&
received_slice.offset + received_slice.received_bytes >
source_stream->offset()) {
*bytes_to_write = 0;
return true;
}
}
}
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_written() +
static_cast<int64_t>(bytes_available_to_write) >
source_stream->length()) {
// Write a partial buffer as the incoming data exceeds the length limit.
*bytes_to_write = source_stream->length() - source_stream->bytes_written();
return true;
}
*bytes_to_write = bytes_available_to_write;
return false;
}
void DownloadFileImpl::RenameAndUniquify(
const base::FilePath& full_path,
const RenameCompletionCallback& callback) {
std::unique_ptr<RenameParameters> parameters(
new RenameParameters(UNIQUIFY, full_path, callback));
RenameWithRetryInternal(std::move(parameters));
}
void DownloadFileImpl::RenameAndAnnotate(
const base::FilePath& full_path,
const std::string& client_guid,
const GURL& source_url,
const GURL& referrer_url,
const RenameCompletionCallback& callback) {
std::unique_ptr<RenameParameters> parameters(new RenameParameters(
ANNOTATE_WITH_SOURCE_INFORMATION, full_path, callback));
parameters->client_guid = client_guid;
parameters->source_url = source_url;
parameters->referrer_url = referrer_url;
RenameWithRetryInternal(std::move(parameters));
}
base::TimeDelta DownloadFileImpl::GetRetryDelayForFailedRename(
int attempt_number) {
DCHECK_GE(attempt_number, 0);
// |delay| starts at kInitialRenameRetryDelayMs and increases by a factor of
// 2 at each subsequent retry. Assumes that |retries_left| starts at
// kMaxRenameRetries. Also assumes that kMaxRenameRetries is less than the
// number of bits in an int.
return base::TimeDelta::FromMilliseconds(kInitialRenameRetryDelayMs) *
(1 << attempt_number);
}
bool DownloadFileImpl::ShouldRetryFailedRename(DownloadInterruptReason reason) {
return reason == DOWNLOAD_INTERRUPT_REASON_FILE_TRANSIENT_ERROR;
}
DownloadInterruptReason DownloadFileImpl::HandleStreamCompletionStatus(
SourceStream* source_stream) {
DownloadInterruptReason reason = source_stream->GetCompletionStatus();
if (source_stream->length() == DownloadSaveInfo::kLengthFullContent &&
!received_slices_.empty() &&
(source_stream->offset() == received_slices_.back().offset +
received_slices_.back().received_bytes) &&
reason ==
DownloadInterruptReason::DOWNLOAD_INTERRUPT_REASON_SERVER_NO_RANGE) {
// We are probably reaching the end of the stream, don't treat this
// as an error.
return DOWNLOAD_INTERRUPT_REASON_NONE;
}
return reason;
}
void DownloadFileImpl::RenameWithRetryInternal(
std::unique_ptr<RenameParameters> parameters) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
base::FilePath new_path = parameters->new_path;
if ((parameters->option & UNIQUIFY) && new_path != file_.full_path()) {
int uniquifier =
base::GetUniquePathNumber(new_path, base::FilePath::StringType());
if (uniquifier > 0)
new_path = new_path.InsertBeforeExtensionASCII(
base::StringPrintf(" (%d)", uniquifier));
}
DownloadInterruptReason reason = file_.Rename(new_path);
// Attempt to retry the rename if possible. If the rename failed and the
// subsequent open also failed, then in_progress() would be false. We don't
// try to retry renames if the in_progress() was false to begin with since we
// have less assurance that the file at file_.full_path() was the one we were
// working with.
if (ShouldRetryFailedRename(reason) && file_.in_progress() &&
parameters->retries_left > 0) {
int attempt_number = kMaxRenameRetries - parameters->retries_left;
--parameters->retries_left;
if (parameters->time_of_first_failure.is_null())
parameters->time_of_first_failure = base::TimeTicks::Now();
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&DownloadFileImpl::RenameWithRetryInternal,
weak_factory_.GetWeakPtr(),
base::Passed(std::move(parameters))),
GetRetryDelayForFailedRename(attempt_number));
return;
}
if (!parameters->time_of_first_failure.is_null())
RecordDownloadFileRenameResultAfterRetry(
base::TimeTicks::Now() - parameters->time_of_first_failure, reason);
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
(parameters->option & ANNOTATE_WITH_SOURCE_INFORMATION)) {
// Doing the annotation after the rename rather than before leaves
// a very small window during which the file has the final name but
// hasn't been marked with the Mark Of The Web. However, it allows
// anti-virus scanners on Windows to actually see the data
// (http://crbug.com/127999) under the correct name (which is information
// it uses).
reason = file_.AnnotateWithSourceInformation(parameters->client_guid,
parameters->source_url,
parameters->referrer_url);
}
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
// Make sure our information is updated, since we're about to
// error out.
SendUpdate();
// Null out callback so that we don't do any more stream processing.
// The request that writes to the pipe should be canceled after
// the download being interrupted.
for (auto& stream : source_streams_)
stream.second->ClearDataReadyCallback();
new_path.clear();
}
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::BindOnce(parameters->completion_callback, reason, new_path));
}
void DownloadFileImpl::Detach() {
file_.Detach();
}
void DownloadFileImpl::Cancel() {
file_.Cancel();
}
void DownloadFileImpl::SetPotentialFileLength(int64_t length) {
DCHECK(potential_file_length_ == length ||
potential_file_length_ == kUnknownContentLength)
<< "Potential file length changed, the download might have updated.";
if (length < potential_file_length_ ||
potential_file_length_ == kUnknownContentLength) {
potential_file_length_ = length;
}
// TODO(qinmin): interrupt the download if the received bytes are larger
// than content length limit.
LOG_IF(ERROR, TotalBytesReceived() > potential_file_length_)
<< "Received data is larger than the content length limit.";
}
const base::FilePath& DownloadFileImpl::FullPath() const {
return file_.full_path();
}
bool DownloadFileImpl::InProgress() const {
return file_.in_progress();
}
void DownloadFileImpl::Pause() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
is_paused_ = true;
record_stream_bandwidth_ = false;
}
void DownloadFileImpl::Resume() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(is_paused_);
is_paused_ = false;
if (!base::FeatureList::IsEnabled(features::kNetworkService))
return;
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (!source_stream->is_finished()) {
StreamActive(source_stream, MOJO_RESULT_OK);
}
}
}
void DownloadFileImpl::StreamActive(SourceStream* source_stream,
MojoResult result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (base::FeatureList::IsEnabled(features::kNetworkService) && is_paused_)
return;
base::TimeTicks start(base::TimeTicks::Now());
base::TimeTicks now;
scoped_refptr<net::IOBuffer> incoming_data;
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
size_t bytes_to_write = 0;
bool should_terminate = false;
SourceStream::StreamState state(SourceStream::EMPTY);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
base::TimeDelta delta(
base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
// Take care of any file local activity required.
do {
state = source_stream->Read(&incoming_data, &incoming_data_size);
switch (state) {
case SourceStream::EMPTY:
should_terminate = (source_stream->length() == kNoBytesToWrite);
break;
case SourceStream::HAS_DATA: {
++num_buffers;
base::TimeTicks write_start(base::TimeTicks::Now());
should_terminate = CalculateBytesToWrite(
source_stream, incoming_data_size, &bytes_to_write);
DCHECK_GE(incoming_data_size, bytes_to_write);
reason = WriteDataToFile(
source_stream->offset() + source_stream->bytes_written(),
incoming_data.get()->data(), bytes_to_write);
disk_writes_time_ += (base::TimeTicks::Now() - write_start);
bytes_seen_ += bytes_to_write;
total_incoming_data_size += bytes_to_write;
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
int64_t prev_bytes_written = source_stream->bytes_written();
source_stream->OnWriteBytesToDisk(bytes_to_write);
if (!IsSparseFile())
break;
// If the write operation creates a new slice, add it to the
// |received_slices_| and update all the entries in
// |source_streams_|.
if (bytes_to_write > 0 && prev_bytes_written == 0) {
AddNewSlice(source_stream->offset(), bytes_to_write);
} else {
received_slices_[source_stream->index()].received_bytes +=
bytes_to_write;
}
}
} break;
case SourceStream::WAIT_FOR_COMPLETION:
source_stream->RegisterCompletionCallback(base::BindOnce(
&DownloadFileImpl::OnStreamCompleted, weak_factory_.GetWeakPtr()));
break;
case SourceStream::COMPLETE:
break;
default:
NOTREACHED();
break;
}
now = base::TimeTicks::Now();
} while (state == SourceStream::HAS_DATA &&
reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
!should_terminate);
// If we're stopping to yield the thread, post a task so we come back.
if (state == SourceStream::HAS_DATA && now - start > delta &&
!should_terminate) {
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&DownloadFileImpl::StreamActive,
weak_factory_.GetWeakPtr(), source_stream,
MOJO_RESULT_OK));
}
if (total_incoming_data_size)
RecordFileThreadReceiveBuffers(num_buffers);
RecordContiguousWriteTime(now - start);
if (state == SourceStream::COMPLETE)
OnStreamCompleted(source_stream);
else
NotifyObserver(source_stream, reason, state, should_terminate);
TRACE_EVENT_INSTANT2("download", "DownloadStreamDrained",
TRACE_EVENT_SCOPE_THREAD, "stream_size",
total_incoming_data_size, "num_buffers", num_buffers);
}
void DownloadFileImpl::OnStreamCompleted(SourceStream* source_stream) {
DownloadInterruptReason reason = HandleStreamCompletionStatus(source_stream);
SendUpdate();
NotifyObserver(source_stream, reason, SourceStream::COMPLETE, false);
}
void DownloadFileImpl::NotifyObserver(SourceStream* source_stream,
DownloadInterruptReason reason,
SourceStream::StreamState stream_state,
bool should_terminate) {
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
HandleStreamError(source_stream, reason);
} else if (stream_state == SourceStream::COMPLETE || should_terminate) {
// Signal successful completion or termination of the current stream.
source_stream->ClearDataReadyCallback();
source_stream->set_finished(true);
if (should_terminate)
CancelRequest(source_stream->offset());
if (source_stream->length() == DownloadSaveInfo::kLengthFullContent) {
SetPotentialFileLength(source_stream->offset() +
source_stream->bytes_written());
}
num_active_streams_--;
// Inform observers.
SendUpdate();
// All the stream reader are completed, shut down file IO processing.
if (IsDownloadCompleted()) {
RecordFileBandwidth(bytes_seen_, disk_writes_time_,
base::TimeTicks::Now() - download_start_);
if (record_stream_bandwidth_) {
RecordParallelizableDownloadStats(
bytes_seen_with_parallel_streams_,
download_time_with_parallel_streams_,
bytes_seen_without_parallel_streams_,
download_time_without_parallel_streams_, IsSparseFile());
}
weak_factory_.InvalidateWeakPtrs();
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
update_timer_.reset();
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationCompleted,
observer_, TotalBytesReceived(),
base::Passed(&hash_state)));
}
}
}
void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
source_stream->Initialize();
source_stream->RegisterDataReadyCallback(
base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
source_stream));
// Truncate |source_stream|'s length if necessary.
for (const auto& received_slice : received_slices_) {
source_stream->TruncateLengthWithWrittenDataBlock(
received_slice.offset, received_slice.received_bytes);
}
num_active_streams_++;
StreamActive(source_stream, MOJO_RESULT_OK);
}
int64_t DownloadFileImpl::TotalBytesReceived() const {
return file_.bytes_so_far();
}
void DownloadFileImpl::SendUpdate() {
// TODO(qinmin): For each active stream, add the slice it has written so
// far along with received_slices_.
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationUpdate, observer_,
TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
received_slices_));
}
void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
if (!update_timer_->IsRunning()) {
update_timer_->Start(FROM_HERE,
base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
this, &DownloadFileImpl::SendUpdate);
}
rate_estimator_.Increment(data_len);
base::TimeTicks now = base::TimeTicks::Now();
base::TimeDelta time_elapsed = (now - last_update_time_);
last_update_time_ = now;
if (num_active_streams_ > 1) {
download_time_with_parallel_streams_ += time_elapsed;
bytes_seen_with_parallel_streams_ += data_len;
} else {
download_time_without_parallel_streams_ += time_elapsed;
bytes_seen_without_parallel_streams_ += data_len;
}
}
void DownloadFileImpl::AddNewSlice(int64_t offset, int64_t length) {
size_t index = AddOrMergeReceivedSliceIntoSortedArray(
DownloadItem::ReceivedSlice(offset, length), received_slices_);
// Check if the slice is added as a new slice, or merged with an existing one.
bool slice_added = (offset == received_slices_[index].offset);
// Update the index of exising SourceStreams.
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (source_stream->offset() > offset) {
if (slice_added && source_stream->bytes_written() > 0)
source_stream->set_index(source_stream->index() + 1);
} else if (source_stream->offset() == offset) {
source_stream->set_index(index);
} else {
source_stream->TruncateLengthWithWrittenDataBlock(offset, length);
}
}
}
bool DownloadFileImpl::IsDownloadCompleted() {
for (auto& stream : source_streams_) {
if (!stream.second->is_finished())
return false;
}
if (!IsSparseFile())
return true;
// Verify that all the file slices have been downloaded.
std::vector<DownloadItem::ReceivedSlice> slices_to_download =
FindSlicesToDownload(received_slices_);
if (slices_to_download.size() > 1) {
// If there are 1 or more holes in the file, download is not finished.
// Some streams might not have been added to |source_streams_| yet.
return false;
}
return TotalBytesReceived() == potential_file_length_;
}
void DownloadFileImpl::HandleStreamError(SourceStream* source_stream,
DownloadInterruptReason reason) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
source_stream->ClearDataReadyCallback();
source_stream->set_finished(true);
num_active_streams_--;
bool can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
if (IsSparseFile() && !can_recover_from_error) {
// If a neighboring stream request is available, check if it can help
// download all the data left by |source stream| or has already done so. We
// want to avoid the situation that a server always fail additional requests
// from the client thus causing the initial request and the download going
// nowhere.
// TODO(qinmin): make all streams half open so that they can recover
// failures from their neighbors.
SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
while (preceding_neighbor) {
int64_t upper_range = source_stream->offset() + source_stream->length();
if ((!preceding_neighbor->is_finished() &&
(preceding_neighbor->length() ==
DownloadSaveInfo::kLengthFullContent ||
preceding_neighbor->offset() + preceding_neighbor->length() >=
upper_range)) ||
(preceding_neighbor->offset() + preceding_neighbor->bytes_written() >=
upper_range)) {
can_recover_from_error = true;
break;
}
// If the neighbor cannot recover the error and it has already created
// a slice, just interrupt the download.
if (preceding_neighbor->bytes_written() > 0)
break;
preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
}
if (can_recover_from_error) {
// Since the neighbor stream will download all data downloading from its
// offset to source_stream->offset(). Close all other streams in the
// middle.
for (auto& stream : source_streams_) {
if (stream.second->offset() < source_stream->offset() &&
stream.second->offset() > preceding_neighbor->offset()) {
DCHECK_EQ(stream.second->bytes_written(), 0);
stream.second->ClearDataReadyCallback();
stream.second->set_finished(true);
CancelRequest(stream.second->offset());
num_active_streams_--;
}
}
}
}
SendUpdate(); // Make info up to date before error.
if (!can_recover_from_error) {
// Error case for both upstream source and file write.
// Shut down processing and signal an error to our observer.
// Our observer will clean us up.
weak_factory_.InvalidateWeakPtrs();
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationError,
observer_, reason, TotalBytesReceived(),
base::Passed(&hash_state)));
}
}
bool DownloadFileImpl::IsSparseFile() const {
return source_streams_.size() > 1 || !received_slices_.empty();
}
DownloadFileImpl::SourceStream* DownloadFileImpl::FindPrecedingNeighbor(
SourceStream* source_stream) {
int64_t max_preceding_offset = 0;
SourceStream* ret = nullptr;
for (auto& stream : source_streams_) {
int64_t offset = stream.second->offset();
if (offset < source_stream->offset() && offset >= max_preceding_offset) {
ret = stream.second.get();
max_preceding_offset = offset;
}
}
return ret;
}
void DownloadFileImpl::CancelRequest(int64_t offset) {
if (!cancel_request_callback_.is_null()) {
BrowserThread::PostTask(BrowserThread::UI, FROM_HERE,
base::BindOnce(cancel_request_callback_, offset));
}
}
void DownloadFileImpl::DebugStates() const {
DVLOG(1) << "### Debugging DownloadFile states:";
DVLOG(1) << "Total source stream count = " << source_streams_.size();
for (const auto& stream : source_streams_) {
DVLOG(1) << "Source stream, offset = " << stream.second->offset()
<< " , bytes_written = " << stream.second->bytes_written()
<< " , is_finished = " << stream.second->is_finished()
<< " , length = " << stream.second->length();
}
DebugSlicesInfo(received_slices_);
}
DownloadFileImpl::RenameParameters::RenameParameters(
RenameOption option,
const base::FilePath& new_path,
const RenameCompletionCallback& completion_callback)
: option(option),
new_path(new_path),
retries_left(kMaxRenameRetries),
completion_callback(completion_callback) {}
DownloadFileImpl::RenameParameters::~RenameParameters() {}
} // namespace content