From b998fa021f17d0af9953d8c0b6fbd58254d945a6 Mon Sep 17 00:00:00 2001 From: Eli Davidson Date: Fri, 10 Jul 2020 16:22:32 -0400 Subject: [PATCH] fix(publish,publishReplay): resolve sharing Subject change publish operator to use factory change publishReplay operator to not share ReplaySubject fixes issue #5411 --- spec/operators/publish-spec.ts | 26 ++++++++++++++++++++++++- spec/operators/publishReplay-spec.ts | 26 ++++++++++++++++++++++++- src/internal/operators/publish.ts | 2 +- src/internal/operators/publishReplay.ts | 5 +---- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index ad2d9445aa..f5c82f757f 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publish} */ describe('publish operator', () => { @@ -337,4 +337,28 @@ describe('publish operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publish() + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index d01fc00bda..4386ccc59b 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs'; +import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs'; import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators'; /** @test {publishReplay} */ @@ -487,4 +487,28 @@ describe('publishReplay operator', () => { expectObservable(published).toBe(expected, undefined, error); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source2 = cold('-6-7-8-9-0-|'); + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + const source1Subs = '^ !'; + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publishReplay(1) + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index e1a1930e15..7de67d27aa 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -85,5 +85,5 @@ export function publish>(selector: (shared: Ob * Details: https://rxjs.dev/deprecations/multicasting */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { - return selector ? connect(selector) : multicast(new Subject()); + return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject())(source); } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index f772fc35b8..47494e2a66 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -89,11 +89,8 @@ export function publishReplay( if (selectorOrScheduler && !isFunction(selectorOrScheduler)) { timestampProvider = selectorOrScheduler; } - const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined; - const subject = new ReplaySubject(bufferSize, windowTime, timestampProvider); - // Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument // but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross). - return (source: Observable) => multicast(subject, selector!)(source); + return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, timestampProvider), selector!)(source); }