| // META: global=worker,jsshell |
| // META: script=../resources/recording-streams.js |
| // META: script=../resources/test-utils.js |
| 'use strict'; |
| |
| const error1 = new Error('error1 message'); |
| error1.name = 'error1'; |
| |
| promise_test(() => { |
| const ts = recordingTransformStream(); |
| const writer = ts.writable.getWriter(); |
| // This call never resolves. |
| writer.write('a'); |
| return flushAsyncEvents().then(() => { |
| assert_array_equals(ts.events, [], 'transform should not be called'); |
| }); |
| }, 'backpressure allows no transforms with a default identity transform and no reader'); |
| |
| promise_test(() => { |
| const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 }); |
| const writer = ts.writable.getWriter(); |
| // This call to write() resolves asynchronously. |
| writer.write('a'); |
| // This call to write() waits for backpressure that is never relieved and never calls transform(). |
| writer.write('b'); |
| return flushAsyncEvents().then(() => { |
| assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once'); |
| }); |
| }, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader'); |
| |
| promise_test(() => { |
| // Without a transform() implementation, recordingTransformStream() never enqueues anything. |
| const ts = recordingTransformStream({ |
| transform() { |
| // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform() |
| // keeps being called. |
| } |
| }, undefined, { highWaterMark: 1 }); |
| const writer = ts.writable.getWriter(); |
| const writePromises = []; |
| for (let i = 0; i < 4; ++i) { |
| writePromises.push(writer.write(i)); |
| } |
| return Promise.all(writePromises).then(() => { |
| assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3], |
| 'all 4 events should be transformed'); |
| }); |
| }, 'transform() should keep being called as long as there is no backpressure'); |
| |
| promise_test(() => { |
| const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); |
| const writer = ts.writable.getWriter(); |
| const reader = ts.readable.getReader(); |
| const events = []; |
| const writerPromises = [ |
| writer.write('a').then(() => events.push('a')), |
| writer.write('b').then(() => events.push('b')), |
| writer.close().then(() => events.push('closed'))]; |
| return delay(0).then(() => { |
| assert_array_equals(events, ['a'], 'the first write should have resolved'); |
| return reader.read(); |
| }).then(({ value, done }) => { |
| assert_false(done, 'done should not be true'); |
| assert_equals('a', value, 'value should be "a"'); |
| return delay(0); |
| }).then(() => { |
| assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved'); |
| return reader.read(); |
| }).then(({ value, done }) => { |
| assert_false(done, 'done should still not be true'); |
| assert_equals('b', value, 'value should be "b"'); |
| return reader.read(); |
| }).then(({ done }) => { |
| assert_true(done, 'done should be true'); |
| return writerPromises; |
| }); |
| }, 'writes should resolve as soon as transform completes'); |
| |
| promise_test(() => { |
| const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 }); |
| const writer = ts.writable.getWriter(); |
| const reader = ts.readable.getReader(); |
| const readPromise = reader.read(); |
| writer.write('a'); |
| return readPromise.then(({ value, done }) => { |
| assert_false(done, 'not done'); |
| assert_equals(value, 'a', 'value should be "a"'); |
| }); |
| }, 'calling pull() before the first write() with backpressure should work'); |
| |
| promise_test(() => { |
| let reader; |
| const ts = recordingTransformStream({ |
| transform(chunk, controller) { |
| controller.enqueue(chunk); |
| return reader.read(); |
| } |
| }, undefined, { highWaterMark: 1 }); |
| const writer = ts.writable.getWriter(); |
| reader = ts.readable.getReader(); |
| return writer.write('a'); |
| }, 'transform() should be able to read the chunk it just enqueued'); |
| |
| promise_test(() => { |
| let resolveTransform; |
| const transformPromise = new Promise(resolve => { |
| resolveTransform = resolve; |
| }); |
| const ts = recordingTransformStream({ |
| transform() { |
| return transformPromise; |
| } |
| }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity })); |
| const writer = ts.writable.getWriter(); |
| assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); |
| return delay(0).then(() => { |
| writer.write('a'); |
| assert_array_equals(ts.events, ['transform', 'a']); |
| assert_equals(writer.desiredSize, 0, 'desiredSize should be 0'); |
| return flushAsyncEvents(); |
| }).then(() => { |
| assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0'); |
| resolveTransform(); |
| return delay(0); |
| }).then(() => { |
| assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); |
| }); |
| }, 'blocking transform() should cause backpressure'); |
| |
| promise_test(t => { |
| const ts = new TransformStream(); |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, ts.writable.getWriter().closed, 'closed should reject'); |
| }, 'writer.closed should resolve after readable is canceled during start'); |
| |
| promise_test(t => { |
| const ts = new TransformStream({}, undefined, { highWaterMark: 0 }); |
| return delay(0).then(() => { |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, ts.writable.getWriter().closed, 'closed should reject'); |
| }); |
| }, 'writer.closed should resolve after readable is canceled with backpressure'); |
| |
| promise_test(t => { |
| const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); |
| return delay(0).then(() => { |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, ts.writable.getWriter().closed, 'closed should reject'); |
| }); |
| }, 'writer.closed should resolve after readable is canceled with no backpressure'); |
| |
| promise_test(() => { |
| const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); |
| const writer = ts.writable.getWriter(); |
| return delay(0).then(() => { |
| const writePromise = writer.write('a'); |
| ts.readable.cancel(error1); |
| return writePromise; |
| }); |
| }, 'cancelling the readable should cause a pending write to resolve'); |
| |
| promise_test(t => { |
| const rs = new ReadableStream(); |
| const ts = new TransformStream(); |
| const pipePromise = rs.pipeTo(ts.writable); |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); |
| }, 'cancelling the readable side of a TransformStream should abort an empty pipe'); |
| |
| promise_test(t => { |
| const rs = new ReadableStream(); |
| const ts = new TransformStream(); |
| const pipePromise = rs.pipeTo(ts.writable); |
| return delay(0).then(() => { |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); |
| }); |
| }, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup'); |
| |
| promise_test(t => { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('a'); |
| controller.enqueue('b'); |
| controller.enqueue('c'); |
| } |
| }); |
| const ts = new TransformStream(); |
| const pipePromise = rs.pipeTo(ts.writable); |
| // Allow data to flow into the pipe. |
| return delay(0).then(() => { |
| ts.readable.cancel(error1); |
| return promise_rejects(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); |
| }); |
| }, 'cancelling the readable side of a TransformStream should abort a full pipe'); |