blob: 3069a8d6c072623f5ab78229fcf6d8a9dea08396 [file] [log] [blame]
// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/devtools/devtools_stream_pipe.h"
#include "base/base64.h"
#include "base/containers/span.h"
#include "base/functional/bind.h"
#include "base/strings/string_util.h"
#include "base/strings/string_view_util.h"
#include "base/task/sequenced_task_runner.h"
namespace content {
struct DevToolsStreamPipe::ReadRequest {
ReadRequest() = delete;
ReadRequest(uint32_t max_size, ReadCallback read_callback)
: max_size(max_size), read_callback(std::move(read_callback)) {}
uint32_t max_size;
ReadCallback read_callback;
};
// static
scoped_refptr<DevToolsStreamPipe> DevToolsStreamPipe::Create(
DevToolsIOContext* context,
mojo::ScopedDataPipeConsumerHandle pipe) {
return new DevToolsStreamPipe(context, std::move(pipe));
}
DevToolsStreamPipe::DevToolsStreamPipe(DevToolsIOContext* context,
mojo::ScopedDataPipeConsumerHandle pipe)
: DevToolsIOContext::Stream(base::SequencedTaskRunner::GetCurrentDefault()),
handle_(Register(context)),
pipe_(std::move(pipe)),
pipe_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
last_status_(StatusSuccess) {
MojoResult res = pipe_watcher_.Watch(
pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&DevToolsStreamPipe::OnPipeSignalled,
base::Unretained(this)));
DCHECK_EQ(MOJO_RESULT_OK, res);
}
DevToolsStreamPipe::~DevToolsStreamPipe() = default;
bool DevToolsStreamPipe::SupportsSeek() const {
return false;
}
void DevToolsStreamPipe::Read(off_t position,
size_t max_size,
ReadCallback callback) {
DCHECK(position == -1);
if (last_status_ != StatusSuccess) {
DCHECK(read_requests_.empty());
std::move(callback).Run(std::make_unique<std::string>(), false,
last_status_);
return;
}
read_requests_.emplace(max_size, std::move(callback));
if (read_requests_.size() == 1lu)
pipe_watcher_.ArmOrNotify();
}
void DevToolsStreamPipe::OnPipeSignalled(
MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK_EQ(StatusSuccess, last_status_);
DCHECK(!read_requests_.empty());
if (result != MOJO_RESULT_OK) {
DispatchEOFOrError(state.peer_closed());
return;
}
while (!read_requests_.empty()) {
base::span<const uint8_t> pipe_bytes;
MojoResult res = pipe_->BeginReadData(MOJO_READ_DATA_FLAG_NONE, pipe_bytes);
if (res == MOJO_RESULT_FAILED_PRECONDITION) {
DCHECK(state.peer_closed());
DispatchEOFOrError(state.peer_closed());
return;
}
DCHECK_EQ(MOJO_RESULT_OK, res);
auto& request = read_requests_.front();
const size_t bytes_to_read =
std::min(pipe_bytes.size(), size_t{request.max_size} - buffer_.size());
// Dispatch available bytes (but no more than requested), when there are
// multiple requests pending. If we just have a single read request, it's
// more efficient (and easier for client) to only dispatch when enough bytes
// are available or eof has been reached.
const bool fulfill_entire_request = read_requests_.size() == 1ul;
if (fulfill_entire_request)
buffer_.reserve(request.max_size);
buffer_.append(base::as_string_view(pipe_bytes.first(bytes_to_read)));
pipe_->EndReadData(bytes_to_read);
DCHECK_LE(buffer_.size(), request.max_size);
if (buffer_.size() < request.max_size && fulfill_entire_request)
break;
DispatchResponse();
if (bytes_to_read == pipe_bytes.size()) {
break;
}
}
if (!read_requests_.empty())
pipe_watcher_.ArmOrNotify();
}
void DevToolsStreamPipe::DispatchResponse() {
auto data = std::make_unique<std::string>(std::move(buffer_));
bool is_binary = !data->empty() && !base::IsStringUTF8(*data);
if (is_binary) {
*data.get() = base::Base64Encode(*data);
}
std::move(read_requests_.front().read_callback)
.Run(std::move(data), is_binary, last_status_);
read_requests_.pop();
}
void DevToolsStreamPipe::DispatchEOFOrError(bool is_eof) {
// For consistency with other implementation, do not report EOF or failure
// condition along with actual data, do it for the next request instead.
if (!buffer_.empty())
DispatchResponse();
last_status_ = is_eof ? StatusEOF : StatusFailure;
while (!read_requests_.empty())
DispatchResponse();
}
} // namespace content