Skip to content

Commit

Permalink
fix(Observable): Fix Observable.subscribe to add operator TeardownLog…
Browse files Browse the repository at this point in the history
…ic to returned Subscription. (#4434)
  • Loading branch information
PSanetra authored and benlesh committed Jan 30, 2019
1 parent 9b4f358 commit f28955f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
21 changes: 21 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
import { noop } from '../src/internal/util/noop';
import { NEVER } from '../src/internal/observable/never';
import { Subscriber } from '../src/internal/Subscriber';
import { Operator } from '../src/internal/Operator';

declare const asDiagram: any, rxTestScheduler: any;
const Observable = Rx.Observable;
Expand Down Expand Up @@ -697,6 +700,24 @@ describe('Observable.lift', () => {
}
}

it('should return Observable which calls TeardownLogic of operator on unsubscription', (done) => {

const myOperator: Operator<any, any> = {
call: (subscriber: Subscriber<any>, source: any) => {
const subscription = source.subscribe((x: any) => subscriber.next(x));
return () => {
subscription.unsubscribe();
done();
};
}
};

NEVER.lift(myOperator)
.subscribe()
.unsubscribe();

});

it('should be overrideable in a custom Observable type that composes', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ export class Observable<T> implements Subscribable<T> {
const sink = toSubscriber(observerOrNext, error, complete);

if (operator) {
operator.call(sink, this.source);
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
Expand Down
29 changes: 18 additions & 11 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,15 @@ export class Subscription implements SubscriptionLike {
}
}

// Optimize for the common case when adding the first subscription.
const subscriptions = this._subscriptions;
if (subscriptions) {
subscriptions.push(subscription);
} else {
this._subscriptions = [subscription];
if (subscription._addParent(this)) {
// Optimize for the common case when adding the first subscription.
const subscriptions = this._subscriptions;
if (subscriptions) {
subscriptions.push(subscription);
} else {
this._subscriptions = [subscription];
}
}
subscription._addParent(this);

return subscription;
}
Expand All @@ -193,20 +194,26 @@ export class Subscription implements SubscriptionLike {
}

/** @internal */
private _addParent(parent: Subscription) {
private _addParent(parent: Subscription): boolean {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
// If we don't have a parent, or the new parent is the same as the
// current parent, then set this._parent to the new parent.
if (_parent === parent) {
// If the new parent is the same as the current parent, then do nothing.
return false;
} else if (!_parent) {
// If we don't have a parent, then set this._parent to the new parent.
this._parent = parent;
return true;
} else if (!_parents) {
// If there's already one parent, but not multiple, allocate an Array to
// store the rest of the parent Subscriptions.
this._parents = [parent];
return true;
} else if (_parents.indexOf(parent) === -1) {
// Only add the new parent to the _parents list if it's not already there.
_parents.push(parent);
return true;
}
return false;
}
}

Expand Down

0 comments on commit f28955f

Please sign in to comment.