diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index ad2d9445aa..31efba419f 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publish} */ describe('publish operator', () => { @@ -337,4 +337,31 @@ describe('publish operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should be referentially-transparent', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = '-1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = '-6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe( + publish() + ); + + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishBehavior-spec.ts b/spec/operators/publishBehavior-spec.ts index 06c56abafa..a6f6d066d5 100644 --- a/spec/operators/publishBehavior-spec.ts +++ b/spec/operators/publishBehavior-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publishBehavior, mergeMapTo, tap, mergeMap, refCount, retry, repeat } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publishBehavior} */ describe('publishBehavior operator', () => { @@ -344,4 +344,31 @@ describe('publishBehavior operator', () => { expect(results).to.deep.equal([]); done(); }); + + it('should be referentially-transparent', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = 'x1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = 'x6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe( + publishBehavior('x') + ); + + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishLast-spec.ts b/spec/operators/publishLast-spec.ts index 2287a4b15f..88cc47b017 100644 --- a/spec/operators/publishLast-spec.ts +++ b/spec/operators/publishLast-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { publishLast, mergeMapTo, tap, mergeMap, refCount, retry } from 'rxjs/operators'; -import { ConnectableObservable, of, Subscription, Observable } from 'rxjs'; +import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs'; /** @test {publishLast} */ describe('publishLast operator', () => { @@ -261,4 +261,31 @@ describe('publishLast operator', () => { expect(subscriptions).to.equal(1); done(); }); + + it('should be referentially-transparent', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = '-----------(5|)'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = '-----------(0|)'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe( + publishLast() + ); + + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index d01fc00bda..6c396c7c66 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs'; +import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs'; import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators'; /** @test {publishReplay} */ @@ -487,4 +487,31 @@ describe('publishReplay operator', () => { expectObservable(published).toBe(expected, undefined, error); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); + + it('should be referentially-transparent', () => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = '^ !'; + const expected1 = '-1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = '^ !'; + const expected2 = '-6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe( + publishReplay(1) + ); + + // The non-referentially-transparent publishing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const published1 = source1.pipe(partialPipeLine) as ConnectableObservable; + const published2 = source2.pipe(partialPipeLine) as ConnectableObservable; + + expectObservable(published1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(published2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + + published1.connect(); + published2.connect(); + }); }); diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index 2ea5132a4b..7ed7b17dba 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -1,6 +1,6 @@ /** @prettier */ import { expect } from 'chai'; -import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs'; +import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError, pipe } from 'rxjs'; import { map, mergeMap, @@ -619,6 +619,30 @@ describe('share', () => { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); }); + + it('should be referentially-transparent', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = ' ^----------!'; + const expected1 = ' -1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = ' ^----------!'; + const expected2 = ' -6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe(share({ resetOnRefCountZero })); + + // The non-referentially-transparent sharing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const shared1 = source1.pipe(partialPipeLine); + const shared2 = source2.pipe(partialPipeLine); + + expectObservable(shared1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(shared2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + }); + }); }); } diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index e202aee89d..0b336dac74 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -3,7 +3,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; -import { Observable, Operator, Observer, of, from, defer } from 'rxjs'; +import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {shareReplay} */ @@ -387,4 +387,28 @@ describe('shareReplay', () => { } else { console.warn(`No support for FinalizationRegistry in Node ${process.version}`); } + + it('should be referentially-transparent', () => { + testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { + const source1 = cold('-1-2-3-4-5-|'); + const source1Subs = ' ^----------!'; + const expected1 = ' -1-2-3-4-5-|'; + const source2 = cold('-6-7-8-9-0-|'); + const source2Subs = ' ^----------!'; + const expected2 = ' -6-7-8-9-0-|'; + + // Calls to the _operator_ must be referentially-transparent. + const partialPipeLine = pipe(shareReplay({ refCount: false })); + + // The non-referentially-transparent sharing occurs within the _operator function_ + // returned by the _operator_ and that happens when the complete pipeline is composed. + const shared1 = source1.pipe(partialPipeLine); + const shared2 = source2.pipe(partialPipeLine); + + expectObservable(shared1).toBe(expected1); + expectSubscriptions(source1.subscriptions).toBe(source1Subs); + expectObservable(shared2).toBe(expected2); + expectSubscriptions(source2.subscriptions).toBe(source2Subs); + }); + }); }); diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index e1a1930e15..7de67d27aa 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -85,5 +85,5 @@ export function publish>(selector: (shared: Ob * Details: https://rxjs.dev/deprecations/multicasting */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { - return selector ? connect(selector) : multicast(new Subject()); + return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject())(source); } diff --git a/src/internal/operators/publishBehavior.ts b/src/internal/operators/publishBehavior.ts index 87ff0ec8dc..d94589c09c 100644 --- a/src/internal/operators/publishBehavior.ts +++ b/src/internal/operators/publishBehavior.ts @@ -18,7 +18,9 @@ import { UnaryFunction } from '../types'; * Details: https://rxjs.dev/deprecations/multicasting */ export function publishBehavior(initialValue: T): UnaryFunction, ConnectableObservable> { - const subject = new BehaviorSubject(initialValue); // Note that this has *never* supported the selector function. - return (source) => new ConnectableObservable(source, () => subject); + return (source) => { + const subject = new BehaviorSubject(initialValue); + return new ConnectableObservable(source, () => subject); + }; } diff --git a/src/internal/operators/publishLast.ts b/src/internal/operators/publishLast.ts index 13a6e57624..7cde7f50d5 100644 --- a/src/internal/operators/publishLast.ts +++ b/src/internal/operators/publishLast.ts @@ -67,7 +67,9 @@ import { UnaryFunction } from '../types'; * Details: https://rxjs.dev/deprecations/multicasting */ export function publishLast(): UnaryFunction, ConnectableObservable> { - const subject = new AsyncSubject(); // Note that this has *never* supported a selector function like `publish` and `publishReplay`. - return (source) => new ConnectableObservable(source, () => subject); + return (source) => { + const subject = new AsyncSubject(); + return new ConnectableObservable(source, () => subject); + }; } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index f772fc35b8..47494e2a66 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -89,11 +89,8 @@ export function publishReplay( if (selectorOrScheduler && !isFunction(selectorOrScheduler)) { timestampProvider = selectorOrScheduler; } - const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined; - const subject = new ReplaySubject(bufferSize, windowTime, timestampProvider); - // Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument // but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross). - return (source: Observable) => multicast(subject, selector!)(source); + return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, timestampProvider), selector!)(source); } diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 90f929d9aa..d91dc76b60 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -135,89 +135,99 @@ export function share(options: ShareConfig): MonoTypeOperatorFunction; */ export function share(options: ShareConfig = {}): MonoTypeOperatorFunction { const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options; - - let connection: SafeSubscriber | null = null; - let resetConnection: Subscription | null = null; - let subject: SubjectLike | null = null; - let refCount = 0; - let hasCompleted = false; - let hasErrored = false; - - const cancelReset = () => { - resetConnection?.unsubscribe(); - resetConnection = null; - }; - // Used to reset the internal state to a "cold" - // state, as though it had never been subscribed to. - const reset = () => { - cancelReset(); - connection = subject = null; - hasCompleted = hasErrored = false; - }; - const resetAndUnsubscribe = () => { - // We need to capture the connection before - // we reset (if we need to reset). - const conn = connection; - reset(); - conn?.unsubscribe(); - }; - - return operate((source, subscriber) => { - refCount++; - if (!hasErrored && !hasCompleted) { + // It's necessary to use a wrapper here, as the _operator_ must be + // referentially transparent. Otherwise, it cannot be used in calls to the + // static `pipe` function - to create a partial pipeline. + // + // The _operator function_ - the function returned by the _operator_ - will + // not be referentially transparent - as it shares its source - but the + // _operator function_ is called when the complete pipeline is composed via a + // call to a source observable's `pipe` method - not when the static `pipe` + // function is called. + return (wrapperSource) => { + let connection: SafeSubscriber | null = null; + let resetConnection: Subscription | null = null; + let subject: SubjectLike | null = null; + let refCount = 0; + let hasCompleted = false; + let hasErrored = false; + + const cancelReset = () => { + resetConnection?.unsubscribe(); + resetConnection = null; + }; + // Used to reset the internal state to a "cold" + // state, as though it had never been subscribed to. + const reset = () => { cancelReset(); - } - - // Create the subject if we don't have one yet. Grab a local reference to - // it as well, which avoids non-null assertations when using it and, if we - // connect to it now, then error/complete need a reference after it was - // reset. - const dest = (subject = subject ?? connector()); - - // Add the teardown directly to the subscriber - instead of returning it - - // so that the handling of the subscriber's unsubscription will be wired - // up _before_ the subscription to the source occurs. This is done so that - // the assignment to the source connection's `closed` property will be seen - // by synchronous firehose sources. - subscriber.add(() => { - refCount--; - - // If we're resetting on refCount === 0, and it's 0, we only want to do - // that on "unsubscribe", really. Resetting on error or completion is a different - // configuration. - if (refCount === 0 && !hasErrored && !hasCompleted) { - resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); + connection = subject = null; + hasCompleted = hasErrored = false; + }; + const resetAndUnsubscribe = () => { + // We need to capture the connection before + // we reset (if we need to reset). + const conn = connection; + reset(); + conn?.unsubscribe(); + }; + + return operate((source, subscriber) => { + refCount++; + if (!hasErrored && !hasCompleted) { + cancelReset(); } - }); - - // The following line adds the subscription to the subscriber passed. - // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. - dest.subscribe(subscriber); - if (!connection) { - // We need to create a subscriber here - rather than pass an observer and - // assign the returned subscription to connection - because it's possible - // for reentrant subscriptions to the shared observable to occur and in - // those situations we want connection to be already-assigned so that we - // don't create another connection to the source. - connection = new SafeSubscriber({ - next: (value) => dest.next(value), - error: (err) => { - hasErrored = true; - cancelReset(); - resetConnection = handleReset(reset, resetOnError, err); - dest.error(err); - }, - complete: () => { - hasCompleted = true; - cancelReset(); - resetConnection = handleReset(reset, resetOnComplete); - dest.complete(); - }, + // Create the subject if we don't have one yet. Grab a local reference to + // it as well, which avoids non-null assertations when using it and, if we + // connect to it now, then error/complete need a reference after it was + // reset. + const dest = (subject = subject ?? connector()); + + // Add the teardown directly to the subscriber - instead of returning it - + // so that the handling of the subscriber's unsubscription will be wired + // up _before_ the subscription to the source occurs. This is done so that + // the assignment to the source connection's `closed` property will be seen + // by synchronous firehose sources. + subscriber.add(() => { + refCount--; + + // If we're resetting on refCount === 0, and it's 0, we only want to do + // that on "unsubscribe", really. Resetting on error or completion is a different + // configuration. + if (refCount === 0 && !hasErrored && !hasCompleted) { + resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero); + } }); - from(source).subscribe(connection); - } - }); + + // The following line adds the subscription to the subscriber passed. + // Basically, `subscriber === dest.subscribe(subscriber)` is `true`. + dest.subscribe(subscriber); + + if (!connection) { + // We need to create a subscriber here - rather than pass an observer and + // assign the returned subscription to connection - because it's possible + // for reentrant subscriptions to the shared observable to occur and in + // those situations we want connection to be already-assigned so that we + // don't create another connection to the source. + connection = new SafeSubscriber({ + next: (value) => dest.next(value), + error: (err) => { + hasErrored = true; + cancelReset(); + resetConnection = handleReset(reset, resetOnError, err); + dest.error(err); + }, + complete: () => { + hasCompleted = true; + cancelReset(); + resetConnection = handleReset(reset, resetOnComplete); + dest.complete(); + }, + }); + from(source).subscribe(connection); + } + })(wrapperSource); + }; } function handleReset(