Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscriber): don't unsubscribe self #4106

Merged
merged 20 commits into from Sep 25, 2018
Merged

Conversation

cartant
Copy link
Collaborator

@cartant cartant commented Sep 6, 2018

Description:

This PR fixes a problem that was introduced in #3963 and was partially fixed in #4072.

The problem was introduced with this change - to Subscriber - in #3963 where a parent subscription is unsubscribed upon a complete notification:

complete(): void {
  if (!this.isStopped) {
    this.isStopped = true;
    this._complete();
+   this._unsubscribeParentSubscription();
  }
}

The parent subscription is added to the subscriber in Observable.subscribe:

const sink = toSubscriber(observerOrNext, error, complete);

if (operator) {
  operator.call(sink, this.source);
} else {
  sink._addParentTeardownLogic(
    this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
    this._subscribe(sink) :
    this._trySubscribe(sink)
  );
}

In numerous situations, the subscription passed to _addParentTeardownLogic will be the same subscriber. That is, it will be sink.

Adding sink to itself, as a parent, introduces a bug, as sink will be unsubscribed from within the complete method - which differs from the pre-#3963 mechanism.


What follows below is a description of the PR's original implementation. This implementation was changed substantially as the PR was discussed. For an explanation of the PR's final, merged changes, skip to my comment at the bottom of this conversation.


#4072 introduced a check to ensure that a specific subscriber isn't added to itself as a parent. However, it's still possible for a subscriber that is a child of the specific subscriber to be added as a parent. And doing so will see the specific subscriber's unsubscribe called within its complete method - which can effect unwanted behaviour. See the failing test that's included in this PR.

This PR solves the problem by adding a _keepAliveCount that's incremented before unsubscribing the parent subscription and is decremented afterwards. In Subscription.unsubscribe, if _keepAliveCount is greater than zero, the subscription is not unsubscribed.

Related issue (if exists): #4095

@coveralls
Copy link

coveralls commented Sep 6, 2018

Coverage Status

Coverage decreased (-0.003%) to 96.968% when pulling b817f53 on cartant:issue-4095 into 6feec15 on ReactiveX:master.

@benlesh
Copy link
Member

benlesh commented Sep 13, 2018

Thank you for bringing this issue to my attention. Taking a step back, I think that _addParentTeardownLogic is probably a smell, and we likely just wanted to this.add(trustedSubscriber) on this line

Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment, and refer to our offline discussion.

@cartant
Copy link
Collaborator Author

cartant commented Sep 14, 2018

@benlesh This PR is now a WIP, as I've made some changes to make things simpler, but the changes need to be applied to many of the operator-specific subscribers and I've only changed MergeMapSubscriber, so far.

I've removed all of the parent-teardown business and the _keepAliveCount.

The _addParentTeardownLogic function is still there - as a noop - as its used in isTrustedSubscriber. I'm not sure what you'll want to do with _addParentTeardownLogic if it's no longer needed.

Anyway, the important parts are in MergeMapSubscriber's _innerSub function, where the inner subscription is added to the destination (which should always be a trusted subscriber). And in its _complete function, where this.unsubscribe() is called.

To me this is simpler than dealing with children, parents and destinations. In the original implementation - that preceded the PR that sought to address the not-unsubscribing-from-source problem - a subscription was added to its destination so that unsubscribing the destination unsubscribes the subscription. That behaviour is now restored. And because the subscription's destination is not added to the subscription, calling this.unsubscribe() in _complete does not unsubscribe the destination - or any inner observables that were added to it.

If you think this approach is a better way to go, I can apply similar changes to the other subscribers. (It's not just the flattening subscribers. E.g. the subscribers for delay and delayWhen will need to be changed, too.)

We would need to decide the best way of adding subscriptions to the destination, as the type assertions are rather ugly:

const destination = this.destination as any as Subscription;

Copy link
Member

@benlesh benlesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is precisely what I was suggesting when we discussed this yesterday.

import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';

declare const type: Function;
declare const asDiagram: Function;

/** @test {mergeMap} */
describe('mergeMap', () => {
describe.only('mergeMap', () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a WIP, but don't forget to remove this only.

@@ -140,7 +140,8 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
const destination = this.destination as any as Subscription;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destination is a Subscriber, which extends Subscription... I'm not sure why any casting is necessary here... 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's a PartialObserver:

protected destination: PartialObserver<any>;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing destination to be:

protected destination: PartialObserver<any> | Subscriber<any>;

means the type assertion becomes:

const destination = this.destination as Subscription;

@@ -158,16 +156,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {

/** @deprecated This is an internal implementation detail, do not use. */
_addParentTeardownLogic(parentTeardownLogic: TeardownLogic) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably just remove _addParentTeardownLogic entirely.

  1. It's prefixed with _ which is a universal signal for "do not use".
  2. It's been marked as "deprecated do not use" since the moment it was added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that the name has been used in the isTrustedSubscriber test and that's been released. I wasn't entirely sure of what the affect of it's removal in a new release would be. I suppose it means a trusted subscriber could end up being re-wrapped in a using-6.3.3-with-6.3.2 scenario.

@cartant
Copy link
Collaborator Author

cartant commented Sep 14, 2018

@benlesh This should be done, now.

It's been rebased. The commits made subsequent to your comments are those that follow refactor(mergeMap): simplify.

@cartant cartant removed the PR: WIP label Sep 14, 2018
@benlesh
Copy link
Member

benlesh commented Sep 19, 2018

@cartant I think we should schedule a time to walk through this one.

Remove the mapTo and the concat to reduce the number of subscribers to
make the test easier to reason with.
@benlesh benlesh merged commit e623ec6 into ReactiveX:master Sep 25, 2018
@cartant
Copy link
Collaborator Author

cartant commented Oct 6, 2018

What follows are notes on the history of changes made in this PR.

#3963 (which was a refactoring of #2457) made changes to the way in which subscribers were wired up. The changes were made so that teardown logic for sources was called as soon as sources completed or errored.

Its changes were:

  • Instead of just adding a subscriber to its destination, it was added using a new method: _addParentTeardownLogic.
  • In Observable.prototype.subscribe, the result from _trySubscribe was also added using _addParentTeardownLogic.
  • When complete or error notifications occurred, subscribers would unsubscribe the teardown login added via _addParentTeardownLogic.

The major problem with this approach was that a subscriber could be added to itself, as _trySubscribe can return the subscriber that's passed to it.

This PR added a test that included this code:

const wrapped = new Observable<number>(subscriber => {
    const subscription = timer(0, asapScheduler).subscribe(subscriber);
    return () => subscription.unsubscribe();
});
wrapped.pipe(
  mergeMap(() => timer(0, asapScheduler))
).subscribe({
  next(value) { results.push(value); },
  complete() { results.push('done'); }
});

When run with the changes in #3963, the sequence of subscribers/subscriptions was:

construct Subscription@1
construct Subscriber@2
construct SafeSubscriber@3
construct MergeMapSubscriber@4
add MergeMapSubscriber@4 to Subscriber@2
construct AsapAction@5
add AsapAction@1 to MergeMapSubscriber@4
construct Subscription@6
add Subscription@6 to MergeMapSubscriber@4
next MergeMapSubscriber@4
construct InnerSubscriber@7
add InnerSubscriber@7 to MergeMapSubscriber@4 <--
construct AsapAction@8
add AsapAction@2 to InnerSubscriber@7
complete MergeMapSubscriber@4
unsubscribe Subscription@6
unsubscribe MergeMapSubscriber@4
unsubscribe AsapAction@1
unsubscribe InnerSubscriber@7
unsubscribe AsapAction@2

Note that the InnerSubscriber was added to the MergeMapSubscriber. That means that the InnerSubscriber was unsubscribed when the MergeMapSubscriber was unsubscribed. And that was happen when the source - the wrapped observable - completed.

This PR reverted the changes in #3963 and made the following changes to a number of operators:

  • Inner subscribers are now added to the destination subscriber.
  • Outer subscribers now unsubscribe upon complete and error, with their inner subscribers remaining subscribed due to their having been added to the destination.

With these changes, the test now passes and the sequence of subscribers/subscriptions is:

construct Subscription@1
construct Subscriber@2
construct SafeSubscriber@3
construct MergeMapSubscriber@4
add MergeMapSubscriber@4 to Subscriber@2
construct AsapAction@5
add AsapAction@1 to MergeMapSubscriber@4
construct Subscription@6
add Subscription@6 to MergeMapSubscriber@4
next MergeMapSubscriber@4
construct InnerSubscriber@7
add InnerSubscriber@7 to Subscriber@2 <--
construct AsapAction@8
add AsapAction@2 to InnerSubscriber@7
complete MergeMapSubscriber@4
unsubscribe MergeMapSubscriber@4
unsubscribe AsapAction@1
unsubscribe Subscription@6
next InnerSubscriber@7
next Subscriber@2
complete InnerSubscriber@7
complete Subscriber@2
unsubscribe SafeSubscriber@3
unsubscribe Subscriber@2
unsubscribe InnerSubscriber@7
unsubscribe AsapAction@2

Note that the InnerSubscriber is now added to the destination Subscriber and is therefore not unsubscribed upon completion of the source. And is able to next its value, etc.

The changes in this PR affected a number of behaviour tests in observeOn-spec.ts and in switch-spec.ts because adding the inner subscibers to the destination changed the subscription counts that those tests were checking.

@lock lock bot locked as resolved and limited conversation to collaborators Nov 6, 2018
@cartant cartant deleted the issue-4095 branch September 24, 2020 07:08
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants