Skip to content

Commit

Permalink
fix(publish,publishReplay) resolve sharing Subject
Browse files Browse the repository at this point in the history
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
  • Loading branch information
e-davidson committed Jul 10, 2020
1 parent 381aedb commit 16fe755
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
26 changes: 25 additions & 1 deletion spec/operators/publish-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publish} */
describe('publish operator', () => {
Expand Down Expand Up @@ -337,4 +337,28 @@ describe('publish operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publish()
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
26 changes: 25 additions & 1 deletion spec/operators/publishReplay-spec.ts
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';

/** @test {publishReplay} */
Expand Down Expand Up @@ -488,4 +488,28 @@ describe('publishReplay operator', () => {
expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source2 = cold('-6-7-8-9-0-|');
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';
const source1Subs = '^ !';
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publishReplay(1)
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
4 changes: 2 additions & 2 deletions src/internal/operators/publish.ts
Expand Up @@ -59,6 +59,6 @@ export function publish<T>(selector: MonoTypeOperatorFunction<T>): MonoTypeOpera
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ?
multicast(() => new Subject<T>(), selector) :
multicast(new Subject<T>());
(source: Observable<T>) => multicast(() => new Subject<T>(), selector)(source) :
(source: Observable<T>) => multicast(new Subject<T>())(source);
}
3 changes: 1 addition & 2 deletions src/internal/operators/publishReplay.ts
Expand Up @@ -19,7 +19,6 @@ export function publishReplay<T, R>(bufferSize?: number,
}

const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);

return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler), selector!)(source) as ConnectableObservable<R>;
}

0 comments on commit 16fe755

Please sign in to comment.