diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 4b68b1c48b..52161ef120 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs'; +import { Observable, Operator, Observer, of, from, defer, pipe, combineLatest, firstValueFrom, BehaviorSubject } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {shareReplay} */ @@ -274,6 +274,20 @@ describe('shareReplay', () => { expect(subscriptions).to.equal(1); }); + it('should only subscribe once each with multiple synchronous subscriptions and unsubscriptions ', async () => { + // This may seem very specific, but it's a regression test for https://github.com/ReactiveX/rxjs/issues/6760 + + let subscriptions = 0; + const source = defer(() => { + ++subscriptions; + // Needs to be an observable that doesn't complete + return new BehaviorSubject(1); + }).pipe(shareReplay({ bufferSize: 1, refCount: true })); + + await firstValueFrom(combineLatest([source, source])); + expect(subscriptions).to.equal(1); + }); + it('should default to refCount being false', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source = cold('a-b-c-d-e-f-g-h-i-j'); diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 3c7facbc81..82c17f98f4 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -211,7 +211,13 @@ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. dest.subscribe(subscriber); - if (!connection) { + if ( + !connection && + // Check this shareReplay is still activate - it can be reset to 0 + // and be "unsubscribed" _before_ it actually subscribes. + // If we were to subscribe then, it'd leak and get stuck. + refCount > 0 + ) { // We need to create a subscriber here - rather than pass an observer and // assign the returned subscription to connection - because it's possible // for reentrant subscriptions to the shared observable to occur and in