Skip to content

Commit

Permalink
fix(race): ignore latter sources after first complete or error (#4809)
Browse files Browse the repository at this point in the history
* test(race): add failing tests

* fix(race): ignore sources after first complete/error

Closes #4808

BREAKING CHANGE: `race()` will no longer subscribe to subsequent observables if a provided source synchronously errors or completes. This means side effects that might have occurred during subscription in those rare cases will no longer occur.
  • Loading branch information
cartant authored and benlesh committed Jun 4, 2019
1 parent 362d1d4 commit f31c3df
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
24 changes: 23 additions & 1 deletion spec/operators/race-spec.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { NEVER, of, race as staticRace, timer, defer, Observable } from 'rxjs';
import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs';
import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators';

/** @test {race} */
Expand Down Expand Up @@ -184,6 +184,28 @@ describe('race operator', () => {
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one completes immediately', () => {
const onComplete = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = EMPTY; // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(race(e2)).subscribe({ complete: onComplete });
expect(onComplete.calledWithExactly()).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one errors immediately', () => {
const onError = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = throwError('kaboom'); // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(race(e2)).subscribe({ error: onError });
expect(onError.calledWithExactly('kaboom')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should unsubscribe former observables if a latter one emits immediately', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
Expand Down
10 changes: 10 additions & 0 deletions src/internal/observable/race.ts
Expand Up @@ -137,4 +137,14 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {

this.destination.next(innerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, T>): void {
this.hasFirst = true;
super.notifyComplete(innerSub);
}

notifyError(error: any, innerSub: InnerSubscriber<T, T>): void {
this.hasFirst = true;
super.notifyError(error, innerSub);
}
}

0 comments on commit f31c3df

Please sign in to comment.