| const { Readable, Writable, Transform, getStreamError, isStreamx, isDisturbed } = require('streamx') |
| const tee = require('teex') |
| |
| const readableKind = Symbol.for('bare.stream.readable.kind') |
| const writableKind = Symbol.for('bare.stream.writable.kind') |
| const transformKind = Symbol.for('bare.stream.transform.kind') |
| |
| // https://streams.spec.whatwg.org/#readablestreamdefaultreader |
| exports.ReadableStreamDefaultReader = class ReadableStreamDefaultReader { |
| constructor(stream) { |
| this._stream = stream |
| this._stream._stream.once('close', onclose).once('error', onerror) |
| |
| const closed = Promise.withResolvers() |
| |
| // Avoid unhandled exceptions |
| closed.promise.catch(noop) |
| |
| this._closed = closed |
| |
| function onclose() { |
| closed.resolve() |
| } |
| |
| function onerror(err) { |
| closed.reject(err) |
| } |
| } |
| |
| get closed() { |
| return this._closed.promise |
| } |
| |
| read() { |
| const stream = this._stream._stream |
| |
| return new Promise((resolve, reject) => { |
| const err = getStreamError(stream) |
| |
| if (err) return reject(err) |
| |
| if (stream.destroyed) { |
| return resolve({ value: undefined, done: true }) |
| } |
| |
| const value = stream.read() |
| |
| if (value !== null) { |
| return resolve({ value, done: false }) |
| } |
| |
| stream.once('readable', onreadable).once('close', onclose).once('error', onerror) |
| |
| function onreadable() { |
| const value = stream.read() |
| |
| ondone(null, value === null ? { value: undefined, done: true } : { value, done: false }) |
| } |
| |
| function onclose() { |
| ondone(null, { value: undefined, done: true }) |
| } |
| |
| function onerror(err) { |
| ondone(err, null) |
| } |
| |
| function ondone(err, value) { |
| stream.off('readable', onreadable).off('close', onclose).off('error', onerror) |
| |
| if (err) reject(err) |
| else resolve(value) |
| } |
| }) |
| } |
| |
| releaseLock() { |
| this._closed.reject(new TypeError('Reader was released')) |
| this._stream._releaseLock() |
| this._stream = null |
| } |
| |
| cancel(reason = new TypeError('Stream was cancelled')) { |
| const stream = this._stream._stream |
| |
| if (stream.destroyed) return Promise.resolve() |
| |
| return new Promise((resolve) => |
| stream.once('close', resolve).once('error', noop).destroy(reason) |
| ) |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#readablestreamdefaultcontroller |
| exports.ReadableStreamDefaultController = class ReadableStreamDefaultController { |
| constructor(stream) { |
| this._stream = stream |
| } |
| |
| get desiredSize() { |
| const stream = this._stream._stream |
| |
| return stream._readableState.highWaterMark - stream._readableState.buffered |
| } |
| |
| enqueue(data) { |
| this._stream._stream.push(data) |
| } |
| |
| close() { |
| this._stream._stream.push(null) |
| } |
| |
| error(err) { |
| this._stream._stream.destroy(err) |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#readablestream |
| class ReadableStream { |
| static get [readableKind]() { |
| return 0 // Compatibility version |
| } |
| |
| static from(iterable) { |
| return new ReadableStream(Readable.from(iterable)) |
| } |
| |
| constructor(underlyingSource = {}, queuingStrategy) { |
| if (isStreamx(underlyingSource)) { |
| this._stream = underlyingSource |
| } else { |
| if (queuingStrategy === undefined) { |
| queuingStrategy = new exports.CountQueuingStrategy() |
| } |
| |
| const { start, pull, cancel } = underlyingSource |
| const { highWaterMark = 1, size = defaultSize } = queuingStrategy |
| |
| this._stream = new Readable({ highWaterMark, byteLength: size }) |
| |
| const controller = new exports.ReadableStreamDefaultController(this) |
| |
| try { |
| let starting = Promise.resolve() |
| |
| if (start) starting = forwardError(start.call(this, controller), controller) |
| |
| if (pull) { |
| this._stream._read = this._read.bind(this, starting, pull.bind(this, controller)) |
| } |
| |
| if (cancel) { |
| this._stream.once('error', cancel.bind(this)) |
| } |
| } catch (err) { |
| controller.error(err) |
| } |
| } |
| |
| this._reader = null |
| } |
| |
| get [readableKind]() { |
| return ReadableStream[readableKind] |
| } |
| |
| get locked() { |
| return this._reader !== null |
| } |
| |
| getReader() { |
| if (this.locked) throw new TypeError('Stream is locked') |
| |
| this._reader = new exports.ReadableStreamDefaultReader(this) |
| |
| return this._reader |
| } |
| |
| cancel(reason = new TypeError('Stream was cancelled')) { |
| const stream = this._stream |
| |
| if (stream.destroyed) return Promise.resolve() |
| |
| if (this.locked) return Promise.reject(new TypeError('Stream is locked')) |
| |
| return new Promise((resolve) => |
| stream.once('close', resolve).once('error', noop).destroy(reason) |
| ) |
| } |
| |
| tee() { |
| const [a, b] = tee(this._stream) |
| |
| return [new ReadableStream(a), new ReadableStream(b)] |
| } |
| |
| pipeTo(destination) { |
| return new Promise((resolve, reject) => |
| this._stream.pipe(destination._stream, (err) => { |
| err ? reject(err) : resolve() |
| }) |
| ) |
| } |
| |
| [Symbol.asyncIterator]() { |
| return this._stream[Symbol.asyncIterator]() |
| } |
| |
| _releaseLock() { |
| this._reader = null |
| } |
| |
| async _read(starting, pull, cb) { |
| await starting |
| |
| let err = null |
| try { |
| await pull() |
| } catch (e) { |
| err = e |
| } |
| cb(err) |
| } |
| } |
| |
| function defaultSize() { |
| return 1 |
| } |
| |
| exports.ReadableStream = ReadableStream |
| |
| // https://streams.spec.whatwg.org/#countqueuingstrategy |
| exports.CountQueuingStrategy = class CountQueuingStrategy { |
| constructor(opts = {}) { |
| const { highWaterMark = 1 } = opts |
| |
| this.highWaterMark = highWaterMark |
| } |
| |
| size(chunk) { |
| return 1 |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#bytelengthqueuingstrategy |
| exports.ByteLengthQueuingStrategy = class ByteLengthQueuingStrategy { |
| constructor(opts = {}) { |
| const { highWaterMark = 16384 } = opts |
| |
| this.highWaterMark = highWaterMark |
| } |
| |
| size(chunk) { |
| return chunk.byteLength |
| } |
| } |
| |
| exports.isReadableStream = function isReadableStream(value) { |
| if (value instanceof ReadableStream) return true |
| |
| return ( |
| typeof value === 'object' && |
| value !== null && |
| value[readableKind] === ReadableStream[readableKind] |
| ) |
| } |
| |
| // https://streams.spec.whatwg.org/#readablestream-errored |
| exports.isReadableStreamErrored = function isReadableStreamErrored(stream) { |
| return getStreamError(stream._stream) !== null |
| } |
| |
| // https://streams.spec.whatwg.org/#is-readable-stream-disturbed |
| exports.isReadableStreamDisturbed = function isReadableStreamDisturbed(stream) { |
| return isDisturbed(stream._stream) |
| } |
| |
| // https://streams.spec.whatwg.org/#writablestreamdefaultwriter |
| exports.WritableStreamDefaultWriter = class WritableStreamDefaultWriter { |
| constructor(stream) { |
| this._stream = stream |
| this._stream._stream.once('close', onclose).once('error', onerror) |
| |
| const closed = Promise.withResolvers() |
| |
| // Avoid unhandled exceptions |
| closed.promise.catch(noop) |
| |
| this._closed = closed |
| |
| function onclose() { |
| closed.resolve() |
| } |
| |
| function onerror(err) { |
| closed.reject(err) |
| } |
| } |
| |
| get desiredSize() { |
| const stream = this._stream._stream |
| |
| return stream._writableState.highWaterMark - stream._writableState.buffered |
| } |
| |
| get closed() { |
| return this._closed.promise |
| } |
| |
| get ready() { |
| const stream = this._stream._stream |
| |
| if (getStreamError(stream)) return Promise.reject() |
| |
| return Writable.drained(stream).then() |
| } |
| |
| async write(chunk) { |
| const stream = this._stream._stream |
| |
| let err = getStreamError(stream) |
| if (err) return Promise.reject(err) |
| |
| stream.write(chunk) |
| |
| await Writable.drained(stream) |
| |
| err = getStreamError(stream) |
| if (err) return Promise.reject(err) |
| } |
| |
| releaseLock() { |
| this._closed.reject(new TypeError('Writer was released')) |
| this._stream._releaseLock() |
| this._stream = null |
| } |
| |
| close() { |
| const stream = this._stream._stream |
| |
| if (stream.destroyed) return Promise.resolve() |
| |
| return new Promise((resolve) => stream.once('close', resolve).end()) |
| } |
| |
| abort(reason = new TypeError('Stream was aborted')) { |
| const stream = this._stream._stream |
| |
| if (stream.destroyed) return Promise.resolve() |
| |
| return new Promise((resolve) => stream.once('close', resolve).destroy(reason)) |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#writablestreamdefaultcontroller |
| exports.WritableStreamDefaultController = class WritableStreamDefaultController { |
| constructor(stream) { |
| this._stream = stream |
| } |
| |
| error(err) { |
| this._stream._stream.destroy(err) |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#writablestream |
| class WritableStream { |
| static get [writableKind]() { |
| return 0 // Compatibility version |
| } |
| |
| constructor(underlyingSink = {}, queuingStrategy = {}) { |
| if (isStreamx(underlyingSink)) { |
| this._stream = underlyingSink |
| } else { |
| if (queuingStrategy === undefined) { |
| queuingStrategy = new exports.CountQueuingStrategy() |
| } |
| |
| const { start, write, close, abort } = underlyingSink |
| const { highWaterMark = 1, size = defaultSize } = queuingStrategy |
| |
| this._stream = new Writable({ highWaterMark, byteLength: size }) |
| |
| const controller = new exports.WritableStreamDefaultController(this) |
| |
| this._controller = controller |
| |
| try { |
| let starting = Promise.resolve() |
| |
| if (start) starting = forwardError(start.call(this, controller), controller) |
| |
| if (write) { |
| this._stream._write = this._write.bind(this, starting, write.bind(this)) |
| } |
| |
| if (close) { |
| this._stream._destroy = this._destroy.bind(this, close.call(this)) |
| } |
| |
| if (abort) { |
| this._stream.once('error', abort.bind(this)) |
| } |
| } catch (err) { |
| controller.error(err) |
| } |
| } |
| |
| this._writer = null |
| } |
| |
| get [writableKind]() { |
| return WritableStream[writableKind] |
| } |
| |
| get locked() { |
| return this._writer !== null |
| } |
| |
| getWriter() { |
| if (this.locked) throw new TypeError('Stream is locked') |
| |
| this._writer = new exports.WritableStreamDefaultWriter(this) |
| |
| return this._writer |
| } |
| |
| abort(reason = new TypeError('Stream was aborted')) { |
| if (this._stream.destroyed) return Promise.resolve() |
| |
| if (this.locked) return Promise.reject(new TypeError('Stream is locked')) |
| |
| return new Promise((resolve) => this._stream.once('close', resolve).destroy(reason)) |
| } |
| |
| close() { |
| if (this._stream.destroyed) return Promise.resolve() |
| |
| if (this.locked) return Promise.reject(new TypeError('Stream is locked')) |
| |
| return new Promise((resolve) => this._stream.once('close', resolve).end()) |
| } |
| |
| _releaseLock() { |
| this._writer = null |
| } |
| |
| async _write(starting, write, data, cb) { |
| await starting |
| |
| let err = null |
| try { |
| await write(data, this._controller) |
| } catch (e) { |
| err = e |
| } |
| cb(err) |
| } |
| |
| async _destroy(closing, cb) { |
| let err = null |
| try { |
| await closing |
| } catch (e) { |
| err = e |
| } |
| cb(err) |
| } |
| } |
| |
| exports.WritableStream = WritableStream |
| |
| exports.isWritableStream = function isWritableStream(value) { |
| if (value instanceof WritableStream) return true |
| |
| return ( |
| typeof value === 'object' && |
| value !== null && |
| value[writableKind] === WritableStream[writableKind] |
| ) |
| } |
| |
| // https://streams.spec.whatwg.org/#transformstreamdefaultcontroller |
| exports.TransformStreamDefaultController = class TransformStreamDefaultController { |
| constructor(stream) { |
| this._stream = stream |
| } |
| |
| get desiredSize() { |
| const stream = this._stream._stream |
| |
| return stream._readableState.highWaterMark - stream._readableState.buffered |
| } |
| |
| enqueue(data) { |
| this._stream._stream.push(data) |
| } |
| |
| error(err) { |
| this._stream._stream.destroy(err) |
| } |
| |
| terminate() { |
| const stream = this._stream._stream |
| |
| stream.push(null) |
| stream.destroy(new TypeError('Stream has been terminated')) |
| } |
| } |
| |
| // https://streams.spec.whatwg.org/#transformstream |
| class TransformStream { |
| static get [transformKind]() { |
| return 0 // Compatibility version |
| } |
| |
| constructor(transformer = {}, writableStrategy = {}, readableStrategy = {}) { |
| if (isStreamx(transformer)) { |
| this._stream = transformer |
| } else { |
| const { start, transform, flush } = transformer |
| |
| this._stream = new Transform({ ...writableStrategy, ...readableStrategy }) |
| |
| const controller = new exports.TransformStreamDefaultController(this) |
| |
| this._controller = controller |
| |
| try { |
| let starting = Promise.resolve() |
| |
| if (start) starting = forwardError(start.call(this, controller), controller) |
| |
| if (transform) { |
| this._stream._transform = this._transform.bind(this, starting, transform.bind(this)) |
| } |
| |
| if (flush) { |
| this._stream._flush = this._flush.bind(this, flush.call(this, this._controller)) |
| } |
| } catch (err) { |
| controller.error(err) |
| } |
| } |
| |
| this._writable = new WritableStream(this._stream) |
| this._readable = new ReadableStream(this._stream) |
| } |
| |
| get [transformKind]() { |
| return TransformStream[transformKind] |
| } |
| |
| get writable() { |
| return this._writable |
| } |
| |
| get readable() { |
| return this._readable |
| } |
| |
| async _transform(starting, transform, data, cb) { |
| await starting |
| |
| let err = null |
| try { |
| await transform(data, this._controller) |
| } catch (e) { |
| err = e |
| } |
| cb(err) |
| } |
| |
| async _flush(flush, cb) { |
| let err = null |
| try { |
| await flush |
| } catch (e) { |
| err = e |
| } |
| cb(err) |
| } |
| } |
| |
| exports.TransformStream = TransformStream |
| |
| exports.isTransformStream = function isTransformStream(value) { |
| if (value instanceof TransformStream) return true |
| |
| return ( |
| typeof value === 'object' && |
| value !== null && |
| value[transformKind] === TransformStream[transformKind] |
| ) |
| } |
| |
| async function forwardError(promise, controller) { |
| try { |
| await promise |
| } catch (err) { |
| controller.error(err) |
| } |
| } |
| |
| function noop() {} |