| test(() => { |
| const source = new Observable(subscriber => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| subscriber.complete(); |
| }); |
| |
| let projectionCalls = 0; |
| |
| const results = []; |
| |
| const flattened = source.flatMap(value => { |
| projectionCalls++; |
| return new Observable((subscriber) => { |
| subscriber.next(value * 10); |
| subscriber.next(value * 100); |
| subscriber.complete(); |
| }); |
| }); |
| |
| assert_true(flattened instanceof Observable, "flatMap() returns an Observable"); |
| assert_equals(projectionCalls, 0, |
| "Projection is not called until subscription starts"); |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: () => results.push("error"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_equals(projectionCalls, 3, |
| "Mapper is called three times, once for each source Observable value"); |
| assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"], |
| "flatMap() results are correct"); |
| }, "flatMap(): Flattens simple source Observable properly"); |
| |
| test(() => { |
| const error = new Error("error"); |
| const source = new Observable(subscriber => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error); |
| subscriber.next(3); |
| }); |
| |
| const flattened = source.flatMap(value => { |
| return new Observable(subscriber => { |
| subscriber.next(value * 10); |
| subscriber.next(value * 100); |
| subscriber.complete(); |
| }); |
| }); |
| |
| const results = []; |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: e => results.push(e), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, [10, 100, 20, 200, error], |
| "Source error is passed through to the flatMap() Observable"); |
| }, "flatMap(): Returned Observable passes through source Observable errors"); |
| |
| test(() => { |
| const results = []; |
| const error = new Error("error"); |
| const source = new Observable(subscriber => { |
| subscriber.next(1); |
| results.push(subscriber.active ? "active" : "inactive"); |
| subscriber.next(2); |
| results.push(subscriber.active ? "active" : "inactive"); |
| subscriber.next(3); |
| subscriber.complete(); |
| }); |
| |
| const flattened = source.flatMap((value) => { |
| return new Observable((subscriber) => { |
| subscriber.next(value * 10); |
| subscriber.next(value * 100); |
| if (value === 2) { |
| subscriber.error(error); |
| } else { |
| subscriber.complete(); |
| } |
| }); |
| }); |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: e => results.push(e), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"], |
| "Inner subscription error gets surfaced"); |
| }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + |
| "'inner' Observable emits an error"); |
| |
| test(() => { |
| const results = []; |
| const error = new Error("error"); |
| const source = new Observable(subscriber => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| results.push(subscriber.active ? "active" : "inactive"); |
| subscriber.complete(); |
| }); |
| |
| const flattened = source.flatMap(value => { |
| if (value === 3) { |
| throw error; |
| } |
| return new Observable(subscriber => { |
| subscriber.next(value * 10); |
| subscriber.next(value * 100); |
| subscriber.complete(); |
| }); |
| }); |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: e => results.push(e), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, [10, 100, 20, 200, error, "inactive"], |
| "Inner subscriber thrown error gets surfaced"); |
| }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + |
| "'inner' Observable throws an error"); |
| |
| test(() => { |
| const source = createTestSubject(); |
| const inner1 = createTestSubject(); |
| const inner2 = createTestSubject(); |
| |
| const flattened = source.flatMap(value => { |
| if (value === 1) { |
| return inner1; |
| } |
| |
| return inner2; |
| }); |
| |
| const results = []; |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: e => results.push(e), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, []); |
| |
| source.next(1); |
| assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); |
| |
| source.next(2); |
| assert_equals(inner2.subscriberCount(), 0, |
| "inner2 is queued, not subscribed to until inner1 completes"); |
| |
| assert_array_equals(results, []); |
| |
| inner1.next(100); |
| inner1.next(101); |
| |
| assert_array_equals(results, [100, 101]); |
| |
| inner1.complete(); |
| assert_equals(inner1.subscriberCount(), 0, |
| "inner1 becomes inactive once it completes"); |
| assert_equals(inner2.subscriberCount(), 1, |
| "inner2 gets un-queued and subscribed to once inner1 completes"); |
| |
| inner2.next(200); |
| inner2.next(201); |
| assert_array_equals(results, [100, 101, 200, 201]); |
| |
| inner2.complete(); |
| assert_equals(inner2.subscriberCount(), 0, |
| "inner2 becomes inactive once it completes"); |
| assert_equals(source.subscriberCount(), 1, |
| "source is not unsubscribed from yet, since it has not completed"); |
| assert_array_equals(results, [100, 101, 200, 201]); |
| |
| source.complete(); |
| assert_equals(source.subscriberCount(), 0, |
| "source unsubscribed from after it completes"); |
| |
| assert_array_equals(results, [100, 101, 200, 201, "complete"]); |
| }, "flatMap(): result Observable does not complete until source and inner " + |
| "Observables all complete"); |
| |
| test(() => { |
| const source = createTestSubject(); |
| const inner1 = createTestSubject(); |
| const inner2 = createTestSubject(); |
| |
| const flattened = source.flatMap(value => { |
| if (value === 1) { |
| return inner1; |
| } |
| |
| return inner2; |
| }); |
| |
| const results = []; |
| |
| flattened.subscribe({ |
| next: v => results.push(v), |
| error: e => results.push(e), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, []); |
| |
| source.next(1); |
| source.next(2); |
| assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); |
| assert_equals(inner2.subscriberCount(), 0, |
| "inner2 is queued, not subscribed to until inner1 completes"); |
| |
| assert_array_equals(results, []); |
| |
| // Before `inner1` pushes any values, we first complete the source Observable. |
| // This will not fire completion of the Observable returned from `flatMap()`, |
| // because there are two values (corresponding to inner Observables) that are |
| // queued to the inner queue that need to be processed first. Once the last |
| // one of *those* completes (i.e., `inner2.complete()` further down), then the |
| // returned Observable can finally complete. |
| source.complete(); |
| assert_equals(source.subscriberCount(), 0, |
| "source becomes inactive once it completes"); |
| |
| inner1.next(100); |
| inner1.next(101); |
| |
| assert_array_equals(results, [100, 101]); |
| |
| inner1.complete(); |
| assert_array_equals(results, [100, 101], |
| "Outer completion not triggered after inner1 completes"); |
| assert_equals(inner2.subscriberCount(), 1, |
| "inner2 gets un-queued and subscribed after inner1 completes"); |
| |
| inner2.next(200); |
| inner2.next(201); |
| assert_array_equals(results, [100, 101, 200, 201]); |
| |
| inner2.complete(); |
| assert_equals(inner2.subscriberCount(), 0, |
| "inner2 becomes inactive once it completes"); |
| assert_array_equals(results, [100, 101, 200, 201, "complete"]); |
| }, "flatMap(): result Observable does not complete after source Observable " + |
| "completes while there are still queued inner Observables to process " + |
| "Observables all complete"); |
| |
| test(() => { |
| const source = createTestSubject(); |
| const inner = createTestSubject(); |
| const result = source.flatMap(() => inner); |
| |
| const ac = new AbortController(); |
| |
| result.subscribe({}, { signal: ac.signal, }); |
| |
| source.next(1); |
| |
| assert_equals(inner.subscriberCount(), 1, |
| "inner Observable subscribed to once source emits it"); |
| |
| ac.abort(); |
| |
| assert_equals(source.subscriberCount(), 0, |
| "source unsubscribed from, once outer signal is aborted"); |
| |
| assert_equals(inner.subscriberCount(), 0, |
| "inner Observable unsubscribed from once the outer Observable is " + |
| "subscribed from, as a result of the outer signal being aborted"); |
| }, "flatMap(): source and inner active Observables are both unsubscribed " + |
| "from once the outer subscription signal is aborted"); |
| |
| // A helper function to create an Observable that can be externally controlled |
| // and examined for testing purposes. |
| function createTestSubject() { |
| const subscribers = new Set(); |
| const subject = new Observable(subscriber => { |
| subscribers.add(subscriber); |
| subscriber.addTeardown(() => subscribers.delete(subscriber)); |
| }); |
| |
| subject.next = value => { |
| for (const subscriber of Array.from(subscribers)) { |
| subscriber.next(value); |
| } |
| }; |
| subject.error = error => { |
| for (const subscriber of Array.from(subscribers)) { |
| subscriber.error(error); |
| } |
| }; |
| subject.complete = () => { |
| for (const subscriber of Array.from(subscribers)) { |
| subscriber.complete(); |
| } |
| }; |
| subject.subscriberCount = () => { |
| return subscribers.size; |
| }; |
| |
| return subject; |
| } |