diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index ad2d9445aa..f5c82f757f 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publish} */ describe('publish operator', () => { @@ -337,4 +337,28 @@ describe('publish operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publish() + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index f7e10f548e..1c9b52196b 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs'; +import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs'; import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators'; /** @test {publishReplay} */ @@ -488,4 +488,28 @@ describe('publishReplay operator', () => { expectObservable(published).toBe(expected, undefined, error); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source2 = cold('-6-7-8-9-0-|'); + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + const source1Subs = '^ !'; + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publishReplay(1) + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index 26e6351515..151f76919d 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -59,6 +59,6 @@ export function publish(selector: MonoTypeOperatorFunction): MonoTypeOpera */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { return selector ? - multicast(() => new Subject(), selector) : - multicast(new Subject()); + (source: Observable) => multicast(() => new Subject(), selector)(source) : + (source: Observable) => multicast(new Subject())(source); } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index e5cd19228b..9d9cb7d955 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -19,7 +19,6 @@ export function publishReplay(bufferSize?: number, } const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined; - const subject = new ReplaySubject(bufferSize, windowTime, scheduler); - return (source: Observable) => multicast(() => subject, selector!)(source) as ConnectableObservable; + return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, scheduler), selector!)(source) as ConnectableObservable; }