Skip to content

Commit

Permalink
fix(rxjs): every now properly handles reentrant calls.
Browse files Browse the repository at this point in the history
Fixes #7425
  • Loading branch information
benlesh committed Jan 23, 2024
1 parent 83ffab1 commit c666574
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
22 changes: 21 additions & 1 deletion packages/rxjs/spec/operators/every-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import { every, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import type { Observer } from 'rxjs';
import { of, Observable } from 'rxjs';
import { of, Observable, Subject } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {every} */
Expand Down Expand Up @@ -301,4 +301,24 @@ describe('every', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should handle reentrancy properly', () => {
const subject = new Subject<number>();
const results: any[] = [];
let n = 0;

subject.pipe(every(() => false)).subscribe({
next: (result) => {
results.push(result);
if (n < 3) {
subject.next(n++);
}
},
complete: () => results.push('done'),
});

subject.next(n);

expect(results).to.deep.equal([false, 'done']);
});
});
33 changes: 19 additions & 14 deletions packages/rxjs/src/internal/operators/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,25 @@ export function every<T>(predicate: (value: T, index: number) => boolean): Opera
return (source) =>
new Observable((destination) => {
let index = 0;
source.subscribe(
operate({
destination,
next: (value) => {
if (!predicate(value, index++)) {
destination.next(false);
destination.complete();
}
},
complete: () => {
destination.next(true);

const subscriber = operate({
destination,
next: (value: T) => {
if (!predicate(value, index++)) {
// To prevent re-entrancy issues, we unsubscribe from the
// source as soon as possible. Because the `next` right below it
// could cause us to re-enter before we get to `complete()`.
subscriber.unsubscribe();
destination.next(false);
destination.complete();
},
})
);
}
},
complete: () => {
destination.next(true);
destination.complete();
},
});

source.subscribe(subscriber);
});
}

0 comments on commit c666574

Please sign in to comment.