Skip to content

Commit

Permalink
feat(sequenceEqual): compareTo should support ObservableInput (#7102)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakovljevic-mladen committed Dec 15, 2022
1 parent 60d6c40 commit d501961
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
6 changes: 6 additions & 0 deletions spec-dtslint/operators/sequenceEqual-spec.ts
Expand Up @@ -16,3 +16,9 @@ it('should enforce compareTo to be the same type of Observable', () => {
it('should infer correctly given comparator parameter', () => {
const a = of(1, 2, 3).pipe(sequenceEqual(of(1), (val1, val2) => val1 === val2)); // $ExpectType Observable<boolean>
});

it('should support Promises', () => {
of(1, 2, 3).pipe(sequenceEqual(Promise.resolve(1))); // $ExpectType Observable<boolean>
// Enforce the same types produced by Promise and source observable
of(1, 2, 3).pipe(sequenceEqual(Promise.resolve('foo'))); // $ExpectError
});
21 changes: 11 additions & 10 deletions src/internal/operators/sequenceEqual.ts
@@ -1,8 +1,7 @@
import { Observable } from '../Observable';

import { OperatorFunction } from '../types';
import { OperatorFunction, ObservableInput } from '../types';
import { operate } from '../util/lift';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { innerFrom } from '../observable/innerFrom';

/**
* Compares all values of two observables in sequence using an optional comparator function
Expand All @@ -13,7 +12,8 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
*
* ![](sequenceEqual.png)
*
* `sequenceEqual` subscribes to two observables and buffers incoming values from each observable. Whenever either
* `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally
* gets converted to an observable) and buffers incoming values from each observable. Whenever either
* observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
* up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
* observables completes, the operator will wait for the other observable to complete; If the other
Expand Down Expand Up @@ -53,14 +53,15 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* @see {@link zip}
* @see {@link withLatestFrom}
*
* @param {Observable} compareTo The observable sequence to compare the source sequence to.
* @param {function} [comparator] An optional function to compare each value pair
* @param compareTo The `ObservableInput` sequence to compare the source sequence to.
* @param comparator An optional function to compare each value pair.
*
* @return A function that returns an Observable that emits a single boolean
* value representing whether or not the values emitted by the source
* Observable and provided Observable were equal in sequence.
* Observable and provided `ObservableInput` were equal in sequence.
*/
export function sequenceEqual<T>(
compareTo: Observable<T>,
compareTo: ObservableInput<T>,
comparator: (a: T, b: T) => boolean = (a, b) => a === b
): OperatorFunction<T, boolean> {
return operate((source, subscriber) => {
Expand Down Expand Up @@ -94,7 +95,7 @@ export function sequenceEqual<T>(
// at the appropriate time.
complete ? emit(false) : selfState.buffer.push(a);
} else {
// If the other stream *does* have values in it's buffer,
// If the other stream *does* have values in its buffer,
// pull the oldest one off so we can compare it to what we
// just got. If it wasn't a match, emit `false` and complete.
!comparator(a, buffer.shift()!) && emit(false);
Expand All @@ -119,7 +120,7 @@ export function sequenceEqual<T>(

// Subscribe to each source.
source.subscribe(createSubscriber(aState, bState));
compareTo.subscribe(createSubscriber(bState, aState));
innerFrom(compareTo).subscribe(createSubscriber(bState, aState));
});
}

Expand Down

0 comments on commit d501961

Please sign in to comment.