Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: AbortSignal support first value from last value from #6675

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
845 changes: 0 additions & 845 deletions api_guard/dist/types/index.d.ts

This file was deleted.

43 changes: 40 additions & 3 deletions spec/firstValueFrom-spec.ts
@@ -1,4 +1,5 @@
import { interval, firstValueFrom, EMPTY, EmptyError, throwError, of, Observable } from 'rxjs';
/** @prettier */
import { interval, firstValueFrom, EMPTY, EmptyError, throwError, of, Observable, NEVER, AbortError } from 'rxjs';
import { expect } from 'chai';
import { finalize } from 'rxjs/operators';

Expand Down Expand Up @@ -61,7 +62,7 @@ describe('firstValueFrom', () => {

it('should stop listening to a synchronous observable when resolved', async () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>(subscriber => {
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
Expand All @@ -70,7 +71,43 @@ describe('firstValueFrom', () => {
}
});

const result = await firstValueFrom(synchronousObservable);
await firstValueFrom(synchronousObservable);
expect(sideEffects).to.deep.equal([0]);
});

if (typeof AbortController === 'function') {
it('should support abort signal', async () => {
const source = NEVER;
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => {
ac.abort();
});
let errorThrown: any;
try {
await firstValueFrom(source, { signal });
} catch (err) {
errorThrown = err;
}
expect(errorThrown).to.be.an.instanceOf(AbortError);
});

it('should support abort signal with a default value', async () => {
const source = NEVER;
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => {
ac.abort();
});
let errorThrown: any;
let result = 'not set';
try {
result = await firstValueFrom(source, { signal, defaultValue: 'bad' });
} catch (err) {
errorThrown = err;
}
expect(errorThrown).to.be.an.instanceOf(AbortError);
expect(result).to.equal('not set');
});
}
});
39 changes: 38 additions & 1 deletion spec/lastValueFrom-spec.ts
@@ -1,4 +1,4 @@
import { interval, lastValueFrom, EMPTY, EmptyError, throwError, of } from 'rxjs';
import { interval, lastValueFrom, EMPTY, EmptyError, throwError, of, AbortError, NEVER } from 'rxjs';
import { expect } from 'chai';
import { finalize, take } from 'rxjs/operators';

Expand Down Expand Up @@ -61,4 +61,41 @@ describe('lastValueFrom', () => {
expect(result).to.equal('bananas');
expect(finalized).to.be.true;
});


if (typeof AbortController === 'function') {
it('should support abort signal', async () => {
const source = NEVER;
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => {
ac.abort();
});
let errorThrown: any;
try {
await lastValueFrom(source, { signal });
} catch (err) {
errorThrown = err;
}
expect(errorThrown).to.be.an.instanceOf(AbortError);
});

it('should support abort signal with a default value', async () => {
const source = NEVER;
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => {
ac.abort();
});
let errorThrown: any;
let result = 'not set';
try {
result = await lastValueFrom(source, { signal, defaultValue: 'bad' });
} catch (err) {
errorThrown = err;
}
expect(errorThrown).to.be.an.instanceOf(AbortError);
expect(result).to.equal('not set');
});
}
});
1 change: 1 addition & 0 deletions src/index.ts
Expand Up @@ -53,6 +53,7 @@ export { firstValueFrom } from './internal/firstValueFrom';

/* Error types */
export { ArgumentOutOfRangeError } from './internal/util/ArgumentOutOfRangeError';
export { AbortError } from './internal/util/AbortError';
export { EmptyError } from './internal/util/EmptyError';
export { NotFoundError } from './internal/util/NotFoundError';
export { ObjectUnsubscribedError } from './internal/util/ObjectUnsubscribedError';
Expand Down
34 changes: 28 additions & 6 deletions src/internal/firstValueFrom.ts
@@ -1,12 +1,16 @@
import { Observable } from './Observable';
import { EmptyError } from './util/EmptyError';
import { SafeSubscriber } from './Subscriber';
import { AbortError } from './util/AbortError';
import { linkSignalToSubscription } from './util/linkSignalToSubscription';

export interface FirstValueFromConfig<T> {
defaultValue: T;
defaultValue?: T;
signal?: AbortSignal;
}

export function firstValueFrom<T, D>(source: Observable<T>, config: FirstValueFromConfig<D>): Promise<T | D>;
export function firstValueFrom<T, D>(source: Observable<T>, config: { defaultValue: D; signal?: AbortSignal }): Promise<T | D>;
export function firstValueFrom<T, D>(source: Observable<T>, config: { signal?: AbortSignal }): Promise<T>;
export function firstValueFrom<T>(source: Observable<T>): Promise<T>;

/**
Expand Down Expand Up @@ -54,22 +58,40 @@ export function firstValueFrom<T>(source: Observable<T>): Promise<T>;
* @param config a configuration object to define the `defaultValue` to use if the source completes without emitting a value
*/
export function firstValueFrom<T, D>(source: Observable<T>, config?: FirstValueFromConfig<D>): Promise<T | D> {
const hasConfig = typeof config === 'object';
return new Promise<T | D>((resolve, reject) => {
// This is creating our subscriber, which is also our subscription.
const subscriber = new SafeSubscriber<T>({
next: (value) => {
resolve(value);
// We have a value, unsubscribe as soon as we can and then emit.
subscriber.unsubscribe();
resolve(value);
},
error: reject,
complete: () => {
if (hasConfig) {
resolve(config!.defaultValue);
// We should never hit complete if we have a value! This is because we're unsubscribing
// as soon as we get a value in `next`. Therefore any call that lands here means the
// promised value never arrived.
if (config && 'defaultValue' in config) {
// If they gave use a default value it, resolve the promise with that.
resolve(config.defaultValue!);
} else {
// Otherwise, reject with an empty error because promises *must* resolve or reject.
// If we don't reject here, it will leave our promise hanging and any other promises
// that were built off of it will never resolve or reject, either.
reject(new EmptyError());
}
},
});
const signal = config?.signal;
if (signal) {
// The user provided an abort signal, wire it up.
linkSignalToSubscription(signal, subscriber, () => {
reject(new AbortError());
});
}

// Start our subscription. Notice we are not capturing the returned subscription
// because it's technically the same instance as the `subscriber` above.
source.subscribe(subscriber);
});
}
36 changes: 30 additions & 6 deletions src/internal/lastValueFrom.ts
@@ -1,11 +1,16 @@
import { Observable } from './Observable';
import { SafeSubscriber } from './Subscriber';
import { AbortError } from './util/AbortError';
import { EmptyError } from './util/EmptyError';
import { linkSignalToSubscription } from './util/linkSignalToSubscription';

export interface LastValueFromConfig<T> {
defaultValue: T;
defaultValue?: T;
signal?: AbortSignal;
}

export function lastValueFrom<T, D>(source: Observable<T>, config: LastValueFromConfig<D>): Promise<T | D>;
export function lastValueFrom<T, D>(source: Observable<T>, config: { defaultValue: D; signal?: AbortSignal }): Promise<T | D>;
export function lastValueFrom<T>(source: Observable<T>, config: { signal?: AbortSignal }): Promise<T>;
export function lastValueFrom<T>(source: Observable<T>): Promise<T>;

/**
Expand Down Expand Up @@ -53,25 +58,44 @@ export function lastValueFrom<T>(source: Observable<T>): Promise<T>;
* @param config a configuration object to define the `defaultValue` to use if the source completes without emitting a value
*/
export function lastValueFrom<T, D>(source: Observable<T>, config?: LastValueFromConfig<D>): Promise<T | D> {
const hasConfig = typeof config === 'object';
return new Promise<T | D>((resolve, reject) => {
// We must track if we have a value or not, because if
// we don't, then the promised value never arrived.
let _hasValue = false;
let _value: T;
source.subscribe({
const subscriber = new SafeSubscriber<T>({
next: (value) => {
// We have a value! The promise can resolve later.
_value = value;
_hasValue = true;
},
error: reject,
complete: () => {
if (_hasValue) {
// Happy path.
resolve(_value);
} else if (hasConfig) {
resolve(config!.defaultValue);
} else if (config && 'defaultValue' in config) {
// The observable was empty, but we have a default value we'd like to emit.
resolve(config.defaultValue!);
} else {
// if the observable is empty, and we don't have a default value, we'll reject with an EmptyError
// because promises _must_ resolve or reject. We cannot just leave this hanging.
reject(new EmptyError());
}
},
});

const signal = config?.signal;
if (signal) {
// The user provided an abort signal. Wire it up. The
// subscriber *is* the subscription.
linkSignalToSubscription(signal, subscriber, () => {
reject(new AbortError());
});
}

// Start the subscription. We are not keeping the subscription returned
// because it's technically the same instance as the subscriber.
source.subscribe(subscriber);
});
}
28 changes: 28 additions & 0 deletions src/internal/util/AbortError.ts
@@ -0,0 +1,28 @@
import { createErrorClass } from './createErrorClass';

export interface AbortError extends Error {}

export interface AbortErrorCtor {
/**
* @deprecated Internal implementation detail. Do not construct error instances.
* Cannot be tagged as internal: https://github.com/ReactiveX/rxjs/issues/6269
*/
Copy link
Contributor

@josepot josepot Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it should be tagged as "internal" because IMO this interface (and the AbortError class) shouldn't be publicly exported.

IMO if a user wants to check whether a Promise rejection was caused by an Abortion, the right way to check that would be: e instanceof Error && e.name === 'AbortError'.

What would happen, otherwise, if all libraries that want to support AbortSignal export their own version of AbortError? A user could accidentally import the AbortError instance from a different library and then trying to do e instanceof AbortError would return false for a rejection caused by an AbortSignal. That's why I think that in order to avoid that kind of ambiguity it's probably better if libraries that want to support AbortSignal do not expose their AbortError classes, and treat them as an internal implementation detail.

Although, I would be curios to know what @benjamingr thoughts are in this regard, because I'm sure that he must have already put some thought on what's the best way to detect rejections caused by an AbortSignal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josepot I wish I had a good answer but WHATWG are pretty much in "you shouldn't do that" camp and have recently made a change that means you can controller.abort(anyError) and then you are expected to throw signal.reason.

This is quite frustrating to me (possibly because I don't understand it well) but I don't have the capacity to engage with them and resolve it quite now.

Ref: whatwg/dom#1033

If this is important to RxJS - please also say it there.

Copy link
Contributor

@josepot josepot Nov 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjamingr thanks for your response! After I read all the comments in the issue that you shared, I ended up spending a lot of time trying to understand the history behind AbortController. Let's just say that I have a lot of thoughts and opinions about it 😅. However, in order to stay constructive, I will keep those thoughts to myself and I will try to propose a solution for RxJS that I think that should work for all users, despite the current (and hopefully temporary) misalignment between the DOM spec and the NodeJS API:

  • RxJS should keep its AbortError class as an internal implementation detail, since AFAIK doing e instanceof AbortError has never been the recommended way of identifying the AbortError exception.

  • When the onAbort event happens, then RxJS should first check whether signal.hasOwnProperty('reason'), and then it should reject using that reason if it exists, otherwise it should use a new instance of its internal AbortError as a fallback.

Rejecting abortions this way would guarantee that the RxJS implementation works for both kinds of ursers: DOM and NodeJS. Given the current state of things, I think that's the most sensible approach to stay compliant with the current misalignment.

Copy link

@ronag ronag Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference nodejs/node#41008

new (): AbortError;
}

/**
* An error thrown when an abort signal is received and causes a promise to reject.
*
* @see {@link firstValueFrom}
* @see {@link lastValueFrom}
*
* @class AbortError
*/
export const AbortError: AbortErrorCtor = createErrorClass(
(_super) =>
function AbortErrorImpl(this: any) {
_super(this);
this.name = 'AbortError';
this.message = 'Aborted by AbortSignal';
}
);
12 changes: 12 additions & 0 deletions src/internal/util/linkSignalToSubscription.ts
@@ -0,0 +1,12 @@
import { Subscription } from '../Subscription';

export function linkSignalToSubscription(signal: AbortSignal, subscription: Subscription, onAbort: () => void) {
const handler = () => {
subscription.unsubscribe();
onAbort();
};
signal.addEventListener('abort', handler, { once: true });
subscription.add(() => {
signal.removeEventListener('abort', handler);
});
}