| 'use strict'; |
| |
| const common = require('../common'); |
| const fixtures = require('../common/fixtures'); |
| const { |
| Readable, |
| } = require('stream'); |
| const assert = require('assert'); |
| const { setTimeout } = require('timers/promises'); |
| const { createReadStream } = require('fs'); |
| |
| function oneTo5() { |
| return Readable.from([1, 2, 3, 4, 5]); |
| } |
| |
| { |
| // flatMap works on synchronous streams with a synchronous mapper |
| (async () => { |
| assert.deepStrictEqual( |
| await oneTo5().flatMap((x) => [x + x]).toArray(), |
| [2, 4, 6, 8, 10] |
| ); |
| assert.deepStrictEqual( |
| await oneTo5().flatMap(() => []).toArray(), |
| [] |
| ); |
| assert.deepStrictEqual( |
| await oneTo5().flatMap((x) => [x, x]).toArray(), |
| [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] |
| ); |
| })().then(common.mustCall()); |
| } |
| |
| |
| { |
| // flatMap works on sync/async streams with an asynchronous mapper |
| (async () => { |
| assert.deepStrictEqual( |
| await oneTo5().flatMap(async (x) => [x, x]).toArray(), |
| [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] |
| ); |
| const asyncOneTo5 = oneTo5().map(async (x) => x); |
| assert.deepStrictEqual( |
| await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(), |
| [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] |
| ); |
| })().then(common.mustCall()); |
| } |
| { |
| // flatMap works on a stream where mapping returns a stream |
| (async () => { |
| const result = await oneTo5().flatMap(async (x) => { |
| return Readable.from([x, x]); |
| }).toArray(); |
| assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]); |
| })().then(common.mustCall()); |
| // flatMap works on an objectMode stream where mappign returns a stream |
| (async () => { |
| const result = await oneTo5().flatMap(() => { |
| return createReadStream(fixtures.path('x.txt')); |
| }).toArray(); |
| // The resultant stream is in object mode so toArray shouldn't flatten |
| assert.strictEqual(result.length, 5); |
| assert.deepStrictEqual( |
| Buffer.concat(result).toString(), |
| 'xyz\n'.repeat(5) |
| ); |
| |
| })().then(common.mustCall()); |
| |
| } |
| |
| { |
| // Concurrency + AbortSignal |
| const ac = new AbortController(); |
| const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { |
| await setTimeout(100, { signal }); |
| }), { signal: ac.signal, concurrency: 2 }); |
| // pump |
| assert.rejects(async () => { |
| for await (const item of stream) { |
| // nope |
| console.log(item); |
| } |
| }, { |
| name: 'AbortError', |
| }).then(common.mustCall()); |
| |
| queueMicrotask(() => { |
| ac.abort(); |
| }); |
| } |
| |
| { |
| // Already aborted AbortSignal |
| const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { |
| await setTimeout(100, { signal }); |
| }), { signal: AbortSignal.abort() }); |
| // pump |
| assert.rejects(async () => { |
| for await (const item of stream) { |
| // nope |
| console.log(item); |
| } |
| }, { |
| name: 'AbortError', |
| }).then(common.mustCall()); |
| } |
| |
| { |
| // Error cases |
| assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/); |
| assert.throws(() => Readable.from([1]).flatMap((x) => x, { |
| concurrency: 'Foo' |
| }), /ERR_OUT_OF_RANGE/); |
| assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/); |
| assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); |
| } |
| { |
| // Test result is a Readable |
| const stream = oneTo5().flatMap((x) => x); |
| assert.strictEqual(stream.readable, true); |
| } |
| { |
| const stream = oneTo5(); |
| Object.defineProperty(stream, 'map', { |
| value: common.mustNotCall(() => {}), |
| }); |
| // Check that map isn't getting called. |
| stream.flatMap(() => true); |
| } |