Skip to content

Commit

Permalink
fix(share): propagate closed to firehose sources (#6370)
Browse files Browse the repository at this point in the history
* test: enable firehose test for share

* fix: add teardown directly to subscriber

* chore: add comment

* chore: remove firehose tests for deprecated ops

* refactor: move it up even further
  • Loading branch information
cartant committed May 6, 2021
1 parent abf2bc1 commit 2271a91
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 101 deletions.
43 changes: 0 additions & 43 deletions spec/operators/multicast-spec.ts
Expand Up @@ -821,47 +821,4 @@ describe('multicast', () => {
);
});
});

// TODO: fix firehose unsubscription
// AFAICT, it's not possible for multicast observables to support ASAP
// unsubscription from synchronous firehose sources. The problem is that the
// chaining of the closed 'signal' is broken by the subject. For example,
// here:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53
//
// The subject is passed to subscribe. However, in the subscribe
// implementation a SafeSubscriber is created with the subject as the
// observer:
//
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210
//
// That breaks the chaining of closed - i.e. even if the unsubscribe is
// called on the subject, closing it, the SafeSubscriber's closed property
// won't reflect that.
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable
.pipe(
multicast(
() => new Subject<number>(),
(source) => source
),
take(3)
)
.subscribe(() => {
/* noop */
});

expect(sideEffects).to.deep.equal([0, 1, 2]);
});
});
21 changes: 0 additions & 21 deletions spec/operators/refCount-spec.ts
Expand Up @@ -114,25 +114,4 @@ describe('refCount', () => {
expect(arr[0]).to.equal('the number one');
expect(arr[1]).to.equal('the number two');
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable.pipe(
multicast(() => new Subject<number>()),
refCount(),
take(3),
).subscribe(() => { /* noop */ });

expect(sideEffects).to.deep.equal([0, 1, 2]);
});
});
3 changes: 1 addition & 2 deletions spec/operators/share-spec.ts
Expand Up @@ -427,8 +427,7 @@ describe('share', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
Expand Down
19 changes: 0 additions & 19 deletions spec/operators/shareReplay-spec.ts
Expand Up @@ -347,25 +347,6 @@ describe('shareReplay', () => {
});
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects.push(i);
subscriber.next(i);
}
});

synchronousObservable.pipe(shareReplay(), take(3)).subscribe(() => {
/* noop */
});

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

const FinalizationRegistry = (global as any).FinalizationRegistry;
if (FinalizationRegistry) {
it('should not leak the subscriber for sync sources', (done) => {
Expand Down
36 changes: 20 additions & 16 deletions src/internal/operators/share.ts
Expand Up @@ -112,6 +112,26 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
// Create the subject if we don't have one yet.
subject = subject ?? connector();

// Add the teardown directly to the subscriber - instead of returning it -
// so that the handling of the subscriber's unsubscription will be wired
// up _before_ the subscription to the source occurs. This is done so that
// the assignment to the source connection's `closed` property will be seen
// by synchronous firehose sources.
subscriber.add(() => {
refCount--;

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
}
});

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
subject.subscribe(subscriber);
Expand Down Expand Up @@ -147,21 +167,5 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
});
from(source).subscribe(connection);
}

// This is also added to `subscriber`, technically.
return () => {
refCount--;

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
}
};
});
}

0 comments on commit 2271a91

Please sign in to comment.