diff --git a/spec-dtslint/operators/reduce-spec.ts b/spec-dtslint/operators/reduce-spec.ts index 5f31ed75ba..1897fadb58 100644 --- a/spec-dtslint/operators/reduce-spec.ts +++ b/spec-dtslint/operators/reduce-spec.ts @@ -1,4 +1,4 @@ -import { of, Observable } from 'rxjs'; +import { of, OperatorFunction } from 'rxjs'; import { reduce } from 'rxjs/operators'; it('should enforce parameter', () => { @@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => { const bv: { [key: string]: string } = {}; const b = of(1, 2, 3).pipe(reduce((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }> }); + +it('should act appropriately with no seed', () => { + // Because an observable of one value will just pass that value directly through the reducer, + // the below could be a number or a string. + const a = of(1, 2, 3).pipe(reduce((a: any, v) => '' + v)); // $ExpectType Observable + const b = of(1, 2, 3).pipe(reduce((a, v) => v)); // $ExpectType Observable + const c = of(1, 2, 3).pipe(reduce(() => {})); // $ExpectType Observable +}); + +it('should act appropriately with a seed', () => { + const a = of(1, 2, 3).pipe(reduce((a, v) => a + v, '')); // $ExpectType Observable + const b = of(1, 2, 3).pipe(reduce((a, v) => a + v, 0)); // $ExpectType Observable + const c = of(1, 2, 3).pipe(reduce((a, v) => a + 1, [])); // $ExpectError +}); + +it('should infer types properly from arguments', () => { + function toArrayReducer(arr: number[], item: number, index: number): number[] { + if (index === 0) { + return [item]; + } + arr.push(item); + return arr; + } + + const a = reduce(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction +}); diff --git a/spec-dtslint/operators/scan-spec.ts b/spec-dtslint/operators/scan-spec.ts index f5e73bf114..553813f98f 100644 --- a/spec-dtslint/operators/scan-spec.ts +++ b/spec-dtslint/operators/scan-spec.ts @@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => { const bv: { [key: string]: string } = {}; const b = of(1, 2, 3).pipe(scan((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }> }); + +it('should act appropriately with no seed', () => { + // Because an observable of one value will just pass that value directly through the reducer, + // the below could be a number or a string. + const a = of(1, 2, 3).pipe(scan((a: any, v) => '' + v)); // $ExpectType Observable + const b = of(1, 2, 3).pipe(scan((a, v) => v)); // $ExpectType Observable + const c = of(1, 2, 3).pipe(scan(() => {})); // $ExpectType Observable +}); + +it('should act appropriately with a seed', () => { + const a = of(1, 2, 3).pipe(scan((a, v) => a + v, '')); // $ExpectType Observable + const b = of(1, 2, 3).pipe(scan((a, v) => a + v, 0)); // $ExpectType Observable + const c = of(1, 2, 3).pipe(scan((a, v) => a + 1, [])); // $ExpectError +}); + +it('should infer types properly from arguments', () => { + function toArrayReducer(arr: number[], item: number, index: number): number[] { + if (index === 0) { + return [item]; + } + arr.push(item); + return arr; + } + + const a = scan(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction +}); diff --git a/spec/operators/reduce-spec.ts b/spec/operators/reduce-spec.ts index 4cbea42efc..1823202ef0 100644 --- a/spec/operators/reduce-spec.ts +++ b/spec/operators/reduce-spec.ts @@ -193,7 +193,7 @@ describe('reduce operator', () => { throw 'error'; }; - expectObservable(e1.pipe(reduce(reduceFunction, seed))).toBe(expected); + expectObservable(e1.pipe(reduce(reduceFunction, seed))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); @@ -289,105 +289,4 @@ describe('reduce operator', () => { expectObservable(e1.pipe(reduce(reduceFunction))).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - - type('should accept array typed reducers', () => { - let a: Observable<{ a: number; b: string }>; - a.pipe(reduce((acc, value) => acc.concat(value), [])); - }); - - type('should accept T typed reducers', () => { - let a: Observable<{ a: number; b: string }>; - const reduced = a.pipe(reduce((acc, value) => { - value.a = acc.a; - value.b = acc.b; - return acc; - })); - - reduced.subscribe(r => { - r.a.toExponential(); - r.b.toLowerCase(); - }); - }); - - type('should accept T typed reducers when T is an array', () => { - let a: Observable; - const reduced = a.pipe(reduce((acc, value) => { - return acc.concat(value); - }, [])); - - reduced.subscribe(rs => { - rs[0].toExponential(); - }); - }); - - type('should accept R typed reduces when R is an array of T', () => { - let a: Observable; - const reduced = a.pipe(reduce((acc, value) => { - acc.push(value); - return acc; - }, [])); - - reduced.subscribe(rs => { - rs[0].toExponential(); - }); - }); - - type('should accept R typed reducers when R is assignable to T', () => { - let a: Observable<{ a?: number; b?: string }>; - const reduced = a.pipe(reduce((acc, value) => { - value.a = acc.a; - value.b = acc.b; - return acc; - }, {} as { a?: number; b?: string })); - - reduced.subscribe(r => { - r.a.toExponential(); - r.b.toLowerCase(); - }); - }); - - type('should accept R typed reducers when R is not assignable to T', () => { - let a: Observable<{ a: number; b: string }>; - const seed = { - as: [1], - bs: ['a'] - }; - const reduced = a.pipe(reduce((acc, value: {a: number, b: string}) => { - acc.as.push(value.a); - acc.bs.push(value.b); - return acc; - }, seed)); - - reduced.subscribe(r => { - r.as[0].toExponential(); - r.bs[0].toLowerCase(); - }); - }); - - type('should accept R typed reducers and reduce to type R', () => { - let a: Observable<{ a: number; b: string }>; - const reduced = a.pipe(reduce<{ a?: number; b?: string }>((acc, value) => { - value.a = acc.a; - value.b = acc.b; - return acc; - }, {})); - - reduced.subscribe(r => { - r.a.toExponential(); - r.b.toLowerCase(); - }); - }); - - type('should accept array of R typed reducers and reduce to array of R', () => { - let a: Observable; - const reduced = a.pipe(reduce((acc, cur) => { - console.log(acc); - acc.push(cur.toString()); - return acc; - }, [] as string[])); - - reduced.subscribe(rs => { - rs[0].toLowerCase(); - }); - }); }); diff --git a/spec/operators/scan-spec.ts b/spec/operators/scan-spec.ts index 66f4e2d9eb..08b3d8d607 100644 --- a/spec/operators/scan-spec.ts +++ b/spec/operators/scan-spec.ts @@ -227,27 +227,4 @@ describe('scan operator', () => { expectObservable(scanObs).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - - type('should accept array typed reducers', () => { - let a: Observable<{ a: number; b: string }>; - a.pipe(scan((acc, value) => acc.concat(value), [])); - }); - - type('should accept T typed reducers', () => { - let a: Observable<{ a?: number; b?: string }>; - a.pipe(scan((acc, value) => { - value.a = acc.a; - value.b = acc.b; - return acc; - }, {} as { a?: number; b?: string })); - }); - - type('should accept R typed reducers', () => { - let a: Observable<{ a: number; b: string }>; - a.pipe(scan<{ a?: number; b?: string }>((acc, value) => { - value.a = acc.a; - value.b = acc.b; - return acc; - }, {})); - }); }); diff --git a/src/internal/operators/reduce.ts b/src/internal/operators/reduce.ts index 6039fdb123..b044e9283f 100644 --- a/src/internal/operators/reduce.ts +++ b/src/internal/operators/reduce.ts @@ -2,13 +2,13 @@ import { Observable } from '../Observable'; import { scan } from './scan'; import { takeLast } from './takeLast'; import { defaultIfEmpty } from './defaultIfEmpty'; -import { OperatorFunction, MonoTypeOperatorFunction } from '../types'; +import { OperatorFunction } from '../types'; import { pipe } from '../util/pipe'; /* tslint:disable:max-line-length */ -export function reduce(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction; -export function reduce(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction; -export function reduce(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction; +export function reduce(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction; +export function reduce(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export function reduce(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction; /* tslint:enable:max-line-length */ /** @@ -54,28 +54,32 @@ export function reduce(accumulator: (acc: R, value: T, index: number) => R * @see {@link mergeScan} * @see {@link scan} * - * @param {function(acc: R, value: T, index: number): R} accumulator The accumulator function + * @param {function(acc: A, value: V, index: number): A} accumulator The accumulator function * called on each source value. - * @param {R} [seed] The initial accumulation value. - * @return {Observable} An Observable that emits a single value that is the + * @param {A} [seed] The initial accumulation value. + * @return {Observable} An Observable that emits a single value that is the * result of accumulating the values emitted by the source Observable. * @method reduce * @owner Observable */ -export function reduce(accumulator: (acc: T | R, value: T, index?: number) => T | R, seed?: T | R): OperatorFunction { +export function reduce(accumulator: (acc: V | A, value: V, index?: number) => A, seed?: any): OperatorFunction { // providing a seed of `undefined` *should* be valid and trigger // hasSeed! so don't use `seed !== undefined` checks! // For this reason, we have to check it here at the original call site // otherwise inside Operator/Subscriber we won't know if `undefined` // means they didn't provide anything or if they literally provided `undefined` if (arguments.length >= 2) { - return function reduceOperatorFunctionWithSeed(source: Observable): Observable { - return pipe(scan(accumulator, seed), takeLast(1), defaultIfEmpty(seed))(source); + return function reduceOperatorFunctionWithSeed(source: Observable): Observable { + return pipe( + scan(accumulator, seed), + takeLast(1), + defaultIfEmpty(seed), + )(source); }; } - return function reduceOperatorFunction(source: Observable): Observable { + return function reduceOperatorFunction(source: Observable): Observable { return pipe( - scan((acc, value, index) => accumulator(acc, value, index + 1)), + scan((acc, value, index) => accumulator(acc, value, index + 1)), takeLast(1), )(source); }; diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index a1eb511e40..6ef11d0cf8 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -1,12 +1,12 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; -import { OperatorFunction, MonoTypeOperatorFunction } from '../types'; +import { OperatorFunction, TeardownLogic } from '../types'; /* tslint:disable:max-line-length */ -export function scan(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction; -export function scan(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction; -export function scan(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction; +export function scan(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction; +export function scan(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction; +export function scan(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction; /* tslint:enable:max-line-length */ /** @@ -45,14 +45,14 @@ export function scan(accumulator: (acc: R, value: T, index: number) => R): * @see {@link mergeScan} * @see {@link reduce} * - * @param {function(acc: R, value: T, index: number): R} accumulator + * @param {function(acc: A, value: V, index: number): A} accumulator * The accumulator function called on each source value. - * @param {T|R} [seed] The initial accumulation value. - * @return {Observable} An observable of the accumulated values. + * @param {V|A} [seed] The initial accumulation value. + * @return {Observable} An observable of the accumulated values. * @method scan * @owner Observable */ -export function scan(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction { +export function scan(accumulator: (acc: V|A|S, value: V, index: number) => A, seed?: S): OperatorFunction { let hasSeed = false; // providing a seed of `undefined` *should* be valid and trigger // hasSeed! so don't use `seed !== undefined` checks! @@ -63,15 +63,15 @@ export function scan(accumulator: (acc: R, value: T, index: number) => R, hasSeed = true; } - return function scanOperatorFunction(source: Observable): Observable { + return function scanOperatorFunction(source: Observable) { return source.lift(new ScanOperator(accumulator, seed, hasSeed)); }; } -class ScanOperator implements Operator { - constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {} +class ScanOperator implements Operator { + constructor(private accumulator: (acc: V|A|S, value: V, index: number) => A, private seed?: S, private hasSeed: boolean = false) {} - call(subscriber: Subscriber, source: any): any { + call(subscriber: Subscriber, source: any): TeardownLogic { return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); } } @@ -81,41 +81,31 @@ class ScanOperator implements Operator { * @ignore * @extends {Ignored} */ -class ScanSubscriber extends Subscriber { +class ScanSubscriber extends Subscriber { private index: number = 0; - get seed(): T | R { - return this._seed; - } - - set seed(value: T | R) { - this.hasSeed = true; - this._seed = value; - } - - constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, - private hasSeed: boolean) { + constructor(destination: Subscriber, private accumulator: (acc: V|A, value: V, index: number) => A, private _state: any, + private _hasState: boolean) { super(destination); } - protected _next(value: T): void { - if (!this.hasSeed) { - this.seed = value; - this.destination.next(value); + protected _next(value: V): void { + const { destination } = this; + if (!this._hasState) { + this._state = value; + this._hasState = true; + destination.next(value); } else { - return this._tryNext(value); - } - } - - private _tryNext(value: T): void { - const index = this.index++; - let result: any; - try { - result = this.accumulator(this.seed, value, index); - } catch (err) { - this.destination.error(err); + const index = this.index++; + let result: A; + try { + result = this.accumulator(this._state, value, index); + } catch (err) { + destination.error(err); + return; + } + this._state = result; + destination.next(result); } - this.seed = result; - this.destination.next(result); } } diff --git a/src/internal/operators/toArray.ts b/src/internal/operators/toArray.ts index 40d05601c3..0a5f59a434 100644 --- a/src/internal/operators/toArray.ts +++ b/src/internal/operators/toArray.ts @@ -1,7 +1,7 @@ import { reduce } from './reduce'; import { OperatorFunction } from '../types'; -function toArrayReducer(arr: T[], item: T, index: number) { +function toArrayReducer(arr: T[], item: T, index: number): T[] { if (index === 0) { return [item]; }