Skip to content

Commit

Permalink
chore: rebased. There are still type issues
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Apr 29, 2021
1 parent f7db0dd commit 9c26c40
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
4 changes: 2 additions & 2 deletions spec/operators/groupBy-spec.ts
Expand Up @@ -107,8 +107,8 @@ describe('groupBy operator', () => {

of(1, 2, 3).pipe(
groupBy({
key: x => x % 2,
subject: () => new ReplaySubject(1)
key: (x) => x % 2,
subject: () => new ReplaySubject(1),
}),
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
Expand Down
103 changes: 72 additions & 31 deletions src/internal/operators/groupBy.ts
@@ -1,28 +1,52 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/from';
import { Subject } from '../Subject';
import { Observer, OperatorFunction } from '../types';
import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

interface BasicGroupByOptions<K, T> {
key: (value: T) => K;
element?: undefined;
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
connector?: () => SubjectLike<T>;
}

interface GroupByOptionsWithElement<K, E, T> {
key: (value: T) => K;
element: (value: T) => E;
duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
connector?: () => SubjectLike<E>;
}

export function groupBy<T, K>(options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;

export function groupBy<T, K, E>(options: GroupByOptionsWithElement<K, E, T>): OperatorFunction<T, GroupedObservable<K, E>>;

export function groupBy<T, K, E>(options: GroupByOptionsWithElement<K, E, T>): OperatorFunction<T, GroupedObservable<K, E>>;

export function groupBy<T, K extends T>(
keySelector: (value: T) => value is K
key: (value: T) => value is K
): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
export function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;

export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;

/**
* @deprecated use the options parameter instead.
*/
export function groupBy<T, K>(
keySelector: (value: T) => K,
elementSelector: void,
durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>
key: (value: T) => K,
element: void,
duration: (grouped: GroupedObservable<K, T>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, T>>;

/**
* @deprecated use the options parameter instead.
*/
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, R>>;
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>
key: (value: T) => K,
element?: (value: T) => R,
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
): OperatorFunction<T, GroupedObservable<K, R>>;

/**
Expand All @@ -32,7 +56,7 @@ export function groupBy<T, K, R>(
*
* ![](groupBy.png)
*
* When the Observable emits an item, a key is computed for this item with the keySelector function.
* When the Observable emits an item, a key is computed for this item with the key function.
*
* If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
* {@link GroupedObservable} for this key is created and emits.
Expand All @@ -41,7 +65,7 @@ export function groupBy<T, K, R>(
* key is available as the `key` field of a {@link GroupedObservable} instance.
*
* The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
* returned by the elementSelector function.
* returned by the element function.
*
* ## Examples
*
Expand Down Expand Up @@ -101,28 +125,45 @@ export function groupBy<T, K, R>(
* // { id: 3, values: [ 'TSLint' ] }
* ```
*
* @param {function(value: T): K} keySelector A function that extracts the key
* @param key A function that extracts the key
* for each item.
* @param {function(value: T): R} [elementSelector] A function that extracts the
* @param element A function that extracts the
* return element for each item.
* @param {function(grouped: GroupedObservable<K,R>): Observable<any>} [durationSelector]
* @param duration
* A function that returns an Observable to determine how long each group should
* exist.
* @param {function(): Subject<R>} [subjectSelector] Factory function to create an
* @param connector Factory function to create an
* intermediate Subject through which grouped elements are emitted.
* @return A function that returns an Observable that emits GroupedObservables,
* each of which corresponds to a unique key value and each of which emits
* those items from the source Observable that share that key value.
*
* @deprecated Use the options parameter instead.
*/
export function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>
key: (value: T) => K,
element?: (value: T) => R,
duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
connector?: () => Subject<R>
): OperatorFunction<T, GroupedObservable<K, R>>;

// Impl
export function groupBy<T, K, R>(
optionsOrKeySelector: BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T> | ((value: T) => K),
element?: ((value: any) => any) | void,
duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
connector?: () => SubjectLike<any>
): OperatorFunction<T, GroupedObservable<K, R>> {
return operate((source, subscriber) => {
let keySelector: (value: T) => K;
if (typeof optionsOrKeySelector === 'function') {
keySelector = optionsOrKeySelector;
} else {
({ key: keySelector, duration, element, connector } = optionsOrKeySelector);
}

// A lookup for the groups that we have so far.
const groups = new Map<K, Subject<any>>();
const groups = new Map<K, SubjectLike<any>>();

// Used for notifying all groups and the subscriber in the same way.
const notify = (cb: (group: Observer<any>) => void) => {
Expand Down Expand Up @@ -153,15 +194,15 @@ export function groupBy<T, K, R>(
let group = groups.get(key);
if (!group) {
// Create our group subject
groups.set(key, (group = subjectSelector ? subjectSelector() : new Subject<any>()));
groups.set(key, (group = connector ? connector() : new Subject<any>()));

// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
// because the grouped observable has special semantics around reference counting
// to ensure we don't sever our connection to the source prematurely.
const grouped = createGroupedObservable(key, group);
subscriber.next(grouped);

if (durationSelector) {
if (duration) {
const durationSubscriber = new OperatorSubscriber(
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
// wnen the duration subscription is torn down. That is important, because then
Expand All @@ -185,12 +226,12 @@ export function groupBy<T, K, R>(
);

// Start our duration notifier.
groupBySourceSubscriber.add(durationSelector(grouped).subscribe(durationSubscriber));
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
}
}

// Send the value to our group.
group.next(elementSelector ? elementSelector(value) : value);
group.next(element ? element(value) : value);
} catch (err) {
handleError(err);
}
Expand All @@ -214,7 +255,7 @@ export function groupBy<T, K, R>(
* @param key The key of the group
* @param groupSubject The subject that fuels the group
*/
function createGroupedObservable(key: K, groupSubject: Subject<any>) {
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
groupBySourceSubscriber.activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
Expand Down

0 comments on commit 9c26c40

Please sign in to comment.