blob: 04fc9e7b15f71b9512b08ce94486855c2948622d [file] [log] [blame]
// Copyright 2020 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/direct_sockets/tcp_writable_stream_wrapper.h"
#include "third_party/blink/renderer/bindings/core/v8/array_buffer_or_array_buffer_view.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_throw_dom_exception.h"
#include "third_party/blink/renderer/core/streams/underlying_sink_base.h"
#include "third_party/blink/renderer/core/streams/writable_stream.h"
#include "third_party/blink/renderer/platform/bindings/exception_code.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/heap/persistent.h"
#include "third_party/blink/renderer/platform/heap/visitor.h"
#include "third_party/blink/renderer/platform/wtf/functional.h"
namespace blink {
// An implementation of UnderlyingSinkBase that forwards all operations to the
// TCPWritableStreamWrapper object that created it.
class TCPWritableStreamWrapper::UnderlyingSink final
: public UnderlyingSinkBase {
public:
explicit UnderlyingSink(TCPWritableStreamWrapper* tcp_writable_stream_wrapper)
: tcp_writable_stream_wrapper_(tcp_writable_stream_wrapper) {}
// Implementation of UnderlyingSinkBase
ScriptPromise start(ScriptState* script_state,
WritableStreamDefaultController* controller,
ExceptionState&) override {
DVLOG(1) << "TCPWritableStreamWrapper::UnderlyinkSink::start() "
"tcp_writable_stream_wrapper_="
<< tcp_writable_stream_wrapper_;
tcp_writable_stream_wrapper_->controller_ = controller;
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise write(ScriptState* script_state,
ScriptValue chunk,
WritableStreamDefaultController*,
ExceptionState& exception_state) override {
DVLOG(1) << "TCPWritableStreamWrapper::UnderlyingSink::write() "
"tcp_writable_stream_wrapper_="
<< tcp_writable_stream_wrapper_;
return tcp_writable_stream_wrapper_->SinkWrite(script_state, chunk,
exception_state);
}
ScriptPromise close(ScriptState* script_state, ExceptionState&) override {
DVLOG(1) << "TCPWritableStreamWrapper::UnderlingSink::close() "
"tcp_writable_stream_wrapper_="
<< tcp_writable_stream_wrapper_;
DCHECK(!tcp_writable_stream_wrapper_->write_promise_resolver_);
tcp_writable_stream_wrapper_->AbortAndReset();
return ScriptPromise::CastUndefined(script_state);
}
ScriptPromise abort(ScriptState* script_state,
ScriptValue reason,
ExceptionState& exception_state) override {
DVLOG(1) << "TCPWritableStreamWrapper::UnderlyingSink::abort() "
"tcp_writable_stream_wrapper_="
<< tcp_writable_stream_wrapper_;
return close(script_state, exception_state);
}
void Trace(Visitor* visitor) const override {
visitor->Trace(tcp_writable_stream_wrapper_);
UnderlyingSinkBase::Trace(visitor);
}
private:
const Member<TCPWritableStreamWrapper> tcp_writable_stream_wrapper_;
};
TCPWritableStreamWrapper::CachedDataBuffer::CachedDataBuffer(
v8::Isolate* isolate,
const uint8_t* data,
size_t length)
: isolate_(isolate), length_(length) {
// We use the BufferPartition() allocator here to allow big enough
// allocations, and to do proper accounting of the used memory. If
// BufferPartition() will ever not be able to provide big enough allocations,
// e.g. because bigger ArrayBuffers get supported, then we have to switch to
// another allocator, e.g. the ArrayBuffer allocator.
buffer_ = std::unique_ptr<uint8_t[], OnFree>(
reinterpret_cast<uint8_t*>(WTF::Partitions::BufferPartition()->Alloc(
length, "TCPWritableStreamWrapper")));
memcpy(buffer_.get(), data, length);
isolate_->AdjustAmountOfExternalAllocatedMemory(static_cast<int64_t>(length));
}
TCPWritableStreamWrapper::CachedDataBuffer::~CachedDataBuffer() {
isolate_->AdjustAmountOfExternalAllocatedMemory(
-static_cast<int64_t>(length_));
}
TCPWritableStreamWrapper::TCPWritableStreamWrapper(
ScriptState* script_state,
base::OnceClosure on_abort,
mojo::ScopedDataPipeProducerHandle handle)
: script_state_(script_state),
on_abort_(std::move(on_abort)),
data_pipe_(std::move(handle)),
write_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL),
close_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
write_watcher_.Watch(
data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
WTF::BindRepeating(&TCPWritableStreamWrapper::OnHandleReady,
WrapWeakPersistent(this)));
close_watcher_.Watch(
data_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
WTF::BindRepeating(&TCPWritableStreamWrapper::OnPeerClosed,
WrapWeakPersistent(this)));
ScriptState::Scope scope(script_state_);
// Set the CountQueueingStrategy's high water mark as 1 to make the logic of
// |WriteOrCacheData| much simpler
writable_ = WritableStream::CreateWithCountQueueingStrategy(
script_state_, MakeGarbageCollected<UnderlyingSink>(this), 1);
}
TCPWritableStreamWrapper::~TCPWritableStreamWrapper() = default;
void TCPWritableStreamWrapper::Reset() {
DVLOG(1) << "TCPWritableStreamWrapper::Reset() this=" << this;
// We no longer need to call |on_abort_|.
on_abort_.Reset();
ErrorStreamAbortAndReset();
}
void TCPWritableStreamWrapper::Trace(Visitor* visitor) const {
visitor->Trace(script_state_);
visitor->Trace(writable_);
visitor->Trace(controller_);
visitor->Trace(write_promise_resolver_);
}
void TCPWritableStreamWrapper::OnHandleReady(MojoResult result,
const mojo::HandleSignalsState&) {
DVLOG(1) << "TCPWritableStreamWrapper::OnHandleReady() this=" << this
<< " result=" << result;
switch (result) {
case MOJO_RESULT_OK:
WriteCachedData();
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// Will be handled by |close_watcher_|.
break;
default:
NOTREACHED();
}
}
void TCPWritableStreamWrapper::OnPeerClosed(MojoResult result,
const mojo::HandleSignalsState&) {
DVLOG(1) << "TCPWritableStreamWrapper::OnPeerClosed() this=" << this
<< " result=" << result;
DCHECK_EQ(result, MOJO_RESULT_OK);
DCHECK_EQ(state_, State::kOpen);
state_ = State::kClosed;
ErrorStreamAbortAndReset();
}
ScriptPromise TCPWritableStreamWrapper::SinkWrite(
ScriptState* script_state,
ScriptValue chunk,
ExceptionState& exception_state) {
DVLOG(1) << "TCPWritableStreamWrapper::SinkWrite() this=" << this;
// There can only be one call to write() in progress at a time.
DCHECK(!write_promise_resolver_);
DCHECK_EQ(0u, offset_);
if (!data_pipe_) {
exception_state.ThrowDOMException(
DOMExceptionCode::kNetworkError,
"The underlying data pipe was disconnected.");
return ScriptPromise();
}
auto* isolate = script_state->GetIsolate();
ArrayBufferOrArrayBufferView buffer_source;
V8ArrayBufferOrArrayBufferView::ToImpl(
isolate, chunk.V8Value(), buffer_source,
UnionTypeConversionMode::kNotNullable, exception_state);
if (exception_state.HadException())
return ScriptPromise();
DCHECK(!buffer_source.IsNull());
// |data| will not be valid when this function returns.
base::span<const uint8_t> data;
if (buffer_source.IsArrayBuffer()) {
const auto* array_buffer = buffer_source.GetAsArrayBuffer();
data = base::make_span(static_cast<const uint8_t*>(array_buffer->Data()),
array_buffer->ByteLength());
} else {
DCHECK(buffer_source.IsArrayBufferView());
const auto* array_buffer_view = buffer_source.GetAsArrayBufferView().View();
data = base::make_span(
static_cast<const uint8_t*>(array_buffer_view->BaseAddress()),
array_buffer_view->byteLength());
}
return WriteOrCacheData(script_state, data);
}
// Attempt to write |data|. Cache anything that could not be written
// synchronously. Arrange for the cached data to be written asynchronously.
ScriptPromise TCPWritableStreamWrapper::WriteOrCacheData(
ScriptState* script_state,
base::span<const uint8_t> data) {
DVLOG(1) << "TCPWritableStreamWrapper::WriteOrCacheData() this=" << this
<< " data=(" << data.data() << ", " << data.size() << ")";
size_t written = WriteDataSynchronously(data);
if (written == data.size())
return ScriptPromise::CastUndefined(script_state);
DCHECK_LT(written, data.size());
if (!data_pipe_) {
return ScriptPromise::Reject(script_state, CreateAbortException());
}
DCHECK(!cached_data_);
cached_data_ = std::make_unique<CachedDataBuffer>(
script_state->GetIsolate(), data.data() + written, data.size() - written);
DCHECK_EQ(offset_, 0u);
write_watcher_.ArmOrNotify();
write_promise_resolver_ =
MakeGarbageCollected<ScriptPromiseResolver>(script_state);
return write_promise_resolver_->Promise();
}
// Write data previously cached. Arrange for any remaining data to be sent
// asynchronously. Fulfill |write_promise_resolver_| once all data has been
// written.
void TCPWritableStreamWrapper::WriteCachedData() {
DVLOG(1) << "TCPWritableStreamWrapper::WriteCachedData() this=" << this;
auto data = base::make_span(static_cast<uint8_t*>(cached_data_->data()),
cached_data_->length())
.subspan(offset_);
size_t written = WriteDataSynchronously(data);
if (written == data.size()) {
cached_data_.reset();
offset_ = 0;
write_promise_resolver_->Resolve();
write_promise_resolver_ = nullptr;
return;
}
if (!data_pipe_) {
cached_data_.reset();
offset_ = 0;
return;
}
offset_ += written;
write_watcher_.ArmOrNotify();
}
// Write as much of |data| as can be written synchronously. Return the number of
// bytes written. May close |data_pipe_| as a side-effect on error.
size_t TCPWritableStreamWrapper::WriteDataSynchronously(
base::span<const uint8_t> data) {
DVLOG(1) << "TCPWritableStreamWrapper::WriteDataSynchronously() this=" << this
<< " data=(" << data.data() << ", " << data.size() << ")";
DCHECK(data_pipe_);
// This use of saturated cast means that we will fallback to asynchronous
// sending if |data| is larger than 4GB. In practice we'd never be able to
// send 4GB synchronously anyway.
uint32_t num_bytes = base::saturated_cast<uint32_t>(data.size());
MojoResult result =
data_pipe_->WriteData(data.data(), &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
switch (result) {
case MOJO_RESULT_OK:
case MOJO_RESULT_SHOULD_WAIT:
return num_bytes;
case MOJO_RESULT_FAILED_PRECONDITION:
// Will be handled by |close_watcher_|.
return 0;
default:
NOTREACHED();
return 0;
}
}
ScriptValue TCPWritableStreamWrapper::CreateAbortException() {
DVLOG(1) << "TCPWritableStreamWrapper::CreateAbortException() this=" << this;
DOMExceptionCode code = DOMExceptionCode::kNetworkError;
String message = "The stream was aborted by the remote.";
return ScriptValue(script_state_->GetIsolate(),
V8ThrowDOMException::CreateOrEmpty(
script_state_->GetIsolate(), code, message));
}
void TCPWritableStreamWrapper::ErrorStreamAbortAndReset() {
DVLOG(1) << "TCPWritableStreamWrapper::ErrorStreamAbortAndReset() this="
<< this;
if (script_state_->ContextIsValid()) {
ScriptState::Scope scope(script_state_);
ScriptValue exception = CreateAbortException();
if (write_promise_resolver_) {
write_promise_resolver_->Reject(exception);
write_promise_resolver_ = nullptr;
} else if (controller_) {
controller_->error(script_state_, exception);
}
}
controller_ = nullptr;
AbortAndReset();
}
void TCPWritableStreamWrapper::AbortAndReset() {
DVLOG(1) << "TCPWritableStreamWrapper::AbortAndReset() this=" << this;
state_ = State::kAborted;
if (on_abort_) {
std::move(on_abort_).Run();
}
ResetPipe();
}
void TCPWritableStreamWrapper::ResetPipe() {
DVLOG(1) << "TCPWritableStreamWrapper::ResetPipe() this=" << this;
write_watcher_.Cancel();
close_watcher_.Cancel();
data_pipe_.reset();
if (cached_data_)
cached_data_.reset();
}
void TCPWritableStreamWrapper::Dispose() {
DVLOG(1) << "TCPWritableStreamWrapper::Dispose() this=" << this;
ResetPipe();
}
} // namespace blink