From 9c26c4095efd0693e1fc99dab55702c7ff41e239 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 28 Apr 2021 22:02:00 -0500 Subject: [PATCH] chore: rebased. There are still type issues --- spec/operators/groupBy-spec.ts | 4 +- src/internal/operators/groupBy.ts | 103 +++++++++++++++++++++--------- 2 files changed, 74 insertions(+), 33 deletions(-) diff --git a/spec/operators/groupBy-spec.ts b/spec/operators/groupBy-spec.ts index ce6c6b5246..d4f5768359 100644 --- a/spec/operators/groupBy-spec.ts +++ b/spec/operators/groupBy-spec.ts @@ -107,8 +107,8 @@ describe('groupBy operator', () => { of(1, 2, 3).pipe( groupBy({ - key: x => x % 2, - subject: () => new ReplaySubject(1) + key: (x) => x % 2, + subject: () => new ReplaySubject(1), }), // Ensure each inner group reaches the destination after the first event // has been next'd to the group diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index cc77f3d03c..393c3608fa 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -1,28 +1,52 @@ import { Observable } from '../Observable'; +import { innerFrom } from '../observable/from'; import { Subject } from '../Subject'; -import { Observer, OperatorFunction } from '../types'; +import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; +interface BasicGroupByOptions { + key: (value: T) => K; + element?: undefined; + duration?: (grouped: GroupedObservable) => ObservableInput; + connector?: () => SubjectLike; +} + +interface GroupByOptionsWithElement { + key: (value: T) => K; + element: (value: T) => E; + duration?: (grouped: GroupedObservable) => ObservableInput; + connector?: () => SubjectLike; +} + +export function groupBy(options: BasicGroupByOptions): OperatorFunction>; + +export function groupBy(options: GroupByOptionsWithElement): OperatorFunction>; + +export function groupBy(options: GroupByOptionsWithElement): OperatorFunction>; + export function groupBy( - keySelector: (value: T) => value is K + key: (value: T) => value is K ): OperatorFunction | GroupedObservable>>; -export function groupBy(keySelector: (value: T) => K): OperatorFunction>; + +export function groupBy(key: (value: T) => K): OperatorFunction>; + +/** + * @deprecated use the options parameter instead. + */ export function groupBy( - keySelector: (value: T) => K, - elementSelector: void, - durationSelector: (grouped: GroupedObservable) => Observable + key: (value: T) => K, + element: void, + duration: (grouped: GroupedObservable) => Observable ): OperatorFunction>; + +/** + * @deprecated use the options parameter instead. + */ export function groupBy( - keySelector: (value: T) => K, - elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable -): OperatorFunction>; -export function groupBy( - keySelector: (value: T) => K, - elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupedObservable) => Observable, - subjectSelector?: () => Subject + key: (value: T) => K, + element?: (value: T) => R, + duration?: (grouped: GroupedObservable) => Observable ): OperatorFunction>; /** @@ -32,7 +56,7 @@ export function groupBy( * * ![](groupBy.png) * - * When the Observable emits an item, a key is computed for this item with the keySelector function. + * When the Observable emits an item, a key is computed for this item with the key function. * * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new * {@link GroupedObservable} for this key is created and emits. @@ -41,7 +65,7 @@ export function groupBy( * key is available as the `key` field of a {@link GroupedObservable} instance. * * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements - * returned by the elementSelector function. + * returned by the element function. * * ## Examples * @@ -101,28 +125,45 @@ export function groupBy( * // { id: 3, values: [ 'TSLint' ] } * ``` * - * @param {function(value: T): K} keySelector A function that extracts the key + * @param key A function that extracts the key * for each item. - * @param {function(value: T): R} [elementSelector] A function that extracts the + * @param element A function that extracts the * return element for each item. - * @param {function(grouped: GroupedObservable): Observable} [durationSelector] + * @param duration * A function that returns an Observable to determine how long each group should * exist. - * @param {function(): Subject} [subjectSelector] Factory function to create an + * @param connector Factory function to create an * intermediate Subject through which grouped elements are emitted. * @return A function that returns an Observable that emits GroupedObservables, * each of which corresponds to a unique key value and each of which emits * those items from the source Observable that share that key value. + * + * @deprecated Use the options parameter instead. */ export function groupBy( - keySelector: (value: T) => K, - elementSelector?: ((value: T) => R) | void, - durationSelector?: (grouped: GroupedObservable) => Observable, - subjectSelector?: () => Subject + key: (value: T) => K, + element?: (value: T) => R, + duration?: (grouped: GroupedObservable) => Observable, + connector?: () => Subject +): OperatorFunction>; + +// Impl +export function groupBy( + optionsOrKeySelector: BasicGroupByOptions | GroupByOptionsWithElement | ((value: T) => K), + element?: ((value: any) => any) | void, + duration?: (grouped: GroupedObservable) => ObservableInput, + connector?: () => SubjectLike ): OperatorFunction> { return operate((source, subscriber) => { + let keySelector: (value: T) => K; + if (typeof optionsOrKeySelector === 'function') { + keySelector = optionsOrKeySelector; + } else { + ({ key: keySelector, duration, element, connector } = optionsOrKeySelector); + } + // A lookup for the groups that we have so far. - const groups = new Map>(); + const groups = new Map>(); // Used for notifying all groups and the subscriber in the same way. const notify = (cb: (group: Observer) => void) => { @@ -153,7 +194,7 @@ export function groupBy( let group = groups.get(key); if (!group) { // Create our group subject - groups.set(key, (group = subjectSelector ? subjectSelector() : new Subject())); + groups.set(key, (group = connector ? connector() : new Subject())); // Emit the grouped observable. Note that we can't do a simple `asObservable()` here, // because the grouped observable has special semantics around reference counting @@ -161,7 +202,7 @@ export function groupBy( const grouped = createGroupedObservable(key, group); subscriber.next(grouped); - if (durationSelector) { + if (duration) { const durationSubscriber = new OperatorSubscriber( // Providing the group here ensures that it is disposed of -- via `unsubscribe` -- // wnen the duration subscription is torn down. That is important, because then @@ -185,12 +226,12 @@ export function groupBy( ); // Start our duration notifier. - groupBySourceSubscriber.add(durationSelector(grouped).subscribe(durationSubscriber)); + groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber)); } } // Send the value to our group. - group.next(elementSelector ? elementSelector(value) : value); + group.next(element ? element(value) : value); } catch (err) { handleError(err); } @@ -214,7 +255,7 @@ export function groupBy( * @param key The key of the group * @param groupSubject The subject that fuels the group */ - function createGroupedObservable(key: K, groupSubject: Subject) { + function createGroupedObservable(key: K, groupSubject: SubjectLike) { const result: any = new Observable((groupSubscriber) => { groupBySourceSubscriber.activeGroups++; const innerSub = groupSubject.subscribe(groupSubscriber);