Skip to content

Commit

Permalink
fix(share): Prevent setup/reset race condition in share with refCount (
Browse files Browse the repository at this point in the history
…#7005)

* fix(shareReplay): Prevent setup/reset race condition in shareReplay with refCount

* test: ensure we're awaiting the result of firstValueFrom

Co-authored-by: Ben Lesh <ben@benlesh.com>
  • Loading branch information
henry-alakazhang and benlesh committed Jul 8, 2022
1 parent 47fa8d5 commit 5d4c1d9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
16 changes: 15 additions & 1 deletion spec/operators/shareReplay-spec.ts
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs';
import { Observable, Operator, Observer, of, from, defer, pipe, combineLatest, firstValueFrom, BehaviorSubject } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {shareReplay} */
Expand Down Expand Up @@ -274,6 +274,20 @@ describe('shareReplay', () => {
expect(subscriptions).to.equal(1);
});

it('should only subscribe once each with multiple synchronous subscriptions and unsubscriptions ', async () => {
// This may seem very specific, but it's a regression test for https://github.com/ReactiveX/rxjs/issues/6760

let subscriptions = 0;
const source = defer(() => {
++subscriptions;
// Needs to be an observable that doesn't complete
return new BehaviorSubject(1);
}).pipe(shareReplay({ bufferSize: 1, refCount: true }));

await firstValueFrom(combineLatest([source, source]));
expect(subscriptions).to.equal(1);
});

it('should default to refCount being false', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
Expand Down
8 changes: 7 additions & 1 deletion src/internal/operators/share.ts
Expand Up @@ -211,7 +211,13 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
// Basically, `subscriber === dest.subscribe(subscriber)` is `true`.
dest.subscribe(subscriber);

if (!connection) {
if (
!connection &&
// Check this shareReplay is still activate - it can be reset to 0
// and be "unsubscribed" _before_ it actually subscribes.
// If we were to subscribe then, it'd leak and get stuck.
refCount > 0
) {
// We need to create a subscriber here - rather than pass an observer and
// assign the returned subscription to connection - because it's possible
// for reentrant subscriptions to the shared observable to occur and in
Expand Down

0 comments on commit 5d4c1d9

Please sign in to comment.