Skip to content

Commit

Permalink
fix(scan/reduce): Typings correct for mixed seed/value types (#4858)
Browse files Browse the repository at this point in the history
- Adds dtslint tests to cover various mixtures of seeds, accumulator results, and value types
- Refactors scan a little bit, as types needed to be updated in the implementation
- Resolves a performance issue where scan was calling next on the destination Subscriber when it had already errored
  • Loading branch information
benlesh committed Aug 26, 2019
1 parent 259853e commit b89ebe5
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 180 deletions.
28 changes: 27 additions & 1 deletion spec-dtslint/operators/reduce-spec.ts
@@ -1,4 +1,4 @@
import { of, Observable } from 'rxjs';
import { of, OperatorFunction } from 'rxjs';
import { reduce } from 'rxjs/operators';

it('should enforce parameter', () => {
Expand Down Expand Up @@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => {
const bv: { [key: string]: string } = {};
const b = of(1, 2, 3).pipe(reduce((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }>
});

it('should act appropriately with no seed', () => {
// Because an observable of one value will just pass that value directly through the reducer,
// the below could be a number or a string.
const a = of(1, 2, 3).pipe(reduce((a: any, v) => '' + v)); // $ExpectType Observable<string | number>
const b = of(1, 2, 3).pipe(reduce((a, v) => v)); // $ExpectType Observable<number>
const c = of(1, 2, 3).pipe(reduce(() => {})); // $ExpectType Observable<number | void>
});

it('should act appropriately with a seed', () => {
const a = of(1, 2, 3).pipe(reduce((a, v) => a + v, '')); // $ExpectType Observable<string>
const b = of(1, 2, 3).pipe(reduce((a, v) => a + v, 0)); // $ExpectType Observable<number>
const c = of(1, 2, 3).pipe(reduce((a, v) => a + 1, [])); // $ExpectError
});

it('should infer types properly from arguments', () => {
function toArrayReducer(arr: number[], item: number, index: number): number[] {
if (index === 0) {
return [item];
}
arr.push(item);
return arr;
}

const a = reduce(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction<number, number[]>
});
26 changes: 26 additions & 0 deletions spec-dtslint/operators/scan-spec.ts
Expand Up @@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => {
const bv: { [key: string]: string } = {};
const b = of(1, 2, 3).pipe(scan((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }>
});

it('should act appropriately with no seed', () => {
// Because an observable of one value will just pass that value directly through the reducer,
// the below could be a number or a string.
const a = of(1, 2, 3).pipe(scan((a: any, v) => '' + v)); // $ExpectType Observable<string | number>
const b = of(1, 2, 3).pipe(scan((a, v) => v)); // $ExpectType Observable<number>
const c = of(1, 2, 3).pipe(scan(() => {})); // $ExpectType Observable<number | void>
});

it('should act appropriately with a seed', () => {
const a = of(1, 2, 3).pipe(scan((a, v) => a + v, '')); // $ExpectType Observable<string>
const b = of(1, 2, 3).pipe(scan((a, v) => a + v, 0)); // $ExpectType Observable<number>
const c = of(1, 2, 3).pipe(scan((a, v) => a + 1, [])); // $ExpectError
});

it('should infer types properly from arguments', () => {
function toArrayReducer(arr: number[], item: number, index: number): number[] {
if (index === 0) {
return [item];
}
arr.push(item);
return arr;
}

const a = scan(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction<number, number[]>
});
103 changes: 1 addition & 102 deletions spec/operators/reduce-spec.ts
Expand Up @@ -193,7 +193,7 @@ describe('reduce operator', () => {
throw 'error';
};

expectObservable(e1.pipe(reduce<string>(reduceFunction, seed))).toBe(expected);
expectObservable(e1.pipe(reduce(reduceFunction, seed))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand Down Expand Up @@ -289,105 +289,4 @@ describe('reduce operator', () => {
expectObservable(e1.pipe(reduce(reduceFunction))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

type('should accept array typed reducers', () => {
let a: Observable<{ a: number; b: string }>;
a.pipe(reduce((acc, value) => acc.concat(value), []));
});

type('should accept T typed reducers', () => {
let a: Observable<{ a: number; b: string }>;
const reduced = a.pipe(reduce((acc, value) => {
value.a = acc.a;
value.b = acc.b;
return acc;
}));

reduced.subscribe(r => {
r.a.toExponential();
r.b.toLowerCase();
});
});

type('should accept T typed reducers when T is an array', () => {
let a: Observable<number[]>;
const reduced = a.pipe(reduce((acc, value) => {
return acc.concat(value);
}, []));

reduced.subscribe(rs => {
rs[0].toExponential();
});
});

type('should accept R typed reduces when R is an array of T', () => {
let a: Observable<number>;
const reduced = a.pipe(reduce((acc, value) => {
acc.push(value);
return acc;
}, []));

reduced.subscribe(rs => {
rs[0].toExponential();
});
});

type('should accept R typed reducers when R is assignable to T', () => {
let a: Observable<{ a?: number; b?: string }>;
const reduced = a.pipe(reduce((acc, value) => {
value.a = acc.a;
value.b = acc.b;
return acc;
}, {} as { a?: number; b?: string }));

reduced.subscribe(r => {
r.a.toExponential();
r.b.toLowerCase();
});
});

type('should accept R typed reducers when R is not assignable to T', () => {
let a: Observable<{ a: number; b: string }>;
const seed = {
as: [1],
bs: ['a']
};
const reduced = a.pipe(reduce((acc, value: {a: number, b: string}) => {
acc.as.push(value.a);
acc.bs.push(value.b);
return acc;
}, seed));

reduced.subscribe(r => {
r.as[0].toExponential();
r.bs[0].toLowerCase();
});
});

type('should accept R typed reducers and reduce to type R', () => {
let a: Observable<{ a: number; b: string }>;
const reduced = a.pipe(reduce<{ a?: number; b?: string }>((acc, value) => {
value.a = acc.a;
value.b = acc.b;
return acc;
}, {}));

reduced.subscribe(r => {
r.a.toExponential();
r.b.toLowerCase();
});
});

type('should accept array of R typed reducers and reduce to array of R', () => {
let a: Observable<number>;
const reduced = a.pipe(reduce<number, string[]>((acc, cur) => {
console.log(acc);
acc.push(cur.toString());
return acc;
}, [] as string[]));

reduced.subscribe(rs => {
rs[0].toLowerCase();
});
});
});
23 changes: 0 additions & 23 deletions spec/operators/scan-spec.ts
Expand Up @@ -227,27 +227,4 @@ describe('scan operator', () => {
expectObservable(scanObs).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

type('should accept array typed reducers', () => {
let a: Observable<{ a: number; b: string }>;
a.pipe(scan((acc, value) => acc.concat(value), []));
});

type('should accept T typed reducers', () => {
let a: Observable<{ a?: number; b?: string }>;
a.pipe(scan((acc, value) => {
value.a = acc.a;
value.b = acc.b;
return acc;
}, {} as { a?: number; b?: string }));
});

type('should accept R typed reducers', () => {
let a: Observable<{ a: number; b: string }>;
a.pipe(scan<{ a?: number; b?: string }>((acc, value) => {
value.a = acc.a;
value.b = acc.b;
return acc;
}, {}));
});
});
28 changes: 16 additions & 12 deletions src/internal/operators/reduce.ts
Expand Up @@ -2,13 +2,13 @@ import { Observable } from '../Observable';
import { scan } from './scan';
import { takeLast } from './takeLast';
import { defaultIfEmpty } from './defaultIfEmpty';
import { OperatorFunction, MonoTypeOperatorFunction } from '../types';
import { OperatorFunction } from '../types';
import { pipe } from '../util/pipe';

/* tslint:disable:max-line-length */
export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
export function reduce<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction<T>;
export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction<T, R>;
export function reduce<V, A = V>(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction<V, V|A>;
export function reduce<V, A>(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction<V, A>;
export function reduce<V, A, S = A>(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;
/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -54,28 +54,32 @@ export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R
* @see {@link mergeScan}
* @see {@link scan}
*
* @param {function(acc: R, value: T, index: number): R} accumulator The accumulator function
* @param {function(acc: A, value: V, index: number): A} accumulator The accumulator function
* called on each source value.
* @param {R} [seed] The initial accumulation value.
* @return {Observable<R>} An Observable that emits a single value that is the
* @param {A} [seed] The initial accumulation value.
* @return {Observable<A>} An Observable that emits a single value that is the
* result of accumulating the values emitted by the source Observable.
* @method reduce
* @owner Observable
*/
export function reduce<T, R>(accumulator: (acc: T | R, value: T, index?: number) => T | R, seed?: T | R): OperatorFunction<T, T | R> {
export function reduce<V, A>(accumulator: (acc: V | A, value: V, index?: number) => A, seed?: any): OperatorFunction<V, V | A> {
// providing a seed of `undefined` *should* be valid and trigger
// hasSeed! so don't use `seed !== undefined` checks!
// For this reason, we have to check it here at the original call site
// otherwise inside Operator/Subscriber we won't know if `undefined`
// means they didn't provide anything or if they literally provided `undefined`
if (arguments.length >= 2) {
return function reduceOperatorFunctionWithSeed(source: Observable<T>): Observable<T | R> {
return pipe(scan(accumulator, seed), takeLast(1), defaultIfEmpty(seed))(source);
return function reduceOperatorFunctionWithSeed(source: Observable<V>): Observable<V | A> {
return pipe(
scan(accumulator, seed),
takeLast(1),
defaultIfEmpty(seed),
)(source);
};
}
return function reduceOperatorFunction(source: Observable<T>): Observable<T | R> {
return function reduceOperatorFunction(source: Observable<V>): Observable<V | A> {
return pipe(
scan<T, T | R>((acc, value, index) => accumulator(acc, value, index + 1)),
scan<V, V | A>((acc, value, index) => accumulator(acc, value, index + 1)),
takeLast(1),
)(source);
};
Expand Down
72 changes: 31 additions & 41 deletions src/internal/operators/scan.ts
@@ -1,12 +1,12 @@
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { OperatorFunction, MonoTypeOperatorFunction } from '../types';
import { OperatorFunction, TeardownLogic } from '../types';

/* tslint:disable:max-line-length */
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
export function scan<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction<T>;
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction<T, R>;
export function scan<V, A = V>(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction<V, V|A>;
export function scan<V, A>(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction<V, A>;
export function scan<V, A, S>(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;
/* tslint:enable:max-line-length */

/**
Expand Down Expand Up @@ -45,14 +45,14 @@ export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R):
* @see {@link mergeScan}
* @see {@link reduce}
*
* @param {function(acc: R, value: T, index: number): R} accumulator
* @param {function(acc: A, value: V, index: number): A} accumulator
* The accumulator function called on each source value.
* @param {T|R} [seed] The initial accumulation value.
* @return {Observable<R>} An observable of the accumulated values.
* @param {V|A} [seed] The initial accumulation value.
* @return {Observable<A>} An observable of the accumulated values.
* @method scan
* @owner Observable
*/
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction<T, R> {
export function scan<V, A, S>(accumulator: (acc: V|A|S, value: V, index: number) => A, seed?: S): OperatorFunction<V, V|A> {
let hasSeed = false;
// providing a seed of `undefined` *should* be valid and trigger
// hasSeed! so don't use `seed !== undefined` checks!
Expand All @@ -63,15 +63,15 @@ export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R,
hasSeed = true;
}

return function scanOperatorFunction(source: Observable<T>): Observable<R> {
return function scanOperatorFunction(source: Observable<V>) {
return source.lift(new ScanOperator(accumulator, seed, hasSeed));
};
}

class ScanOperator<T, R> implements Operator<T, R> {
constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {}
class ScanOperator<V, A, S> implements Operator<V, A> {
constructor(private accumulator: (acc: V|A|S, value: V, index: number) => A, private seed?: S, private hasSeed: boolean = false) {}

call(subscriber: Subscriber<R>, source: any): any {
call(subscriber: Subscriber<A>, source: any): TeardownLogic {
return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed));
}
}
Expand All @@ -81,41 +81,31 @@ class ScanOperator<T, R> implements Operator<T, R> {
* @ignore
* @extends {Ignored}
*/
class ScanSubscriber<T, R> extends Subscriber<T> {
class ScanSubscriber<V, A> extends Subscriber<V> {
private index: number = 0;

get seed(): T | R {
return this._seed;
}

set seed(value: T | R) {
this.hasSeed = true;
this._seed = value;
}

constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R,
private hasSeed: boolean) {
constructor(destination: Subscriber<A>, private accumulator: (acc: V|A, value: V, index: number) => A, private _state: any,
private _hasState: boolean) {
super(destination);
}

protected _next(value: T): void {
if (!this.hasSeed) {
this.seed = value;
this.destination.next(value);
protected _next(value: V): void {
const { destination } = this;
if (!this._hasState) {
this._state = value;
this._hasState = true;
destination.next(value);
} else {
return this._tryNext(value);
}
}

private _tryNext(value: T): void {
const index = this.index++;
let result: any;
try {
result = this.accumulator(<R>this.seed, value, index);
} catch (err) {
this.destination.error(err);
const index = this.index++;
let result: A;
try {
result = this.accumulator(this._state, value, index);
} catch (err) {
destination.error(err);
return;
}
this._state = result;
destination.next(result);
}
this.seed = result;
this.destination.next(result);
}
}
2 changes: 1 addition & 1 deletion src/internal/operators/toArray.ts
@@ -1,7 +1,7 @@
import { reduce } from './reduce';
import { OperatorFunction } from '../types';

function toArrayReducer<T>(arr: T[], item: T, index: number) {
function toArrayReducer<T>(arr: T[], item: T, index: number): T[] {
if (index === 0) {
return [item];
}
Expand Down

0 comments on commit b89ebe5

Please sign in to comment.