From b998fa021f17d0af9953d8c0b6fbd58254d945a6 Mon Sep 17 00:00:00 2001 From: Eli Davidson Date: Fri, 10 Jul 2020 16:22:32 -0400 Subject: [PATCH 01/12] fix(publish,publishReplay): resolve sharing Subject change publish operator to use factory change publishReplay operator to not share ReplaySubject fixes issue #5411 --- spec/operators/publish-spec.ts | 26 ++++++++++++++++++++++++- spec/operators/publishReplay-spec.ts | 26 ++++++++++++++++++++++++- src/internal/operators/publish.ts | 2 +- src/internal/operators/publishReplay.ts | 5 +---- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index ad2d9445aa..f5c82f757f 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publish} */ describe('publish operator', () => { @@ -337,4 +337,28 @@ describe('publish operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publish() + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index d01fc00bda..4386ccc59b 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs'; +import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs'; import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators'; /** @test {publishReplay} */ @@ -487,4 +487,28 @@ describe('publishReplay operator', () => { expectObservable(published).toBe(expected, undefined, error); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source2 = cold('-6-7-8-9-0-|'); + const expected1 = '-1-2-3-4-5-|'; + const expected2 = '-6-7-8-9-0-|'; + const source1Subs = '^ !'; + const source2Subs = '^ !'; + + const sharedPipeLine = pipe( + publishReplay(1) + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index e1a1930e15..7de67d27aa 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -85,5 +85,5 @@ export function publish>(selector: (shared: Ob * Details: https://rxjs.dev/deprecations/multicasting */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { - return selector ? connect(selector) : multicast(new Subject()); + return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject())(source); } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index f772fc35b8..47494e2a66 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -89,11 +89,8 @@ export function publishReplay( if (selectorOrScheduler && !isFunction(selectorOrScheduler)) { timestampProvider = selectorOrScheduler; } - const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined; - const subject = new ReplaySubject(bufferSize, windowTime, timestampProvider); - // Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument // but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross). - return (source: Observable) => multicast(subject, selector!)(source); + return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, timestampProvider), selector!)(source); } From f0e13c0cd8b9ced2fb2f87b98e0e4d3361f79e8f Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 May 2021 10:36:57 +1000 Subject: [PATCH 02/12] test: rearrange tests --- spec/operators/publish-spec.ts | 10 +++++----- spec/operators/publishReplay-spec.ts | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index f5c82f757f..b405cb34ac 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -341,17 +341,17 @@ describe('publish operator', () => { it('should subscribe to its own source when using a shared pipeline', () => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = '^ !'; + const expected1 = '-1-2-3-4-5-|'; const source2 = cold('-6-7-8-9-0-|'); - const source2Subs = '^ !'; + const source2Subs = '^ !'; + const expected2 = '-6-7-8-9-0-|'; const sharedPipeLine = pipe( - publish() - ); + publish() + ); const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; - const expected1 = '-1-2-3-4-5-|'; - const expected2 = '-6-7-8-9-0-|'; expectObservable(published1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index 4386ccc59b..594110e55d 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -490,15 +490,15 @@ describe('publishReplay operator', () => { it('should subscribe to its own source when using a shared pipeline', () => { const source1 = cold('-1-2-3-4-5-|'); - const source2 = cold('-6-7-8-9-0-|'); - const expected1 = '-1-2-3-4-5-|'; - const expected2 = '-6-7-8-9-0-|'; const source1Subs = '^ !'; + const expected1 = '-1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); const source2Subs = '^ !'; + const expected2 = '-6-7-8-9-0-|'; const sharedPipeLine = pipe( publishReplay(1) - ); + ); const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; From 47376dc2e6ef0d6a5bae018cb5014f598383c78e Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 May 2021 10:37:44 +1000 Subject: [PATCH 03/12] test: add failing ref transparency tests --- spec/operators/publishBehavior-spec.ts | 26 +++++++++++++++++++++++++- spec/operators/publishLast-spec.ts | 26 +++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/spec/operators/publishBehavior-spec.ts b/spec/operators/publishBehavior-spec.ts index 06c56abafa..398337af4e 100644 --- a/spec/operators/publishBehavior-spec.ts +++ b/spec/operators/publishBehavior-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publishBehavior, mergeMapTo, tap, mergeMap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publishBehavior} */ describe('publishBehavior operator', () => { @@ -344,4 +344,28 @@ describe('publishBehavior operator', () => { expect(results).to.deep.equal([]); done(); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = 'x1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = 'x6-7-8-9-0-|'; + + const sharedPipeLine = pipe( + publishBehavior('x') + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishLast-spec.ts b/spec/operators/publishLast-spec.ts index 2287a4b15f..03db9dd3de 100644 --- a/spec/operators/publishLast-spec.ts +++ b/spec/operators/publishLast-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publishLast, mergeMapTo, tap, mergeMap, refCount, retry } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publishLast} */ describe('publishLast operator', () => { @@ -261,4 +261,28 @@ describe('publishLast operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = '-----------(5|)'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = '-----------(0|)'; + + const sharedPipeLine = pipe( + publishLast() + ); + + const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; + const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); From e0b4e57967909598d26b4a5ab5dc5ee747a801cc Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 May 2021 10:39:20 +1000 Subject: [PATCH 04/12] fix(publishBehavior): make ref transparent --- src/internal/operators/publishBehavior.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/publishBehavior.ts b/src/internal/operators/publishBehavior.ts index 87ff0ec8dc..d94589c09c 100644 --- a/src/internal/operators/publishBehavior.ts +++ b/src/internal/operators/publishBehavior.ts @@ -18,7 +18,9 @@ import { UnaryFunction } from '../types'; * Details: https://rxjs.dev/deprecations/multicasting */ export function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable> { - const subject = new BehaviorSubject(initialValue); // Note that this has *never* supported the selector function. - return (source) => new ConnectableObservable(source, () => subject); + return (source) => { + const subject = new BehaviorSubject(initialValue); + return new ConnectableObservable(source, () => subject); + }; } From 2ccae8cfb01be21bd72286db41a5812e1d1e1ee8 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 15 May 2021 10:39:55 +1000 Subject: [PATCH 05/12] fix(publishLast): make ref transparent --- src/internal/operators/publishLast.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/publishLast.ts b/src/internal/operators/publishLast.ts index 13a6e57624..7cde7f50d5 100644 --- a/src/internal/operators/publishLast.ts +++ b/src/internal/operators/publishLast.ts @@ -67,7 +67,9 @@ import { UnaryFunction } from '../types'; * Details: https://rxjs.dev/deprecations/multicasting */ export function publishLast(): UnaryFunction, ConnectableObservable> { - const subject = new AsyncSubject(); // Note that this has *never* supported a selector function like `publish` and `publishReplay`. - return (source) => new ConnectableObservable(source, () => subject); + return (source) => { + const subject = new AsyncSubject(); + return new ConnectableObservable(source, () => subject); + }; } From a7879a117a661d8b4564702415028e9ba8c30a1a Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 18 May 2021 21:16:58 +1000 Subject: [PATCH 06/12] test: add failing ref transparency tests --- spec/operators/share-spec.ts | 23 ++++++++++++++++++++++- spec/operators/shareReplay-spec.ts | 23 ++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 2ea5132a4b..73cab855fb 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,6 +1,6 @@ /** @prettier */ import { expect } from 'chai'; -import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs'; +import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError, pipe } from 'rxjs'; import { map, mergeMap, @@ -619,6 +619,27 @@ describe('share', () => { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); + + it('should subscribe to its own source when using a shared pipeline', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = ' ^----------!'; + const expected1 = ' -1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = ' ^----------!'; + const expected2 = ' -6-7-8-9-0-|'; + + const sharedPipeLine = pipe(share({ resetOnRefCountZero })); + + const shared1 = source1.pipe(sharedPipeLine); + const shared2 = source2.pipe(sharedPipeLine); + + expectObservable(shared1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(shared2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + }); + }); }); } diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index e202aee89d..ebd10fa09d 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -3,7 +3,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { Observable, Operator, Observer, of, from, defer } from 'rxjs'; +import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {shareReplay} */ @@ -387,4 +387,25 @@ describe('shareReplay', () => { } else { console.warn(`No support for FinalizationRegistry in Node ${process.version}`); } + + it('should subscribe to its own source when using a shared pipeline', () => { + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = ' ^----------!'; + const expected1 = ' -1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = ' ^----------!'; + const expected2 = ' -6-7-8-9-0-|'; + + const sharedPipeLine = pipe(shareReplay({ refCount: false })); + + const shared1 = source1.pipe(sharedPipeLine); + const shared2 = source2.pipe(sharedPipeLine); + + expectObservable(shared1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(shared2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + }); + }); }); From 877b379b219cb882d9f76676c1e84e86f6ac9d0d Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 18 May 2021 21:27:41 +1000 Subject: [PATCH 07/12] fix(share): make ref transparent --- src/internal/operators/share.ts | 164 ++++++++++++++++---------------- 1 file changed, 83 insertions(+), 81 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 90f929d9aa..e4966a7087 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -134,90 +134,92 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * @return A function that returns an Observable that mirrors the source. */ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { - const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; - - let connection: SafeSubscriber | null = null; - let resetConnection: Subscription | null = null; - let subject: SubjectLike | null = null; - let refCount = 0; - let hasCompleted = false; - let hasErrored = false; - - const cancelReset = () => { - resetConnection?.unsubscribe(); - resetConnection = null; - }; - // Used to reset the internal state to a "cold" - // state, as though it had never been subscribed to. - const reset = () => { - cancelReset(); - connection = subject = null; - hasCompleted = hasErrored = false; - }; - const resetAndUnsubscribe = () => { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); - }; - - return operate((source, subscriber) => { - refCount++; - if (!hasErrored && !hasCompleted) { + return (observable) => { + const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; + + let connection: SafeSubscriber | null = null; + let resetConnection: Subscription | null = null; + let subject: SubjectLike | null = null; + let refCount = 0; + let hasCompleted = false; + let hasErrored = false; + + const cancelReset = () => { + resetConnection?.unsubscribe(); + resetConnection = null; + }; + // Used to reset the internal state to a "cold" + // state, as though it had never been subscribed to. + const reset = () => { cancelReset(); - } - - // Create the subject if we don't have one yet. Grab a local reference to - // it as well, which avoids non-null assertations when using it and, if we - // connect to it now, then error/complete need a reference after it was - // reset. - const dest = (subject = subject ?? connector()); - - // Add the teardown directly to the subscriber - instead of returning it - - // so that the handling of the subscriber's unsubscription will be wired - // up _before_ the subscription to the source occurs. This is done so that - // the assignment to the source connection's `closed` property will be seen - // by synchronous firehose sources. - subscriber.add(() => { - refCount--; - - // If we're resetting on refCount === 0, and it's 0, we only want to do - // that on "unsubscribe", really. Resetting on error or completion is a different - // configuration. - if (refCount === 0 && !hasErrored && !hasCompleted) { - resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); + connection = subject = null; + hasCompleted = hasErrored = false; + }; + const resetAndUnsubscribe = () => { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + }; + + return operate((source, subscriber) => { + refCount++; + if (!hasErrored && !hasCompleted) { + cancelReset(); } - }); - - // The following line adds the subscription to the subscriber passed. - // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. - dest.subscribe(subscriber); - - if (!connection) { - // We need to create a subscriber here - rather than pass an observer and - // assign the returned subscription to connection - because it's possible - // for reentrant subscriptions to the shared observable to occur and in - // those situations we want connection to be already-assigned so that we - // don't create another connection to the source. - connection = new SafeSubscriber({ - next: (value) => dest.next(value), - error: (err) => { - hasErrored = true; - cancelReset(); - resetConnection = handleReset(reset, resetOnError, err); - dest.error(err); - }, - complete: () => { - hasCompleted = true; - cancelReset(); - resetConnection = handleReset(reset, resetOnComplete); - dest.complete(); - }, + + // Create the subject if we don't have one yet. Grab a local reference to + // it as well, which avoids non-null assertations when using it and, if we + // connect to it now, then error/complete need a reference after it was + // reset. + const dest = (subject = subject ?? connector()); + + // Add the teardown directly to the subscriber - instead of returning it - + // so that the handling of the subscriber's unsubscription will be wired + // up _before_ the subscription to the source occurs. This is done so that + // the assignment to the source connection's `closed` property will be seen + // by synchronous firehose sources. + subscriber.add(() => { + refCount--; + + // If we're resetting on refCount === 0, and it's 0, we only want to do + // that on "unsubscribe", really. Resetting on error or completion is a different + // configuration. + if (refCount === 0 && !hasErrored && !hasCompleted) { + resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); + } }); - from(source).subscribe(connection); - } - }); + + // The following line adds the subscription to the subscriber passed. + // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. + dest.subscribe(subscriber); + + if (!connection) { + // We need to create a subscriber here - rather than pass an observer and + // assign the returned subscription to connection - because it's possible + // for reentrant subscriptions to the shared observable to occur and in + // those situations we want connection to be already-assigned so that we + // don't create another connection to the source. + connection = new SafeSubscriber({ + next: (value) => dest.next(value), + error: (err) => { + hasErrored = true; + cancelReset(); + resetConnection = handleReset(reset, resetOnError, err); + dest.error(err); + }, + complete: () => { + hasCompleted = true; + cancelReset(); + resetConnection = handleReset(reset, resetOnComplete); + dest.complete(); + }, + }); + from(source).subscribe(connection); + } + })(observable); + }; } function handleReset( From 854abf61704fccd23f5deedc2811963a343e5982 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Wed, 19 May 2021 18:45:36 +1000 Subject: [PATCH 08/12] chore: add a comment --- src/internal/operators/share.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index e4966a7087..d4cadfa91c 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -134,7 +134,15 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * @return A function that returns an Observable that mirrors the source. */ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { - return (observable) => { + // It's necessary to use a wrapper here, as the _operator_ must be + // referentially transparent. Otherwise, it cannot be used in calls to the + // static `pipe` function - to create a reusable pipeline. + // + // The _operator function_ - the function returned by the _operator_ - will + // not be referentially transparent - as it shares its source - but the + // _operator function_ is called when the complete pipeline is composed - not + // when the static `pipe` function is called. + return (wrapperSource) => { const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; let connection: SafeSubscriber | null = null; @@ -218,7 +226,7 @@ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction }); from(source).subscribe(connection); } - })(observable); + })(wrapperSource); }; } From 17dc704a5eac6dd7a57ba1128a331ea3dd9f05c0 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Wed, 19 May 2021 19:07:01 +1000 Subject: [PATCH 09/12] test: change descriptions and add comments --- spec/operators/publish-spec.ts | 5 ++++- spec/operators/publishBehavior-spec.ts | 5 ++++- spec/operators/publishLast-spec.ts | 5 ++++- spec/operators/publishReplay-spec.ts | 5 ++++- spec/operators/share-spec.ts | 5 ++++- spec/operators/shareReplay-spec.ts | 5 ++++- 6 files changed, 24 insertions(+), 6 deletions(-) diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index b405cb34ac..9cbf0a18ea 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -338,7 +338,7 @@ describe('publish operator', () => { done(); }); - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = '^ !'; const expected1 = '-1-2-3-4-5-|'; @@ -346,10 +346,13 @@ describe('publish operator', () => { const source2Subs = '^ !'; const expected2 = '-6-7-8-9-0-|'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe( publish() ); + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; diff --git a/spec/operators/publishBehavior-spec.ts b/spec/operators/publishBehavior-spec.ts index 398337af4e..33cc9583ce 100644 --- a/spec/operators/publishBehavior-spec.ts +++ b/spec/operators/publishBehavior-spec.ts @@ -345,7 +345,7 @@ describe('publishBehavior operator', () => { done(); }); - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = '^ !'; const expected1 = 'x1-2-3-4-5-|'; @@ -353,10 +353,13 @@ describe('publishBehavior operator', () => { const source2Subs = '^ !'; const expected2 = 'x6-7-8-9-0-|'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe( publishBehavior('x') ); + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; diff --git a/spec/operators/publishLast-spec.ts b/spec/operators/publishLast-spec.ts index 03db9dd3de..48031fdfbe 100644 --- a/spec/operators/publishLast-spec.ts +++ b/spec/operators/publishLast-spec.ts @@ -262,7 +262,7 @@ describe('publishLast operator', () => { done(); }); - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = '^ !'; const expected1 = '-----------(5|)'; @@ -270,10 +270,13 @@ describe('publishLast operator', () => { const source2Subs = '^ !'; const expected2 = '-----------(0|)'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe( publishLast() ); + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index 594110e55d..d3c978ac7c 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -488,7 +488,7 @@ describe('publishReplay operator', () => { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = '^ !'; const expected1 = '-1-2-3-4-5-|'; @@ -496,10 +496,13 @@ describe('publishReplay operator', () => { const source2Subs = '^ !'; const expected2 = '-6-7-8-9-0-|'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe( publishReplay(1) ); + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 73cab855fb..9b0560d3b9 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -620,7 +620,7 @@ describe('share', () => { }); }); - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = ' ^----------!'; @@ -629,8 +629,11 @@ describe('share', () => { const source2Subs = ' ^----------!'; const expected2 = ' -6-7-8-9-0-|'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe(share({ resetOnRefCountZero })); + // The non-referentially-transparent sharing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const shared1 = source1.pipe(sharedPipeLine); const shared2 = source2.pipe(sharedPipeLine); diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index ebd10fa09d..904c6a0876 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -388,7 +388,7 @@ describe('shareReplay', () => { console.warn(`No support for FinalizationRegistry in Node ${process.version}`); } - it('should subscribe to its own source when using a shared pipeline', () => { + it('should be referentially-transparent', () => { testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const source1 = cold('-1-2-3-4-5-|'); const source1Subs = ' ^----------!'; @@ -397,8 +397,11 @@ describe('shareReplay', () => { const source2Subs = ' ^----------!'; const expected2 = ' -6-7-8-9-0-|'; + // Calls to the _operator_ must be referentially-transparent. const sharedPipeLine = pipe(shareReplay({ refCount: false })); + // The non-referentially-transparent sharing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. const shared1 = source1.pipe(sharedPipeLine); const shared2 = source2.pipe(sharedPipeLine); From 56919fa0246682e1abcae560027be99ad2a2014d Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 21 May 2021 08:29:33 +1000 Subject: [PATCH 10/12] refactor: destructure options outside of op func --- src/internal/operators/share.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index d4cadfa91c..013dea8413 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -134,6 +134,7 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; * @return A function that returns an Observable that mirrors the source. */ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { + const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; // It's necessary to use a wrapper here, as the _operator_ must be // referentially transparent. Otherwise, it cannot be used in calls to the // static `pipe` function - to create a reusable pipeline. @@ -143,8 +144,6 @@ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction // _operator function_ is called when the complete pipeline is composed - not // when the static `pipe` function is called. return (wrapperSource) => { - const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; - let connection: SafeSubscriber | null = null; let resetConnection: Subscription | null = null; let subject: SubjectLike | null = null; From a69cff9ad64bd68fccca743b01620681c2b3ab1c Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 21 May 2021 08:54:29 +1000 Subject: [PATCH 11/12] chore: use consistent terminology in comments --- src/internal/operators/share.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 013dea8413..d91dc76b60 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -137,12 +137,13 @@ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; // It's necessary to use a wrapper here, as the _operator_ must be // referentially transparent. Otherwise, it cannot be used in calls to the - // static `pipe` function - to create a reusable pipeline. + // static `pipe` function - to create a partial pipeline. // // The _operator function_ - the function returned by the _operator_ - will // not be referentially transparent - as it shares its source - but the - // _operator function_ is called when the complete pipeline is composed - not - // when the static `pipe` function is called. + // _operator function_ is called when the complete pipeline is composed via a + // call to a source observable's `pipe` method - not when the static `pipe` + // function is called. return (wrapperSource) => { let connection: SafeSubscriber | null = null; let resetConnection: Subscription | null = null; From da2037ea90cca08a728b34b5915ba1c0ca685555 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Fri, 21 May 2021 09:19:47 +1000 Subject: [PATCH 12/12] test: use consistent terminology --- spec/operators/publish-spec.ts | 6 +++--- spec/operators/publishBehavior-spec.ts | 6 +++--- spec/operators/publishLast-spec.ts | 6 +++--- spec/operators/publishReplay-spec.ts | 6 +++--- spec/operators/share-spec.ts | 6 +++--- spec/operators/shareReplay-spec.ts | 6 +++--- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index 9cbf0a18ea..31efba419f 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -347,14 +347,14 @@ describe('publish operator', () => { const expected2 = '-6-7-8-9-0-|'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe( + const partialPipeLine = pipe( publish() ); // The non-referentially-transparent publishing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; - const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; expectObservable(published1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/publishBehavior-spec.ts b/spec/operators/publishBehavior-spec.ts index 33cc9583ce..a6f6d066d5 100644 --- a/spec/operators/publishBehavior-spec.ts +++ b/spec/operators/publishBehavior-spec.ts @@ -354,14 +354,14 @@ describe('publishBehavior operator', () => { const expected2 = 'x6-7-8-9-0-|'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe( + const partialPipeLine = pipe( publishBehavior('x') ); // The non-referentially-transparent publishing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; - const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; expectObservable(published1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/publishLast-spec.ts b/spec/operators/publishLast-spec.ts index 48031fdfbe..88cc47b017 100644 --- a/spec/operators/publishLast-spec.ts +++ b/spec/operators/publishLast-spec.ts @@ -271,14 +271,14 @@ describe('publishLast operator', () => { const expected2 = '-----------(0|)'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe( + const partialPipeLine = pipe( publishLast() ); // The non-referentially-transparent publishing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; - const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; expectObservable(published1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index d3c978ac7c..6c396c7c66 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -497,14 +497,14 @@ describe('publishReplay operator', () => { const expected2 = '-6-7-8-9-0-|'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe( + const partialPipeLine = pipe( publishReplay(1) ); // The non-referentially-transparent publishing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable; - const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable; + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; expectObservable(published1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 9b0560d3b9..7ed7b17dba 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -630,12 +630,12 @@ describe('share', () => { const expected2 = ' -6-7-8-9-0-|'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe(share({ resetOnRefCountZero })); + const partialPipeLine = pipe(share({ resetOnRefCountZero })); // The non-referentially-transparent sharing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const shared1 = source1.pipe(sharedPipeLine); - const shared2 = source2.pipe(sharedPipeLine); + const shared1 = source1.pipe(partialPipeLine); + const shared2 = source2.pipe(partialPipeLine); expectObservable(shared1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs); diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 904c6a0876..0b336dac74 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -398,12 +398,12 @@ describe('shareReplay', () => { const expected2 = ' -6-7-8-9-0-|'; // Calls to the _operator_ must be referentially-transparent. - const sharedPipeLine = pipe(shareReplay({ refCount: false })); + const partialPipeLine = pipe(shareReplay({ refCount: false })); // The non-referentially-transparent sharing occurs within the _operator function_ // returned by the _operator_ and that happens when the complete pipeline is composed. - const shared1 = source1.pipe(sharedPipeLine); - const shared2 = source2.pipe(sharedPipeLine); + const shared1 = source1.pipe(partialPipeLine); + const shared2 = source2.pipe(partialPipeLine); expectObservable(shared1).toBe(expected1); expectSubscriptions(source1.subscriptions).toBe(source1Subs);