Skip to content

Commit

Permalink
fix(onErrorResumeNext): stop listening to a synchronous inner-obervab…
Browse files Browse the repository at this point in the history
…le when unsubscribed
  • Loading branch information
peaBerberian committed Aug 19, 2018
1 parent c4002f3 commit 1d14277
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
29 changes: 27 additions & 2 deletions spec/operators/onErrorResumeNext-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { onErrorResumeNext } from 'rxjs/operators';
import { concat, throwError, of } from 'rxjs';
import { onErrorResumeNext, takeWhile } from 'rxjs/operators';
import { concat, defer, throwError, of } from 'rxjs';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -104,6 +104,31 @@ describe('onErrorResumeNext operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = concat(
defer(() => {
sideEffects.push(1);
return of(1);
}),
defer(() => {
sideEffects.push(2);
return of(2);
}),
defer(() => {
sideEffects.push(3);
return of(3);
})
);

throwError(new Error('Some error')).pipe(
onErrorResumeNext(synchronousObservable),
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([1, 2]);
});

it('should work with promise', (done: MochaDone) => {
const expected = [1, 2];
const source = concat(of(1), throwError('meh'));
Expand Down
4 changes: 3 additions & 1 deletion src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
private subscribeToNextSource(): void {
const next = this.nextSources.shift();
if (next) {
this.add(subscribeToResult(this, next));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult(this, next, undefined, undefined, innerSubscriber);
} else {
this.destination.complete();
}
Expand Down

0 comments on commit 1d14277

Please sign in to comment.