Skip to content

TypeScript library that contains handy rxjs 6 operators.

License

Notifications You must be signed in to change notification settings

reactgular/observables

Repository files navigation

Build Status Coverage Status npm version

What is Observables?

Observables is a small Rxjs 6 library that contains handy operators and utilities.

Why use this rxjs library?

This library contains operators and utilities that solve some very common problems that I face with Rxjs. Here is a quick list of features that I use most often in projects.

Installation

To get started, install the package from npm.

npm install @reactgular/observables

This package requires the Rxjs 6 as a peer dependency.

npm install rxjs

Usage

Operators and utilities are imported from the package path @reactgular/observables.

For example;

import {Observable} from 'rxjs';
import {windowResize, distinctStringify} from '@reactgular/observables';

function conditionalResize(cond$: Observable<boolean>): Observable<{}> {
    return windowResize(250).pipe(
      enabledWhen(cond$)
    );
}

Operators

Here is a list of observable operators that you can use from this library.

Operators Operators Operators Operators Operators Operators
after before beforeError counter disabledWhen distinctArray
distinctDeepEqual distinctStringify enabledWhen falsy historyBuffer ifOp
loadFirst mapFirst mapLast negate pluckDistinct scanLatestFrom
truthy withMergeMap withSwitchMap

Utilities

Here is a list of utility functions that you can use from this library.

Operators Operators Operators Operators Operators Operators
combineEarliest mergeChain mergeDelayError mergeTrim roundRobin switchChain
toObservable windowResize

Operators List

after

Emits the value that came after the value that passed the provided condition.

This operator has the following limitations:

  • This operator will never emit if the observable only emits one or fewer values.
  • This operator will never emit the first value.
  • If no values pass the provided condition, then nothing is emitted.
after<T>(cond: (current: T, next: T) => boolean): MonoTypeOperatorFunction<T>

Example:

of('starting', 'started', 'error', 'restarting').pipe(
    after(v => v === 'error')
).subscribe(v => console.log(v)); // prints "restarting"

[source] [tests] [up]


before

Emits the value that came before the value that passed the provided condition.

This operator has the following limitations:

  • This operator will never emit if the observable only emits one or fewer values.
  • This operator will never emit the last value.
  • If no values pass the provided condition, then nothing is emitted.
before<T>(cond: (current: T, prev: T) => boolean): MonoTypeOperatorFunction<T>

Example:

of('starting', 'started', 'error', 'restarting').pipe(
    before(v => v === 'error')
).subscribe(v => console.log(v)); // prints "started"

[source] [tests] [up]


beforeError

Emits an array of values that came before an error and silences the error. You can specify how many values to emit upon an error (the default is 1). The emitted array contains the most recent value first followed by older values.

This is a good operator for debugging to see what values preceded an error.

Example:

of('starting','started','restarting').pipe(
    map(n => {
        if(n === 'restarting') { throw new Error() }
        return n;
    }),
    beforeError()
}).subscribe(v => console.log(v)); // prints ["started"]

[source] [tests] [up]


counter

Increments a counter for each emitted value.

counter<T>(): OperatorFunction<T, [number, T]>

Example:

of('a', 'b', 'c', 'd').pipe(
    counter()
).subscribe(v => console.log(v));
// [1, 'a']
// [2, 'b']
// [3, 'c']
// [4, 'd']    

[source] [tests] [up]


disabledWhen

The inner observable can emit a truthy value to stop the emitting of values from the outer observable, and emit a falsy to resume emitting values.

Does not emit any values until the inner observable emits a falsy value.

disabledWhen<T>(disabled$: Observable<boolean>): MonoTypeOperatorFunction<T>

[source] [tests] [up]


distinctArray

Only emits when an array contains different values than the last and ignores the order of those values. The array must contain sortable values otherwise the results are unpredictable.

This operator sorts each array value before comparison.

distinctArray<T>(): MonoTypeOperatorFunction<T[]>

Example:

of([1,2,3], [3,2,1], [1, 3, 2], [4, 5, 6], [1, 2, 3]).pipe(
    distinctArray()
).subscribe(v => console.log(v));
// prints
// [1,2,3]
// [4,5,6]
// [1,2,3]

[source] [tests] [up]


distinctDeepEqual

Only emits when the current value is deeply different than the last. Two values that have different references, but contain the same properties will be compared to be the same. This is the same for arrays, nested objects, dates and regular expressions.

distinctDeepEqual<T>(): MonoTypeOperatorFunction<T>

Example:

of([1,2],[2,1],{a:1, b:1},{b:1, a:1}).pipe(
    distinctDeepEqual()
).subscribe(v => console.log(v));
// prints
// [1,2]
// {a:1, b:1}

[source] [tests] [up]


distinctStringify

Emits all items from the source Observable that are distinct by comparison using JSON.stringify() on each value.

Arrays with same values in different orders will be seen as different, and the same for objects with properties in different order.

distinctStringify<T>(): MonoTypeOperatorFunction<T>

Example:

of([1,2,3], [1,2,3], [3,2,1], {a: 1}, {a: 1}, {a: 1, b: 1}, {b: 1, a: 1}, "one", "one", "two")
    .pipe(distinctStringify())
    .subscribe(v => console.log(v));

// [1,2,3]
// [3,2,1]
// {a: 1}
// {a: 1, b: 1}
// {b: 1, a: 1}
// "one"
// "two"

[source] [tests] [up]


enabledWhen

The inner observable can emit a falsy value to stop the emitting of values from the outer observable, and emit a truthy to resume emitting values.

Does not emit any values until the inner observable emits a truthy value.

enabledWhen<T>(enabled: Observable<boolean>): MonoTypeOperatorFunction<T>

[source] [tests] [up]


falsy

Emits only falsy values. Performs a filter(v => !v) operator internally.

falsy<T>(): MonoTypeOperatorFunction<T>

Example:

of(0, "Hello", false, [1,2], "")
    .pipe(falsy(), toArray())
    .subscribe(v => console.log(v)); // prints [0, false, ""]

[source] [tests] [up]


historyBuffer

Emits an array that starts with the current value followed by previous values. Pass a count number to limit the length of the array, otherwise the array will continue to grow in length until the observable completes.

historyBuffer<T>(count?: number): OperatorFunction<T, T[]>

Example:

of(1,2,3,4,5).pipe(
   bufferHistory(3)
).subscribe(v => console.log(v));
// [1]
// [2,1]
// [3,2,1]
// [4,3,2]
// [5,4,3]

[source] [tests] [up]


ifOp

Apply an operator based on a condition. This operator only adds another operator when the condition is true. When the condition is false the source observable is not modified.

ifOp<T, R>(cond: boolean, operator: OperatorFunction<T, R>): OperatorFunction<T, T | R>

Examples:

Creates an observable of Window resize events with optional debouncing.

windowResize(debounce?: number) {
   return fromEvent(window, 'resize').pipe(
      ifOp(Boolean(debounce), debounceTime(debounce))
   );
}

If you are looking to apply two different operators based upon a conditional if/else, then you can use a simple ?: condition in the pipe() chain.

function switchOrMerge(cond: boolean): Observable<number> {
    const projector = (value) => of(value).pipe(startWith(99));
    return of(1,2,3).pipe(
       cond ? switchMap(projector) : mergeMap(projector)
    );
}

[source] [tests] [up]


loadFirst

Emits objects that describe the loading of data from a remote resource (like making a HTTP request). The objects contain the status property which can be either "start", "value" or "error", and a value property which holds the first data emitted by the outer observable.

This operator only reads the first value from the outer observable, and then completes.

There is always a start object emitted first followed by either a value or error object. The error object can be a thrown error or the result of the outer observable completing without any results.

export interface LoadFirst<T> {
    status: string;
    value: T | undefined;
}

loadFirst<T, S, E>(start?: S, empty?: E): OperatorFunction<T, LoadFirst<T | S | E>>

Example:

of("Hello World").pipe(
    loadFirst()
).subscribe(v => console.log(v));
// prints 
// {state: "start", value: undefined}
// {state: "value", value: "Hello World"}

You can use this operator to make loading indicators for Angular components.

Example:

import {loadFirst, LoadFirst} from '@reactgular/observable/operators';

@Component({
    selector: 'example',
    template: `
        <ng-container *ngIf="load$ | async as load" [ngSwitch]="load.status">
            <div *ngSwitchCase='"start"'>
                Please wait while loading...
            </div>
            <div *ngSwitchCase='"value"'>
                {{load.value}}
            </div>
            <div *ngSwitchCase='"error"'>
                There was an error loading data...
            </div>
        </ng-container>`
})
export class ExampleComponent implements OnInit {
    public load$: Observable<LoadFirst<any>>;
   
    public constructor(private http: HttpClient) { }
   
    public ngOnInit() {
        this.data$ = this.http
            .get('https://example.com/api')
            .pipe(loadFirst());    
    }
}

[source] [tests] [up]


mapFirst

Applies a given project function to the first value emitted by the source Observables, and emits the resulting value. Only the first value is projected and subsequent values are emitted without projection.

This operator is an alias for doing map((value, indx) => indx === 0 ? project(value) : value)

Example:

of(1,2,3,4).pipe(
    mapFirst(v => v * 100)
).subscribe(v => console.log(v)); 
// 100
// 2
// 3
// 4

[source] [tests] [up]


mapLast

Applies a given project function to the last value emitted by the source Observables, and emits the resulting value. Only the last value is projected and previous values are emitted without projection. This operator uses pairwise() internally and emits each value only when a next value is emitted or the source observable completes.

If you use both mapFirst() and mapLast() on an observable that emits only a single value and completes, then both operators will project on the same value.

This operator has the following limitations:

  • Each emitted value is the previous value from the source observable, and the last value is flushed out when the source completes.
  • Projects the last value even if the observable emits only a single value and completes.
mapLast<T, R>(project: (value: T) => R): OperatorFunction<T, T | R>

Example:

of(1,2,3).pipe(
    mapLast(v => v + 1000)
).subscribe(v => console.log(v));
// 1
// 2
// 1003

[source] [tests] [up]


negate

Maps truthy values to false, and falsy values to true. Performs a map(v => !v) internally.

negate<T>(): OperatorFunction<T, boolean>

Example:

of(0, "Hello", false, [1,2,3], "").pipe(
    negate(),
    toArray()
).subscribe(v => console.log(v));
// prints [true, false, true, false, true]

[source] [tests] [up]


pluckDistinct

Maps each source value (an object) to its specified nested property, and only emits distinct changes. It is the same as applying a pluck() followed by a distinctUntilChanged().

pluckDistinct<T, R>(...properties: string[]): OperatorFunction<T, R>

Example:

from([
    {name: 'John Smith'},
    {name: 'John Smith'},
    {name: 'Jane Doe'},
    {name: 'Jane Doe'}
]).pipe(
    pluckDistinct('name'),
    toArray()
).subscribe(v => console.log(v)); // prints ['John Smith', 'Jane Doe']

[source] [tests] [up]


scanLatestFrom

Applies an accumulator function over the source Observable, and returns each intermediate result. The seed value is the latest value from the second observable. If the source observable emits multiple values before the second observable emits a value, then the latest from both observables will be used instead. Accumulated values are discarded when the second observable emits a seed value.

Accumulated values are discarded when the second observable emits a seed value, and a new value is calculate using the accumulator function.

Accumulator function parameters:

  • acc is the accumulated value and is either the latest value from the second observable or the previous value from the accumulator.
  • value is the value from the source observable.
  • index is the offset number from the source observable.
  • reset is true when the acc parameter has been reset by the second observable emitting a value.
scanLatestFrom<T, A, R>(accumulator: (acc: A | R, value: T, index: number, reset: boolean) => R, latest: Observable<A>): OperatorFunction<T, R>

[source] [tests] [up]


truthy

Emits only truthy values. This operator is an alias for filter(v => Boolean(v)), but most people write filter(Boolean) because it's shorter. The problem with using filter(Boolean) is that the observable type can change to Boolean by TypeScript. So using truthy() is a shorter alias for the longer form that persists the generic type.

truthy<T>(): MonoTypeOperatorFunction<T>

Example:

of(0, false, [1,2,3], "Hello", "", {}).pipe(
    truthy(),
    toArray()
).subscribe(v => console.log(v));
// prints [[1,2,3], "Hello", {}]

[source] [tests] [up]


withMergeMap

Applies a mergeMap to the outer observable, and maps the inner observable to an array that contains the value of both the outer and inner observables as Observable<[outer, inner]>.

withMergeMap<T, R>(inner: (x: T) => Observable<R>): OperatorFunction<T, [T, R]>

Example:

of('A', 'B', 'C').pipe(
    withMergeMap(() => of('1'))
).subscribe(v => console.log(v));
// ['A', '1']
// ['B', '1']
// ['C', '1']

[source] [tests] [up]


withSwitchMap

Applies a switchMap to the outer observable, and maps the inner observable to an array that contains the value of both the outer and inner observables as Observable<[outer, inner]>.

withSwitchMap<T, R>(inner: (x: T) => Observable<R>): OperatorFunction<T, [T, R]>

Example:

of('A', 'B', 'C').pipe(
    withSwitchMap(() => of('1'))
).subscribe(v => console.log(v));
// ['A', '1']
// ['B', '1']
// ['C', '1']

[source] [tests] [up]


Utilities List

combineEarliest

Unlike combineLatest() which does not emit a value until all observables emits at least one value. The combineEarliest() emits immediately upon the first observable that emits a value substituting a value (defaults to undefined) for any awaiting values from the other observables.

combineEarliest<O extends Observable<any>, S, R>(observables: O[], substitute?: S): Observable<R>

Example:

combineEarliest([
    interval(1000),
    of('A').pipe(delay(1000)),
    of('B').pipe(delay(2000))
]).pipe(take(3)).subscribe(v => console.log(v));

// [0, undefined, undefined]
// [1, 'A', undefined]
// [2, 'A', 'B']

[source] [tests] [up]


mergeChain

When the source observable emits a value it is passed to the next switchTo function which returns another observable, and the value from that observable is passed onto the next switchTo function. It creates a new observable that emits an array of all values emitted from chained observables.

Uses mergeMap() internally to chain the functions together.

mergeChain<T, R>(source: Observable<T>, ...mergeTo: Array<(...values: any[]) => Observable<any>>): Observable<R>

Example:

mergeChain(
    store.select('company'),
    (company) => store.selectPriceChanges(company.id),
    (price, company) => store.selectPriceUpdates(price.id)
).subscribe(([changes, price, company]) => console.log(changes, price, company));

[source] [tests] [up]


mergeDelayError

Creates an output observable which concurrently emits all values from every given input observable, but delays any thrown errors until all observables have completed, and throws the first error.

All observables must complete before any awaiting error are thrown.

mergeDelayError<T>(...observables: Observable<T>[]): Observable<T>

Example:

mergeDelayError(
    of(1,2,3),
    throwError('ERROR')
).subscribe(
    v => console.log(v),
    err => console.error(err)
);
// prints
// 1
// 2
// 3
// ERROR

[source] [tests] [up]


mergeTrim

Creates an output observable which concurrently emits all values from every given input observable until any observable completes.

mergeTrim<T>(...observables: Observable<T>[]): Observable<T>

[source] [tests] [up]


roundRobin

Creates an output observable which emits values from each observable in a round robin sequence. Where the first observable must emit a value, before the next observable emits a value and starts over after all observables have emitted a value.

function roundRobin<T>(...observables: Observable<T>[]): Observable<T>

[source] [tests] [up]


switchChain

When the source observable emits a value it is passed to the next switchTo function which returns another observable, and the value from that observable is passed onto the next switchTo function. It creates a new observable that emits an array of all values emitted from chained observables.

Uses switchMap() internally to chain the functions together.

switchChain<T, R>(source: Observable<T>, ...switchTo: Array<(...values: any[]) => Observable<any>>): Observable<R>

Example:

switchChain(
    http.get('/user'),
    (user) => http.get(`/projects/${user.projectId}`),
    (project, user) => http.get(`/company/${project.companyId}`),
    (company, project, user) => http.get(`/brand/${company.brandId}`)
).subscribe(([brand, company, project, user]) => console.log(brand, company, project, user));

[source] [tests] [up]


toObservable

Converts the parameter to an observable, or returns the value if already an observable.

toObservable<T>(value: T | Observable<T>): Observable<T>

Example:

An example where an array of values is converted into an array of observables.

const values = [100, of(200), 300];
forkJoin(values.map(toObservable))
    .subscribe(v => console.log(v));
// prints [100, 200, 300]

[source] [tests] [up]


windowResize

Emits changes in the window size with optional debounce time.

windowResize(debounce?: number, wnd?: Window): Observable<{ innerWidth: number, innerHeight: number }>

Example:

Creates an observable of the window aspect ratio.

const aspect$ = windowResize(250).pipe(
   map(({innerWidth, innerHeight}) => innerWidth / innerHeight)
);

[source] [tests] [up]