Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✅(worker) Run tests against workers in ESM mode #4648

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 44 additions & 1 deletion packages/worker/src/internals/worker-pool/BasicPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
WorkerToPoolMessage,
PoolToWorkerMessage,
} from './IWorkerPool.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Worker internal API
Expand Down Expand Up @@ -37,6 +39,7 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
onSuccess: OnSuccessCallback<TSuccess>;
onFailure: OnErrorCallback;
} | null = null;
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> NEW WORKER\n`, { flag: 'a' });
const worker = new Worker(this.workerFileUrl, { workerData: { fastcheckWorker: true } });

let resolveOnline: () => void = () => undefined;
Expand All @@ -47,13 +50,19 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('online', () => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER is ONLINE\n`, {
flag: 'a',
});
// Emitted when the worker thread has started executing JavaScript code.
// More details at https://nodejs.org/api/worker_threads.html#event-online
ready = true;
resolveOnline();
});

worker.on('message', (data: WorkerToPoolMessage<TSuccess>): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER sent MESSAGE\n`, {
flag: 'a',
});
// Emitted for any incoming message, containing the cloned input of port.postMessage().
// More details at https://nodejs.org/api/worker_threads.html#event-message
if (registration === null || data.runId !== registration.currentRunId) {
Expand All @@ -68,6 +77,9 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('messageerror', (err: Error): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER sent MESSAGE ERROR\n`, {
flag: 'a',
});
// Emitted when deserializing a message failed.
// More details at https://nodejs.org/api/worker_threads.html#event-messageerror
if (!ready) {
Expand All @@ -81,6 +93,9 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
});

worker.on('error', (err: Error): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> WORKER error -> ${err}\n`, {
flag: 'a',
});
// Emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated.
// More details at https://nodejs.org/api/worker_threads.html#event-error
faulty = true;
Expand All @@ -93,7 +108,24 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay
registration = null;
});

worker.on('uncaughtException', (err): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] uncaughtException -> ${err}\n`, {
flag: 'a',
});
});

worker.on('unhandledRejection', (reason): void => {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] unhandledRejection -> ${reason}\n`, {
flag: 'a',
});
});

worker.on('exit', (code: number): void => {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] BasicPool -> WORKER exited (code=${code})\n`,
{ flag: 'a' },
);
// Emitted once the worker has stopped. If the worker exited by calling process.exit(), the exitCode parameter is the passed exit code. If the worker was terminated, the exitCode parameter is 1.
// More details at https://nodejs.org/api/worker_threads.html#event-exit
faulty = true;
Expand Down Expand Up @@ -149,6 +181,17 @@ export class BasicPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPay

public terminateAllWorkers(): Promise<void> {
const dropped = this.workers.splice(0, this.workers.length); // clear all workers
return Promise.all(dropped.map((w) => w.worker.terminate())).then(() => undefined);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] BasicPool -> TERMINATING ${dropped.length} workers\n`,
{
flag: 'a',
},
);
const p = Promise.all(dropped.map((w) => w.worker.terminate())).then(() => undefined);
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] BasicPool -> TERMINATION completed\n`, {
flag: 'a',
});
return p;
}
}
13 changes: 13 additions & 0 deletions packages/worker/src/internals/worker-pool/GlobalPool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { writeFileSync } from 'fs';
import * as process from 'process';
import { BasicPool } from './BasicPool.js';
import type { IWorkerPool, PooledWorker } from './IWorkerPool.js';

Expand Down Expand Up @@ -37,6 +39,7 @@ export class GlobalPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPa
}

spawnNewWorker(): Promise<PooledWorker<TSuccess, TPayload>> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] GlobalPool::spawnNewWorker\n`, { flag: 'a' });
cancelPendingTerminationIfAny(this.workerFileUrl);
return this.internalPool.spawnNewWorker();
}
Expand All @@ -47,10 +50,20 @@ export class GlobalPool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TPa
}

terminateAllWorkers(): Promise<void> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] GlobalPool::terminateAllWorkers -> START\n`, {
flag: 'a',
});
cancelPendingTerminationIfAny(this.workerFileUrl);
pendingTerminationPerFile.set(
this.workerFileUrl.toString(),
setTimeout(() => {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] GlobalPool::terminateAllWorkers -> In timer\n`,
{
flag: 'a',
},
);
this.internalPool.terminateAllWorkers();
}, 0),
);
Expand Down
12 changes: 11 additions & 1 deletion packages/worker/src/internals/worker-pool/OneTimePool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { writeFileSync } from 'fs';
import * as process from 'process';
import { BasicPool } from './BasicPool.js';
import type { IWorkerPool, PooledWorker } from './IWorkerPool.js';

Expand All @@ -17,6 +19,7 @@ export class OneTimePool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TP
}

spawnNewWorker(): Promise<PooledWorker<TSuccess, TPayload>> {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::spawnNewWorker\n`, { flag: 'a' });
return this.internalPool.spawnNewWorker();
}

Expand All @@ -25,6 +28,13 @@ export class OneTimePool<TSuccess, TPayload> implements IWorkerPool<TSuccess, TP
}

terminateAllWorkers(): Promise<void> {
return this.internalPool.terminateAllWorkers();
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::terminateAllWorkers -> START\n`, {
flag: 'a',
});
const p = this.internalPool.terminateAllWorkers();
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] OneTimePool::terminateAllWorkers -> END\n`, {
flag: 'a',
});
return p;
}
}
24 changes: 24 additions & 0 deletions packages/worker/src/internals/worker-runner/NoWorkerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import type { MessagePort } from 'node:worker_threads';
import type { MainThreadToWorkerMessage, WorkerToMainThreadMessage } from '../SharedTypes.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Setup the fallback worker listening to all predicates and rejecting any that has never been registered
* @param parentPort - the parent to listen to and sending us queries to execute
* @param registeredPredicates - list of all the predicates currently registered, can be updated after the call to runNoWorker
*/
export function runNoWorker(parentPort: MessagePort, registeredPredicates: Set<number>): void {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> SETUP\n`, { flag: 'a' });
parentPort.on('message', (message: MainThreadToWorkerMessage<unknown>) => {
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runNoWorker -> MESSAGE RECEIVED ${JSON.stringify(message)}\n`,
{
flag: 'a',
},
);
} catch (err) {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runNoWorker -> MESSAGE RECEIVED {{{!!!}}}\n`,
{ flag: 'a' },
);
}
const { targetPredicateId, runId } = message;
if (registeredPredicates.has(targetPredicateId)) {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> REGISTRATION CONFIRMED\n`, {
flag: 'a',
});
return;
}
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runNoWorker -> REGISTRATION NOT-FOUND\n`, {
flag: 'a',
});
const errorMessage = `Unregistered predicate, got: ${targetPredicateId}, for registered: ${[
...registeredPredicates,
].join(', ')}`;
Expand Down
22 changes: 22 additions & 0 deletions packages/worker/src/internals/worker-runner/WorkerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { MessagePort } from 'node:worker_threads';
import type { MainThreadToWorkerMessage, PropertyPredicate, WorkerToMainThreadMessage } from '../SharedTypes.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

/**
* Setup a worker listening to parentPort and able to run a single time for a given predicate
Expand All @@ -12,7 +14,21 @@ export function runWorker<Ts extends unknown[]>(
predicateId: number,
predicate: PropertyPredicate<Ts>,
): void {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> SETUP\n`, { flag: 'a' });
parentPort.on('message', (message: MainThreadToWorkerMessage<Ts>) => {
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}] runWorker -> MESSAGE RECEIVED ${JSON.stringify(message)}\n`,
{
flag: 'a',
},
);
} catch (err) {
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> MESSAGE RECEIVED {{{!!!}}}\n`, {
flag: 'a',
});
}
const { payload, targetPredicateId, runId } = message;
if (targetPredicateId !== predicateId) {
// The current predicate is not the one targeted by the received message
Expand All @@ -21,10 +37,16 @@ export function runWorker<Ts extends unknown[]>(
Promise.resolve(predicate(...payload)).then(
(output) => {
const message: WorkerToMainThreadMessage = { success: true, output, runId };
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> PREDICATE OUTPUT\n`, {
flag: 'a',
});
parentPort.postMessage(message);
},
(error) => {
const message: WorkerToMainThreadMessage = { success: false, error, runId };
writeFileSync('/workspaces/fast-check/debug.log', `[${process.pid}] runWorker -> PREDICATE ERROR\n`, {
flag: 'a',
});
parentPort.postMessage(message);
},
);
Expand Down
49 changes: 48 additions & 1 deletion packages/worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { isMainThread, parentPort, workerData, threadId } from 'node:worker_threads';

import { assert as fcAssert, type IAsyncProperty, type IProperty, type Parameters } from 'fast-check';
import { runWorker } from './internals/worker-runner/WorkerRunner.js';
import { runMainThread } from './internals/MainThreadRunner.js';
import { NoopWorkerProperty } from './internals/NoopWorkerProperty.js';
import type { PropertyArbitraries, PropertyPredicate, WorkerProperty } from './internals/SharedTypes.js';
import { runNoWorker } from './internals/worker-runner/NoWorkerRunner.js';
import { writeFileSync } from 'fs';
import * as process from 'process';

let lastPredicateId = 0;
const allKnownTerminateAllWorkersPerProperty = new Map<
Expand All @@ -15,8 +17,18 @@ const allKnownTerminateAllWorkersPerProperty = new Map<
async function clearAllWorkersFor(property: IAsyncProperty<unknown> | IProperty<unknown>): Promise<void> {
const terminateAllWorkers = allKnownTerminateAllWorkersPerProperty.get(property);
if (terminateAllWorkers === undefined) {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] No cleaning found for property, found ${allKnownTerminateAllWorkersPerProperty.size} other cleanings\n`,
{ flag: 'a' },
);
return;
}
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] Executing cleaning for property, found ${allKnownTerminateAllWorkersPerProperty.size - 1} other cleanings\n`,
{ flag: 'a' },
);
await terminateAllWorkers();
}

Expand All @@ -38,9 +50,37 @@ export async function assert<Ts>(property: IAsyncProperty<Ts> | IProperty<Ts>, p
if (isMainThread) {
// Main thread code
try {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> BEFORE fc.assert\n`,
{
flag: 'a',
},
);
await fcAssert(property, params);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> AFTER fc.assert\n`,
{
flag: 'a',
},
);
} finally {
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> BEFORE cleaning for workers\n`,
{
flag: 'a',
},
);
await clearAllWorkersFor(property);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] worker::assert -> AFTER cleaning for workers\n`,
{
flag: 'a',
},
);
}
} else {
// Worker code
Expand Down Expand Up @@ -82,6 +122,13 @@ function workerProperty<Ts extends [unknown, ...unknown[]]>(
const isolationLevel = options.isolationLevel || 'file';
const arbitraries = args.slice(0, -1) as PropertyArbitraries<Ts>;
const { property, terminateAllWorkers } = runMainThread<Ts>(url, currentPredicateId, isolationLevel, arbitraries);
writeFileSync(
'/workspaces/fast-check/debug.log',
`[${process.pid}][${threadId}] Registering cleaning for property\n`,
{
flag: 'a',
},
);
allKnownTerminateAllWorkersPerProperty.set(property, terminateAllWorkers);
return property;
} else if (parentPort !== null && workerData.fastcheckWorker === true) {
Expand Down
18 changes: 0 additions & 18 deletions packages/worker/test/e2e/__properties__/blockEventLoop.cjs

This file was deleted.

15 changes: 15 additions & 0 deletions packages/worker/test/e2e/__properties__/blockEventLoop.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// @ts-check
import fc from 'fast-check';
import { propertyFor } from '@fast-check/worker';

const property = propertyFor(new URL(import.meta.url));

export const blockEventLoopProperty = property(
fc.integer({ min: -1000, max: 1000 }),
fc.integer({ min: -1000, max: 1000 }),
(from, to) => {
for (let i = from; i !== to; ++i) {
// Loop from "from" to "to" possibly NEVER ending
}
},
);
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// @ts-check
/* eslint-disable @typescript-eslint/no-var-requires */
/* global __filename, exports, require, __dirname */
const { pathToFileURL } = require('node:url');
const fc = require('fast-check');
const { propertyFor } = require('@fast-check/worker');
const { writeFileSync, existsSync, rmSync } = require('fs');
const path = require('path');
import fc from 'fast-check';
import { propertyFor } from '@fast-check/worker';
import { writeFileSync, existsSync, rmSync } from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';

const property = propertyFor(pathToFileURL(__filename));
const __dirname = fileURLToPath(new URL('.', import.meta.url));

const property = propertyFor(new URL(import.meta.url));

let index = 0;
function nextFilenameQuestion() {
Expand Down