Skip to content

Commit

Permalink
cleanup(core): nx package should not depend on rxjs
Browse files Browse the repository at this point in the history
  • Loading branch information
vsavkin committed May 19, 2022
1 parent 80b4439 commit 7617df8
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 45 deletions.
4 changes: 2 additions & 2 deletions docs/generated/devkit/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ stored in the daemon process. To reset both run: `nx reset`.

### defaultTasksRunner

`Const` **defaultTasksRunner**(`tasks`, `options`, `context?`): `Observable`<`AffectedEvent`\> \| `Promise`<`Object`\>
`Const` **defaultTasksRunner**(`tasks`, `options`, `context?`): `any`

#### Parameters

Expand All @@ -933,7 +933,7 @@ stored in the daemon process. To reset both run: `nx reset`.

#### Returns

`Observable`<`AffectedEvent`\> \| `Promise`<`Object`\>
`any`

---

Expand Down
2 changes: 1 addition & 1 deletion docs/generated/packages/devkit.json

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions packages/nx/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"@swc-node/register": "^1.4.2",
"fast-glob": "3.2.7",
"jsonc-parser": "3.0.0",
"rxjs-for-await": "0.0.2",
"tsconfig-paths": "^3.9.0",
"v8-compile-cache": "2.3.0",
"@parcel/watcher": "2.0.4",
Expand All @@ -49,7 +48,6 @@
"ignore": "^5.0.4",
"npm-run-path": "^4.0.1",
"open": "^8.4.0",
"rxjs": "^6.5.4",
"semver": "7.3.4",
"tar-stream": "~2.2.0",
"tmp": "~0.2.1",
Expand Down
351 changes: 351 additions & 0 deletions packages/nx/src/adapter/rxjs-for-await.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
import { Observable } from 'rxjs';

export class Deferred<T> {
resolve: (value: T | PromiseLike<T>) => void = null!;
reject: (reason?: any) => void = null!;
promise = new Promise<T>((a, b) => {
this.resolve = a;
this.reject = b;
});
}

const RESOLVED = Promise.resolve();

/**
* Will subscribe to the `source` observable provided,
*
* Allowing a `for await..of` loop to iterate over every
* value that the source emits.
*
* **WARNING**: If the async loop is slower than the observable
* producing values, the values will build up in a buffer
* and you could experience an out of memory error.
*
* This is a lossless subscription method. No value
* will be missed or duplicated.
*
* Example usage:
*
* ```ts
* async function test() {
* const source$ = getSomeObservable();
*
* for await(const value of eachValueFrom(source$)) {
* console.log(value);
* }
* }
* ```
*
* @param source the Observable source to await values from
*/
export async function* eachValueFrom<T>(
source: Observable<T>
): AsyncIterableIterator<T> {
const deferreds: Deferred<IteratorResult<T>>[] = [];
const values: T[] = [];
let hasError = false;
let error: any = null;
let completed = false;

const subs = source.subscribe({
next: (value) => {
if (deferreds.length > 0) {
deferreds.shift()!.resolve({ value, done: false });
} else {
values.push(value);
}
},
error: (err) => {
hasError = true;
error = err;
while (deferreds.length > 0) {
deferreds.shift()!.reject(err);
}
},
complete: () => {
completed = true;
while (deferreds.length > 0) {
deferreds.shift()!.resolve({ value: undefined, done: true });
}
},
});

try {
while (true) {
if (values.length > 0) {
yield values.shift()!;
} else if (completed) {
return;
} else if (hasError) {
throw error;
} else {
const d = new Deferred<IteratorResult<T>>();
deferreds.push(d);
const result = await d.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}

/**
* Will subscribe to the `source` observable provided
* and build the emitted values up in a buffer. Allowing
* `for await..of` loops to iterate and get the buffer
* on each loop.
*
* This is a lossless subscription method. No value
* will be missed or duplicated.
*
* Example usage:
*
* ```ts
* async function test() {
* const source$ = getSomeObservable();
*
* for await(const buffer of bufferedValuesFrom(source$)) {
* for (const value of buffer) {
* console.log(value);
* }
* }
* }
* ```
*
* @param source the Observable source to await values from
*/
export async function* bufferedValuesFrom<T>(source: Observable<T>) {
let deferred: Deferred<IteratorResult<T[]>> | null = null;
const buffer: T[] = [];
let hasError = false;
let error: any = null;
let completed = false;

const subs = source.subscribe({
next: (value) => {
if (deferred) {
deferred.resolve(
RESOLVED.then(() => {
const bufferCopy = buffer.slice();
buffer.length = 0;
return { value: bufferCopy, done: false };
})
);
deferred = null;
}
buffer.push(value);
},
error: (err) => {
hasError = true;
error = err;
if (deferred) {
deferred.reject(err);
deferred = null;
}
},
complete: () => {
completed = true;
if (deferred) {
deferred.resolve({ value: undefined, done: true });
deferred = null;
}
},
});

try {
while (true) {
if (buffer.length > 0) {
const bufferCopy = buffer.slice();
buffer.length = 0;
yield bufferCopy;
} else if (completed) {
return;
} else if (hasError) {
throw error;
} else {
deferred = new Deferred<IteratorResult<T[]>>();
const result = await deferred.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}

/**
* Will subscribe to the provided `source` observable,
* allowing `for await..of` loops to iterate and get the
* most recent value that was emitted. Will not iterate out
* the same emission twice.
*
* This is a lossy subscription method. Do not use if
* every value is important.
*
* Example usage:
*
* ```ts
* async function test() {
* const source$ = getSomeObservable();
*
* for await(const value of latestValueFrom(source$)) {
* console.log(value);
* }
* }
* ```
*
* @param source the Observable source to await values from
*/
export async function* latestValueFrom<T>(source: Observable<T>) {
let deferred: Deferred<IteratorResult<T>> | undefined = undefined;
let latestValue: T;
let hasLatestValue = false;
let hasError = false;
let error: any = null;
let completed = false;

const subs = source.subscribe({
next: (value) => {
hasLatestValue = true;
latestValue = value;
if (deferred) {
deferred.resolve(
RESOLVED.then(() => {
hasLatestValue = false;
return { value: latestValue, done: false };
})
);
}
},
error: (err) => {
hasError = true;
error = err;
if (deferred) {
deferred.reject(err);
}
},
complete: () => {
completed = true;
if (deferred) {
hasLatestValue = false;
deferred.resolve({ value: undefined, done: true });
}
},
});

try {
while (true) {
if (hasLatestValue) {
await RESOLVED;
const value = latestValue!;
hasLatestValue = false;
yield value;
} else if (completed) {
return;
} else if (hasError) {
throw error;
} else {
deferred = new Deferred<IteratorResult<T>>();
const result = await deferred.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}

/**
* Subscribes to the provided `source` observable and allows
* `for await..of` loops to iterate over it, such that
* all values are dropped until the iteration occurs, then
* the very next value that arrives is provided to the
* `for await` loop.
*
* This is a lossy subscription method. Do not use if
* every value is important.
*
* Example usage:
*
* ```ts
* async function test() {
* const source$ = getSomeObservable();
*
* for await(const value of nextValueFrom(source$)) {
* console.log(value);
* }
* }
* ```
*
* @param source the Observable source to await values from
*/
export async function* nextValueFrom<T>(
source: Observable<T>
): AsyncGenerator<T, void, void> {
let deferred: Deferred<IteratorResult<T>> | undefined = undefined;
let hasError = false;
let error: any = null;
let completed = false;

const subs = source.subscribe({
next: (value) => {
if (deferred) {
deferred.resolve({ value, done: false });
}
},
error: (err) => {
hasError = true;
error = err;
if (deferred) {
deferred.reject(err);
}
},
complete: () => {
completed = true;
if (deferred) {
deferred.resolve({ value: undefined, done: true });
}
},
});

try {
while (true) {
if (completed) {
return;
} else if (hasError) {
throw error;
} else {
deferred = new Deferred<IteratorResult<T>>();
const result = await deferred.promise;
if (result.done) {
return;
} else {
yield result.value;
}
}
}
} catch (err) {
throw err;
} finally {
subs.unsubscribe();
}
}
1 change: 0 additions & 1 deletion packages/nx/src/command-line/report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export const packagesWeCareAbout = [
'@nrwl/web',
'@nrwl/workspace',
'typescript',
'rxjs',
];

export const patternsWeIgnoreInCommunityReport: Array<string | RegExp> = [
Expand Down

1 comment on commit 7617df8

@vercel
Copy link

@vercel vercel bot commented on 7617df8 May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

nx-dev – ./

nx-dev-nrwl.vercel.app
nx-five.vercel.app
nx-dev-git-master-nrwl.vercel.app
nx.dev

Please sign in to comment.