blob: 707fba14451e5c246bde9f95c7f89375631d7c0d [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "modules/fetch/ReadableStreamDataConsumerHandle.h"
#include "bindings/core/v8/ExceptionState.h"
#include "bindings/core/v8/ReadableStreamOperations.h"
#include "bindings/core/v8/ScriptFunction.h"
#include "bindings/core/v8/ScriptState.h"
#include "bindings/core/v8/ScriptValue.h"
#include "bindings/core/v8/V8BindingMacros.h"
#include "bindings/core/v8/V8IteratorResultValue.h"
#include "bindings/core/v8/V8RecursionScope.h"
#include "bindings/core/v8/V8Uint8Array.h"
#include "core/dom/DOMTypedArray.h"
#include "public/platform/Platform.h"
#include "public/platform/WebTaskRunner.h"
#include "public/platform/WebThread.h"
#include "public/platform/WebTraceLocation.h"
#include "wtf/Assertions.h"
#include "wtf/Functional.h"
#include "wtf/RefCounted.h"
#include "wtf/WeakPtr.h"
#include <algorithm>
#include <string.h>
#include <v8.h>
namespace blink {
using Result = WebDataConsumerHandle::Result;
using Flags = WebDataConsumerHandle::Flags;
// This context is not yet thread-safe.
class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted<ReadingContext> {
WTF_MAKE_NONCOPYABLE(ReadingContext);
public:
class OnFulfilled final : public ScriptFunction {
public:
static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
{
return (new OnFulfilled(scriptState, context))->bindToV8Function();
}
ScriptValue call(ScriptValue v) override
{
RefPtr<ReadingContext> readingContext(m_readingContext.get());
if (!readingContext)
return v;
bool done;
v8::Local<v8::Value> item = v.v8Value();
ASSERT(item->IsObject());
v8::Local<v8::Value> value = v8CallOrCrash(v8UnpackIteratorResult(v.scriptState(), item.As<v8::Object>(), &done));
if (done) {
readingContext->onReadDone();
return v;
}
if (!V8Uint8Array::hasInstance(value, v.isolate())) {
readingContext->onRejected();
return ScriptValue();
}
readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()));
return v;
}
private:
OnFulfilled(ScriptState* scriptState, WeakPtr<ReadingContext> context)
: ScriptFunction(scriptState), m_readingContext(context) {}
WeakPtr<ReadingContext> m_readingContext;
};
class OnRejected final : public ScriptFunction {
public:
static v8::Local<v8::Function> createFunction(ScriptState* scriptState, WeakPtr<ReadingContext> context)
{
return (new OnRejected(scriptState, context))->bindToV8Function();
}
ScriptValue call(ScriptValue v) override
{
RefPtr<ReadingContext> readingContext(m_readingContext.get());
if (!readingContext)
return v;
readingContext->onRejected();
return v;
}
private:
OnRejected(ScriptState* scriptState, WeakPtr<ReadingContext> context)
: ScriptFunction(scriptState), m_readingContext(context) {}
WeakPtr<ReadingContext> m_readingContext;
};
class ReaderImpl final : public FetchDataConsumerHandle::Reader {
public:
ReaderImpl(PassRefPtr<ReadingContext> context, Client* client)
: m_readingContext(context)
{
m_readingContext->attachReader(client);
}
~ReaderImpl() override
{
m_readingContext->detachReader();
}
Result read(void* buffer, size_t size, Flags flags, size_t* readSize) override
{
*readSize = 0;
const void* src = nullptr;
size_t available;
Result r = beginRead(&src, flags, &available);
if (r != WebDataConsumerHandle::Ok)
return r;
*readSize = std::min(available, size);
memcpy(buffer, src, *readSize);
return endRead(*readSize);
}
Result beginRead(const void** buffer, Flags, size_t* available) override
{
return m_readingContext->beginRead(buffer, available);
}
Result endRead(size_t readSize) override
{
return m_readingContext->endRead(readSize);
}
private:
RefPtr<ReadingContext> m_readingContext;
};
static PassRefPtr<ReadingContext> create(ScriptState* scriptState, v8::Local<v8::Value> stream)
{
return adoptRef(new ReadingContext(scriptState, stream));
}
void attachReader(WebDataConsumerHandle::Client* client)
{
m_client = client;
notifyLater();
}
void detachReader()
{
m_client = nullptr;
}
Result beginRead(const void** buffer, size_t* available)
{
*buffer = nullptr;
*available = 0;
if (m_hasError)
return WebDataConsumerHandle::UnexpectedError;
if (m_isDone)
return WebDataConsumerHandle::Done;
if (m_pendingBuffer) {
ASSERT(m_pendingOffset < m_pendingBuffer->length());
*buffer = m_pendingBuffer->data() + m_pendingOffset;
*available = m_pendingBuffer->length() - m_pendingOffset;
return WebDataConsumerHandle::Ok;
}
ASSERT(!m_reader.isEmpty());
m_isInRecursion = true;
if (!m_isReading) {
m_isReading = true;
ScriptState::Scope scope(m_reader.scriptState());
V8RecursionScope recursionScope(m_reader.isolate());
ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Value()).then(
OnFulfilled::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()),
OnRejected::createFunction(m_reader.scriptState(), m_weakPtrFactory.createWeakPtr()));
// Note: Microtasks may run here.
}
m_isInRecursion = false;
return WebDataConsumerHandle::ShouldWait;
}
Result endRead(size_t readSize)
{
ASSERT(m_pendingBuffer);
ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length());
m_pendingOffset += readSize;
if (m_pendingOffset == m_pendingBuffer->length()) {
m_pendingBuffer = nullptr;
m_pendingOffset = 0;
}
return WebDataConsumerHandle::Ok;
}
void onRead(DOMUint8Array* buffer)
{
ASSERT(m_isReading);
ASSERT(buffer);
ASSERT(!m_pendingBuffer);
ASSERT(!m_pendingOffset);
m_isReading = false;
m_pendingBuffer = buffer;
notify();
}
void onReadDone()
{
ASSERT(m_isReading);
ASSERT(!m_pendingBuffer);
m_isReading = false;
m_isDone = true;
m_reader.clear();
notify();
}
void onRejected()
{
ASSERT(m_isReading);
ASSERT(!m_pendingBuffer);
m_hasError = true;
m_isReading = false;
m_reader.clear();
notify();
}
void notify()
{
if (!m_client)
return;
if (m_isInRecursion) {
notifyLater();
return;
}
m_client->didGetReadable();
}
void notifyLater()
{
ASSERT(m_client);
Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_HERE, bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this)));
}
private:
ReadingContext(ScriptState* scriptState, v8::Local<v8::Value> stream)
: m_client(nullptr)
, m_weakPtrFactory(this)
, m_pendingOffset(0)
, m_isReading(false)
, m_isDone(false)
, m_hasError(false)
, m_isInRecursion(false)
{
if (!ReadableStreamOperations::isLocked(scriptState, stream)) {
// Here the stream implementation must not throw an exception.
NonThrowableExceptionState es;
m_reader = ReadableStreamOperations::getReader(scriptState, stream, es);
}
m_hasError = m_reader.isEmpty();
}
// This ScriptValue is leaky because it stores a strong reference to a
// JavaScript object.
// TODO(yhirano): Fix it.
//
// Holding a ScriptValue here is safe in terms of cross-world wrapper
// leakage because we read only Uint8Array chunks from the reader.
ScriptValue m_reader;
WebDataConsumerHandle::Client* m_client;
RefPtr<DOMUint8Array> m_pendingBuffer;
WeakPtrFactory<ReadingContext> m_weakPtrFactory;
size_t m_pendingOffset;
bool m_isReading;
bool m_isDone;
bool m_hasError;
bool m_isInRecursion;
};
ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState* scriptState, v8::Local<v8::Value> stream)
: m_readingContext(ReadingContext::create(scriptState, stream))
{
}
ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default;
FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderInternal(Client* client)
{
return new ReadingContext::ReaderImpl(m_readingContext, client);
}
} // namespace blink