Skip to content

Commit

Permalink
feat(groupBy): Support named arguments, support ObservableInputs for …
Browse files Browse the repository at this point in the history
…duration selector (#5679)

* feat(groupBy): Support named arguments, support ObservableInputs for duration selector

- Adds support for named arguments.
- Adds support for returning promises, et al, from the duration selector.

NOTES:

* The tests for `groupBy` appear to be EXTREMELY old and outdated, and I was unable to updated them easily to use run mode. We may have to rewrite them all at some point to use better techniques. The issue seems to be a rudementary means of testing the inner observables that is incompatible with run mode.
* Docs still need updated
* Older paths still need to be deprecated
* dtslint tests need to be added

* chore: rebased. There are still type issues

* refactor: change to fn, options pattern

* chore: remove duplicate typing
  • Loading branch information
benlesh committed May 20, 2021
1 parent 190e4d0 commit 7a99397
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 72 deletions.
12 changes: 7 additions & 5 deletions api_guard/dist/types/operators/index.d.ts
Expand Up @@ -120,11 +120,13 @@ export declare function first<T, D = T>(predicate: (value: T, index: number, sou

export declare const flatMap: typeof mergeMap;

export declare function groupBy<T, K extends T>(keySelector: (value: T) => value is K): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
export declare function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
export declare function groupBy<T, K>(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, T>>;
export declare function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, R>>;
export declare function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>>;
export declare function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
export declare function groupBy<T, K, E>(key: (value: T) => K, options: GroupByOptionsWithElement<K, E, T>): OperatorFunction<T, GroupedObservable<K, E>>;
export declare function groupBy<T, K extends T>(key: (value: T) => value is K): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
export declare function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
export declare function groupBy<T, K>(key: (value: T) => K, element: void, duration: (grouped: GroupedObservable<K, T>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, T>>;
export declare function groupBy<T, K, R>(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable<K, R>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, R>>;
export declare function groupBy<T, K, R>(key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable<K, R>) => Observable<any>, connector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>>;

export declare function ignoreElements(): OperatorFunction<any, never>;

Expand Down
69 changes: 32 additions & 37 deletions spec/operators/groupBy-spec.ts
Expand Up @@ -42,7 +42,7 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2)
groupBy((x) => x % 2)
).subscribe((g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);
Expand All @@ -60,7 +60,7 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2, (x: number) => x + '!')
groupBy((x) => x % 2, (x) => x + '!')
).subscribe((g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);
Expand All @@ -82,21 +82,20 @@ describe('groupBy operator', () => {
const resultingGroups: { key: number, values: number [] }[] = [];

of(1, 2, 3, 4, 5, 6).pipe(
groupBy(
(x: number) => x % 2,
(x: number) => x,
(g: any) => g.pipe(skip(1)))
).subscribe((g: any) => {
let group = { key: g.key, values: [] as number[] };

g.subscribe((x: any) => {
group.values.push(x);
});
groupBy(x => x % 2, {
duration: g => g.pipe(skip(1))
})
).subscribe((g: any) => {
let group = { key: g.key, values: [] as number[] };

resultingGroups.push(group);
g.subscribe((x: any) => {
group.values.push(x);
});

expect(resultingGroups).to.deep.equal(expectedGroups);
resultingGroups.push(group);
});

expect(resultingGroups).to.deep.equal(expectedGroups);
});

it('should group values with a subject selector', (done) => {
Expand All @@ -106,7 +105,9 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2, null as any, null as any, () => new ReplaySubject(1)),
groupBy(x => x % 2, {
connector: () => new ReplaySubject(1),
}),
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
delay(5)
Expand Down Expand Up @@ -802,11 +803,11 @@ describe('groupBy operator', () => {
const expectedValues = { v: v, w: w, x: x, y: y, z: z };

const source = e1
.pipe(groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
));
.pipe(
groupBy(val => val.toLowerCase().trim(), {
duration: group => group.pipe(skip(2)),
})
);

expectObservable(source).toBe(expected, expectedValues);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
Expand Down Expand Up @@ -836,11 +837,9 @@ describe('groupBy operator', () => {
const expectedValues = { v: v, w: w, x: x };

const source = e1
.pipe(groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
));
.pipe(groupBy(val => val.toLowerCase().trim(), {
duration: group => group.pipe(skip(2))
}));

expectObservable(source, unsub).toBe(expected, expectedValues);
});
Expand Down Expand Up @@ -879,17 +878,15 @@ describe('groupBy operator', () => {
.unsubscribedFrame;

const source = e1.pipe(
groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
),
map((group: any) => {
groupBy(val => val.toLowerCase().trim(), {
duration: group => group.pipe(skip(2))
}),
map((group) => {
const arr: any[] = [];

const subscription = group.pipe(
phonyMarbelize()
).subscribe((value: any) => {
).subscribe((value) => {
arr.push(value);
});

Expand Down Expand Up @@ -923,11 +920,9 @@ describe('groupBy operator', () => {
.parseMarblesAsSubscriptions(sub)
.unsubscribedFrame;

obs.pipe(groupBy(
(val: string) => val,
(val: string) => val,
(group: any) => durations[group.key]
)).subscribe();
obs.pipe(groupBy((val) => val, {
duration: (group) => durations[Number(group.key)]
})).subscribe();

rxTestScheduler.schedule(() => {
durations.forEach((d, i) => {
Expand Down
100 changes: 70 additions & 30 deletions src/internal/operators/groupBy.ts
@@ -1,28 +1,51 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { Subject } from '../Subject';
import { Observer, OperatorFunction } from '../types';
import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

interface BasicGroupByOptions<K, T> {
element?: undefined;
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
connector?: () => SubjectLike<T>;
}

interface GroupByOptionsWithElement<K, E, T> {
element: (value: T) => E;
duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
connector?: () => SubjectLike<E>;
}

export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;

export function groupBy<T, K, E>(
key: (value: T) => K,
options: GroupByOptionsWithElement<K, E, T>
): OperatorFunction<T, GroupedObservable<K, E>>;

export function groupBy<T, K extends T>(
keySelector: (value: T) => value is K
key: (value: T) => value is K
): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
export function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;

export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;

/**
* @deprecated use the options parameter instead.
*/
export function groupBy<T, K>(
keySelector: (value: T) => K,
elementSelector: void,
durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>
key: (value: T) => K,
element: void,
duration: (grouped: GroupedObservable<K, T>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, T>>;

/**
* @deprecated use the options parameter instead.
*/
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, R>>;
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>
key: (value: T) => K,
element?: (value: T) => R,
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, R>>;

/**
Expand All @@ -32,7 +55,7 @@ export function groupBy<T, K, R>(
*
* ![](groupBy.png)
*
* When the Observable emits an item, a key is computed for this item with the keySelector function.
* When the Observable emits an item, a key is computed for this item with the key function.
*
* If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
* {@link GroupedObservable} for this key is created and emits.
Expand All @@ -41,7 +64,7 @@ export function groupBy<T, K, R>(
* key is available as the `key` field of a {@link GroupedObservable} instance.
*
* The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
* returned by the elementSelector function.
* returned by the element function.
*
* ## Examples
*
Expand Down Expand Up @@ -101,28 +124,45 @@ export function groupBy<T, K, R>(
* // { id: 3, values: [ 'TSLint' ] }
* ```
*
* @param {function(value: T): K} keySelector A function that extracts the key
* @param key A function that extracts the key
* for each item.
* @param {function(value: T): R} [elementSelector] A function that extracts the
* @param element A function that extracts the
* return element for each item.
* @param {function(grouped: GroupedObservable<K,R>): Observable<any>} [durationSelector]
* @param duration
* A function that returns an Observable to determine how long each group should
* exist.
* @param {function(): Subject<R>} [subjectSelector] Factory function to create an
* @param connector Factory function to create an
* intermediate Subject through which grouped elements are emitted.
* @return A function that returns an Observable that emits GroupedObservables,
* each of which corresponds to a unique key value and each of which emits
* those items from the source Observable that share that key value.
*
* @deprecated Use the options parameter instead.
*/
export function groupBy<T, K, R>(
key: (value: T) => K,
element?: (value: T) => R,
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
connector?: () => Subject<R>
): OperatorFunction<T, GroupedObservable<K, R>>;

// Impl
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>
elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T>,
duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
connector?: () => SubjectLike<any>
): OperatorFunction<T, GroupedObservable<K, R>> {
return operate((source, subscriber) => {
let element: ((value: any) => any) | void;
if (!elementOrOptions || typeof elementOrOptions === 'function') {
element = elementOrOptions;
} else {
({ duration, element, connector } = elementOrOptions);
}

// A lookup for the groups that we have so far.
const groups = new Map<K, Subject<any>>();
const groups = new Map<K, SubjectLike<any>>();

// Used for notifying all groups and the subscriber in the same way.
const notify = (cb: (group: Observer<any>) => void) => {
Expand Down Expand Up @@ -153,15 +193,15 @@ export function groupBy<T, K, R>(
let group = groups.get(key);
if (!group) {
// Create our group subject
groups.set(key, (group = subjectSelector ? subjectSelector() : new Subject<any>()));
groups.set(key, (group = connector ? connector() : new Subject<any>()));

// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
// because the grouped observable has special semantics around reference counting
// to ensure we don't sever our connection to the source prematurely.
const grouped = createGroupedObservable(key, group);
subscriber.next(grouped);

if (durationSelector) {
if (duration) {
const durationSubscriber = new OperatorSubscriber(
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
// wnen the duration subscription is torn down. That is important, because then
Expand All @@ -185,12 +225,12 @@ export function groupBy<T, K, R>(
);

// Start our duration notifier.
groupBySourceSubscriber.add(durationSelector(grouped).subscribe(durationSubscriber));
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
}
}

// Send the value to our group.
group.next(elementSelector ? elementSelector(value) : value);
group.next(element ? element(value) : value);
} catch (err) {
handleError(err);
}
Expand All @@ -214,7 +254,7 @@ export function groupBy<T, K, R>(
* @param key The key of the group
* @param groupSubject The subject that fuels the group
*/
function createGroupedObservable(key: K, groupSubject: Subject<any>) {
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
groupBySourceSubscriber.activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
Expand Down

0 comments on commit 7a99397

Please sign in to comment.