blob: cafce4170f1cc906c719b95c6039a3a261af6e7a [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Implementation of TransformStream for Blink. See
// https://streams.spec.whatwg.org/#ts. The implementation closely follows the
// standard, except where required for performance or integration with Blink.
// In particular, classes, methods and abstract operations are implemented in
// the same order as in the standard, to simplify side-by-side reading.
(function(global, binding, v8) {
'use strict';
// Private symbols. These correspond to the internal slots in the standard.
// "[[X]]" in the standard is spelt _X in this implementation.
const _backpressure = v8.createPrivateSymbol('[[backpressure]]');
const _backpressureChangePromise =
v8.createPrivateSymbol('[[backpressureChangePromise]]');
const _readable = v8.createPrivateSymbol('[[readable]]');
const _transformStreamController =
v8.createPrivateSymbol('[[transformStreamController]]');
const _writable = v8.createPrivateSymbol('[[writable]]');
const _controlledTransformStream =
v8.createPrivateSymbol('[[controlledTransformStream]]');
// Unlike the version in the standard, the controller is passed to this.
const _flushAlgorithm = v8.createPrivateSymbol('[[flushAlgorithm]]');
// Unlike the version in the standard, the controller is passed in as the
// second argument.
const _transformAlgorithm = v8.createPrivateSymbol('[[transformAlgorithm]]');
// Javascript functions. It is important to use these copies, as the ones on
// the global object may have been overwritten. See "V8 Extras Design Doc",
// section "Security Considerations".
// https://docs.google.com/document/d/1AT5-T0aHGp7Lt29vPWFr2-qG8r3l9CByyvKwEuA8Ec0/edit#heading=h.9yixony1a18r
const ObjectCreate = global.Object.create;
const TypeError = global.TypeError;
const RangeError = global.RangeError;
const Promise = global.Promise;
const thenPromise = v8.uncurryThis(Promise.prototype.then);
const Promise_resolve = Promise.resolve.bind(Promise);
const Promise_reject = Promise.reject.bind(Promise);
// From CommonOperations.js
const {
hasOwnPropertyNoThrow,
resolvePromise,
CreateAlgorithmFromUnderlyingMethod,
CallOrNoop1,
MakeSizeAlgorithmFromSizeFunction,
PromiseCall2,
ValidateAndNormalizeHighWaterMark
} = binding.streamOperations;
// User-visible strings.
const streamErrors = binding.streamErrors;
const errStreamTerminated = 'The transform stream has been terminated';
let useCounted = false;
class TransformStream {
constructor(transformer = {},
writableStrategy = {}, readableStrategy = {}) {
if (!useCounted) {
binding.countUse('TransformStreamConstructor');
useCounted = true;
}
const writableSizeFunction = writableStrategy.size;
let writableHighWaterMark = writableStrategy.highWaterMark;
const readableSizeFunction = readableStrategy.size;
let readableHighWaterMark = readableStrategy.highWaterMark;
// readable and writableType are extension points for future byte streams.
const writableType = transformer.writableType;
if (writableType !== undefined) {
throw new RangeError(streamErrors.invalidType);
}
const writableSizeAlgorithm =
MakeSizeAlgorithmFromSizeFunction(writableSizeFunction);
if (writableHighWaterMark === undefined) {
writableHighWaterMark = 1;
}
writableHighWaterMark =
ValidateAndNormalizeHighWaterMark(writableHighWaterMark);
const readableType = transformer.readableType;
if (readableType !== undefined) {
throw new RangeError(streamErrors.invalidType);
}
const readableSizeAlgorithm =
MakeSizeAlgorithmFromSizeFunction(readableSizeFunction);
if (readableHighWaterMark === undefined) {
readableHighWaterMark = 0;
}
readableHighWaterMark =
ValidateAndNormalizeHighWaterMark(readableHighWaterMark);
const startPromise = v8.createPromise();
InitializeTransformStream(
this, startPromise, writableHighWaterMark, writableSizeAlgorithm,
readableHighWaterMark, readableSizeAlgorithm);
SetUpTransformStreamDefaultControllerFromTransformer(this, transformer);
const startResult = CallOrNoop1(
transformer, 'start', this[_transformStreamController],
'transformer.start');
resolvePromise(startPromise, startResult);
}
get readable() {
if (!IsTransformStream(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
return this[_readable];
}
get writable() {
if (!IsTransformStream(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
return this[_writable];
}
}
const TransformStream_prototype = TransformStream.prototype;
// The controller is passed to |transformAlgorithm| and |flushAlgorithm|,
// unlike in the standard.
function CreateTransformStream(
startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark,
writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) {
if (writableHighWaterMark === undefined) {
writableHighWaterMark = 1;
}
if (writableSizeAlgorithm === undefined) {
writableSizeAlgorithm = () => 1;
}
if (readableHighWaterMark === undefined) {
readableHighWaterMark = 0;
}
if (readableSizeAlgorithm === undefined) {
readableSizeAlgorithm = () => 1;
}
// assert(
// typeof writableHighWaterMark === 'number' &&
// writableHighWaterMark >= 0,
// '! IsNonNegativeNumber(_writableHighWaterMark_) is *true*');
// assert(
// typeof readableHighWaterMark === 'number' &&
// readableHighWaterMark >= 0,
// '! IsNonNegativeNumber(_readableHighWaterMark_) is true');
const stream = ObjectCreate(TransformStream_prototype);
const startPromise = v8.createPromise();
InitializeTransformStream(
stream, startPromise, writableHighWaterMark, writableSizeAlgorithm,
readableHighWaterMark, readableSizeAlgorithm);
const controller = ObjectCreate(TransformStreamDefaultController_prototype);
SetUpTransformStreamDefaultController(
stream, controller, transformAlgorithm, flushAlgorithm);
const startResult = startAlgorithm();
resolvePromise(startPromise, startResult);
return stream;
}
function InitializeTransformStream(
stream, startPromise, writableHighWaterMark, writableSizeAlgorithm,
readableHighWaterMark, readableSizeAlgorithm) {
const startAlgorithm = () => startPromise;
const writeAlgorithm = chunk =>
TransformStreamDefaultSinkWriteAlgorithm(stream, chunk);
const abortAlgorithm = reason =>
TransformStreamDefaultSinkAbortAlgorithm(stream, reason);
const closeAlgorithm = () =>
TransformStreamDefaultSinkCloseAlgorithm(stream);
stream[_writable] = binding.CreateWritableStream(
startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm,
writableHighWaterMark, writableSizeAlgorithm);
const pullAlgorithm = () =>
TransformStreamDefaultSourcePullAlgorithm(stream);
const cancelAlgorithm = reason => {
TransformStreamErrorWritableAndUnblockWrite(stream, reason);
return Promise_resolve(undefined);
};
stream[_readable] = binding.CreateReadableStream(
startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark,
readableSizeAlgorithm, false);
stream[_backpressure] = undefined;
stream[_backpressureChangePromise] = undefined;
TransformStreamSetBackpressure(stream, true);
stream[_transformStreamController] = undefined;
}
function IsTransformStream(x) {
return hasOwnPropertyNoThrow(x, _transformStreamController);
}
function TransformStreamError(stream, e) {
const readable = stream[_readable];
// TODO(ricea): Remove this conditional once ReadableStream is updated.
if (binding.IsReadableStreamReadable(readable)) {
binding.ReadableStreamDefaultControllerError(
binding.getReadableStreamController(readable), e);
}
TransformStreamErrorWritableAndUnblockWrite(stream, e);
}
function TransformStreamErrorWritableAndUnblockWrite(stream, e) {
TransformStreamDefaultControllerClearAlgorithms(
stream[_transformStreamController]);
binding.WritableStreamDefaultControllerErrorIfNeeded(
binding.getWritableStreamController(stream[_writable]), e);
if (stream[_backpressure]) {
TransformStreamSetBackpressure(stream, false);
}
}
function TransformStreamSetBackpressure(stream, backpressure) {
// assert(
// stream[_backpressure] !== backpressure,
// 'stream.[[backpressure]] is not backpressure');
if (stream[_backpressureChangePromise] !== undefined) {
resolvePromise(stream[_backpressureChangePromise], undefined);
}
stream[_backpressureChangePromise] = v8.createPromise();
stream[_backpressure] = backpressure;
}
class TransformStreamDefaultController {
constructor() {
throw new TypeError(streamErrors.illegalConstructor);
}
get desiredSize() {
if (!IsTransformStreamDefaultController(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
const readableController = binding.getReadableStreamController(
this[_controlledTransformStream][_readable]);
return binding.ReadableStreamDefaultControllerGetDesiredSize(
readableController);
}
enqueue(chunk) {
if (!IsTransformStreamDefaultController(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
TransformStreamDefaultControllerEnqueue(this, chunk);
}
error(reason) {
if (!IsTransformStreamDefaultController(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
TransformStreamDefaultControllerError(this, reason);
}
terminate() {
if (!IsTransformStreamDefaultController(this)) {
throw new TypeError(streamErrors.illegalInvocation);
}
TransformStreamDefaultControllerTerminate(this);
}
}
const TransformStreamDefaultController_prototype =
TransformStreamDefaultController.prototype;
function IsTransformStreamDefaultController(x) {
return hasOwnPropertyNoThrow(x, _controlledTransformStream);
}
function SetUpTransformStreamDefaultController(
stream, controller, transformAlgorithm, flushAlgorithm) {
// assert(
// IsTransformStream(stream) === true,
// '! IsTransformStream(_stream_) is *true*');
// assert(
// stream[_transformStreamController] === undefined,
// '_stream_.[[transformStreamController]] is *undefined*');
controller[_controlledTransformStream] = stream;
stream[_transformStreamController] = controller;
controller[_transformAlgorithm] = transformAlgorithm;
controller[_flushAlgorithm] = flushAlgorithm;
}
function SetUpTransformStreamDefaultControllerFromTransformer(
stream, transformer) {
// assert(transformer !== undefined, '_transformer_ is not *undefined*');
const controller = ObjectCreate(TransformStreamDefaultController_prototype);
let transformAlgorithm;
const transformMethod = transformer.transform;
if (transformMethod !== undefined) {
if (typeof transformMethod !== 'function') {
throw new TypeError('transformer.transform is not a function');
}
transformAlgorithm = chunk =>
PromiseCall2(transformMethod, transformer, chunk, controller);
} else {
transformAlgorithm = chunk => {
try {
TransformStreamDefaultControllerEnqueue(controller, chunk);
return Promise_resolve();
} catch (resultValue) {
return Promise_reject(resultValue);
}
};
}
const flushAlgorithm = CreateAlgorithmFromUnderlyingMethod(
transformer, 'flush', 1, 'transformer.flush');
SetUpTransformStreamDefaultController(
stream, controller, transformAlgorithm, flushAlgorithm);
}
function TransformStreamDefaultControllerClearAlgorithms(controller) {
controller[_transformAlgorithm] = undefined;
controller[_flushAlgorithm] = undefined;
}
function TransformStreamDefaultControllerEnqueue(controller, chunk) {
const stream = controller[_controlledTransformStream];
const readableController =
binding.getReadableStreamController(stream[_readable]);
if (!binding.ReadableStreamDefaultControllerCanCloseOrEnqueue(
readableController)) {
throw binding.getReadableStreamEnqueueError(stream[_readable]);
}
try {
binding.ReadableStreamDefaultControllerEnqueue(readableController, chunk);
} catch (e) {
TransformStreamErrorWritableAndUnblockWrite(stream, e);
throw binding.getReadableStreamStoredError(stream[_readable]);
}
const backpressure = binding.ReadableStreamDefaultControllerHasBackpressure(
readableController);
if (backpressure !== stream[_backpressure]) {
// assert(backpressure, 'backpressure is true');
TransformStreamSetBackpressure(stream, true);
}
}
function TransformStreamDefaultControllerError(controller, e) {
TransformStreamError(controller[_controlledTransformStream], e);
}
function TransformStreamDefaultControllerPerformTransform(controller, chunk) {
const transformPromise = controller[_transformAlgorithm](chunk, controller);
return thenPromise(transformPromise, undefined, r => {
TransformStreamError(controller[_controlledTransformStream], r);
throw r;
});
}
function TransformStreamDefaultControllerTerminate(controller) {
const stream = controller[_controlledTransformStream];
const readableController =
binding.getReadableStreamController(stream[_readable]);
if (binding.ReadableStreamDefaultControllerCanCloseOrEnqueue(
readableController)) {
binding.ReadableStreamDefaultControllerClose(readableController);
}
const error = new TypeError(errStreamTerminated);
TransformStreamErrorWritableAndUnblockWrite(stream, error);
}
function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
// assert(
// binding.isWritableStreamWritable(stream[_writable]),
// `stream.[[writable]][[state]] is "writable"`);
const controller = stream[_transformStreamController];
if (stream[_backpressure]) {
const backpressureChangePromise = stream[_backpressureChangePromise];
// assert(
// backpressureChangePromise !== undefined,
// `backpressureChangePromise is not undefined`);
return thenPromise(backpressureChangePromise, () => {
const writable = stream[_writable];
if (binding.isWritableStreamErroring(writable)) {
throw binding.getWritableStreamStoredError(writable);
}
// assert(binding.isWritableStreamWritable(writable),
// `state is "writable"`);
return TransformStreamDefaultControllerPerformTransform(controller,
chunk);
});
}
return TransformStreamDefaultControllerPerformTransform(controller, chunk);
}
function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) {
TransformStreamError(stream, reason);
return Promise_resolve();
}
function TransformStreamDefaultSinkCloseAlgorithm(stream) {
const readable = stream[_readable];
const controller = stream[_transformStreamController];
const flushPromise = controller[_flushAlgorithm](controller);
TransformStreamDefaultControllerClearAlgorithms(controller);
return thenPromise(
flushPromise,
() => {
if (binding.IsReadableStreamErrored(readable)) {
throw binding.getReadableStreamStoredError(readable);
}
const readableController =
binding.getReadableStreamController(readable);
if (binding.ReadableStreamDefaultControllerCanCloseOrEnqueue(
readableController)) {
binding.ReadableStreamDefaultControllerClose(readableController);
}
},
r => {
TransformStreamError(stream, r);
throw binding.getReadableStreamStoredError(readable);
});
}
function TransformStreamDefaultSourcePullAlgorithm(stream) {
// assert(stream[_backpressure], 'stream.[[backpressure]] is true');
// assert(
// stream[_backpressureChangePromise] !== undefined,
// 'stream.[[backpressureChangePromise]] is not undefined');
TransformStreamSetBackpressure(stream, false);
return stream[_backpressureChangePromise];
}
// A wrapper for CreateTransformStream() with only the arguments that
// blink::TransformStream needs. |transformAlgorithm| and |flushAlgorithm| are
// passed the controller, unlike in the standard.
function createTransformStreamSimple(transformAlgorithm, flushAlgorithm) {
return CreateTransformStream(() => Promise_resolve(),
transformAlgorithm, flushAlgorithm);
}
function createTransformStream(
transformer, writableStrategy, readableStrategy) {
if (transformer === undefined) {
transformer = ObjectCreate(null);
}
if (writableStrategy === undefined) {
writableStrategy = ObjectCreate(null);
}
if (readableStrategy === undefined) {
readableStrategy = ObjectCreate(null);
}
return new TransformStream(transformer, writableStrategy, readableStrategy);
}
function getTransformStreamReadable(stream) {
return stream[_readable];
}
function getTransformStreamWritable(stream) {
return stream[_writable];
}
//
// Exports to Blink
//
Object.assign(binding, {
createTransformStreamSimple,
createTransformStream,
TransformStreamDefaultControllerEnqueue,
getTransformStreamReadable,
getTransformStreamWritable
});
});