Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue management in jest-worker #7934

Merged
merged 1 commit into from Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -14,7 +14,8 @@
- `[jest-changed-files]` Improve default file selection for Mercurial repos ([#7880](https://github.com/facebook/jest/pull/7880))
- `[jest-validate]` Fix validating async functions ([#7894](https://github.com/facebook/jest/issues/7894))
- `[jest-circus]` Fix bug with test.only ([#7888](https://github.com/facebook/jest/pull/7888))
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801)
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took me forever to spot the fixed missing paren :P

- `[jest-worker]` Fix `jest-worker` when using pre-allocated jobs ([#7934](https://github.com/facebook/jest/pull/7934))

### Chore & Maintenance

Expand Down
55 changes: 30 additions & 25 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -9,6 +9,7 @@ import {
ChildMessage,
FarmOptions,
QueueChildMessage,
QueueItem,
WorkerInterface,
OnStart,
OnEnd,
Expand All @@ -19,24 +20,25 @@ export default class Farm {
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
private _cacheKeys: {[key: string]: WorkerInterface};
private _callback: Function;
private _last: Array<QueueChildMessage>;
private _last: Array<QueueItem>;
private _locks: Array<boolean>;
private _numOfWorkers: number;
private _offset: number;
private _queue: Array<QueueChildMessage | null>;
private _queue: Array<QueueItem | null>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private _queue: Array<QueueItem | null>;
private _queue: Array<QueueItem>;

right? there can never be nulls in the queue now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue can be empty, and that's signaled by a null item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so Array<QueueItem> | [null]? I guess it doesn't matter in practice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array index refers to each of the workers (I.e. this._queue.length is numOfWorkers, not the list of jobs); so each item in the array is either the first element of the linked list, or null.


constructor(
numOfWorkers: number,
callback: Function,
computeWorkerKey?: FarmOptions['computeWorkerKey'],
) {
this._callback = callback;
this._numOfWorkers = numOfWorkers;
this._cacheKeys = Object.create(null);
this._queue = [];
this._callback = callback;
this._last = [];
this._locks = [];
this._numOfWorkers = numOfWorkers;
this._offset = 0;
this._queue = [];

if (computeWorkerKey) {
this._computeWorkerKey = computeWorkerKey;
}
Expand Down Expand Up @@ -70,6 +72,7 @@ export default class Farm {
};

const task = {onEnd, onStart, request};

if (worker) {
this._enqueue(task, worker.getWorkerId());
} else {
Expand All @@ -78,80 +81,82 @@ export default class Farm {
});
}

private _getNextJob(workerId: number): QueueChildMessage | null {
private _getNextTask(workerId: number): QueueChildMessage | null {
let queueHead = this._queue[workerId];

while (queueHead && queueHead.request[1]) {
while (queueHead && queueHead.task.request[1]) {
queueHead = queueHead.next || null;
}

this._queue[workerId] = queueHead;

return queueHead;
return queueHead && queueHead.task;
}

private _process(workerId: number): Farm {
if (this.isLocked(workerId)) {
if (this._isLocked(workerId)) {
return this;
}

const job = this._getNextJob(workerId);
const task = this._getNextTask(workerId);

if (!job) {
if (!task) {
return this;
}

const onEnd = (error: Error | null, result: unknown) => {
job.onEnd(error, result);
this.unlock(workerId);
task.onEnd(error, result);

this._unlock(workerId);
this._process(workerId);
};

this.lock(workerId);
task.request[1] = true;

this._callback(workerId, job.request, job.onStart, onEnd);

job.request[1] = true;
this._lock(workerId);
this._callback(workerId, task.request, task.onStart, onEnd);

return this;
}

private _enqueue(task: QueueChildMessage, workerId: number): Farm {
const item = {next: null, task};

if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
this._last[workerId].next = item;
} else {
this._queue[workerId] = task;
this._queue[workerId] = item;
}

this._last[workerId] = task;
this._last[workerId] = item;
this._process(workerId);

return this;
}

private _push(task: QueueChildMessage): Farm {
for (let i = 0; i < this._numOfWorkers; i++) {
const workerIdx = (this._offset + i) % this._numOfWorkers;
this._enqueue(task, workerIdx);
this._enqueue(task, (this._offset + i) % this._numOfWorkers);
}

this._offset++;

return this;
}

lock(workerId: number): void {
private _lock(workerId: number): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically breaking, but I guess these shouldn't be called anyways by consumers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, playing with worker locks is a recipe for disaster :D

this._locks[workerId] = true;
}

unlock(workerId: number): void {
private _unlock(workerId: number): void {
this._locks[workerId] = false;
}

isLocked(workerId: number): boolean {
private _isLocked(workerId: number): boolean {
return this._locks[workerId];
}
}
90 changes: 77 additions & 13 deletions packages/jest-worker/src/__tests__/Farm.test.js
Expand Up @@ -200,18 +200,19 @@ describe('Farm', () => {
workerReply(0, null, 17);
await p0;

// Note that the stickiness is not created by the method name or the arguments
// it is solely controlled by the provided "computeWorkerKey" method, which in
// the test example always returns the same key, so all calls should be
// redirected to worker 1 (which is the one that resolved the first call).
// Note that the stickiness is not created by the method name or the
// arguments it is solely controlled by the provided "computeWorkerKey"
// method, which in the test example always returns the same key, so all
// calls should be redirected to worker 1 (which is the one that resolved
// the first call).
const p1 = farm.doWork('foo', 'bar');
workerReply(1, null, 17);
await p1;

// The first time, a call with a "1234567890abcdef" hash had never been done
// earlier ("foo" call), so it got queued to all workers. Later, since the one
// that resolved the call was the one in position 1, all subsequent calls are
// only redirected to that worker.
// The first time, a call with a "1234567890abcdef" hash had never been
// done earlier ("foo" call), so it got queued to all workers. Later, since
// the one that resolved the call was the one in position 1, all subsequent
// calls are only redirected to that worker.
expect(callback).toHaveBeenCalledTimes(2); // Only "foo".
expect(callback).toHaveBeenNthCalledWith(
1,
Expand Down Expand Up @@ -248,11 +249,11 @@ describe('Farm', () => {
workerReply(1, null, 17);
await p1;

// Both requests are send to the same worker
// The first time, a call with a "1234567890abcdef" hash had never been done
// earlier ("foo" call), so it got queued to all workers. Later, since the one
// that resolved the call was the one in position 1, all subsequent calls are
// only redirected to that worker.
// Both requests are send to the same worker. The first time, a call with
// a "1234567890abcdef" hash had never been done earlier ("foo" call), so
// it got queued to all workers. Later, since the one that resolved the
// call was the one in position 1, all subsequent calls are only redirected
// to that worker.
expect(callback).toHaveBeenCalledTimes(2);
expect(callback).toHaveBeenNthCalledWith(
1,
Expand All @@ -269,4 +270,67 @@ describe('Farm', () => {
expect.any(Function),
);
});

it('checks that locking works, and jobs are never lost', async () => {
const hash = jest
.fn()
// This will go to both queues, but picked by the first worker.
.mockReturnValueOnce(0)
// This will go to both queues too, but picked by the second worker.
.mockReturnValueOnce(1)
// This will go to worker 0, now only assigned to it.
.mockReturnValueOnce(0)
// This will go to worker 1, now only assigned to it.
.mockReturnValueOnce(1)
// This will go to both queues too, but will wait, since workers are busy.
.mockReturnValueOnce(2)
// This will only go to the first queue.
.mockReturnValueOnce(0)
// This will be gone if the queue implementation is wrong.
.mockReturnValueOnce(0)
// Push onto the second queue; potentially wiping the earlier job.
.mockReturnValueOnce(1);

const farm = new Farm(2, callback, hash);

// First and second jobs get resolved, so that their hash is sticked to
// the right worker: worker assignment is performed when workers reply, not
// when the call is made.
const p0 = farm.doWork('work-0');
const p1 = farm.doWork('work-1');
workerReply(0, null, 'response-0');
await p0;
workerReply(1, null, 'response-1');
await p1;

// Now we perform the rest of the calls (7 resolves before 5 and 6, since 4
// is in both queues, and as soon as you resolve 4, 7 will be picked).
const p2 = farm.doWork('work-2');
const p3 = farm.doWork('work-3');
const p4 = farm.doWork('work-4');
const p5 = farm.doWork('work-5');
const p6 = farm.doWork('work-6');
const p7 = farm.doWork('work-7');
workerReply(2, null, 'response-2');
await p2;
workerReply(3, null, 'response-3');
await p3;
workerReply(4, null, 'response-4');
await p4;
workerReply(5, null, 'response-7');
await p7;
workerReply(6, null, 'response-5');
await p5;
workerReply(7, null, 'response-6');
await p6;

await expect(p0).resolves.toBe('response-0');
await expect(p1).resolves.toBe('response-1');
await expect(p2).resolves.toBe('response-2');
await expect(p3).resolves.toBe('response-3');
await expect(p4).resolves.toBe('response-4');
await expect(p5).resolves.toBe('response-5');
await expect(p6).resolves.toBe('response-6');
await expect(p7).resolves.toBe('response-7');
});
});
7 changes: 6 additions & 1 deletion packages/jest-worker/src/types.ts
Expand Up @@ -147,12 +147,17 @@ export type ParentMessageError = [
export type ParentMessage = ParentMessageOk | ParentMessageError;

// Queue types.

export type OnStart = (worker: WorkerInterface) => void;
export type OnEnd = (err: Error | null, result: unknown) => void;

export type QueueChildMessage = {
request: ChildMessage;
onStart: OnStart;
onEnd: OnEnd;
next?: QueueChildMessage;
};

export type QueueItem = {
task: QueueChildMessage;
next: QueueItem | null;
};