Skip to content

Commit

Permalink
feat(takeWhile): add an inclusive option to the operator which caus…
Browse files Browse the repository at this point in the history
…es to emit final value (#4115)

By default, the value that causes the predicate to return `false` is not emitted.

This change adds an `inclusive` option which changes the behavior of the operator to include the final value which made the predicate return `false`.
Typical use case: Polling an API until the response contains a certain value, and then doing something with the response.
  • Loading branch information
idosela authored and benlesh committed Jan 30, 2019
1 parent 573adca commit 6e7f407
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
8 changes: 8 additions & 0 deletions spec-dtslint/operators/takeWhile-spec.ts
Expand Up @@ -5,6 +5,14 @@ it('should support a user-defined type guard', () => {
const o = of('foo').pipe(takeWhile((s): s is 'foo' => true)); // $ExpectType Observable<"foo">
});

it('should support a user-defined type guard with inclusive option', () => {
const o = of('foo').pipe(takeWhile((s): s is 'foo' => true, false)); // $ExpectType Observable<"foo">
});

it('should support a predicate', () => {
const o = of('foo').pipe(takeWhile(s => true)); // $ExpectType Observable<string>
});

it('should support a predicate with inclusive option', () => {
const o = of('foo').pipe(takeWhile(s => true, true)); // $ExpectType Observable<string>
});
15 changes: 15 additions & 0 deletions spec/operators/takeWhile-spec.ts
Expand Up @@ -67,6 +67,21 @@ describe('takeWhile operator', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should take all elements up to and including the element that made ' +
'the predicate return false', () => {
const e1 = hot('--a-^-b--c--d--e--|');
const e1subs = '^ ! ';
const expected = '--b--c--(d|) ';

function predicate(value: string) {
return value !== 'd';
}
const inclusive = true;

expectObservable(e1.pipe(takeWhile(predicate, inclusive))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should take elements with predicate when source does not complete', () => {
const e1 = hot('--a-^-b--c--d--e--');
const e1subs = '^ ';
Expand Down
29 changes: 21 additions & 8 deletions src/internal/operators/takeWhile.ts
Expand Up @@ -4,7 +4,8 @@ import { Subscriber } from '../Subscriber';
import { OperatorFunction, MonoTypeOperatorFunction, TeardownLogic } from '../types';

export function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S): OperatorFunction<T, S>;
export function takeWhile<T>(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction<T>;
export function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

/**
* Emits values emitted by the source Observable so long as each value satisfies
Expand Down Expand Up @@ -42,22 +43,29 @@ export function takeWhile<T>(predicate: (value: T, index: number) => boolean): M
* @param {function(value: T, index: number): boolean} predicate A function that
* evaluates a value emitted by the source Observable and returns a boolean.
* Also takes the (zero-based) index as the second argument.
* @param {boolean} inclusive When set to `true` the value that caused
* `predicate` to return `false` will also be emitted.
* @return {Observable<T>} An Observable that emits the values from the source
* Observable so long as each value satisfies the condition defined by the
* `predicate`, then completes.
* @method takeWhile
* @owner Observable
*/
export function takeWhile<T>(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(new TakeWhileOperator(predicate));
export function takeWhile<T>(
predicate: (value: T, index: number) => boolean,
inclusive = false): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
source.lift(new TakeWhileOperator(predicate, inclusive));
}

class TakeWhileOperator<T> implements Operator<T, T> {
constructor(private predicate: (value: T, index: number) => boolean) {
}
constructor(
private predicate: (value: T, index: number) => boolean,
private inclusive: boolean) {}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new TakeWhileSubscriber(subscriber, this.predicate));
return source.subscribe(
new TakeWhileSubscriber(subscriber, this.predicate, this.inclusive));
}
}

Expand All @@ -69,8 +77,10 @@ class TakeWhileOperator<T> implements Operator<T, T> {
class TakeWhileSubscriber<T> extends Subscriber<T> {
private index: number = 0;

constructor(destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean) {
constructor(
destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean,
private inclusive: boolean) {
super(destination);
}

Expand All @@ -91,6 +101,9 @@ class TakeWhileSubscriber<T> extends Subscriber<T> {
if (Boolean(predicateResult)) {
destination.next(value);
} else {
if (this.inclusive) {
destination.next(value);
}
destination.complete();
}
}
Expand Down

0 comments on commit 6e7f407

Please sign in to comment.