Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only serialize and send shared references to workers that need them #8589

Merged
merged 16 commits into from Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/core/src/Parcel.js
Expand Up @@ -110,7 +110,7 @@ export default class Parcel {
await resolvedOptions.cache.ensure();

let {dispose: disposeOptions, ref: optionsRef} =
await this.#farm.createSharedReference(resolvedOptions);
await this.#farm.createSharedReference(resolvedOptions, false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serializing the options aren't cacheable because of the MemoryFS which has a side effect in the serialize() method... 😬

this.#optionsRef = optionsRef;

this.#disposable = new Disposable();
Expand Down
21 changes: 21 additions & 0 deletions packages/core/core/src/RequestTracker.js
Expand Up @@ -145,6 +145,7 @@ export type RunAPI = {|
getRequestResult<T>(contentKey: ContentKey): Async<?T>,
getPreviousResult<T>(ifMatch?: string): Async<?T>,
getSubRequests(): Array<StoredRequest>,
getInvalidSubRequests(): Array<StoredRequest>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new API is subtly different than api.getSubRequest().filter(req => !api.canSkipSubrequest(req.id)) in that it doesn't have the side effect of preserving all previous sub requests even if not used.

canSkipSubrequest(ContentKey): boolean,
runRequest: <TInput, TResult>(
subRequest: Request<TInput, TResult>,
Expand Down Expand Up @@ -635,6 +636,25 @@ export class RequestGraph extends ContentGraph<
});
}

getInvalidSubRequests(requestNodeId: NodeId): Array<StoredRequest> {
if (!this.hasNode(requestNodeId)) {
return [];
}

let subRequests = this.getNodeIdsConnectedFrom(
requestNodeId,
requestGraphEdgeTypes.subrequest,
);

return subRequests
.filter(id => this.invalidNodeIds.has(id))
.map(nodeId => {
let node = nullthrows(this.getNode(nodeId));
invariant(node.type === 'request');
return node.value;
});
}

invalidateFileNameNode(
node: FileNameNode,
filePath: ProjectPath,
Expand Down Expand Up @@ -1028,6 +1048,7 @@ export default class RequestTracker {
this.storeResult(requestId, result, cacheKey);
},
getSubRequests: () => this.graph.getSubRequests(requestId),
getInvalidSubRequests: () => this.graph.getInvalidSubRequests(requestId),
getPreviousResult: <T>(ifMatch?: string): Async<?T> => {
let contentKey = nullthrows(this.graph.getNode(requestId)?.id);
return this.getRequestResult<T>(contentKey, ifMatch);
Expand Down
7 changes: 7 additions & 0 deletions packages/core/core/src/requests/AssetGraphRequest.js
Expand Up @@ -121,6 +121,7 @@ export class AssetGraphBuilder {
cacheKey: string;
shouldBuildLazily: boolean;
requestedAssetIds: Set<string>;
isSingleChangeRebuild: boolean;

constructor(
{input, api, options}: RunInput,
Expand Down Expand Up @@ -151,6 +152,9 @@ export class AssetGraphBuilder {
`${PARCEL_VERSION}${name}${JSON.stringify(entries) ?? ''}${options.mode}`,
);

this.isSingleChangeRebuild =
api.getInvalidSubRequests().filter(req => req.type === 'asset_request')
.length === 1;
this.queue = new PromiseQueue();
}

Expand Down Expand Up @@ -989,6 +993,7 @@ export class AssetGraphBuilder {
...input,
name: this.name,
optionsRef: this.optionsRef,
isSingleChangeRebuild: this.isSingleChangeRebuild,
});
let assets = await this.api.runRequest<AssetRequestInput, Array<Asset>>(
request,
Expand All @@ -1015,6 +1020,8 @@ export class AssetGraphBuilder {
} else {
this.assetGraph.safeToIncrementallyBundle = false;
}

this.isSingleChangeRebuild = false;
}

/**
Expand Down
5 changes: 4 additions & 1 deletion packages/core/core/src/requests/AssetRequest.js
Expand Up @@ -133,7 +133,10 @@ async function run({input, api, farm, invalidateReason, options}: RunInput) {
invalidations,
invalidateOnFileCreate,
devDepRequests,
} = (await farm.createHandle('runTransform')({
} = (await farm.createHandle(
'runTransform',
input.isSingleChangeRebuild,
)({
configCachePath: cachePath,
optionsRef,
request,
Expand Down
7 changes: 2 additions & 5 deletions packages/core/core/src/requests/BundleGraphRequest.js
Expand Up @@ -48,7 +48,7 @@ import {
getConfigHash,
type PluginWithLoadConfig,
} from './ConfigRequest';
import {cacheSerializedObject, deserializeToCache} from '../serializer';
import {deserializeToCache} from '../serializer';
import {
joinProjectPath,
fromProjectPathRelative,
Expand Down Expand Up @@ -356,11 +356,8 @@ class BundlerRunner {
bundleGraphEdgeTypes,
);

// Store the serialized bundle graph in an in memory cache so that we avoid serializing it
// many times to send to each worker, and in build mode, when writing to cache on shutdown.
// Also, pre-compute the hashes for each bundle so they are only computed once and shared between workers.
// Pre-compute the hashes for each bundle so they are only computed once and shared between workers.
internalBundleGraph.getBundleGraphHash();
cacheSerializedObject(internalBundleGraph);

// Recompute the cache key to account for new dev dependencies and invalidations.
let {cacheKey: updatedCacheKey} = await this.getHashes(graph);
Expand Down
6 changes: 4 additions & 2 deletions packages/core/core/src/requests/PackageRequest.js
Expand Up @@ -20,6 +20,7 @@ type PackageRequestInput = {|
bundle: Bundle,
bundleGraphReference: SharedReference,
optionsRef: SharedReference,
useMainThread?: boolean,
|};

type RunInput = {|
Expand All @@ -46,14 +47,15 @@ export function createPackageRequest(
}

async function run({input, api, farm}: RunInput) {
let {bundleGraphReference, optionsRef, bundle} = input;
let runPackage = farm.createHandle('runPackage');
let {bundleGraphReference, optionsRef, bundle, useMainThread} = input;
let runPackage = farm.createHandle('runPackage', useMainThread);

let start = Date.now();
let {devDeps, invalidDevDeps} = await getDevDepRequests(api);
let {cachePath} = nullthrows(
await api.runRequest<null, ConfigAndCachePath>(createParcelConfigRequest()),
);

let {devDepRequests, configRequests, bundleInfo, invalidations} =
(await runPackage({
bundle,
Expand Down
14 changes: 9 additions & 5 deletions packages/core/core/src/requests/WriteBundlesRequest.js
Expand Up @@ -9,7 +9,6 @@ import type BundleGraph from '../BundleGraph';
import type {BundleInfo} from '../PackagerRunner';

import {HASH_REF_PREFIX} from '../constants';
import {serialize} from '../serializer';
import {joinProjectPath} from '../projectPath';
import nullthrows from 'nullthrows';
import {hashString} from '@parcel/hash';
Expand Down Expand Up @@ -49,10 +48,7 @@ export default function createWriteBundlesRequest(

async function run({input, api, farm, options}: RunInput) {
let {bundleGraph, optionsRef} = input;
let {ref, dispose} = await farm.createSharedReference(
bundleGraph,
serialize(bundleGraph),
);
let {ref, dispose} = await farm.createSharedReference(bundleGraph);

api.invalidateOnOptionChange('shouldContentHash');

Expand Down Expand Up @@ -83,6 +79,13 @@ async function run({input, api, farm, options}: RunInput) {
return true;
});

// Package on the main thread if there is only one bundle to package.
// This avoids the cost of serializing the bundle graph for single file change builds.
let useMainThread =
bundles.length === 1 ||
bundles.filter(b => !api.canSkipSubrequest(bundleGraph.getHash(b)))
.length === 1;

try {
await Promise.all(
bundles.map(async bundle => {
Expand All @@ -91,6 +94,7 @@ async function run({input, api, farm, options}: RunInput) {
bundleGraph,
bundleGraphReference: ref,
optionsRef,
useMainThread,
});

let info = await api.runRequest(request);
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/types.js
Expand Up @@ -342,6 +342,7 @@ export type AssetRequestInput = {|
optionsRef: SharedReference,
isURL?: boolean,
query?: ?string,
isSingleChangeRebuild?: boolean,
|};

export type AssetRequestResult = Array<Asset>;
Expand Down
5 changes: 3 additions & 2 deletions packages/core/core/src/worker.js
Expand Up @@ -17,14 +17,15 @@ import Transformation, {
type TransformationOpts,
type TransformationResult,
} from './Transformation';
import {reportWorker} from './ReporterRunner';
import {reportWorker, report} from './ReporterRunner';
import PackagerRunner, {type PackageRequestResult} from './PackagerRunner';
import Validation, {type ValidationOpts} from './Validation';
import ParcelConfig from './ParcelConfig';
import {registerCoreWithSerializer} from './utils';
import {clearBuildCaches} from './buildCache';
import {init as initSourcemaps} from '@parcel/source-map';
import {init as initHash} from '@parcel/hash';
import WorkerFarm from '@parcel/workers';

import '@parcel/cache'; // register with serializer
import '@parcel/package-manager';
Expand Down Expand Up @@ -137,7 +138,7 @@ export async function runPackage(
let runner = new PackagerRunner({
config: parcelConfig,
options,
report: reportWorker.bind(null, workerApi),
report: WorkerFarm.isWorker() ? reportWorker.bind(null, workerApi) : report,
previousDevDeps,
previousInvalidations,
});
Expand Down
3 changes: 3 additions & 0 deletions packages/core/core/test/TargetRequest.test.js
Expand Up @@ -93,6 +93,9 @@ describe('TargetResolver', () => {
getSubRequests() {
return [];
},
getInvalidSubRequests() {
return [];
},
};

it('resolves exactly specified targets', async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/core/workers/src/Worker.js
Expand Up @@ -31,7 +31,7 @@ export default class Worker extends EventEmitter {
+options: WorkerOpts;
worker: WorkerImpl;
id: number = WORKER_ID++;
sharedReferences: $ReadOnlyMap<SharedReference, mixed> = new Map();
sentSharedReferences: Set<SharedReference> = new Set();

calls: Map<number, WorkerCall> = new Map();
exitCode: ?number = null;
Expand Down Expand Up @@ -135,6 +135,7 @@ export default class Worker extends EventEmitter {
}

sendSharedReference(ref: SharedReference, value: mixed): Promise<any> {
this.sentSharedReferences.add(ref);
return new Promise((resolve, reject) => {
this.call({
method: 'createSharedReference',
Expand Down
75 changes: 55 additions & 20 deletions packages/core/workers/src/WorkerFarm.js
Expand Up @@ -76,6 +76,7 @@ export default class WorkerFarm extends EventEmitter {
handles: Map<number, Handle> = new Map();
sharedReferences: Map<SharedReference, mixed> = new Map();
sharedReferencesByValue: Map<mixed, SharedReference> = new Map();
serializedSharedReferences: Map<SharedReference, ?ArrayBuffer> = new Map();
profiler: ?Profiler;

constructor(farmOptions: $Shape<FarmOptions> = {}) {
Expand Down Expand Up @@ -175,21 +176,26 @@ export default class WorkerFarm extends EventEmitter {
);
}

createHandle(method: string): HandleFunction {
createHandle(method: string, useMainThread: boolean = false): HandleFunction {
return async (...args) => {
// Child process workers are slow to start (~600ms).
// While we're waiting, just run on the main thread.
// This significantly speeds up startup time.
if (this.shouldUseRemoteWorkers()) {
if (this.shouldUseRemoteWorkers() && !useMainThread) {
return this.addCall(method, [...args, false]);
} else {
if (this.options.warmWorkers && this.shouldStartRemoteWorkers()) {
this.warmupWorker(method, args);
}

let processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false]),
);
let processedArgs;
if (!useMainThread) {
processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false]),
);
} else {
processedArgs = args;
}

if (this.localWorkerInit != null) {
await this.localWorkerInit;
Expand Down Expand Up @@ -273,11 +279,24 @@ export default class WorkerFarm extends EventEmitter {
}

if (worker.calls.size < this.options.maxConcurrentCallsPerWorker) {
worker.call(this.callQueue.shift());
this.callWorker(worker, this.callQueue.shift());
}
}
}

async callWorker(worker: Worker, call: WorkerCall): Promise<void> {
for (let ref of this.sharedReferences.keys()) {
if (!worker.sentSharedReferences.has(ref)) {
await worker.sendSharedReference(
ref,
this.getSerializedSharedReference(ref),
);
}
}

worker.call(call);
}

async processRequest(
data: {|
location: FilePath,
Expand Down Expand Up @@ -400,33 +419,31 @@ export default class WorkerFarm extends EventEmitter {
return handle;
}

async createSharedReference(
createSharedReference(
value: mixed,
// An optional, pre-serialized representation of the value to be used
// in its place.
buffer?: Buffer,
): Promise<{|ref: SharedReference, dispose(): Promise<mixed>|}> {
isCacheable: boolean = true,
): {|ref: SharedReference, dispose(): Promise<mixed>|} {
let ref = referenceId++;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);

let toSend = buffer ? buffer.buffer : value;
let promises = [];
for (let worker of this.workers.values()) {
if (worker.ready) {
promises.push(worker.sendSharedReference(ref, toSend));
}
if (!isCacheable) {
this.serializedSharedReferences.set(ref, null);
}

await Promise.all(promises);

return {
ref,
dispose: () => {
this.sharedReferences.delete(ref);
this.sharedReferencesByValue.delete(value);
this.serializedSharedReferences.delete(ref);

let promises = [];
for (let worker of this.workers.values()) {
if (!worker.sentSharedReferences.has(ref)) {
continue;
}

worker.sentSharedReferences.delete(ref);
promises.push(
new Promise((resolve, reject) => {
worker.call({
Expand All @@ -445,6 +462,24 @@ export default class WorkerFarm extends EventEmitter {
};
}

getSerializedSharedReference(ref: SharedReference): ArrayBuffer {
let cached = this.serializedSharedReferences.get(ref);
if (cached) {
return cached;
}

let value = this.sharedReferences.get(ref);
let buf = serialize(value).buffer;

// If the reference was created with the isCacheable option set to false,
// serializedSharedReferences will contain `null` as the value.
if (cached !== null) {
this.serializedSharedReferences.set(ref, buf);
}

return buf;
}

async startProfile() {
let promises = [];
for (let worker of this.workers.values()) {
Expand Down