| // Because we test that the global error handler is called at various times. |
| setup({allow_uncaught_exception: true}); |
| |
| test(() => { |
| assert_implements(self.Observable, "The Observable interface is not implemented"); |
| |
| assert_true( |
| typeof Observable === "function", |
| "Observable constructor is defined" |
| ); |
| |
| assert_throws_js(TypeError, () => { new Observable(); }); |
| }, "Observable constructor"); |
| |
| test(() => { |
| let initializerCalled = false; |
| const source = new Observable(() => { |
| initializerCalled = true; |
| }); |
| |
| assert_false( |
| initializerCalled, |
| "initializer should not be called by construction" |
| ); |
| source.subscribe(); |
| assert_true(initializerCalled, "initializer should be called by subscribe"); |
| }, "subscribe() can be called with no arguments"); |
| |
| test(() => { |
| assert_implements(self.Subscriber, "The Subscriber interface is not implemented"); |
| assert_true( |
| typeof Subscriber === "function", |
| "Subscriber interface is defined as a function" |
| ); |
| |
| assert_throws_js(TypeError, () => { new Subscriber(); }); |
| |
| let initializerCalled = false; |
| new Observable(subscriber => { |
| assert_not_equals(subscriber, undefined, "A Subscriber must be passed into the subscribe callback"); |
| assert_implements(subscriber.next, "A Subscriber object must have a next() method"); |
| assert_implements(subscriber.complete, "A Subscriber object must have a complete() method"); |
| assert_implements(subscriber.error, "A Subscriber object must have an error() method"); |
| initializerCalled = true; |
| }).subscribe(); |
| assert_true(initializerCalled, "initializer should be called by subscribe"); |
| }, "Subscriber interface is not constructible"); |
| |
| test(() => { |
| let initializerCalled = false; |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| initializerCalled = true; |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| }); |
| |
| assert_false( |
| initializerCalled, |
| "initializer should not be called by construction" |
| ); |
| |
| source.subscribe(x => results.push(x)); |
| |
| assert_true(initializerCalled, "initializer should be called by subscribe"); |
| assert_array_equals( |
| results, |
| [1, 2, 3], |
| "should emit values synchronously, but not complete" |
| ); |
| }, "Subscribe with just a function as the next handler"); |
| |
| test(() => { |
| let initializerCalled = false; |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| initializerCalled = true; |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| subscriber.complete(); |
| }); |
| |
| assert_false( |
| initializerCalled, |
| "initializer should not be called by construction" |
| ); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: () => assert_unreached("error should not be called"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_true(initializerCalled, "initializer should be called by subscribe"); |
| assert_array_equals( |
| results, |
| [1, 2, 3, "complete"], |
| "should emit values synchronously" |
| ); |
| }, "Observable constructor calls initializer on subscribe"); |
| |
| test(() => { |
| const error = new Error("error"); |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (e) => results.push(e), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, error], |
| "should emit error synchronously" |
| ); |
| }, "Observable error path called synchronously"); |
| |
| test(() => { |
| let subscriber; |
| new Observable(s => { subscriber = s }).subscribe(); |
| const {next, complete, error} = subscriber; |
| assert_throws_js(TypeError, () => next(1)); |
| assert_throws_js(TypeError, () => complete()); |
| assert_throws_js(TypeError, () => error(1)); |
| }, "Subscriber must have receiver"); |
| |
| test(() => { |
| let subscriber; |
| new Observable(s => { subscriber = s }).subscribe(); |
| assert_throws_js(TypeError, () => subscriber.next()); |
| assert_throws_js(TypeError, () => subscriber.error()); |
| }, "Subscriber next & error must recieve argument"); |
| |
| test(() => { |
| let subscriber; |
| new Observable(s => { subscriber = s }).subscribe(); |
| assert_true(subscriber.active); |
| assert_false(subscriber.signal.aborted); |
| subscriber.complete(); |
| assert_false(subscriber.active); |
| assert_true(subscriber.signal.aborted); |
| }, "Subscriber complete() will set active to false, and abort signal"); |
| |
| test(() => { |
| let subscriber; |
| new Observable(s => { subscriber = s }).subscribe(); |
| assert_true(subscriber.active); |
| subscriber.active = false; |
| assert_true(subscriber.active); |
| }, "Subscriber active is readonly"); |
| |
| test(() => { |
| let subscriber; |
| new Observable(s => { subscriber = s }).subscribe(); |
| assert_false(subscriber.signal.aborted); |
| const oldSignal = subscriber.signal; |
| const newSignal = AbortSignal.abort(); |
| subscriber.signal = newSignal; |
| assert_false(subscriber.signal.aborted); |
| assert_equals(subscriber.signal, oldSignal, "signal did not change"); |
| }, "Subscriber signal is readonly"); |
| |
| test(() => { |
| const error = new Error("error"); |
| const results = []; |
| let errorReported = null; |
| let innerSubscriber = null; |
| let subscriptionActivityInFinallyAfterThrow; |
| let subscriptionActivityInErrorHandlerAfterThrow; |
| |
| self.addEventListener("error", e => errorReported = e, {once: true}); |
| |
| const source = new Observable((subscriber) => { |
| innerSubscriber = subscriber; |
| subscriber.next(1); |
| try { |
| throw error; |
| } finally { |
| subscriptionActivityInFinallyAfterThrow = subscriber.active; |
| } |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (e) => { |
| subscriptionActivityInErrorHandlerAfterThrow = innerSubscriber.active; |
| results.push(e); |
| }, |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_equals(errorReported, null, "The global error handler should not be " + |
| "invoked when the subscribe callback throws an error and the " + |
| "subscriber has given an error handler"); |
| assert_true(subscriptionActivityInFinallyAfterThrow, "Subscriber is " + |
| "considered active in finally block before error handler is invoked"); |
| assert_false(subscriptionActivityInErrorHandlerAfterThrow, "Subscriber is " + |
| "considered inactive in error handler block after thrown error"); |
| assert_array_equals( |
| results, |
| [1, error], |
| "should emit values and the thrown error synchronously" |
| ); |
| }, "Observable should error if initializer throws"); |
| |
| test(t => { |
| let innerSubscriber = null; |
| let activeBeforeComplete = false; |
| let activeAfterComplete = false; |
| let activeDuringComplete = false; |
| let abortedBeforeComplete = false; |
| let abortedDuringComplete = false; |
| let abortedAfterComplete = false; |
| |
| const source = new Observable((subscriber) => { |
| innerSubscriber = subscriber; |
| activeBeforeComplete = subscriber.active; |
| abortedBeforeComplete = subscriber.signal.aborted; |
| |
| subscriber.complete(); |
| activeAfterComplete = subscriber.active; |
| abortedAfterComplete = subscriber.signal.aborted; |
| }); |
| |
| source.subscribe({ |
| complete: () => { |
| activeDuringComplete = innerSubscriber.active; |
| abortedDuringComplete = innerSubscriber.signal.aborted; |
| } |
| }); |
| assert_true(activeBeforeComplete, "Subscription is active before complete"); |
| assert_false(abortedBeforeComplete, "Subscription is not aborted before complete"); |
| assert_false(activeDuringComplete, |
| "Subscription becomes inactive during Subscriber#complete(), just " + |
| "before Observer#complete() callback is invoked"); |
| assert_true(abortedDuringComplete, |
| "Subscription's signal is aborted during Subscriber#complete(), just " + |
| "before Observer#complete() callback is invoked"); |
| assert_false(activeAfterComplete, "Subscription is not active after complete"); |
| assert_true(abortedAfterComplete, "Subscription is aborted after complete"); |
| }, "Subscription is inactive after complete()"); |
| |
| test(t => { |
| let innerSubscriber = null; |
| let activeBeforeError = false; |
| let activeAfterError = false; |
| let activeDuringError = false; |
| let abortedBeforeError = false; |
| let abortedDuringError = false; |
| let abortedAfterError = false; |
| |
| const error = new Error("error"); |
| const source = new Observable((subscriber) => { |
| innerSubscriber = subscriber; |
| activeBeforeError = subscriber.active; |
| abortedBeforeError = subscriber.signal.aborted; |
| |
| subscriber.error(error); |
| activeAfterError = subscriber.active; |
| abortedAfterError = subscriber.signal.aborted; |
| }); |
| |
| source.subscribe({ |
| error: () => { |
| activeDuringError = innerSubscriber.active; |
| abortedDuringError = innerSubscriber.signal.aborted; |
| } |
| }); |
| assert_true(activeBeforeError, "Subscription is active before error"); |
| assert_false(abortedBeforeError, "Subscription is not aborted before error"); |
| assert_false(activeDuringError, |
| "Subscription becomes inactive during Subscriber#error(), just " + |
| "before Observer#error() callback is invoked"); |
| assert_true(abortedDuringError, |
| "Subscription's signal is aborted during Subscriber#error(), just " + |
| "before Observer#error() callback is invoked"); |
| assert_false(activeAfterError, "Subscription is not active after error"); |
| assert_true(abortedAfterError, "Subscription is not aborted after error"); |
| }, "Subscription is inactive after error()"); |
| |
| test(t => { |
| let innerSubscriber; |
| let initialActivity; |
| let initialSignalAborted; |
| |
| const source = new Observable((subscriber) => { |
| innerSubscriber = subscriber; |
| initialActivity = subscriber.active; |
| initialSignalAborted = subscriber.signal.aborted; |
| }); |
| |
| source.subscribe({}, {signal: AbortSignal.abort('Initially aborted')}); |
| assert_false(initialActivity); |
| assert_true(initialSignalAborted); |
| assert_equals(innerSubscriber.signal.reason, 'Initially aborted'); |
| }, "Subscription is inactive when aborted signal is passed in"); |
| |
| test(() => { |
| let outerSubscriber = null; |
| |
| const source = new Observable(subscriber => outerSubscriber = subscriber); |
| |
| const controller = new AbortController(); |
| source.subscribe({}, {signal: controller.signal}); |
| |
| assert_not_equals(controller.signal, outerSubscriber.signal); |
| }, "Subscriber#signal is not the same AbortSignal as the one passed into `subscribe()`"); |
| |
| test(() => { |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.complete(); |
| subscriber.next(3); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: () => assert_unreached("error should not be called"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, "complete"], |
| "should emit values synchronously, but not nexted values after complete" |
| ); |
| }, "Subscription does not emit values after completion"); |
| |
| test(() => { |
| const error = new Error("error"); |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error); |
| subscriber.next(3); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (e) => results.push(e), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, error], |
| "should emit values synchronously, but not nexted values after error" |
| ); |
| }, "Subscription does not emit values after error"); |
| |
| test(() => { |
| const error = new Error("error"); |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error); |
| assert_false(subscriber.active, "subscriber is closed after error"); |
| subscriber.next(3); |
| subscriber.complete(); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals(results, [1, 2, error], "should emit synchronously"); |
| }, "Completing or nexting a subscriber after an error does nothing"); |
| |
| test(() => { |
| const error = new Error("custom error"); |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.error(error); |
| }); |
| |
| // No error handler provided... |
| source.subscribe({ |
| next: () => assert_unreached("next should not be called"), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| // ... still the exception is reported to the global. |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Errors pushed to the subscriber that are not handled by the subscription " + |
| "are reported to the global"); |
| |
| test(() => { |
| const error = new Error("custom error"); |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| throw error; |
| }); |
| |
| // No error handler provided... |
| source.subscribe({ |
| next: () => assert_unreached("next should not be called"), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| // ... still the exception is reported to the global. |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Errors thrown in the initializer that are not handled by the " + |
| "subscription are reported to the global"); |
| |
| test(() => { |
| const error = new Error("custom error"); |
| const results = []; |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.complete(); |
| subscriber.error(error); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: () => assert_unreached("error should not be called"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, "complete"], |
| "should emit values synchronously, but not error values after complete" |
| ); |
| |
| // Error reporting still happens even after the subscription is closed. |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"),"Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Subscription reports errors that are pushed after subscriber is closed " + |
| "by completion"); |
| |
| test(t => { |
| const error = new Error("custom error"); |
| const results = []; |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.complete(); |
| throw error; |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: () => assert_unreached("error should not be called"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals(results, [1, 2, "complete"], |
| "should emit values synchronously, but not error after complete" |
| ); |
| |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Errors thrown by initializer function after subscriber is closed by " + |
| "completion are reported"); |
| |
| test(() => { |
| const error1 = new Error("error 1"); |
| const error2 = new Error("error 2"); |
| const results = []; |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error1); |
| throw error2; |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, error1], |
| "should emit values synchronously, but not nexted values after error" |
| ); |
| |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("error 2"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error2, "Error object is equivalent"); |
| }, "Errors thrown by initializer function after subscriber is closed by " + |
| "error are reported"); |
| |
| test(() => { |
| const error1 = new Error("error 1"); |
| const error2 = new Error("error 2"); |
| const results = []; |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.error(error1); |
| subscriber.error(error2); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, error1], |
| "should emit values synchronously, but not nexted values after error" |
| ); |
| |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("error 2"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error2, "Error object is equivalent"); |
| }, "Errors pushed by initializer function after subscriber is closed by " + |
| "error are reported"); |
| |
| test(() => { |
| const results = []; |
| const target = new EventTarget(); |
| |
| const source = new Observable((subscriber) => { |
| target.addEventListener('custom event', e => { |
| subscriber.next(1); |
| subscriber.complete(); |
| subscriber.error('not a real error'); |
| }); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => { |
| results.push('complete'), |
| // Re-entrantly tries to invoke `complete()`. However, this function must |
| // only ever run once. |
| target.dispatchEvent(new Event('custom event')); |
| }, |
| }); |
| |
| target.dispatchEvent(new Event('custom event')); |
| |
| assert_array_equals( |
| results, |
| [1, 'complete'], |
| "complete() can only be called once, and cannot invoke other Observer methods" |
| ); |
| }, "Subscriber#complete() cannot re-entrantly invoke itself"); |
| |
| test(() => { |
| const results = []; |
| const target = new EventTarget(); |
| |
| const source = new Observable((subscriber) => { |
| target.addEventListener('custom event', e => { |
| subscriber.next(1); |
| subscriber.error('not a real error'); |
| subscriber.complete(); |
| }); |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => { |
| results.push('error'), |
| // Re-entrantly tries to invoke `error()`. However, this function must |
| // only ever run once. |
| target.dispatchEvent(new Event('custom event')); |
| }, |
| complete: () => results.push('complete'), |
| }); |
| |
| target.dispatchEvent(new Event('custom event')); |
| |
| assert_array_equals( |
| results, |
| [1, 'error'], |
| "error() can only be called once, and cannot invoke other Observer methods" |
| ); |
| }, "Subscriber#error() cannot re-entrantly invoke itself"); |
| |
| test(() => { |
| const results = []; |
| let innerSubscriber = null; |
| let activeDuringTeardown1 = null; |
| let abortedDuringTeardown1 = null; |
| let activeDuringTeardown2 = null; |
| let abortedDuringTeardown2 = null; |
| |
| const source = new Observable((subscriber) => { |
| assert_true(subscriber.active); |
| assert_false(subscriber.signal.aborted); |
| results.push('subscribe() callback'); |
| innerSubscriber = subscriber; |
| |
| subscriber.signal.addEventListener('abort', () => { |
| assert_false(subscriber.active); |
| assert_true(subscriber.signal.aborted); |
| results.push('inner abort handler'); |
| subscriber.next('next from inner abort handler'); |
| subscriber.complete(); |
| }); |
| |
| subscriber.addTeardown(() => { |
| activeDuringTeardown1 = subscriber.active; |
| abortedDuringTeardown1 = subscriber.signal.aborted; |
| results.push('teardown 1'); |
| }); |
| |
| subscriber.addTeardown(() => { |
| activeDuringTeardown2 = subscriber.active; |
| abortedDuringTeardown2 = subscriber.signal.aborted; |
| results.push('teardown 2'); |
| }); |
| }); |
| |
| const ac = new AbortController(); |
| source.subscribe({ |
| // This should never get called. If it is, the array assertion below will fail. |
| next: (x) => results.push(x), |
| complete: () => results.push('complete()') |
| }, {signal: ac.signal}); |
| |
| ac.signal.addEventListener('abort', () => { |
| results.push('outer abort handler'); |
| assert_true(ac.signal.aborted); |
| assert_false(innerSubscriber.signal.aborted); |
| }); |
| |
| assert_array_equals(results, ['subscribe() callback']); |
| ac.abort(); |
| results.push('abort() returned'); |
| // The reason the "inner" abort event handler is invoked first is because the |
| // "inner" AbortSignal is not a dependent signal (that would ordinarily get |
| // aborted after the parent, aka "outer" signal, is completely finished being |
| // aborted). Instead, the order of operations looks like this: |
| // 1. "Outer" signal begins to be aborted |
| // 2. Its abort algorithms [1] run [2]; the internal abort algorithm here is |
| // the "inner" Subscriber's "Close a subscription" [0]. |
| // a. This signals abort on the "inner" Subscriber's signal, firing the |
| // abort event |
| // b. Then, the "inner" Subscriber's teardowns run. |
| // 3. Once the "outer" signal's abort algorithms are finished, the abort |
| // event is fired [3], triggering the outer abort handler. |
| // |
| // [0]: https://wicg.github.io/observable/#close-a-subscription |
| // [1]: https://dom.spec.whatwg.org/#abortsignal-abort-algorithms |
| // [2]: https://dom.spec.whatwg.org/#ref-for-abortsignal-abort-algorithms%E2%91%A2:~:text=For%20each%20algorithm%20of%20signal%E2%80%99s%20abort%20algorithms%3A%20run%20algorithm |
| // [3]: https://dom.spec.whatwg.org/#abortsignal-signal-abort:~:text=Fire%20an%20event%20named%20abort%20at%20signal |
| assert_array_equals(results, [ |
| 'subscribe() callback', 'inner abort handler', 'teardown 2', 'teardown 1', |
| 'outer abort handler', 'abort() returned', |
| ]); |
| assert_false(activeDuringTeardown1, 'should not be active during teardown callback 1'); |
| assert_false(activeDuringTeardown2, 'should not be active during teardown callback 2'); |
| assert_true(abortedDuringTeardown1, 'should be aborted during teardown callback 1'); |
| assert_true(abortedDuringTeardown2, 'should be aborted during teardown callback 2'); |
| }, "Unsubscription lifecycle"); |
| |
| // In the usual consumer-initiated unsubscription case, when the AbortController |
| // is aborted after subscription, teardowns run from upstream->downstream. This |
| // is because for a given Subscriber, when a downstream signal is aborted |
| // (`ac.signal` in this case), the "Close" algorithm prompts the Subscriber to |
| // first abort *its* own signal (the one accessible via `Subscriber#signal`) and |
| // then run its teardowns. |
| // |
| // This means upstream Subscribers get the first opportunity their teardowns |
| // before the control flow is returned to downstream Subscribers to run *their* |
| // teardowns (after they abort their internal signal). |
| test(() => { |
| const results = []; |
| const upstream = new Observable(subscriber => { |
| subscriber.signal.addEventListener('abort', |
| e => results.push('upstream abort handler'), {once: true}); |
| subscriber.addTeardown( |
| () => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`)); |
| }); |
| const middle = new Observable(subscriber => { |
| subscriber.signal.addEventListener('abort', |
| e => results.push('middle abort handler'), {once: true}); |
| subscriber.addTeardown( |
| () => results.push(`middle teardown. reason: ${subscriber.signal.reason}`)); |
| upstream.subscribe({}, {signal: subscriber.signal}); |
| }); |
| const downstream = new Observable(subscriber => { |
| subscriber.signal.addEventListener('abort', |
| e => results.push('downstream abort handler'), {once: true}); |
| subscriber.addTeardown( |
| () => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`)); |
| middle.subscribe({}, {signal: subscriber.signal}); |
| }); |
| |
| const ac = new AbortController(); |
| downstream.subscribe({}, {signal: ac.signal}); |
| ac.abort('Abort!'); |
| assert_array_equals(results, [ |
| 'upstream abort handler', |
| 'upstream teardown. reason: Abort!', |
| 'middle abort handler', |
| 'middle teardown. reason: Abort!', |
| 'downstream abort handler', |
| 'downstream teardown. reason: Abort!', |
| ]); |
| }, "Teardowns are called in upstream->downstream order on " + |
| "consumer-initiated unsubscription"); |
| |
| // This test is like the above, but asserts the exact opposite order of |
| // teardowns. This is because, since the Subscriber's signal is aborted |
| // immediately upon construction, `addTeardown()` runs teardowns synchronously |
| // in subscriber-order, which goes from downstream->upstream. |
| test(() => { |
| const results = []; |
| const upstream = new Observable(subscriber => { |
| subscriber.addTeardown( |
| () => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`)); |
| }); |
| const middle = new Observable(subscriber => { |
| subscriber.addTeardown( |
| () => results.push(`middle teardown. reason: ${subscriber.signal.reason}`)); |
| upstream.subscribe({}, {signal: subscriber.signal}); |
| }); |
| const downstream = new Observable(subscriber => { |
| subscriber.addTeardown( |
| () => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`)); |
| middle.subscribe({}, {signal: subscriber.signal}); |
| }); |
| |
| downstream.subscribe({}, {signal: AbortSignal.abort('Initial abort')}); |
| assert_array_equals(results, [ |
| "downstream teardown. reason: Initial abort", |
| "middle teardown. reason: Initial abort", |
| "upstream teardown. reason: Initial abort", |
| ]); |
| }, "Teardowns are called in downstream->upstream order on " + |
| "consumer-initiated unsubscription with pre-aborted Signal"); |
| |
| // Producer-initiated unsubscription test, capturing the ordering of abort events and teardowns. |
| test(() => { |
| const results = []; |
| |
| const source = new Observable(subscriber => { |
| subscriber.addTeardown(() => results.push('source teardown')); |
| subscriber.signal.addEventListener('abort', |
| e => results.push('source abort event')); |
| }); |
| |
| const middle = new Observable(subscriber => { |
| subscriber.addTeardown(() => results.push('middle teardown')); |
| subscriber.signal.addEventListener('abort', |
| e => results.push('middle abort event')); |
| |
| source.subscribe(() => {}, {signal: subscriber.signal}); |
| }); |
| |
| let innerSubscriber = null; |
| const downstream = new Observable(subscriber => { |
| innerSubscriber = subscriber; |
| subscriber.addTeardown(() => results.push('downstream teardown')); |
| subscriber.signal.addEventListener('abort', |
| e => results.push('downstream abort event')); |
| |
| middle.subscribe(() => {}, {signal: subscriber.signal}); |
| }); |
| |
| downstream.subscribe(); |
| |
| // Trigger a producer-initiated unsubscription from the most-downstream Observable. |
| innerSubscriber.complete(); |
| |
| assert_array_equals(results, [ |
| 'source abort event', |
| 'source teardown', |
| 'middle abort event', |
| 'middle teardown', |
| 'downstream abort event', |
| 'downstream teardown', |
| ]); |
| }, "Producer-initiated unsubscription in a downstream Observable fires abort " + |
| "events before each teardown, in downstream->upstream order"); |
| |
| test(t => { |
| let innerSubscriber = null; |
| const source = new Observable(subscriber => { |
| innerSubscriber = subscriber; |
| subscriber.error('calling error()'); |
| }); |
| |
| source.subscribe(); |
| assert_equals(innerSubscriber.signal.reason, "calling error()", |
| "Reason is set correctly"); |
| }, "Subscriber#error() value is stored as Subscriber's AbortSignal's reason"); |
| |
| test(t => { |
| const source = new Observable((subscriber) => { |
| let n = 0; |
| while (!subscriber.signal.aborted) { |
| assert_true(subscriber.active); |
| subscriber.next(n++); |
| if (n > 3) { |
| assert_unreached("The subscriber should be closed by now"); |
| } |
| } |
| assert_false(subscriber.active); |
| }); |
| |
| const ac = new AbortController(); |
| const results = []; |
| |
| source.subscribe({ |
| next: (x) => { |
| results.push(x); |
| if (x === 2) { |
| ac.abort(); |
| } |
| }, |
| error: () => results.push('error'), |
| complete: () => results.push('complete') |
| }, {signal: ac.signal}); |
| |
| assert_array_equals( |
| results, |
| [0, 1, 2], |
| "should emit values synchronously before abort" |
| ); |
| }, "Aborting a subscription should stop emitting values"); |
| |
| test(() => { |
| const error = new Error("custom error"); |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable(() => { |
| throw error; |
| }); |
| |
| try { |
| source.subscribe(); |
| } catch { |
| assert_unreached("subscriber() never throws an error"); |
| } |
| |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Calling subscribe should never throw an error synchronously, initializer throws error"); |
| |
| test(() => { |
| const error = new Error("custom error"); |
| let errorReported = null; |
| |
| self.addEventListener("error", e => errorReported = e, { once: true }); |
| |
| const source = new Observable((subscriber) => { |
| subscriber.error(error); |
| }); |
| |
| try { |
| source.subscribe(); |
| } catch { |
| assert_unreached("subscriber() never throws an error"); |
| } |
| |
| assert_true(errorReported !== null, "Exception was reported to global"); |
| assert_true(errorReported.message.includes("custom error"), "Error message matches"); |
| assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); |
| assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); |
| assert_equals(errorReported.error, error, "Error object is equivalent"); |
| }, "Calling subscribe should never throw an error synchronously, subscriber pushes error"); |
| |
| test(() => { |
| let addTeardownCalled = false; |
| let activeDuringTeardown; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.addTeardown(() => { |
| addTeardownCalled = true; |
| activeDuringTeardown = subscriber.active; |
| }); |
| }); |
| |
| const ac = new AbortController(); |
| source.subscribe({}, {signal: ac.signal}); |
| |
| assert_false(addTeardownCalled, "Teardown is not be called upon subscription"); |
| ac.abort(); |
| assert_true(addTeardownCalled, "Teardown is called when subscription is aborted"); |
| assert_false(activeDuringTeardown, "Teardown observers inactive subscription"); |
| }, "Teardown should be called when subscription is aborted"); |
| |
| test(() => { |
| const addTeardownsCalled = []; |
| // This is used to snapshot `addTeardownsCalled` from within the subscribe |
| // callback, for assertion/comparison later. |
| let teardownsSnapshot = []; |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); |
| |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| subscriber.complete(); |
| |
| // We don't run the actual `assert_array_equals` here because if it fails, |
| // it won't be properly caught. This is because assertion failures throw an |
| // error, and in subscriber callback, thrown errors result in |
| // `window.onerror` handlers being called, which this test file doesn't |
| // record as an error (see the first line of this file). |
| teardownsSnapshot = addTeardownsCalled; |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: () => results.push("unreached"), |
| complete: () => results.push("complete"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, 3, "complete"], |
| "should emit values and complete synchronously" |
| ); |
| |
| assert_array_equals(teardownsSnapshot, addTeardownsCalled); |
| assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], |
| "Teardowns called in LIFO order synchronously after complete()"); |
| }, "Teardowns should be called when subscription is closed by completion"); |
| |
| test(() => { |
| const addTeardownsCalled = []; |
| let teardownsSnapshot = []; |
| const error = new Error("error"); |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); |
| |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| subscriber.error(error); |
| |
| teardownsSnapshot = addTeardownsCalled; |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, 3, error], |
| "should emit values and error synchronously" |
| ); |
| |
| assert_array_equals(teardownsSnapshot, addTeardownsCalled); |
| assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], |
| "Teardowns called in LIFO order synchronously after error()"); |
| }, "Teardowns should be called when subscription is closed by subscriber pushing an error"); |
| |
| test(() => { |
| const addTeardownsCalled = []; |
| const error = new Error("error"); |
| const results = []; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); |
| |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| throw error; |
| }); |
| |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => assert_unreached("complete should not be called"), |
| }); |
| |
| assert_array_equals( |
| results, |
| [1, 2, 3, error], |
| "should emit values and error synchronously" |
| ); |
| |
| assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], |
| "Teardowns called in LIFO order synchronously after thrown error"); |
| }, "Teardowns should be called when subscription is closed by subscriber throwing error"); |
| |
| test(() => { |
| const addTeardownsCalled = []; |
| const results = []; |
| let firstTeardownInvokedSynchronously = false; |
| let secondTeardownInvokedSynchronously = false; |
| |
| const source = new Observable((subscriber) => { |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); |
| if (addTeardownsCalled.length === 1) { |
| firstTeardownInvokedSynchronously = true; |
| } |
| subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); |
| if (addTeardownsCalled.length === 2) { |
| secondTeardownInvokedSynchronously = true; |
| } |
| |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.next(3); |
| subscriber.complete(); |
| }); |
| |
| const ac = new AbortController(); |
| ac.abort(); |
| source.subscribe({ |
| next: (x) => results.push(x), |
| error: (error) => results.push(error), |
| complete: () => results.push('complete') |
| }, {signal: ac.signal}); |
| |
| assert_array_equals(results, []); |
| assert_true(firstTeardownInvokedSynchronously, "First teardown callback is invoked during addTeardown()"); |
| assert_true(secondTeardownInvokedSynchronously, "Second teardown callback is invoked during addTeardown()"); |
| assert_array_equals(addTeardownsCalled, ["teardown 1", "teardown 2"], |
| "Teardowns called synchronously upon addition end up in FIFO order"); |
| }, "Teardowns should be called synchronously during addTeardown() if the subscription is inactive"); |
| |
| test(() => { |
| const results = []; |
| let producerInvocations = 0; |
| let teardownInvocations = 0; |
| |
| const source = new Observable((subscriber) => { |
| producerInvocations++; |
| results.push('producer invoked'); |
| subscriber.addTeardown(() => { |
| teardownInvocations++; |
| results.push('teardown invoked'); |
| }); |
| }); |
| |
| const ac1 = new AbortController(); |
| const ac2 = new AbortController(); |
| |
| // First subscription. |
| source.subscribe({}, {signal: ac1.signal}); |
| assert_equals(producerInvocations, 1, |
| "Producer is invoked once for first subscription"); |
| |
| // Second subscription should reuse the same producer. |
| source.subscribe({}, {signal: ac2.signal}); |
| assert_equals(producerInvocations, 1, |
| "Producer should not be invoked again for second subscription"); |
| |
| // First unsubscribe. |
| ac1.abort(); |
| assert_equals(teardownInvocations, 0, |
| "Teardown not run when first subscriber unsubscribes"); |
| |
| // Second unsubscribe. |
| ac2.abort(); |
| assert_equals(teardownInvocations, 1, |
| "Teardown should run after last subscriber unsubscribes"); |
| |
| assert_array_equals(results, ['producer invoked', 'teardown invoked']); |
| }, "Multiple subscriptions share the same producer and teardown runs only " + |
| "after last subscription abort"); |
| |
| test(() => { |
| const results = []; |
| let activeSubscriber = null; |
| |
| const source = new Observable(subscriber => { |
| activeSubscriber = subscriber; |
| results.push('producer start'); |
| subscriber.addTeardown(() => results.push('teardown')); |
| }); |
| |
| // First subscription. |
| const ac1 = new AbortController(); |
| source.subscribe({}, {signal: ac1.signal}); |
| assert_array_equals(results, ['producer start']); |
| |
| // Second subscription. |
| const ac2 = new AbortController(); |
| source.subscribe({}, {signal: ac2.signal}); |
| |
| // Complete the subscription. |
| activeSubscriber.complete(); |
| assert_array_equals(results, ['producer start', 'teardown']); |
| |
| // Additional subscription after complete. |
| const ac3 = new AbortController(); |
| source.subscribe({}, {signal: ac3.signal}); |
| |
| assert_array_equals(results, ['producer start', 'teardown', 'producer start']); |
| }, "New subscription after complete creates new producer"); |
| |
| test(() => { |
| const results = []; |
| let producerInvocations = 0; |
| |
| const source = new Observable(subscriber => { |
| producerInvocations++; |
| results.push('producer start'); |
| subscriber.addTeardown(() => results.push('teardown')); |
| }); |
| |
| // Create 3 subscriptions. |
| const ac1 = new AbortController(); |
| const ac2 = new AbortController(); |
| const ac3 = new AbortController(); |
| source.subscribe({}, {signal: ac1.signal}); |
| source.subscribe({}, {signal: ac2.signal}); |
| source.subscribe({}, {signal: ac3.signal}); |
| |
| assert_equals(producerInvocations, 1, "Producer should be invoked once"); |
| |
| // Unsubscribe in a different order. |
| ac2.abort(); |
| results.push('after first abort'); |
| ac1.abort(); |
| results.push('after second abort'); |
| ac3.abort(); |
| results.push('after final abort'); |
| |
| assert_array_equals(results, [ |
| 'producer start', |
| 'after first abort', |
| 'after second abort', |
| 'teardown', |
| 'after final abort' |
| ]); |
| }, "Teardown runs after last unsubscribe regardless of unsubscription order"); |
| |
| test(() => { |
| const results = []; |
| const source = new Observable(subscriber => { |
| subscriber.next(1); |
| subscriber.next(2); |
| subscriber.complete(); |
| }); |
| |
| source.subscribe(v => { |
| results.push(`${v}-first-sub`); |
| if (v === 1) { |
| // This new subscription adds a new internal observer to the subscriber's |
| // internal observer list, but it does not get the value `1` that the |
| // subscriber is currently pushing. This is because the Subscriber |
| // iterates over a snapshot of its internal observers to push values to. |
| // The first value that this new subscription will see is `2`. |
| // |
| // See https://github.com/WICG/observable/pull/214. |
| source.subscribe(v => results.push(`${v}-second-sub`)); |
| } |
| }); |
| |
| assert_array_equals(results, [ |
| "1-first-sub", |
| |
| "2-first-sub", |
| "2-second-sub", |
| ]); |
| }, "Subscriber iterates over a snapshot of its internal observers"); |