Skip to content

Commit

Permalink
fix(throttle): now properly trailing throttles for individual values
Browse files Browse the repository at this point in the history
BREAKING CHANGES: This changes the behavior of throttle, in particular
throttling with both leading and trailing behaviors set to true, to more
closely match the throttling behavior of lodash and other libraries.
Throttling now starts immediately after any emission from the
observable, and values will not be double emitted for both leading and
trailing values.

fixes ReactiveX#2859
  • Loading branch information
benlesh authored and micha149 committed Feb 27, 2018
1 parent 2f9efbf commit df689e5
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 67 deletions.
85 changes: 57 additions & 28 deletions spec/operators/throttle-spec.ts
Expand Up @@ -3,7 +3,13 @@ import * as Rx from '../../src/Rx';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';

declare const type;
declare function asDiagram(arg: string): Function;
declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
declare const time: typeof marbleTestingSignature.time;
declare const rxTestScheduler: typeof marbleTestingSignature.rxTestScheduler;

const Observable = Rx.Observable;

Expand Down Expand Up @@ -209,28 +215,24 @@ describe('Observable.prototype.throttle', () => {
});

it('should propagate error thrown from durationSelector function', () => {
const e1 = hot('abcdefabcdabcdefghabca| ');
const e1subs = '^ ! ';
const e2 = [cold('-----| '),
cold( '---| '),
cold( '-------| ')];
const e2subs = ['^ ! ',
' ^ ! '];
const expected = 'a-----a---# ';
const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|');
const s1Subs = '^ !';
const n1 = cold( '----|');
const n1Subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const exp = '---x-----x-----x-----(e#)';

let i = 0;
const result = e1.throttle(() => {
if (i === 2) {
throw 'error';
const result = s1.throttle(() => {
if (i++ === 3) {
throw new Error('lol');
}
return e2[i++];
return n1;
});

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
for (let j = 0; j < e2subs.length; j++) {
expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]);
}
expectObservable(result).toBe(exp, undefined, new Error('lol'));
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});

it('should complete when source does not emit', () => {
Expand Down Expand Up @@ -353,20 +355,36 @@ describe('Observable.prototype.throttle', () => {

describe('throttle(fn, { leading: true, trailing: true })', () => {
asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-a---y----b---x-c---x-|';
const e1 = hot('-a-xy-----b--x--cxxx------|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-a---y----b---x---x---x---|';

const result = e1.throttle(() => e2, { leading: true, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should work for individual values', () => {
const s1 = hot('-^-x------------------|');
const s1Subs = '^ !';
const n1 = cold( '------------------------|');
const n1Subs = [' ^ !'];
const exp = '--x------------------|';

const result = s1.throttle(() => n1, { leading: true, trailing: true });
expectObservable(result).toBe(exp);
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});
});

describe('throttle(fn, { leading: false, trailing: true })', () => {
Expand All @@ -375,15 +393,26 @@ describe('Observable.prototype.throttle', () => {
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-----y--------x-----x-|';
' ^ ! ',
' ^ !'];
const expected = '-----y--------x---x---|';

const result = e1.throttle(() => e2, { leading: false, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should work for individual values', () => {
const s1 = cold('---------x---------x--------x------|');
const n = cold('----(n|)');
const exp = '-------------x---------x--------x--|';

expectObservable(s1.throttle(() => n, { trailing: true }))
.toBe(exp);
});
});
});
75 changes: 36 additions & 39 deletions src/internal/operators/throttle.ts
Expand Up @@ -83,9 +83,9 @@ class ThrottleOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
private throttled: Subscription;
private _trailingValue: T;
private _hasTrailingValue = false;
private _throttled: Subscription;
private _sendValue: T;
private _hasValue = false;

constructor(protected destination: Subscriber<T>,
private durationSelector: (value: T) => SubscribableOrPromise<any>,
Expand All @@ -95,26 +95,35 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
}

protected _next(value: T): void {
if (this.throttled) {
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
} else {
const duration = this.tryDurationSelector(value);
if (duration) {
this.add(this.throttled = subscribeToResult(this, duration));
}
this._hasValue = true;
this._sendValue = value;

if (!this._throttled) {
if (this._leading) {
this.destination.next(value);
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
this.send();
} else {
this.throttle(value);
}
}
}

private send() {
const { _hasValue, _sendValue } = this;
if (_hasValue) {
this.destination.next(_sendValue);
this.throttle(_sendValue);
}
this._hasValue = false;
this._sendValue = null;
}

private throttle(value: T): void {
const duration = this.tryDurationSelector(value);
if (duration) {
this.add(this._throttled = subscribeToResult(this, duration));
}
}

private tryDurationSelector(value: T): SubscribableOrPromise<any> {
try {
return this.durationSelector(value);
Expand All @@ -124,37 +133,25 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

protected _unsubscribe() {
const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this;

this._trailingValue = null;
this._hasTrailingValue = false;

if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
private throttlingDone() {
const { _throttled, _trailing } = this;
if (_throttled) {
_throttled.unsubscribe();
}
}
this._throttled = null;

private _sendTrailing() {
const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this;
if (throttled && _trailing && _hasTrailingValue) {
destination.next(_trailingValue);
this._trailingValue = null;
this._hasTrailingValue = false;
if (_trailing) {
this.send();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this._sendTrailing();
this._unsubscribe();
this.throttlingDone();
}

notifyComplete(): void {
this._sendTrailing();
this._unsubscribe();
this.throttlingDone();
}
}

0 comments on commit df689e5

Please sign in to comment.