Skip to content

Commit

Permalink
feat(groupBy): Support named arguments, support ObservableInputs for …
Browse files Browse the repository at this point in the history
…duration selector

- Adds support for named arguments.
- Adds support for returning promises, et al, from the duration selector.

NOTES:

* There's a problem with type inferrence from the options `duration` selector that is noted in a `TODO` in the code here.
* The tests for `groupBy` appear to be EXTREMELY old and outdated, and I was unable to updated them easily to use run mode. We may have to rewrite them all at some point to use better techniques. The issue seems to be a rudementary means of testing the inner observables that is incompatible with run mode.
* Docs still need updated
* Older paths still need to be deprecated
* dtslint tests need to be added
  • Loading branch information
benlesh committed Aug 25, 2020
1 parent fd1e039 commit b21d811
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 72 deletions.
75 changes: 38 additions & 37 deletions spec/operators/groupBy-spec.ts
Expand Up @@ -42,7 +42,7 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2)
groupBy((x) => x % 2)
).subscribe((g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);
Expand All @@ -60,7 +60,7 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2, (x: number) => x + '!')
groupBy((x) => x % 2, (x) => x + '!')
).subscribe((g: any) => {
const expectedGroup = expectedGroups.shift()!;
expect(g.key).to.equal(expectedGroup.key);
Expand All @@ -82,21 +82,21 @@ describe('groupBy operator', () => {
const resultingGroups: { key: number, values: number [] }[] = [];

of(1, 2, 3, 4, 5, 6).pipe(
groupBy(
(x: number) => x % 2,
(x: number) => x,
(g: any) => g.pipe(skip(1)))
).subscribe((g: any) => {
let group = { key: g.key, values: [] as number[] };

g.subscribe((x: any) => {
group.values.push(x);
});
groupBy({
key: x => x % 2,
duration: g => g.pipe(skip(1))
})
).subscribe((g: any) => {
let group = { key: g.key, values: [] as number[] };

resultingGroups.push(group);
g.subscribe((x: any) => {
group.values.push(x);
});

expect(resultingGroups).to.deep.equal(expectedGroups);
resultingGroups.push(group);
});

expect(resultingGroups).to.deep.equal(expectedGroups);
});

it('should group values with a subject selector', (done: MochaDone) => {
Expand All @@ -106,7 +106,10 @@ describe('groupBy operator', () => {
];

of(1, 2, 3).pipe(
groupBy((x: number) => x % 2, null as any, null as any, () => new ReplaySubject(1)),
groupBy({
key: x => x % 2,
subject: () => new ReplaySubject(1)
}),
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
delay(5)
Expand Down Expand Up @@ -802,11 +805,12 @@ describe('groupBy operator', () => {
const expectedValues = { v: v, w: w, x: x, y: y, z: z };

const source = e1
.pipe(groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
));
.pipe(
groupBy({
key: val => val.toLowerCase().trim(),
duration: group => group.pipe(skip(2)),
})
);

expectObservable(source).toBe(expected, expectedValues);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
Expand Down Expand Up @@ -836,11 +840,10 @@ describe('groupBy operator', () => {
const expectedValues = { v: v, w: w, x: x };

const source = e1
.pipe(groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
));
.pipe(groupBy({
key: val => val.toLowerCase().trim(),
duration: group => group.pipe(skip(2))
}));

expectObservable(source, unsub).toBe(expected, expectedValues);
});
Expand Down Expand Up @@ -879,17 +882,16 @@ describe('groupBy operator', () => {
.unsubscribedFrame;

const source = e1.pipe(
groupBy(
(val: string) => val.toLowerCase().trim(),
(val: string) => val,
(group: any) => group.pipe(skip(2))
),
map((group: any) => {
groupBy({
key: val => val.toLowerCase().trim(),
duration: group => group.pipe(skip(2))
}),
map((group) => {
const arr: any[] = [];

const subscription = group.pipe(
phonyMarbelize()
).subscribe((value: any) => {
).subscribe((value) => {
arr.push(value);
});

Expand Down Expand Up @@ -923,11 +925,10 @@ describe('groupBy operator', () => {
.parseMarblesAsSubscriptions(sub)
.unsubscribedFrame;

obs.pipe(groupBy(
(val: string) => val,
(val: string) => val,
(group: any) => durations[group.key]
)).subscribe();
obs.pipe(groupBy({
key: (val) => val,
duration: (group) => durations[group.key]
})).subscribe();

rxTestScheduler.schedule(() => {
durations.forEach((d, i) => {
Expand Down
106 changes: 71 additions & 35 deletions src/internal/operators/groupBy.ts
@@ -1,18 +1,45 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Subject } from '../Subject';
import { OperatorFunction } from '../types';
import { OperatorFunction, ObservableInput } from '../types';
import { lift } from '../util/lift';
import { from } from '../observable/from';

/* tslint:disable:max-line-length */
export function groupBy<T, K extends T>(keySelector: (value: T) => value is K): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
// TODO: The types here aren't really inferring properly. There's an issue
// where the type of `grouped` in `duration` is `GroupedObservable<unknown, T>`,
// in all cases, and will not pick up the `K` value from `key`, where the
// return value will work fine.
export interface GroupByOptions<T, K> {
key: (value: T) => K;
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
subject?: () => Subject<T>;
}

export function groupBy<T, K>(options: GroupByOptions<T, K>): OperatorFunction<T, GroupedObservable<K, T>>;

export function groupBy<T, K extends T>(
keySelector: (value: T) => value is K
): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
export function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
export function groupBy<T, K>(keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, T>>;
export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): OperatorFunction<T, GroupedObservable<K, R>>;
export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>>;
/* tslint:enable:max-line-length */
export function groupBy<T, K>(
keySelector: (value: T) => K,
elementSelector: void,
durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, T>>;
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, R>>;
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>
): OperatorFunction<T, GroupedObservable<K, R>>;

/**
* Groups the items emitted by an Observable according to a specified criterion,
Expand Down Expand Up @@ -105,12 +132,22 @@ export function groupBy<T, K, R>(keySelector: (value: T) => K, elementSelector?:
* value.
* @name groupBy
*/
export function groupBy<T, K, R>(keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>): OperatorFunction<T, GroupedObservable<K, R>> {
return (source: Observable<T>) =>
lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
export function groupBy<T, K, R>(
optionsOrKeySelector: GroupByOptions<T, K> | ((value: T) => K),
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, any>) => ObservableInput<any>,
subjectSelector?: () => Subject<any>
): OperatorFunction<T, GroupedObservable<K, R>> {
let keySelector: (value: T) => K;
if (optionsOrKeySelector && typeof optionsOrKeySelector === 'object') {
keySelector = optionsOrKeySelector.key;
durationSelector = optionsOrKeySelector.duration;
subjectSelector = optionsOrKeySelector.subject;
} else {
keySelector = optionsOrKeySelector;
}

return (source: Observable<T>) => lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
}

export interface RefCountSubscription {
Expand All @@ -121,16 +158,17 @@ export interface RefCountSubscription {
}

class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
constructor(private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
}
constructor(
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => ObservableInput<any>,
private subjectSelector?: () => Subject<R>
) {}

call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
return source.subscribe(new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
));
return source.subscribe(
new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector)
);
}
}

Expand All @@ -144,11 +182,13 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
public attemptedToUnsubscribe: boolean = false;
public count: number = 0;

constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
constructor(
destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => ObservableInput<any>,
private subjectSelector?: () => Subject<R>
) {
super(destination);
}

Expand Down Expand Up @@ -190,9 +230,9 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
const groupedObservable = new GroupedObservable(key, group, this);
this.destination.next(groupedObservable);
if (this.durationSelector) {
let duration: any;
let duration: Observable<any>;
try {
duration = this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group));
duration = from(this.durationSelector(new GroupedObservable<K, R>(key, <Subject<R>>group)));
} catch (err) {
this.error(err);
return;
Expand Down Expand Up @@ -250,9 +290,7 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
* @extends {Ignored}
*/
class GroupDurationSubscriber<K, T> extends Subscriber<T> {
constructor(private key: K,
group: Subject<T>,
private parent: GroupBySubscriber<any, K, T | any>) {
constructor(private key: K, group: Subject<T>, private parent: GroupBySubscriber<any, K, T | any>) {
super(group);
this.add(this._teardown);
}
Expand All @@ -267,7 +305,7 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
if (parent) {
parent.removeGroup(key);
}
}
};
}

/**
Expand All @@ -280,9 +318,7 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> {
*/
export class GroupedObservable<K, T> extends Observable<T> {
/** @deprecated Do not construct this type. Internal use only */
constructor(public key: K,
private groupSubject: Subject<T>,
private refCountSubscription?: RefCountSubscription) {
constructor(public readonly key: K, private groupSubject: Subject<T>, private refCountSubscription?: RefCountSubscription) {
super();
}

Expand Down

0 comments on commit b21d811

Please sign in to comment.