blob: 107f4e380d098f493db1c7ca39a3260a0cb14a84 [file] [log] [blame]
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "modules/fetch/BodyStreamBuffer.h"
#include "bindings/core/v8/ScriptState.h"
#include "bindings/core/v8/V8HiddenValue.h"
#include "bindings/core/v8/WorkerOrWorkletScriptController.h"
#include "core/dom/DOMArrayBuffer.h"
#include "core/dom/DOMTypedArray.h"
#include "core/dom/ExceptionCode.h"
#include "core/streams/ReadableStreamController.h"
#include "core/streams/ReadableStreamOperations.h"
#include "core/workers/WorkerGlobalScope.h"
#include "modules/fetch/Body.h"
#include "modules/fetch/DataConsumerHandleUtil.h"
#include "modules/fetch/DataConsumerTee.h"
#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
#include "platform/RuntimeEnabledFeatures.h"
#include "platform/blob/BlobData.h"
#include "platform/network/EncodedFormData.h"
namespace blink {
namespace {
bool isTerminating(ScriptState* scriptState)
{
ExecutionContext* executionContext = scriptState->getExecutionContext();
if (!executionContext)
return true;
if (!executionContext->isWorkerGlobalScope())
return false;
return toWorkerGlobalScope(executionContext)->scriptController()->isExecutionTerminating();
}
} // namespace
class BodyStreamBuffer::LoaderClient final : public GarbageCollectedFinalized<LoaderClient>, public ActiveDOMObject, public FetchDataLoader::Client {
WTF_MAKE_NONCOPYABLE(LoaderClient);
USING_GARBAGE_COLLECTED_MIXIN(LoaderClient);
public:
LoaderClient(ExecutionContext* executionContext, BodyStreamBuffer* buffer, FetchDataLoader::Client* client)
: ActiveDOMObject(executionContext)
, m_buffer(buffer)
, m_client(client)
{
suspendIfNeeded();
}
void didFetchDataLoadedBlobHandle(PassRefPtr<BlobDataHandle> blobDataHandle) override
{
m_buffer->endLoading();
m_client->didFetchDataLoadedBlobHandle(blobDataHandle);
}
void didFetchDataLoadedArrayBuffer(DOMArrayBuffer* arrayBuffer) override
{
m_buffer->endLoading();
m_client->didFetchDataLoadedArrayBuffer(arrayBuffer);
}
void didFetchDataLoadedString(const String& string) override
{
m_buffer->endLoading();
m_client->didFetchDataLoadedString(string);
}
void didFetchDataLoadedStream() override
{
m_buffer->endLoading();
m_client->didFetchDataLoadedStream();
}
void didFetchDataLoadFailed() override
{
m_buffer->endLoading();
m_client->didFetchDataLoadFailed();
}
DEFINE_INLINE_TRACE()
{
visitor->trace(m_buffer);
visitor->trace(m_client);
ActiveDOMObject::trace(visitor);
FetchDataLoader::Client::trace(visitor);
}
private:
void stop() override
{
m_buffer->stopLoading();
}
Member<BodyStreamBuffer> m_buffer;
Member<FetchDataLoader::Client> m_client;
};
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, PassOwnPtr<FetchDataConsumerHandle> handle)
: UnderlyingSourceBase(scriptState)
, m_scriptState(scriptState)
, m_handle(std::move(handle))
, m_reader(m_handle->obtainReader(this))
, m_madeFromReadableStream(false)
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(scriptState);
if (isTerminating(scriptState))
return;
v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
if (bodyValue.IsEmpty()) {
DCHECK(isTerminating(scriptState));
return;
}
ASSERT(bodyValue->IsObject());
v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
ScriptValue readableStream = ReadableStreamOperations::createReadableStream(
scriptState, this, ReadableStreamOperations::createCountQueuingStrategy(scriptState, 0));
V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), readableStream.v8Value());
} else {
m_stream = new ReadableByteStream(this, new ReadableByteStream::StrictStrategy);
m_stream->didSourceStart();
}
}
BodyStreamBuffer::BodyStreamBuffer(ScriptState* scriptState, ScriptValue stream)
: UnderlyingSourceBase(scriptState)
, m_scriptState(scriptState)
, m_madeFromReadableStream(true)
{
ScriptState::Scope scope(scriptState);
DCHECK(RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled());
DCHECK(ReadableStreamOperations::isReadableStream(scriptState, stream));
if (isTerminating(scriptState))
return;
v8::Local<v8::Value> bodyValue = toV8(this, scriptState);
if (bodyValue.IsEmpty()) {
DCHECK(isTerminating(scriptState));
return;
}
DCHECK(bodyValue->IsObject());
v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
V8HiddenValue::setHiddenValue(scriptState, body, V8HiddenValue::internalBodyStream(scriptState->isolate()), stream.v8Value());
}
ScriptValue BodyStreamBuffer::stream()
{
ScriptState::Scope scope(m_scriptState.get());
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
if (isTerminating(m_scriptState.get()))
return ScriptValue();
v8::Local<v8::Value> bodyValue = toV8(this, m_scriptState.get());
if (bodyValue.IsEmpty()) {
DCHECK(isTerminating(m_scriptState.get()));
return ScriptValue();
}
ASSERT(bodyValue->IsObject());
v8::Local<v8::Object> body = bodyValue.As<v8::Object>();
return ScriptValue(m_scriptState.get(), V8HiddenValue::getHiddenValue(m_scriptState.get(), body, V8HiddenValue::internalBodyStream(m_scriptState->isolate())));
}
return ScriptValue(m_scriptState.get(), toV8(m_stream, m_scriptState.get()));
}
PassRefPtr<BlobDataHandle> BodyStreamBuffer::drainAsBlobDataHandle(FetchDataConsumerHandle::Reader::BlobSizePolicy policy)
{
ASSERT(!isStreamLocked());
ASSERT(!isStreamDisturbed());
if (isStreamClosed() || isStreamErrored())
return nullptr;
if (m_madeFromReadableStream)
return nullptr;
RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle(policy);
if (blobDataHandle) {
closeAndLockAndDisturb();
return blobDataHandle.release();
}
return nullptr;
}
PassRefPtr<EncodedFormData> BodyStreamBuffer::drainAsFormData()
{
ASSERT(!isStreamLocked());
ASSERT(!isStreamDisturbed());
if (isStreamClosed() || isStreamErrored())
return nullptr;
if (m_madeFromReadableStream)
return nullptr;
RefPtr<EncodedFormData> formData = m_reader->drainAsFormData();
if (formData) {
closeAndLockAndDisturb();
return formData.release();
}
return nullptr;
}
void BodyStreamBuffer::startLoading(FetchDataLoader* loader, FetchDataLoader::Client* client)
{
ASSERT(!m_loader);
ASSERT(m_scriptState->contextIsValid());
OwnPtr<FetchDataConsumerHandle> handle = releaseHandle();
m_loader = loader;
loader->start(handle.get(), new LoaderClient(m_scriptState->getExecutionContext(), this, client));
}
void BodyStreamBuffer::tee(BodyStreamBuffer** branch1, BodyStreamBuffer** branch2)
{
DCHECK(!isStreamLocked());
DCHECK(!isStreamDisturbed());
*branch1 = nullptr;
*branch2 = nullptr;
if (m_madeFromReadableStream) {
ScriptState::Scope scope(m_scriptState.get());
ScriptValue stream1, stream2;
ReadableStreamOperations::tee(m_scriptState.get(), stream(), &stream1, &stream2);
*branch1 = new BodyStreamBuffer(m_scriptState.get(), stream1);
*branch2 = new BodyStreamBuffer(m_scriptState.get(), stream2);
return;
}
OwnPtr<FetchDataConsumerHandle> handle = releaseHandle();
OwnPtr<FetchDataConsumerHandle> handle1, handle2;
DataConsumerTee::create(m_scriptState->getExecutionContext(), std::move(handle), &handle1, &handle2);
*branch1 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle1));
*branch2 = new BodyStreamBuffer(m_scriptState.get(), std::move(handle2));
}
void BodyStreamBuffer::pullSource()
{
ASSERT(!m_streamNeedsMore);
m_streamNeedsMore = true;
processData();
}
ScriptPromise BodyStreamBuffer::cancelSource(ScriptState* scriptState, ScriptValue)
{
ASSERT(scriptState == m_scriptState.get());
close();
return ScriptPromise::castUndefined(scriptState);
}
ScriptPromise BodyStreamBuffer::pull(ScriptState* scriptState)
{
ASSERT(!m_streamNeedsMore);
ASSERT(scriptState == m_scriptState.get());
m_streamNeedsMore = true;
processData();
return ScriptPromise::castUndefined(scriptState);
}
ScriptPromise BodyStreamBuffer::cancel(ScriptState* scriptState, ScriptValue reason)
{
ASSERT(scriptState == m_scriptState.get());
close();
return ScriptPromise::castUndefined(scriptState);
}
void BodyStreamBuffer::didGetReadable()
{
if (!m_reader)
return;
if (!m_streamNeedsMore) {
// Perform zero-length read to call close()/error() early.
size_t readSize;
WebDataConsumerHandle::Result result = m_reader->read(nullptr, 0, WebDataConsumerHandle::FlagNone, &readSize);
switch (result) {
case WebDataConsumerHandle::Ok:
case WebDataConsumerHandle::ShouldWait:
return;
case WebDataConsumerHandle::Done:
close();
return;
case WebDataConsumerHandle::Busy:
case WebDataConsumerHandle::ResourceExhausted:
case WebDataConsumerHandle::UnexpectedError:
error();
return;
}
return;
}
processData();
}
bool BodyStreamBuffer::hasPendingActivity() const
{
if (m_loader)
return true;
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled())
return UnderlyingSourceBase::hasPendingActivity();
return m_stream->stateInternal() == ReadableStream::Readable && m_stream->isLocked();
}
void BodyStreamBuffer::stop()
{
m_reader = nullptr;
m_handle = nullptr;
UnderlyingSourceBase::stop();
}
bool BodyStreamBuffer::isStreamReadable()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
return ReadableStreamOperations::isReadable(m_scriptState.get(), stream());
}
return m_stream->stateInternal() == ReadableStream::Readable;
}
bool BodyStreamBuffer::isStreamClosed()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
return ReadableStreamOperations::isClosed(m_scriptState.get(), stream());
}
return m_stream->stateInternal() == ReadableStream::Closed;
}
bool BodyStreamBuffer::isStreamErrored()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
return ReadableStreamOperations::isErrored(m_scriptState.get(), stream());
}
return m_stream->stateInternal() == ReadableStream::Errored;
}
bool BodyStreamBuffer::isStreamLocked()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
return ReadableStreamOperations::isLocked(m_scriptState.get(), stream());
}
return m_stream->isLocked();
}
bool BodyStreamBuffer::isStreamDisturbed()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
return ReadableStreamOperations::isDisturbed(m_scriptState.get(), stream());
}
return m_stream->isDisturbed();
}
void BodyStreamBuffer::closeAndLockAndDisturb()
{
if (isStreamReadable()) {
// Note that the stream cannot be "draining", because it doesn't have
// the internal buffer.
close();
}
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
ScriptState::Scope scope(m_scriptState.get());
NonThrowableExceptionState exceptionState;
ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState);
ReadableStreamOperations::defaultReaderRead(m_scriptState.get(), reader);
} else {
NonThrowableExceptionState exceptionState;
m_stream->getBytesReader(m_scriptState->getExecutionContext(), exceptionState);
m_stream->setIsDisturbed();
}
}
void BodyStreamBuffer::close()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled())
controller()->close();
else
m_stream->close();
m_reader = nullptr;
m_handle = nullptr;
}
void BodyStreamBuffer::error()
{
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled())
controller()->error(DOMException::create(NetworkError, "network error"));
else
m_stream->error(DOMException::create(NetworkError, "network error"));
m_reader = nullptr;
m_handle = nullptr;
}
void BodyStreamBuffer::processData()
{
ASSERT(m_reader);
while (m_streamNeedsMore) {
const void* buffer;
size_t available;
WebDataConsumerHandle::Result result = m_reader->beginRead(&buffer, WebDataConsumerHandle::FlagNone, &available);
switch (result) {
case WebDataConsumerHandle::Ok: {
DOMUint8Array* array = DOMUint8Array::create(static_cast<const unsigned char*>(buffer), available);
if (RuntimeEnabledFeatures::responseBodyWithV8ExtraStreamEnabled()) {
controller()->enqueue(array);
m_streamNeedsMore = controller()->desiredSize() > 0;
} else {
m_streamNeedsMore = m_stream->enqueue(array);
}
m_reader->endRead(available);
break;
}
case WebDataConsumerHandle::Done:
close();
return;
case WebDataConsumerHandle::ShouldWait:
return;
case WebDataConsumerHandle::Busy:
case WebDataConsumerHandle::ResourceExhausted:
case WebDataConsumerHandle::UnexpectedError:
error();
return;
}
}
}
void BodyStreamBuffer::endLoading()
{
ASSERT(m_loader);
m_loader = nullptr;
}
void BodyStreamBuffer::stopLoading()
{
if (!m_loader)
return;
m_loader->cancel();
m_loader = nullptr;
}
PassOwnPtr<FetchDataConsumerHandle> BodyStreamBuffer::releaseHandle()
{
DCHECK(!isStreamLocked());
DCHECK(!isStreamDisturbed());
if (m_madeFromReadableStream) {
ScriptState::Scope scope(m_scriptState.get());
// We need to have |reader| alive by some means (as written in
// ReadableStreamDataConsumerHandle). Based on the following facts
// - This function is used only from tee and startLoading.
// - This branch cannot be taken when called from tee.
// - startLoading makes hasPendingActivity return true while loading.
// , we don't need to keep the reader explicitly.
NonThrowableExceptionState exceptionState;
ScriptValue reader = ReadableStreamOperations::getReader(m_scriptState.get(), stream(), exceptionState);
return ReadableStreamDataConsumerHandle::create(m_scriptState.get(), reader);
}
// We need to call these before calling closeAndLockAndDisturb.
const bool isClosed = isStreamClosed();
const bool isErrored = isStreamErrored();
OwnPtr<FetchDataConsumerHandle> handle = std::move(m_handle);
closeAndLockAndDisturb();
if (isClosed) {
// Note that the stream cannot be "draining", because it doesn't have
// the internal buffer.
return createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle());
}
if (isErrored)
return createFetchDataConsumerHandleFromWebHandle(createUnexpectedErrorDataConsumerHandle());
DCHECK(handle);
return handle;
}
} // namespace blink