From f31c3df01b524126ec8e7760e89395ea2fd3717d Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Wed, 5 Jun 2019 08:28:28 +1000 Subject: [PATCH] fix(race): ignore latter sources after first complete or error (#4809) * test(race): add failing tests * fix(race): ignore sources after first complete/error Closes #4808 BREAKING CHANGE: `race()` will no longer subscribe to subsequent observables if a provided source synchronously errors or completes. This means side effects that might have occurred during subscription in those rare cases will no longer occur. --- spec/operators/race-spec.ts | 24 +++++++++++++++++++++++- src/internal/observable/race.ts | 10 ++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/spec/operators/race-spec.ts b/spec/operators/race-spec.ts index de78a93ac3..bcbe0779ec 100644 --- a/spec/operators/race-spec.ts +++ b/spec/operators/race-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { NEVER, of, race as staticRace, timer, defer, Observable } from 'rxjs'; +import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs'; import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators'; /** @test {race} */ @@ -184,6 +184,28 @@ describe('race operator', () => { expect(onSubscribe.called).to.be.false; }); + it('should ignore latter observables if a former one completes immediately', () => { + const onComplete = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = EMPTY; // Wins the race + const e2 = defer(onSubscribe); // Should be ignored + + e1.pipe(race(e2)).subscribe({ complete: onComplete }); + expect(onComplete.calledWithExactly()).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + + it('should ignore latter observables if a former one errors immediately', () => { + const onError = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = throwError('kaboom'); // Wins the race + const e2 = defer(onSubscribe); // Should be ignored + + e1.pipe(race(e2)).subscribe({ error: onError }); + expect(onError.calledWithExactly('kaboom')).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + it('should unsubscribe former observables if a latter one emits immediately', () => { const onNext = sinon.spy(); const onUnsubscribe = sinon.spy(); diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index e6e13ded30..6881e8b4ca 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -137,4 +137,14 @@ export class RaceSubscriber extends OuterSubscriber { this.destination.next(innerValue); } + + notifyComplete(innerSub: InnerSubscriber): void { + this.hasFirst = true; + super.notifyComplete(innerSub); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this.hasFirst = true; + super.notifyError(error, innerSub); + } }