Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber refactor for version 8 #6817

Merged
merged 8 commits into from
Feb 21, 2022
5 changes: 2 additions & 3 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,17 +652,16 @@ export interface Subscribable<T> {
export declare function subscribeOn<T>(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction<T>;

export declare class Subscriber<T> extends Subscription implements Observer<T> {
protected destination: Subscriber<any> | Observer<any>;
protected destination: Subscriber<T> | Observer<T>;
protected isStopped: boolean;
constructor(destination?: Subscriber<any> | Observer<any>);
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null);
protected _complete(): void;
protected _error(err: any): void;
protected _next(value: T): void;
complete(): void;
error(err?: any): void;
next(value?: T): void;
unsubscribe(): void;
static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T>;
}

export declare class Subscription implements SubscriptionLike {
Expand Down
2 changes: 1 addition & 1 deletion integration/side-effects/snapshots/esm/ajax.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
const _bind = Function.prototype.bind;

2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm/fetch.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

const _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

const _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm/operators.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

const _bind = Function.prototype.bind;
2 changes: 1 addition & 1 deletion integration/side-effects/snapshots/esm/testing.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
const _bind = Function.prototype.bind;

2 changes: 1 addition & 1 deletion integration/side-effects/snapshots/esm/websocket.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
const _bind = Function.prototype.bind;

2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/ajax.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/fetch.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/operators.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/testing.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
2 changes: 0 additions & 2 deletions integration/side-effects/snapshots/esm5/websocket.js
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
import "tslib";

var _bind = Function.prototype.bind;
25 changes: 12 additions & 13 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { expect } from 'chai';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';
import { Subscriber, Observable, config, of, Observer } from 'rxjs';
import { Subscriber, Observable, of, Observer } from 'rxjs';
import { asInteropSubscriber } from './helpers/interop-helper';
import { getRegisteredTeardowns } from './helpers/subscription';

/** @test {Subscriber} */
describe('SafeSubscriber', () => {
describe('Subscriber', () => {
it('should ignore next messages after unsubscription', () => {
let times = 0;

const sub = new SafeSubscriber({
const sub = new Subscriber({
next() { times += 1; }
});

Expand All @@ -25,7 +24,7 @@ describe('SafeSubscriber', () => {
let times = 0;
let errorCalled = false;

const sub = new SafeSubscriber({
const sub = new Subscriber({
next() { times += 1; },
error() { errorCalled = true; }
});
Expand All @@ -44,7 +43,7 @@ describe('SafeSubscriber', () => {
let times = 0;
let completeCalled = false;

const sub = new SafeSubscriber({
const sub = new Subscriber({
next() { times += 1; },
complete() { completeCalled = true; }
});
Expand All @@ -64,8 +63,8 @@ describe('SafeSubscriber', () => {
next: function () { /*noop*/ }
};

const sub1 = new SafeSubscriber(observer);
const sub2 = new SafeSubscriber(observer);
const sub1 = new Subscriber(observer);
const sub2 = new Subscriber(observer);

sub2.complete();

Expand All @@ -82,7 +81,7 @@ describe('SafeSubscriber', () => {
}
};

const sub1 = new SafeSubscriber(observer);
const sub1 = new Subscriber(observer);
sub1.complete();

expect(argument).to.have.lengthOf(0);
Expand All @@ -93,7 +92,7 @@ describe('SafeSubscriber', () => {
let subscriberUnsubscribed = false;
let subscriptionUnsubscribed = false;

const subscriber = new SafeSubscriber<void>();
const subscriber = new Subscriber();
subscriber.add(() => subscriberUnsubscribed = true);

const source = new Observable<void>(() => () => observableUnsubscribed = true);
Expand All @@ -108,7 +107,7 @@ describe('SafeSubscriber', () => {

it('should have idempotent unsubscription', () => {
let count = 0;
const subscriber = new SafeSubscriber();
const subscriber = new Subscriber();
subscriber.add(() => ++count);
expect(count).to.equal(0);

Expand All @@ -121,7 +120,7 @@ describe('SafeSubscriber', () => {

it('should close, unsubscribe, and unregister all teardowns after complete', () => {
let isUnsubscribed = false;
const subscriber = new SafeSubscriber();
const subscriber = new Subscriber();
subscriber.add(() => isUnsubscribed = true);
subscriber.complete();
expect(isUnsubscribed).to.be.true;
Expand All @@ -131,7 +130,7 @@ describe('SafeSubscriber', () => {

it('should close, unsubscribe, and unregister all teardowns after error', () => {
let isTornDown = false;
const subscriber = new SafeSubscriber({
const subscriber = new Subscriber({
error: () => {
// Mischief managed!
// Adding this handler here to prevent the call to error from
Expand Down
8 changes: 4 additions & 4 deletions spec/observables/generate-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { TestScheduler } from 'rxjs/testing';
import { expect } from 'chai';
import { expectObservable } from '../helpers/marble-testing';
import { generate, Subscriber } from 'rxjs';
import { generate } from 'rxjs';
import { take } from 'rxjs/operators';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';
import { Subscriber } from 'rxjs/internal/Subscriber';

declare const rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -56,8 +56,8 @@ describe('generate', () => {
it('should stop producing when unsubscribed', () => {
const source = generate(1, x => x < 4, x => x + 1);
let count = 0;
const subscriber = new SafeSubscriber<number>(
x => {
const subscriber = new Subscriber(
(x: number) => {
count++;
if (x == 2) {
subscriber.unsubscribe();
Expand Down
5 changes: 2 additions & 3 deletions spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { repeatWhen, map, mergeMap, takeUntil, takeWhile, take } from 'rxjs/operators';
import { of, EMPTY, Observable, Subscriber } from 'rxjs';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';

/** @test {repeatWhen} */
describe('repeatWhen operator', () => {
Expand Down Expand Up @@ -92,7 +91,7 @@ describe('repeatWhen operator', () => {
Observable.prototype.subscribe = function (...args: any[]): any {
let [subscriber] = args;
if (!(subscriber instanceof Subscriber)) {
subscriber = new SafeSubscriber(...args);
subscriber = new Subscriber(...args);
}
subscriber.error = function (err: any): void {
errors.push(err);
Expand All @@ -119,7 +118,7 @@ describe('repeatWhen operator', () => {
Observable.prototype.subscribe = function (...args: any[]): any {
let [subscriber] = args;
if (!(subscriber instanceof Subscriber)) {
subscriber = new SafeSubscriber(...args);
subscriber = new Subscriber(...args);
}
subscriber.error = function (err: any): void {
errors.push(err);
Expand Down
19 changes: 5 additions & 14 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Operator } from './Operator';
import { SafeSubscriber, Subscriber } from './Subscriber';
import { isSubscription, Subscription } from './Subscription';
import { Subscriber, isSubscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { isFunction } from './util/isFunction';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -191,7 +190,7 @@ export class Observable<T> implements Subscribable<T> {
* @method subscribe
*/
subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext);
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new Subscriber(observerOrNext);

const { operator, source } = this;
subscriber.add(
Expand Down Expand Up @@ -270,8 +269,8 @@ export class Observable<T> implements Subscribable<T> {
*/
forEach(next: (value: T) => void): Promise<void> {
return new Promise<void>((resolve, reject) => {
const subscriber = new SafeSubscriber<T>({
next: (value) => {
const subscriber = new Subscriber({
next: (value: T) => {
try {
next(value);
} catch (err) {
Expand Down Expand Up @@ -394,11 +393,3 @@ export class Observable<T> implements Subscribable<T> {
return pipeFromArray(operations)(this);
}
}

function isObserver<T>(value: any): value is Observer<T> {
return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete);
}

function isSubscriber<T>(value: any): value is Subscriber<T> {
return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}
95 changes: 28 additions & 67 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,31 @@ import { timeoutProvider } from './scheduler/timeoutProvider';
* @class Subscriber<T>
*/
export class Subscriber<T> extends Subscription implements Observer<T> {
/**
* A static factory for a Subscriber, given a (potentially partial) definition
* of an Observer.
* @param next The `next` callback of an Observer.
* @param error The `error` callback of an
* Observer.
* @param complete The `complete` callback of an
* Observer.
* @return A Subscriber wrapping the (partially defined)
* Observer represented by the given arguments.
* @nocollapse
* @deprecated Do not use. Will be removed in v8. There is no replacement for this
* method, and there is no reason to be creating instances of `Subscriber` directly.
* If you have a specific use case, please file an issue.
*/
static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T> {
return new SafeSubscriber({ next, error, complete });
}

/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
protected isStopped: boolean = false;
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
protected destination: Subscriber<any> | Observer<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
protected destination: Subscriber<T> | Observer<T>;

/**
* @deprecated Internal implementation detail, do not use directly. Will be made internal in v8.
benlesh marked this conversation as resolved.
Show resolved Hide resolved
* There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons.
* Creates an instance of an RxJS Subscriber. This is the workhorse of the library.
*
* If another instance of Subscriber is passed in, it will automatically wire up unsubscription
* between this instnace and the passed in instance.
*
* If a partial or full observer is passed in, it will be wrapped and appropriate safeguards will be applied.
*
* If a next-handler function is passed in, it will be wrapped and appropriate safeguards will be applied.
*
* @param destination A subscriber, partial observer, or function that receives the next value.
*/
constructor(destination?: Subscriber<any> | Observer<any>) {
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) {
super();
if (destination) {
this.destination = destination;
// Automatically chain subscriptions together here.
// if destination is a Subscription, then it is a Subscriber.
if (isSubscription(destination)) {
destination.add(this);
}
} else {
this.destination = EMPTY_OBSERVER;
this.destination = isSubscriber(destination) ? destination : createSafeObserver(destination);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

important magic here!

If we have a Subscriber, then just use that, otherwise we don't know what it is, and we need to make a safe observer.

If you step through the commits, you find out that this is all SafeSubscriber was doing, which is why we removed it.


// Automatically chain subscriptions together here.
// if destination is a Subscription, then it is a Subscriber.
if (isSubscription(destination)) {
destination.add(this);
}
}

Expand Down Expand Up @@ -135,21 +121,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
}

/**
* This bind is captured here because we want to be able to have
* compatibility with monoid libraries that tend to use a method named
* `bind`. In particular, a library called Monio requires this.
*/
const _bind = Function.prototype.bind;

function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
return _bind.call(fn, thisArg);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bind stuff can be removed! Yay.


/**
* Internal optimization only, DO NOT EXPOSE.
* @internal
*/
class ConsumerObserver<T> implements Observer<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from #6815

constructor(private partialObserver: Partial<Observer<T>>) {}

Expand Down Expand Up @@ -189,26 +160,8 @@ class ConsumerObserver<T> implements Observer<T> {
}
}

export class SafeSubscriber<T> extends Subscriber<T> {
constructor(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null) {
super();

let partialObserver: Partial<Observer<T>>;
if (isFunction(observerOrNext) || !observerOrNext) {
// The first argument is a function, not an observer. The next
// two arguments *could* be observers, or they could be empty.
partialObserver = {
next: observerOrNext ?? undefined,
};
} else {
// The "normal" path. Just use the partial observer directly.
partialObserver = observerOrNext;
}

// Wrap the partial observer to ensure it's a full observer, and
// make sure proper error handling is accounted for.
this.destination = new ConsumerObserver(partialObserver);
}
function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null): Observer<T> {
return new ConsumerObserver(!observerOrNext || isFunction(observerOrNext) ? { next: observerOrNext ?? undefined } : observerOrNext);
}

/**
Expand Down Expand Up @@ -242,3 +195,11 @@ export const EMPTY_OBSERVER: Readonly<Observer<any>> & { closed: true } = {
error: defaultErrorHandler,
complete: noop,
};

function isObserver<T>(value: any): value is Observer<T> {
return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete);
}

export function isSubscriber<T>(value: any): value is Subscriber<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were just moved.

return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}
6 changes: 3 additions & 3 deletions src/internal/firstValueFrom.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Observable } from './Observable';
import { EmptyError } from './util/EmptyError';
import { SafeSubscriber } from './Subscriber';
import { Subscriber } from './Subscriber';

export interface FirstValueFromConfig<T> {
defaultValue: T;
Expand Down Expand Up @@ -56,8 +56,8 @@ export function firstValueFrom<T>(source: Observable<T>): Promise<T>;
export function firstValueFrom<T, D>(source: Observable<T>, config?: FirstValueFromConfig<D>): Promise<T | D> {
const hasConfig = typeof config === 'object';
return new Promise<T | D>((resolve, reject) => {
const subscriber = new SafeSubscriber<T>({
next: (value) => {
const subscriber = new Subscriber({
next: (value: T) => {
resolve(value);
subscriber.unsubscribe();
},
Expand Down