From 169975c58ea0332fbfb7abd3143daf112f146620 Mon Sep 17 00:00:00 2001 From: Henry Date: Wed, 29 Jun 2022 17:37:11 +1000 Subject: [PATCH 1/2] fix(shareReplay): Prevent setup/reset race condition in shareReplay with refCount --- spec/operators/shareReplay-spec.ts | 16 +++++++++++++++- src/internal/operators/share.ts | 8 +++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 4b68b1c48b..7220bbbe70 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs'; +import { Observable, Operator, Observer, of, from, defer, pipe, combineLatest, firstValueFrom, BehaviorSubject } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {shareReplay} */ @@ -274,6 +274,20 @@ describe('shareReplay', () => { expect(subscriptions).to.equal(1); }); + it('should only subscribe once each with multiple synchronous subscriptions and unsubscriptions ', async () => { + // This may seem very specific, but it's a regression test for https://github.com/ReactiveX/rxjs/issues/6760 + + let subscriptions = 0; + const source = defer(() => { + ++subscriptions; + // Needs to be an observable that doesn't complete + return new BehaviorSubject(1); + }).pipe(shareReplay({ bufferSize: 1, refCount: true })); + + firstValueFrom(combineLatest([source, source])); + expect(subscriptions).to.equal(1); + }); + it('should default to refCount being false', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source = cold('a-b-c-d-e-f-g-h-i-j'); diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 3c7facbc81..82c17f98f4 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -211,7 +211,13 @@ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. dest.subscribe(subscriber); - if (!connection) { + if ( + !connection && + // Check this shareReplay is still activate - it can be reset to 0 + // and be "unsubscribed" _before_ it actually subscribes. + // If we were to subscribe then, it'd leak and get stuck. + refCount > 0 + ) { // We need to create a subscriber here - rather than pass an observer and // assign the returned subscription to connection - because it's possible // for reentrant subscriptions to the shared observable to occur and in From 9c661e7d386f3ab26366684c83990f9958926511 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 8 Jul 2022 10:30:49 -0500 Subject: [PATCH 2/2] test: ensure we're awaiting the result of firstValueFrom --- spec/operators/shareReplay-spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 7220bbbe70..52161ef120 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -284,7 +284,7 @@ describe('shareReplay', () => { return new BehaviorSubject(1); }).pipe(shareReplay({ bufferSize: 1, refCount: true })); - firstValueFrom(combineLatest([source, source])); + await firstValueFrom(combineLatest([source, source])); expect(subscriptions).to.equal(1); });