Skip to content

Commit

Permalink
feat(worker): support custom messages (#10293)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal-kushwaha committed Jul 21, 2020
1 parent 63ebaa6 commit 3150a81
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,7 @@

### Features

- `[jest-worker]` Added support for workers to send custom messages to parent in jest-worker ([#10293](https://github.com/facebook/jest/pull/10293))
- `[pretty-format]` Added support for serializing custom elements (web components) ([#10217](https://github.com/facebook/jest/pull/10237))

### Fixes
Expand Down
85 changes: 58 additions & 27 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -9,8 +9,10 @@ import {
CHILD_MESSAGE_CALL,
ChildMessage,
FarmOptions,
OnCustomMessage,
OnEnd,
OnStart,
PromiseWithCustomMessage,
QueueChildMessage,
QueueItem,
WorkerInterface,
Expand Down Expand Up @@ -44,41 +46,64 @@ export default class Farm {
}
}

doWork(method: string, ...args: Array<any>): Promise<unknown> {
return new Promise((resolve, reject) => {
const computeWorkerKey = this._computeWorkerKey;
const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args];
doWork(
method: string,
...args: Array<any>
): PromiseWithCustomMessage<unknown> {
const customMessageListeners = new Set<OnCustomMessage>();

let worker: WorkerInterface | null = null;
let hash: string | null = null;
const addCustomMessageListener = (listener: OnCustomMessage) => {
customMessageListeners.add(listener);
return () => {
customMessageListeners.delete(listener);
};
};

const onCustomMessage: OnCustomMessage = message => {
customMessageListeners.forEach(listener => listener(message));
};

if (computeWorkerKey) {
hash = computeWorkerKey.call(this, method, ...args);
worker = hash == null ? null : this._cacheKeys[hash];
}
const promise: PromiseWithCustomMessage<unknown> = new Promise(
(resolve, reject) => {
const computeWorkerKey = this._computeWorkerKey;
const request: ChildMessage = [CHILD_MESSAGE_CALL, false, method, args];

const onStart: OnStart = (worker: WorkerInterface) => {
if (hash != null) {
this._cacheKeys[hash] = worker;
let worker: WorkerInterface | null = null;
let hash: string | null = null;

if (computeWorkerKey) {
hash = computeWorkerKey.call(this, method, ...args);
worker = hash == null ? null : this._cacheKeys[hash];
}
};

const onEnd: OnEnd = (error: Error | null, result: unknown) => {
if (error) {
reject(error);
const onStart: OnStart = (worker: WorkerInterface) => {
if (hash != null) {
this._cacheKeys[hash] = worker;
}
};

const onEnd: OnEnd = (error: Error | null, result: unknown) => {
customMessageListeners.clear();
if (error) {
reject(error);
} else {
resolve(result);
}
};

const task = {onCustomMessage, onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
resolve(result);
this._push(task);
}
};
},
);

const task = {onEnd, onStart, request};
promise.UNSTABLE_onCustomMessage = addCustomMessageListener;

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
this._push(task);
}
});
return promise;
}

private _getNextTask(workerId: number): QueueChildMessage | null {
Expand Down Expand Up @@ -114,7 +139,13 @@ export default class Farm {
task.request[1] = true;

this._lock(workerId);
this._callback(workerId, task.request, task.onStart, onEnd);
this._callback(
workerId,
task.request,
task.onStart,
onEnd,
task.onCustomMessage,
);

return this;
}
Expand Down
4 changes: 3 additions & 1 deletion packages/jest-worker/src/WorkerPool.ts
Expand Up @@ -9,6 +9,7 @@ import BaseWorkerPool from './base/BaseWorkerPool';

import type {
ChildMessage,
OnCustomMessage,
OnEnd,
OnStart,
WorkerInterface,
Expand All @@ -31,8 +32,9 @@ class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
request: ChildMessage,
onStart: OnStart,
onEnd: OnEnd,
onCustomMessage: OnCustomMessage,
): void {
this.getWorkerById(workerId).send(request, onStart, onEnd);
this.getWorkerById(workerId).send(request, onStart, onEnd, onCustomMessage);
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
Expand Down
50 changes: 50 additions & 0 deletions packages/jest-worker/src/__tests__/Farm.test.js
Expand Up @@ -25,11 +25,16 @@ function workerReply(i, error, result) {
workerReplyEnd(i, error, result);
}

function workerReplyCustomMessage(i, message) {
mockWorkerCalls[i].onCustomMessage(message);
}

describe('Farm', () => {
beforeEach(() => {
mockWorkerCalls = [];
callback = jest.fn((...args) => {
mockWorkerCalls.push({
onCustomMessage: args[4],
onEnd: args[3],
onStart: args[2],
passed: args[1],
Expand All @@ -49,6 +54,7 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand All @@ -67,27 +73,32 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
1, // second worker
[1, true, 'foo1', [43]],
expect.any(Function),
expect.any(Function),

expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
3,
2, // third worker
[1, true, 'foo2', [44]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
4,
3, // fourth worker
[1, true, 'foo3', [45]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand All @@ -110,6 +121,7 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -146,20 +158,23 @@ describe('Farm', () => {
[1, true, 'foo', [42]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
1, // second worker
[1, true, 'foo1', [43]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
3,
0, // first worker again
[1, true, 'foo2', [44]],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -220,13 +235,15 @@ describe('Farm', () => {
[1, true, 'car', ['plane']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
0, // first worker
[1, true, 'foo', ['bar']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -261,13 +278,15 @@ describe('Farm', () => {
[1, true, 'car', ['plane']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
expect(callback).toHaveBeenNthCalledWith(
2,
0, // first worker
[1, true, 'foo', ['bar']],
expect.any(Function),
expect.any(Function),
expect.any(Function),
);
});

Expand Down Expand Up @@ -333,4 +352,35 @@ describe('Farm', () => {
await expect(p6).resolves.toBe('response-6');
await expect(p7).resolves.toBe('response-7');
});

it('can receive custom messages from workers', async () => {
expect.assertions(2);
const farm = new Farm(2, callback);

const p0 = farm.doWork('work-0');
const p1 = farm.doWork('work-1');

const unsubscribe = p0.UNSTABLE_onCustomMessage(message => {
expect(message).toEqual({key: 0, message: 'foo'});
});

p1.UNSTABLE_onCustomMessage(message => {
expect(message).toEqual({key: 1, message: 'bar'});
});

workerReplyStart(0);
workerReplyStart(1);
workerReplyCustomMessage(0, {key: 0, message: 'foo'});
workerReplyCustomMessage(1, {key: 1, message: 'bar'});

unsubscribe();
// This message will not received because the listener already
// unsubscribed.
workerReplyCustomMessage(0, {key: 0, message: 'baz'});

workerReply(0, null, 17);
workerReply(1, null, 17);
await p0;
await p1;
});
});
3 changes: 3 additions & 0 deletions packages/jest-worker/src/__tests__/WorkerPool.test.js
Expand Up @@ -71,6 +71,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});

Expand Down Expand Up @@ -100,6 +101,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});

Expand Down Expand Up @@ -128,6 +130,7 @@ describe('WorkerPool', () => {
{foo: 'bar'},
onStart,
onEnd,
undefined,
);
});
});
7 changes: 6 additions & 1 deletion packages/jest-worker/src/base/BaseWorkerPool.ts
Expand Up @@ -94,7 +94,12 @@ export default class BaseWorkerPool {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
const workerExitPromises = this._workers.map(async worker => {
worker.send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod);
worker.send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
emptyMethod,
);

// Schedule a force exit in case worker fails to exit gracefully so
// await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY
Expand Down
4 changes: 4 additions & 0 deletions packages/jest-worker/src/index.ts
Expand Up @@ -11,9 +11,11 @@ import Farm from './Farm';
import type {
FarmOptions,
PoolExitResult,
PromiseWithCustomMessage,
WorkerPoolInterface,
WorkerPoolOptions,
} from './types';
export {default as messageParent} from './workers/messageParent';

function getExposedMethods(
workerPath: string,
Expand Down Expand Up @@ -146,3 +148,5 @@ export default class JestWorker {
return this._workerPool.end();
}
}

export type {PromiseWithCustomMessage};

0 comments on commit 3150a81

Please sign in to comment.