Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick(#13584): avoid premature stop of worker in fullyParallel #13597

Merged
merged 1 commit into from Apr 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/playwright-test/src/dispatcher.ts
Expand Up @@ -43,7 +43,7 @@ type TestData = {
export class Dispatcher {
private _workerSlots: { busy: boolean, worker?: Worker }[] = [];
private _queue: TestGroup[] = [];
private _queueHashCount = new Map<string, number>();
private _queuedOrRunningHashCount = new Map<string, number>();
private _finished = new ManualPromise<void>();
private _isStopped = false;

Expand All @@ -58,7 +58,7 @@ export class Dispatcher {
this._reporter = reporter;
this._queue = testGroups;
for (const group of testGroups) {
this._queueHashCount.set(group.workerHash, 1 + (this._queueHashCount.get(group.workerHash) || 0));
this._queuedOrRunningHashCount.set(group.workerHash, 1 + (this._queuedOrRunningHashCount.get(group.workerHash) || 0));
for (const test of group.tests)
this._testById.set(test._id, { test, resultByWorkerIndex: new Map() });
}
Expand All @@ -80,7 +80,6 @@ export class Dispatcher {

// 3. Claim both the job and the worker, run the job and release the worker.
this._queue.shift();
this._queueHashCount.set(job.workerHash, this._queueHashCount.get(job.workerHash)! - 1);
this._workerSlots[index].busy = true;
await this._startJobInWorker(index, job);
this._workerSlots[index].busy = false;
Expand Down Expand Up @@ -143,7 +142,7 @@ export class Dispatcher {
if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash())
workersWithSameHash++;
}
return workersWithSameHash > this._queueHashCount.get(worker.hash())!;
return workersWithSameHash > this._queuedOrRunningHashCount.get(worker.hash())!;
}

async run() {
Expand Down Expand Up @@ -273,6 +272,7 @@ export class Dispatcher {
worker.on('stepEnd', onStepEnd);

const onDone = (params: DonePayload) => {
this._queuedOrRunningHashCount.set(worker.hash(), this._queuedOrRunningHashCount.get(worker.hash())! - 1);
let remaining = [...remainingByTestId.values()];

// We won't file remaining if:
Expand Down Expand Up @@ -379,7 +379,7 @@ export class Dispatcher {

if (remaining.length) {
this._queue.unshift({ ...testGroup, tests: remaining });
this._queueHashCount.set(testGroup.workerHash, this._queueHashCount.get(testGroup.workerHash)! + 1);
this._queuedOrRunningHashCount.set(testGroup.workerHash, this._queuedOrRunningHashCount.get(testGroup.workerHash)! + 1);
// Perhaps we can immediately start the new job if there is a worker available?
this._scheduleJob();
}
Expand Down