In version 7, the multicasting APIs were simplified to just a few functions:
And shareReplay - which is a thin wrapper around the now highly-configurable share operator.
Other APIs that relate to multicasting are now deprecated.
Instead of creating a ConnectableObservable instance, call the connectable function to obtain a connectable observable.
import { ConnectableObservable, timer, Subject } from 'rxjs';
// deprecated
const tick$ = new ConnectableObservable(
timer(1_000),
() => new Subject());
tick$.connect();
import { connectable, timer, Subject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new Subject()
});
tick$.connect();
In situations in which the refCount
method is used, the share operator can be used instead.
import { ConnectableObservable, timer, Subject } from 'rxjs';
// deprecated
const tick$ = new ConnectableObservable(
timer(1_000),
() => new Subject()
).refCount();
import { timer, share, Subject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({ connector: () => new Subject() })
);
Where multicast is called with a subject factory, can be replaced with connectable.
import { timer, multicast, Subject, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
multicast(() => new Subject())
) as ConnectableObservable<number>;
import { connectable, timer, Subject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new Subject()
});
Where multicast is called with a subject instance, it can be replaced with connectable and a local subject instance.
import { timer, multicast, Subject, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
multicast(new Subject())
) as ConnectableObservable<number>;
import { connectable, timer, Subject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new Subject(),
resetOnDisconnect: false
});
Where multicast is used in conjunction with refCount, it can be replaced with share.
import { timer, multicast, Subject, refCount } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
multicast(() => new Subject()),
refCount()
);
import { timer, share, Subject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({ connector: () => new Subject() })
);
Where multicast is used with a selector, it can be replaced with connect.
import { timer, multicast, Subject, combineLatest } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
multicast(
() => new Subject(),
(source) => combineLatest([source, source])
)
);
import { timer, connect, combineLatest, Subject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
connect((source) => combineLatest([source, source]), {
connector: () => new Subject()
})
);
If you're using publish to create a ConnectableObservable, you can use connectable instead.
import { timer, publish, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publish()
) as ConnectableObservable<number>;
import { connectable, timer, Subject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new Subject<number>(),
resetOnDisconnect: false
});
And if refCount is being applied to the result of publish, you can use share to replace both.
import { timer, publish, refCount } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publish(),
refCount()
);
import { timer, share } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false
})
);
If publish is being called with a selector, you can use the connect operator instead.
import { timer, publish, combineLatest } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publish((source) => combineLatest([source, source]))
);
import { timer, connect, combineLatest } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
connect((source) => combineLatest([source, source]))
);
If you're using publishBehavior to create a ConnectableObservable, you can use connectable and a BehaviorSubject instead.
import { timer, publishBehavior, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishBehavior(0)
) as ConnectableObservable<number>;
import { connectable, timer, BehaviorSubject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new BehaviorSubject(0),
resetOnDisconnect: false
});
And if refCount is being applied to the result of publishBehavior, you can use the share operator - with a BehaviorSubject connector - to replace both.
import { timer, publishBehavior, refCount } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishBehavior(0),
refCount()
);
import { timer, share, BehaviorSubject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({
connector: () => new BehaviorSubject(0),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false
})
);
If you're using publishLast to create a ConnectableObservable, you can use connectable and an AsyncSubject instead.
import { timer, publishLast, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishLast()
) as ConnectableObservable<number>;
import { connectable, timer, AsyncSubject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new AsyncSubject<number>(),
resetOnDisconnect: false
});
And if refCount is being applied to the result of publishLast, you can use the share operator - with an AsyncSubject connector - to replace both.
import { timer, publishLast, refCount } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishLast(),
refCount()
);
import { timer, share, AsyncSubject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({
connector: () => new AsyncSubject(),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false
})
);
If you're using publishReplay to create a ConnectableObservable, you can use connectable and a ReplaySubject instead.
import { timer, publishReplay, ConnectableObservable } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishReplay(1)
) as ConnectableObservable<number>;
import { connectable, timer, ReplaySubject } from 'rxjs';
// suggested refactor
const tick$ = connectable(timer(1_000), {
connector: () => new ReplaySubject<number>(1),
resetOnDisconnect: false
});
And if refCount is being applied to the result of publishReplay, you can use the share operator - with a ReplaySubject connector - to replace both.
import { timer, publishReplay, refCount } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishReplay(1),
refCount()
);
import { timer, share, ReplaySubject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
share({
connector: () => new ReplaySubject(1),
resetOnError: false,
resetOnComplete: false,
resetOnRefCountZero: false
})
);
If publishReplay is being called with a selector, you can use the connect operator - with a ReplaySubject connector - instead.
import { timer, publishReplay, combineLatest } from 'rxjs';
// deprecated
const tick$ = timer(1_000).pipe(
publishReplay(1, undefined, (source) => combineLatest([source, source]))
);
import { timer, connect, combineLatest, ReplaySubject } from 'rxjs';
// suggested refactor
const tick$ = timer(1_000).pipe(
connect((source) => combineLatest([source, source]), {
connector: () => new ReplaySubject(1)
})
);
Instead of applying the refCount operator to the ConnectableObservable obtained from a multicast or publish operator, use the share operator to replace both.
The properties passed to share will depend upon the operators that are being replaced. The refactors for using refCount with multicast, publish, publishBehavior, publishLast and publishReplay are detailed above.