blob: 2bf878b7c9cfa670fc67c28264406c5972e32e1d [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/download/public/common/download_file_impl.h"
#include <algorithm>
#include <string>
#include <utility>
#include "base/bind.h"
#include "base/files/file_util.h"
#include "base/strings/stringprintf.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/thread_task_runner_handle.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "base/values.h"
#include "build/build_config.h"
#include "components/download/internal/common/parallel_download_utils.h"
#include "components/download/public/common/download_create_info.h"
#include "components/download/public/common/download_destination_observer.h"
#include "components/download/public/common/download_features.h"
#include "components/download/public/common/download_interrupt_reasons_utils.h"
#include "components/download/public/common/download_stats.h"
#include "crypto/secure_hash.h"
#include "crypto/sha2.h"
#include "mojo/public/c/system/types.h"
#include "net/base/io_buffer.h"
#include "services/network/public/cpp/features.h"
#include "services/service_manager/public/cpp/connector.h"
#if defined(OS_ANDROID)
#include "base/android/content_uri_utils.h"
#include "components/download/internal/common/android/download_collection_bridge.h"
#endif // defined(OS_ANDROID)
namespace download {
namespace {
const int kUpdatePeriodMs = 500;
const int kMaxTimeBlockingFileThreadMs = 1000;
// These constants control the default retry behavior for failing renames. Each
// retry is performed after a delay that is twice the previous delay. The
// initial delay is specified by kInitialRenameRetryDelayMs.
const int kInitialRenameRetryDelayMs = 200;
// Number of times a failing rename is retried before giving up.
const int kMaxRenameRetries = 3;
// Because DownloadSaveInfo::kLengthFullContent is 0, we should avoid using
// 0 for length if we found that a stream can no longer write any data.
const int kNoBytesToWrite = -1;
// Default content length when the potential file size is not yet determined.
const int kUnknownContentLength = -1;
} // namespace
DownloadFileImpl::SourceStream::SourceStream(
int64_t offset,
int64_t length,
int64_t starting_file_write_offset,
std::unique_ptr<InputStream> stream)
: offset_(offset),
length_(length),
starting_file_write_offset_(starting_file_write_offset),
bytes_read_(0),
bytes_written_(0),
finished_(false),
index_(0u),
input_stream_(std::move(stream)) {
CHECK_LE(offset_, starting_file_write_offset_);
CHECK_GE(offset_, 0);
DCHECK(length <= 0 || length >= starting_file_write_offset - offset)
<< "Not enough for content validation. offset = " << offset
<< ", length = " << length
<< " , starting_file_write_offset = " << starting_file_write_offset
<< ".";
}
DownloadFileImpl::SourceStream::~SourceStream() = default;
void DownloadFileImpl::SourceStream::Initialize() {
input_stream_->Initialize();
}
void DownloadFileImpl::SourceStream::OnBytesConsumed(int64_t bytes_read,
int64_t bytes_written) {
CHECK_GE(bytes_read, bytes_written);
bytes_read_ += bytes_read;
bytes_written_ += bytes_written;
}
void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
int64_t received_slice_offset,
int64_t bytes_written) {
DCHECK_GT(bytes_written, 0);
if (length_ == kNoBytesToWrite)
return;
if (received_slice_offset <= starting_file_write_offset_) {
// If validation has completed, mark the stream as finished if the file
// write position already has data.
if (received_slice_offset + bytes_written > starting_file_write_offset_ &&
GetRemainingBytesToValidate() == 0) {
length_ = kNoBytesToWrite;
finished_ = true;
}
return;
}
if (length_ == DownloadSaveInfo::kLengthFullContent ||
(length_ > received_slice_offset - offset_ &&
length_ > starting_file_write_offset_ - offset_)) {
// Stream length should always include the validation data, unless the
// response is too short.
length_ =
std::max(received_slice_offset, starting_file_write_offset_) - offset_;
}
}
void DownloadFileImpl::SourceStream::RegisterDataReadyCallback(
const mojo::SimpleWatcher::ReadyCallback& callback) {
input_stream_->RegisterDataReadyCallback(callback);
}
void DownloadFileImpl::SourceStream::ClearDataReadyCallback() {
input_stream_->ClearDataReadyCallback();
}
DownloadInterruptReason DownloadFileImpl::SourceStream::GetCompletionStatus()
const {
return input_stream_->GetCompletionStatus();
}
void DownloadFileImpl::SourceStream::RegisterCompletionCallback(
DownloadFileImpl::SourceStream::CompletionCallback callback) {
input_stream_->RegisterCompletionCallback(
base::BindOnce(std::move(callback), base::Unretained(this)));
}
InputStream::StreamState DownloadFileImpl::SourceStream::Read(
scoped_refptr<net::IOBuffer>* data,
size_t* length) {
return input_stream_->Read(data, length);
}
size_t DownloadFileImpl::SourceStream::GetRemainingBytesToValidate() {
int64_t bytes_remaining = starting_file_write_offset_ - offset_ - bytes_read_;
return bytes_remaining < 0 ? 0 : bytes_remaining;
}
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
std::unique_ptr<InputStream> stream,
uint32_t download_id,
base::WeakPtr<DownloadDestinationObserver> observer)
: file_(download_id),
save_info_(std::move(save_info)),
default_download_directory_(default_download_directory),
potential_file_length_(kUnknownContentLength),
bytes_seen_(0),
num_active_streams_(0),
record_stream_bandwidth_(false),
bytes_seen_with_parallel_streams_(0),
bytes_seen_without_parallel_streams_(0),
is_paused_(false),
download_id_(download_id),
main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
observer_(observer) {
TRACE_EVENT_INSTANT0("download", "DownloadFileCreated",
TRACE_EVENT_SCOPE_THREAD);
TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("download", "DownloadFileActive",
download_id);
source_streams_[save_info_->offset] = std::make_unique<SourceStream>(
save_info_->offset, save_info_->length,
save_info_->GetStartingFileWriteOffset(), std::move(stream));
DETACH_FROM_SEQUENCE(sequence_checker_);
}
DownloadFileImpl::~DownloadFileImpl() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
TRACE_EVENT_NESTABLE_ASYNC_END0("download", "DownloadFileActive",
download_id_);
}
void DownloadFileImpl::Initialize(
InitializeCallback initialize_callback,
const CancelRequestCallback& cancel_request_callback,
const DownloadItem::ReceivedSlices& received_slices,
bool is_parallelizable) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
update_timer_.reset(new base::RepeatingTimer());
int64_t bytes_so_far = 0;
cancel_request_callback_ = cancel_request_callback;
received_slices_ = received_slices;
// If the last slice is finished, then we know the actual content size.
if (!received_slices_.empty() && received_slices_.back().finished) {
SetPotentialFileLength(received_slices_.back().offset +
received_slices_.back().received_bytes);
}
if (IsSparseFile()) {
for (const auto& received_slice : received_slices_) {
bytes_so_far += received_slice.received_bytes;
}
} else {
bytes_so_far = save_info_->GetStartingFileWriteOffset();
}
int64_t bytes_wasted = 0;
DownloadInterruptReason reason = file_.Initialize(
save_info_->file_path, default_download_directory_,
std::move(save_info_->file), bytes_so_far,
save_info_->hash_of_partial_file, std::move(save_info_->hash_state),
IsSparseFile(), &bytes_wasted);
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(std::move(initialize_callback), reason, bytes_wasted));
return;
}
download_start_ = base::TimeTicks::Now();
last_update_time_ = download_start_;
record_stream_bandwidth_ = is_parallelizable;
// Primarily to make reset to zero in restart visible to owner.
SendUpdate();
main_task_runner_->PostTask(
FROM_HERE, base::BindOnce(std::move(initialize_callback),
DOWNLOAD_INTERRUPT_REASON_NONE, bytes_wasted));
// Initial pull from the straw from all source streams.
for (auto& source_stream : source_streams_)
RegisterAndActivateStream(source_stream.second.get());
}
void DownloadFileImpl::AddInputStream(std::unique_ptr<InputStream> stream,
int64_t offset,
int64_t length) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// UI thread may not be notified about completion and detach download file,
// clear up the network request.
if (IsDownloadCompleted()) {
CancelRequest(offset);
return;
}
DCHECK(source_streams_.find(offset) == source_streams_.end());
source_streams_[offset] =
std::make_unique<SourceStream>(offset, length, offset, std::move(stream));
OnSourceStreamAdded(source_streams_[offset].get());
}
void DownloadFileImpl::OnSourceStreamAdded(SourceStream* source_stream) {
// There are writers at different offsets now, create the received slices
// vector if necessary.
if (received_slices_.empty() && TotalBytesReceived() > 0) {
size_t index = AddOrMergeReceivedSliceIntoSortedArray(
DownloadItem::ReceivedSlice(0, TotalBytesReceived()), received_slices_);
DCHECK_EQ(index, 0u);
}
// If the file is initialized, start to write data, or wait until file opened.
if (file_.in_progress())
RegisterAndActivateStream(source_stream);
}
DownloadInterruptReason DownloadFileImpl::ValidateAndWriteDataToFile(
int64_t offset,
const char* data,
size_t bytes_to_validate,
size_t bytes_to_write) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Check if some of the data is for validation purpose.
if (bytes_to_validate > 0 &&
!file_.ValidateDataInFile(offset, data, bytes_to_validate)) {
return DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH;
}
// If there is no data to write, just return DOWNLOAD_INTERRUPT_REASON_NONE
// and read the next chunk.
if (bytes_to_write <= 0)
return DOWNLOAD_INTERRUPT_REASON_NONE;
// Write the remaining data to disk.
WillWriteToDisk(bytes_to_write);
return file_.WriteDataToFile(offset + bytes_to_validate,
data + bytes_to_validate, bytes_to_write);
}
bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
size_t bytes_available_to_write,
size_t* bytes_to_validate,
size_t* bytes_to_write) {
*bytes_to_validate = 0;
if (source_stream->length() == kNoBytesToWrite) {
*bytes_to_write = 0;
return true;
}
// First calculate the number of bytes to validate.
*bytes_to_write = bytes_available_to_write;
size_t remaining_bytes_to_validate =
source_stream->GetRemainingBytesToValidate();
if (remaining_bytes_to_validate > 0) {
*bytes_to_validate =
std::min(remaining_bytes_to_validate, bytes_available_to_write);
*bytes_to_write -= *bytes_to_validate;
}
if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
source_stream->bytes_read() +
static_cast<int64_t>(bytes_available_to_write) >
source_stream->length()) {
// Total bytes to consume is capped by the length of the stream.
int64_t bytes_to_consume =
source_stream->length() - source_stream->bytes_read();
// The validation data should always be streamed.
DCHECK_GE(bytes_to_consume, static_cast<int64_t>(*bytes_to_validate));
*bytes_to_write = bytes_to_consume - *bytes_to_validate;
return true;
}
// If a new slice finds that its target position has already been written,
// terminate the stream if there are no bytes to validate.
if (source_stream->bytes_written() == 0 && *bytes_to_write > 0) {
for (const auto& received_slice : received_slices_) {
if (received_slice.offset <=
source_stream->starting_file_write_offset() &&
received_slice.offset + received_slice.received_bytes >
source_stream->starting_file_write_offset()) {
*bytes_to_write = 0;
return true;
}
}
}
return false;
}
void DownloadFileImpl::RenameAndUniquify(
const base::FilePath& full_path,
const RenameCompletionCallback& callback) {
std::unique_ptr<RenameParameters> parameters(
new RenameParameters(UNIQUIFY, full_path, callback));
RenameWithRetryInternal(std::move(parameters));
}
void DownloadFileImpl::RenameAndAnnotate(
const base::FilePath& full_path,
const std::string& client_guid,
const GURL& source_url,
const GURL& referrer_url,
std::unique_ptr<service_manager::Connector> connector,
const RenameCompletionCallback& callback) {
std::unique_ptr<RenameParameters> parameters(new RenameParameters(
ANNOTATE_WITH_SOURCE_INFORMATION, full_path, callback));
parameters->client_guid = client_guid;
parameters->source_url = source_url;
parameters->referrer_url = referrer_url;
parameters->connector = std::move(connector);
RenameWithRetryInternal(std::move(parameters));
}
#if defined(OS_ANDROID)
void DownloadFileImpl::RenameToIntermediateUri(
const GURL& original_url,
const GURL& referrer_url,
const base::FilePath& file_name,
const std::string& mime_type,
const base::FilePath& current_path,
const RenameCompletionCallback& callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Create new content URI if |current_path| is not content URI
// or if it is already deleted.
base::FilePath content_path =
current_path.IsContentUri() && base::ContentUriExists(current_path)
? current_path
: DownloadCollectionBridge::CreateIntermediateUriForPublish(
original_url, referrer_url, file_name, mime_type);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_FILE_FAILED;
if (!content_path.empty()) {
reason = file_.Rename(content_path);
display_name_ = DownloadCollectionBridge::GetDisplayName(content_path);
}
if (display_name_.empty())
display_name_ = file_name;
OnRenameComplete(content_path, callback, reason);
}
void DownloadFileImpl::PublishDownload(
const RenameCompletionCallback& callback) {
DownloadInterruptReason reason = file_.PublishDownload();
OnRenameComplete(file_.full_path(), callback, reason);
}
base::FilePath DownloadFileImpl::GetDisplayName() {
return display_name_;
}
#endif // defined(OS_ANDROID)
base::TimeDelta DownloadFileImpl::GetRetryDelayForFailedRename(
int attempt_number) {
DCHECK_GE(attempt_number, 0);
// |delay| starts at kInitialRenameRetryDelayMs and increases by a factor of
// 2 at each subsequent retry. Assumes that |retries_left| starts at
// kMaxRenameRetries. Also assumes that kMaxRenameRetries is less than the
// number of bits in an int.
return base::TimeDelta::FromMilliseconds(kInitialRenameRetryDelayMs) *
(1 << attempt_number);
}
bool DownloadFileImpl::ShouldRetryFailedRename(DownloadInterruptReason reason) {
return reason == DOWNLOAD_INTERRUPT_REASON_FILE_TRANSIENT_ERROR;
}
DownloadInterruptReason DownloadFileImpl::HandleStreamCompletionStatus(
SourceStream* source_stream) {
DownloadInterruptReason reason = source_stream->GetCompletionStatus();
if (source_stream->length() == DownloadSaveInfo::kLengthFullContent &&
!received_slices_.empty() &&
(source_stream->starting_file_write_offset() ==
received_slices_.back().offset +
received_slices_.back().received_bytes) &&
reason == DOWNLOAD_INTERRUPT_REASON_SERVER_NO_RANGE) {
// We are probably reaching the end of the stream, don't treat this
// as an error.
return DOWNLOAD_INTERRUPT_REASON_NONE;
}
return reason;
}
void DownloadFileImpl::RenameWithRetryInternal(
std::unique_ptr<RenameParameters> parameters) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
base::FilePath new_path = parameters->new_path;
if ((parameters->option & UNIQUIFY) && new_path != file_.full_path()) {
new_path = base::GetUniquePath(new_path);
}
DownloadInterruptReason reason = file_.Rename(new_path);
// Attempt to retry the rename if possible. If the rename failed and the
// subsequent open also failed, then in_progress() would be false. We don't
// try to retry renames if the in_progress() was false to begin with since we
// have less assurance that the file at file_.full_path() was the one we were
// working with.
if (ShouldRetryFailedRename(reason) && file_.in_progress() &&
parameters->retries_left > 0) {
int attempt_number = kMaxRenameRetries - parameters->retries_left;
--parameters->retries_left;
if (parameters->time_of_first_failure.is_null())
parameters->time_of_first_failure = base::TimeTicks::Now();
base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
FROM_HERE,
base::BindOnce(&DownloadFileImpl::RenameWithRetryInternal,
weak_factory_.GetWeakPtr(), std::move(parameters)),
GetRetryDelayForFailedRename(attempt_number));
return;
}
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
(parameters->option & ANNOTATE_WITH_SOURCE_INFORMATION)) {
// Doing the annotation after the rename rather than before leaves
// a very small window during which the file has the final name but
// hasn't been marked with the Mark Of The Web. However, it allows
// anti-virus scanners on Windows to actually see the data
// (http://crbug.com/127999) under the correct name (which is information
// it uses).
//
// If concurrent downloads with the same target path are allowed, an
// asynchronous quarantine file may cause a file to be stamped with
// incorrect mark-of-the-web data. Therefore, fall back to non-service
// QuarantineFile when kPreventDownloadsWithSamePath is disabled.
if (base::FeatureList::IsEnabled(
download::features::kPreventDownloadsWithSamePath)) {
file_.AnnotateWithSourceInformation(
parameters->client_guid, parameters->source_url,
parameters->referrer_url, std::move(parameters->connector),
base::BindOnce(&DownloadFileImpl::OnRenameComplete,
weak_factory_.GetWeakPtr(), new_path,
parameters->completion_callback));
return;
}
reason = file_.AnnotateWithSourceInformationSync(parameters->client_guid,
parameters->source_url,
parameters->referrer_url);
}
OnRenameComplete(new_path, parameters->completion_callback, reason);
}
void DownloadFileImpl::OnRenameComplete(
const base::FilePath& new_path,
const RenameCompletionCallback& callback,
DownloadInterruptReason reason) {
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
// Make sure our information is updated, since we're about to
// error out.
SendUpdate();
// Null out callback so that we don't do any more stream processing.
// The request that writes to the pipe should be canceled after
// the download being interrupted.
for (auto& stream : source_streams_)
stream.second->ClearDataReadyCallback();
}
main_task_runner_->PostTask(
FROM_HERE, base::BindOnce(callback, reason,
reason == DOWNLOAD_INTERRUPT_REASON_NONE
? new_path
: base::FilePath()));
}
void DownloadFileImpl::Detach() {
file_.Detach();
}
void DownloadFileImpl::Cancel() {
file_.Cancel();
}
void DownloadFileImpl::SetPotentialFileLength(int64_t length) {
DCHECK(potential_file_length_ == length ||
potential_file_length_ == kUnknownContentLength)
<< "Potential file length changed, the download might have updated.";
if (length < potential_file_length_ ||
potential_file_length_ == kUnknownContentLength) {
potential_file_length_ = length;
}
// TODO(qinmin): interrupt the download if the received bytes are larger
// than content length limit.
LOG_IF(ERROR, TotalBytesReceived() > potential_file_length_)
<< "Received data is larger than the content length limit.";
}
const base::FilePath& DownloadFileImpl::FullPath() const {
return file_.full_path();
}
bool DownloadFileImpl::InProgress() const {
return file_.in_progress();
}
void DownloadFileImpl::Pause() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
is_paused_ = true;
record_stream_bandwidth_ = false;
}
void DownloadFileImpl::Resume() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(is_paused_);
is_paused_ = false;
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (!source_stream->is_finished()) {
StreamActive(source_stream, MOJO_RESULT_OK);
}
}
}
void DownloadFileImpl::StreamActive(SourceStream* source_stream,
MojoResult result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (is_paused_)
return;
base::TimeTicks start(base::TimeTicks::Now());
base::TimeTicks now;
scoped_refptr<net::IOBuffer> incoming_data;
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
size_t bytes_to_validate = 0;
size_t bytes_to_write = 0;
bool should_terminate = false;
InputStream::StreamState state(InputStream::EMPTY);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
base::TimeDelta delta(
base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
// Take care of any file local activity required.
do {
state = source_stream->Read(&incoming_data, &incoming_data_size);
switch (state) {
case InputStream::EMPTY:
should_terminate = (source_stream->length() == kNoBytesToWrite);
break;
case InputStream::HAS_DATA: {
++num_buffers;
should_terminate =
CalculateBytesToWrite(source_stream, incoming_data_size,
&bytes_to_validate, &bytes_to_write);
DCHECK_GE(incoming_data_size, bytes_to_write);
reason = ValidateAndWriteDataToFile(
source_stream->offset() + source_stream->bytes_read(),
incoming_data->data(), bytes_to_validate, bytes_to_write);
bytes_seen_ += bytes_to_write;
total_incoming_data_size += incoming_data_size;
if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
int64_t prev_bytes_written = source_stream->bytes_written();
source_stream->OnBytesConsumed(incoming_data_size, bytes_to_write);
if (!IsSparseFile())
break;
// If the write operation creates a new slice, add it to the
// |received_slices_| and update all the entries in
// |source_streams_|.
if (bytes_to_write > 0 && prev_bytes_written == 0) {
AddNewSlice(source_stream->starting_file_write_offset(),
bytes_to_write);
} else {
received_slices_[source_stream->index()].received_bytes +=
bytes_to_write;
}
}
} break;
case InputStream::WAIT_FOR_COMPLETION:
source_stream->RegisterCompletionCallback(base::BindOnce(
&DownloadFileImpl::OnStreamCompleted, weak_factory_.GetWeakPtr()));
break;
case InputStream::COMPLETE:
break;
default:
NOTREACHED();
break;
}
now = base::TimeTicks::Now();
} while (state == InputStream::HAS_DATA &&
reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
!should_terminate);
// If we're stopping to yield the thread, post a task so we come back.
if (state == InputStream::HAS_DATA && now - start > delta &&
!should_terminate) {
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&DownloadFileImpl::StreamActive,
weak_factory_.GetWeakPtr(), source_stream,
MOJO_RESULT_OK));
}
if (state == InputStream::COMPLETE)
OnStreamCompleted(source_stream);
else
NotifyObserver(source_stream, reason, state, should_terminate);
TRACE_EVENT_INSTANT2("download", "DownloadStreamDrained",
TRACE_EVENT_SCOPE_THREAD, "stream_size",
total_incoming_data_size, "num_buffers", num_buffers);
}
void DownloadFileImpl::OnStreamCompleted(SourceStream* source_stream) {
DownloadInterruptReason reason = HandleStreamCompletionStatus(source_stream);
SendUpdate();
NotifyObserver(source_stream, reason, InputStream::COMPLETE, false);
}
void DownloadFileImpl::NotifyObserver(SourceStream* source_stream,
DownloadInterruptReason reason,
InputStream::StreamState stream_state,
bool should_terminate) {
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
HandleStreamError(source_stream, reason);
} else if (stream_state == InputStream::COMPLETE || should_terminate) {
// Signal successful completion or termination of the current stream.
source_stream->ClearDataReadyCallback();
source_stream->set_finished(true);
if (should_terminate)
CancelRequest(source_stream->offset());
if (source_stream->length() == DownloadSaveInfo::kLengthFullContent) {
// Mark received slice as finished.
if (IsSparseFile() && source_stream->bytes_written() > 0) {
DCHECK_GT(received_slices_.size(), source_stream->index())
<< "Received slice index out of bound!";
received_slices_[source_stream->index()].finished = true;
}
SetPotentialFileLength(source_stream->offset() +
source_stream->bytes_read());
}
num_active_streams_--;
// Inform observers.
SendUpdate();
// All the stream reader are completed, shut down file IO processing.
if (IsDownloadCompleted()) {
RecordFileBandwidth(bytes_seen_,
base::TimeTicks::Now() - download_start_);
if (record_stream_bandwidth_) {
RecordParallelizableDownloadStats(
bytes_seen_with_parallel_streams_,
download_time_with_parallel_streams_,
bytes_seen_without_parallel_streams_,
download_time_without_parallel_streams_, IsSparseFile());
}
weak_factory_.InvalidateWeakPtrs();
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
update_timer_.reset();
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationCompleted,
observer_, TotalBytesReceived(),
std::move(hash_state)));
}
}
}
void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
source_stream->Initialize();
source_stream->RegisterDataReadyCallback(
base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
source_stream));
// Truncate |source_stream|'s length if necessary.
for (const auto& received_slice : received_slices_) {
source_stream->TruncateLengthWithWrittenDataBlock(
received_slice.offset, received_slice.received_bytes);
}
num_active_streams_++;
StreamActive(source_stream, MOJO_RESULT_OK);
}
int64_t DownloadFileImpl::TotalBytesReceived() const {
return file_.bytes_so_far();
}
void DownloadFileImpl::SendUpdate() {
// TODO(qinmin): For each active stream, add the slice it has written so
// far along with received_slices_.
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationUpdate, observer_,
TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
received_slices_));
}
void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
if (!update_timer_->IsRunning()) {
update_timer_->Start(FROM_HERE,
base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
this, &DownloadFileImpl::SendUpdate);
}
rate_estimator_.Increment(data_len);
base::TimeTicks now = base::TimeTicks::Now();
base::TimeDelta time_elapsed = (now - last_update_time_);
last_update_time_ = now;
if (num_active_streams_ > 1) {
download_time_with_parallel_streams_ += time_elapsed;
bytes_seen_with_parallel_streams_ += data_len;
} else {
download_time_without_parallel_streams_ += time_elapsed;
bytes_seen_without_parallel_streams_ += data_len;
}
}
void DownloadFileImpl::AddNewSlice(int64_t offset, int64_t length) {
size_t index = AddOrMergeReceivedSliceIntoSortedArray(
DownloadItem::ReceivedSlice(offset, length), received_slices_);
// Check if the slice is added as a new slice, or merged with an existing one.
bool slice_added = (offset == received_slices_[index].offset);
// Update the index of exising SourceStreams.
for (auto& stream : source_streams_) {
SourceStream* source_stream = stream.second.get();
if (source_stream->starting_file_write_offset() > offset) {
if (slice_added && source_stream->bytes_written() > 0)
source_stream->set_index(source_stream->index() + 1);
} else if (source_stream->starting_file_write_offset() == offset) {
source_stream->set_index(index);
} else {
source_stream->TruncateLengthWithWrittenDataBlock(offset, length);
}
}
}
bool DownloadFileImpl::IsDownloadCompleted() {
for (auto& stream : source_streams_) {
if (!stream.second->is_finished())
return false;
}
if (!IsSparseFile())
return true;
// Verify that all the file slices have been downloaded.
std::vector<DownloadItem::ReceivedSlice> slices_to_download =
FindSlicesToDownload(received_slices_);
if (slices_to_download.size() > 1) {
// If there are 1 or more holes in the file, download is not finished.
// Some streams might not have been added to |source_streams_| yet.
return false;
}
return TotalBytesReceived() == potential_file_length_;
}
void DownloadFileImpl::HandleStreamError(SourceStream* source_stream,
DownloadInterruptReason reason) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
source_stream->ClearDataReadyCallback();
source_stream->set_finished(true);
num_active_streams_--;
bool can_recover_from_error = false;
if (reason != DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH) {
// If previous stream has already written data at the starting offset of
// the error stream. The download can complete.
can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
// See if the previous stream can download the full content.
// If the current stream has written some data, length of all preceding
// streams will be truncated.
if (IsSparseFile() && !can_recover_from_error) {
SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
while (preceding_neighbor) {
if (CanRecoverFromError(source_stream, preceding_neighbor)) {
can_recover_from_error = true;
break;
}
// If the neighbor cannot recover the error and it has already created
// a slice, just interrupt the download.
if (preceding_neighbor->bytes_written() > 0)
break;
preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
}
}
}
SendUpdate(); // Make info up to date before error.
if (!can_recover_from_error) {
// Error case for both upstream source and file write.
// Shut down processing and signal an error to our observer.
// Our observer will clean us up.
weak_factory_.InvalidateWeakPtrs();
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
main_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&DownloadDestinationObserver::DestinationError,
observer_, reason, TotalBytesReceived(),
std::move(hash_state)));
}
}
bool DownloadFileImpl::IsSparseFile() const {
return source_streams_.size() > 1 || !received_slices_.empty();
}
DownloadFileImpl::SourceStream* DownloadFileImpl::FindPrecedingNeighbor(
SourceStream* source_stream) {
int64_t max_preceding_offset = 0;
SourceStream* ret = nullptr;
for (auto& stream : source_streams_) {
int64_t offset = stream.second->starting_file_write_offset();
if (offset < source_stream->starting_file_write_offset() &&
offset >= max_preceding_offset) {
ret = stream.second.get();
max_preceding_offset = offset;
}
}
return ret;
}
void DownloadFileImpl::CancelRequest(int64_t offset) {
if (!cancel_request_callback_.is_null()) {
main_task_runner_->PostTask(
FROM_HERE, base::BindOnce(cancel_request_callback_, offset));
}
}
void DownloadFileImpl::DebugStates() const {
DVLOG(1) << "### Debugging DownloadFile states:";
DVLOG(1) << "Total source stream count = " << source_streams_.size();
for (const auto& stream : source_streams_) {
DVLOG(1) << "Source stream, offset = " << stream.second->offset()
<< " , bytes_read = " << stream.second->bytes_read()
<< " , starting_file_write_offset = "
<< stream.second->starting_file_write_offset()
<< " , bytes_written = " << stream.second->bytes_written()
<< " , is_finished = " << stream.second->is_finished()
<< " , length = " << stream.second->length()
<< ", index = " << stream.second->index();
}
DebugSlicesInfo(received_slices_);
}
DownloadFileImpl::RenameParameters::RenameParameters(
RenameOption option,
const base::FilePath& new_path,
const RenameCompletionCallback& completion_callback)
: option(option),
new_path(new_path),
retries_left(kMaxRenameRetries),
completion_callback(completion_callback) {}
DownloadFileImpl::RenameParameters::~RenameParameters() {}
} // namespace download