Skip to content

Commit

Permalink
Fix queue management in jest-worker (#7934)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjesun committed Feb 19, 2019
1 parent 2f3d557 commit 7cc0771
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 40 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -14,7 +14,8 @@
- `[jest-changed-files]` Improve default file selection for Mercurial repos ([#7880](https://github.com/facebook/jest/pull/7880))
- `[jest-validate]` Fix validating async functions ([#7894](https://github.com/facebook/jest/issues/7894))
- `[jest-circus]` Fix bug with test.only ([#7888](https://github.com/facebook/jest/pull/7888))
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801)
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801))
- `[jest-worker]` Fix `jest-worker` when using pre-allocated jobs ([#7934](https://github.com/facebook/jest/pull/7934))

### Chore & Maintenance

Expand Down
55 changes: 30 additions & 25 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -9,6 +9,7 @@ import {
ChildMessage,
FarmOptions,
QueueChildMessage,
QueueItem,
WorkerInterface,
OnStart,
OnEnd,
Expand All @@ -19,24 +20,25 @@ export default class Farm {
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
private _cacheKeys: {[key: string]: WorkerInterface};
private _callback: Function;
private _last: Array<QueueChildMessage>;
private _last: Array<QueueItem>;
private _locks: Array<boolean>;
private _numOfWorkers: number;
private _offset: number;
private _queue: Array<QueueChildMessage | null>;
private _queue: Array<QueueItem | null>;

constructor(
numOfWorkers: number,
callback: Function,
computeWorkerKey?: FarmOptions['computeWorkerKey'],
) {
this._callback = callback;
this._numOfWorkers = numOfWorkers;
this._cacheKeys = Object.create(null);
this._queue = [];
this._callback = callback;
this._last = [];
this._locks = [];
this._numOfWorkers = numOfWorkers;
this._offset = 0;
this._queue = [];

if (computeWorkerKey) {
this._computeWorkerKey = computeWorkerKey;
}
Expand Down Expand Up @@ -70,6 +72,7 @@ export default class Farm {
};

const task = {onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
Expand All @@ -78,80 +81,82 @@ export default class Farm {
});
}

private _getNextJob(workerId: number): QueueChildMessage | null {
private _getNextTask(workerId: number): QueueChildMessage | null {
let queueHead = this._queue[workerId];

while (queueHead && queueHead.request[1]) {
while (queueHead && queueHead.task.request[1]) {
queueHead = queueHead.next || null;
}

this._queue[workerId] = queueHead;

return queueHead;
return queueHead && queueHead.task;
}

private _process(workerId: number): Farm {
if (this.isLocked(workerId)) {
if (this._isLocked(workerId)) {
return this;
}

const job = this._getNextJob(workerId);
const task = this._getNextTask(workerId);

if (!job) {
if (!task) {
return this;
}

const onEnd = (error: Error | null, result: unknown) => {
job.onEnd(error, result);
this.unlock(workerId);
task.onEnd(error, result);

this._unlock(workerId);
this._process(workerId);
};

this.lock(workerId);
task.request[1] = true;

this._callback(workerId, job.request, job.onStart, onEnd);

job.request[1] = true;
this._lock(workerId);
this._callback(workerId, task.request, task.onStart, onEnd);

return this;
}

private _enqueue(task: QueueChildMessage, workerId: number): Farm {
const item = {next: null, task};

if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
this._last[workerId].next = item;
} else {
this._queue[workerId] = task;
this._queue[workerId] = item;
}

this._last[workerId] = task;
this._last[workerId] = item;
this._process(workerId);

return this;
}

private _push(task: QueueChildMessage): Farm {
for (let i = 0; i < this._numOfWorkers; i++) {
const workerIdx = (this._offset + i) % this._numOfWorkers;
this._enqueue(task, workerIdx);
this._enqueue(task, (this._offset + i) % this._numOfWorkers);
}

this._offset++;

return this;
}

lock(workerId: number): void {
private _lock(workerId: number): void {
this._locks[workerId] = true;
}

unlock(workerId: number): void {
private _unlock(workerId: number): void {
this._locks[workerId] = false;
}

isLocked(workerId: number): boolean {
private _isLocked(workerId: number): boolean {
return this._locks[workerId];
}
}
90 changes: 77 additions & 13 deletions packages/jest-worker/src/__tests__/Farm.test.js
Expand Up @@ -200,18 +200,19 @@ describe('Farm', () => {
workerReply(0, null, 17);
await p0;

// Note that the stickiness is not created by the method name or the arguments
// it is solely controlled by the provided "computeWorkerKey" method, which in
// the test example always returns the same key, so all calls should be
// redirected to worker 1 (which is the one that resolved the first call).
// Note that the stickiness is not created by the method name or the
// arguments it is solely controlled by the provided "computeWorkerKey"
// method, which in the test example always returns the same key, so all
// calls should be redirected to worker 1 (which is the one that resolved
// the first call).
const p1 = farm.doWork('foo', 'bar');
workerReply(1, null, 17);
await p1;

// The first time, a call with a "1234567890abcdef" hash had never been done
// earlier ("foo" call), so it got queued to all workers. Later, since the one
// that resolved the call was the one in position 1, all subsequent calls are
// only redirected to that worker.
// The first time, a call with a "1234567890abcdef" hash had never been
// done earlier ("foo" call), so it got queued to all workers. Later, since
// the one that resolved the call was the one in position 1, all subsequent
// calls are only redirected to that worker.
expect(callback).toHaveBeenCalledTimes(2); // Only "foo".
expect(callback).toHaveBeenNthCalledWith(
1,
Expand Down Expand Up @@ -248,11 +249,11 @@ describe('Farm', () => {
workerReply(1, null, 17);
await p1;

// Both requests are send to the same worker
// The first time, a call with a "1234567890abcdef" hash had never been done
// earlier ("foo" call), so it got queued to all workers. Later, since the one
// that resolved the call was the one in position 1, all subsequent calls are
// only redirected to that worker.
// Both requests are send to the same worker. The first time, a call with
// a "1234567890abcdef" hash had never been done earlier ("foo" call), so
// it got queued to all workers. Later, since the one that resolved the
// call was the one in position 1, all subsequent calls are only redirected
// to that worker.
expect(callback).toHaveBeenCalledTimes(2);
expect(callback).toHaveBeenNthCalledWith(
1,
Expand All @@ -269,4 +270,67 @@ describe('Farm', () => {
expect.any(Function),
);
});

it('checks that locking works, and jobs are never lost', async () => {
const hash = jest
.fn()
// This will go to both queues, but picked by the first worker.
.mockReturnValueOnce(0)
// This will go to both queues too, but picked by the second worker.
.mockReturnValueOnce(1)
// This will go to worker 0, now only assigned to it.
.mockReturnValueOnce(0)
// This will go to worker 1, now only assigned to it.
.mockReturnValueOnce(1)
// This will go to both queues too, but will wait, since workers are busy.
.mockReturnValueOnce(2)
// This will only go to the first queue.
.mockReturnValueOnce(0)
// This will be gone if the queue implementation is wrong.
.mockReturnValueOnce(0)
// Push onto the second queue; potentially wiping the earlier job.
.mockReturnValueOnce(1);

const farm = new Farm(2, callback, hash);

// First and second jobs get resolved, so that their hash is sticked to
// the right worker: worker assignment is performed when workers reply, not
// when the call is made.
const p0 = farm.doWork('work-0');
const p1 = farm.doWork('work-1');
workerReply(0, null, 'response-0');
await p0;
workerReply(1, null, 'response-1');
await p1;

// Now we perform the rest of the calls (7 resolves before 5 and 6, since 4
// is in both queues, and as soon as you resolve 4, 7 will be picked).
const p2 = farm.doWork('work-2');
const p3 = farm.doWork('work-3');
const p4 = farm.doWork('work-4');
const p5 = farm.doWork('work-5');
const p6 = farm.doWork('work-6');
const p7 = farm.doWork('work-7');
workerReply(2, null, 'response-2');
await p2;
workerReply(3, null, 'response-3');
await p3;
workerReply(4, null, 'response-4');
await p4;
workerReply(5, null, 'response-7');
await p7;
workerReply(6, null, 'response-5');
await p5;
workerReply(7, null, 'response-6');
await p6;

await expect(p0).resolves.toBe('response-0');
await expect(p1).resolves.toBe('response-1');
await expect(p2).resolves.toBe('response-2');
await expect(p3).resolves.toBe('response-3');
await expect(p4).resolves.toBe('response-4');
await expect(p5).resolves.toBe('response-5');
await expect(p6).resolves.toBe('response-6');
await expect(p7).resolves.toBe('response-7');
});
});
7 changes: 6 additions & 1 deletion packages/jest-worker/src/types.ts
Expand Up @@ -147,12 +147,17 @@ export type ParentMessageError = [
export type ParentMessage = ParentMessageOk | ParentMessageError;

// Queue types.

export type OnStart = (worker: WorkerInterface) => void;
export type OnEnd = (err: Error | null, result: unknown) => void;

export type QueueChildMessage = {
request: ChildMessage;
onStart: OnStart;
onEnd: OnEnd;
next?: QueueChildMessage;
};

export type QueueItem = {
task: QueueChildMessage;
next: QueueItem | null;
};

0 comments on commit 7cc0771

Please sign in to comment.