Skip to content

Commit

Permalink
Only serialize and send shared references to workers that need them (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
devongovett committed Nov 1, 2022
1 parent c4a898c commit 3480705
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 37 deletions.
2 changes: 1 addition & 1 deletion packages/core/core/src/Parcel.js
Expand Up @@ -110,7 +110,7 @@ export default class Parcel {
await resolvedOptions.cache.ensure();

let {dispose: disposeOptions, ref: optionsRef} =
await this.#farm.createSharedReference(resolvedOptions);
await this.#farm.createSharedReference(resolvedOptions, false);
this.#optionsRef = optionsRef;

this.#disposable = new Disposable();
Expand Down
21 changes: 21 additions & 0 deletions packages/core/core/src/RequestTracker.js
Expand Up @@ -145,6 +145,7 @@ export type RunAPI = {|
getRequestResult<T>(contentKey: ContentKey): Async<?T>,
getPreviousResult<T>(ifMatch?: string): Async<?T>,
getSubRequests(): Array<StoredRequest>,
getInvalidSubRequests(): Array<StoredRequest>,
canSkipSubrequest(ContentKey): boolean,
runRequest: <TInput, TResult>(
subRequest: Request<TInput, TResult>,
Expand Down Expand Up @@ -635,6 +636,25 @@ export class RequestGraph extends ContentGraph<
});
}

getInvalidSubRequests(requestNodeId: NodeId): Array<StoredRequest> {
if (!this.hasNode(requestNodeId)) {
return [];
}

let subRequests = this.getNodeIdsConnectedFrom(
requestNodeId,
requestGraphEdgeTypes.subrequest,
);

return subRequests
.filter(id => this.invalidNodeIds.has(id))
.map(nodeId => {
let node = nullthrows(this.getNode(nodeId));
invariant(node.type === 'request');
return node.value;
});
}

invalidateFileNameNode(
node: FileNameNode,
filePath: ProjectPath,
Expand Down Expand Up @@ -1028,6 +1048,7 @@ export default class RequestTracker {
this.storeResult(requestId, result, cacheKey);
},
getSubRequests: () => this.graph.getSubRequests(requestId),
getInvalidSubRequests: () => this.graph.getInvalidSubRequests(requestId),
getPreviousResult: <T>(ifMatch?: string): Async<?T> => {
let contentKey = nullthrows(this.graph.getNode(requestId)?.id);
return this.getRequestResult<T>(contentKey, ifMatch);
Expand Down
7 changes: 7 additions & 0 deletions packages/core/core/src/requests/AssetGraphRequest.js
Expand Up @@ -113,6 +113,7 @@ export class AssetGraphBuilder {
cacheKey: string;
shouldBuildLazily: boolean;
requestedAssetIds: Set<string>;
isSingleChangeRebuild: boolean;

constructor(
{input, api, options}: RunInput,
Expand Down Expand Up @@ -143,6 +144,9 @@ export class AssetGraphBuilder {
`${PARCEL_VERSION}${name}${JSON.stringify(entries) ?? ''}${options.mode}`,
);

this.isSingleChangeRebuild =
api.getInvalidSubRequests().filter(req => req.type === 'asset_request')
.length === 1;
this.queue = new PromiseQueue();
}

Expand Down Expand Up @@ -981,6 +985,7 @@ export class AssetGraphBuilder {
...input,
name: this.name,
optionsRef: this.optionsRef,
isSingleChangeRebuild: this.isSingleChangeRebuild,
});
let assets = await this.api.runRequest<AssetRequestInput, Array<Asset>>(
request,
Expand All @@ -1007,6 +1012,8 @@ export class AssetGraphBuilder {
} else {
this.assetGraph.safeToIncrementallyBundle = false;
}

this.isSingleChangeRebuild = false;
}

/**
Expand Down
5 changes: 4 additions & 1 deletion packages/core/core/src/requests/AssetRequest.js
Expand Up @@ -133,7 +133,10 @@ async function run({input, api, farm, invalidateReason, options}: RunInput) {
invalidations,
invalidateOnFileCreate,
devDepRequests,
} = (await farm.createHandle('runTransform')({
} = (await farm.createHandle(
'runTransform',
input.isSingleChangeRebuild,
)({
configCachePath: cachePath,
optionsRef,
request,
Expand Down
6 changes: 1 addition & 5 deletions packages/core/core/src/requests/BundleGraphRequest.js
Expand Up @@ -48,7 +48,6 @@ import {
runConfigRequest,
type PluginWithLoadConfig,
} from './ConfigRequest';
import {cacheSerializedObject} from '../serializer';
import {
joinProjectPath,
fromProjectPathRelative,
Expand Down Expand Up @@ -366,11 +365,8 @@ class BundlerRunner {
configs: this.configs,
});

// Store the serialized bundle graph in an in memory cache so that we avoid serializing it
// many times to send to each worker, and in build mode, when writing to cache on shutdown.
// Also, pre-compute the hashes for each bundle so they are only computed once and shared between workers.
// Pre-compute the hashes for each bundle so they are only computed once and shared between workers.
internalBundleGraph.getBundleGraphHash();
cacheSerializedObject(internalBundleGraph);
}

await dumpGraphToGraphViz(
Expand Down
6 changes: 4 additions & 2 deletions packages/core/core/src/requests/PackageRequest.js
Expand Up @@ -20,6 +20,7 @@ type PackageRequestInput = {|
bundle: Bundle,
bundleGraphReference: SharedReference,
optionsRef: SharedReference,
useMainThread?: boolean,
|};

type RunInput = {|
Expand All @@ -46,14 +47,15 @@ export function createPackageRequest(
}

async function run({input, api, farm}: RunInput) {
let {bundleGraphReference, optionsRef, bundle} = input;
let runPackage = farm.createHandle('runPackage');
let {bundleGraphReference, optionsRef, bundle, useMainThread} = input;
let runPackage = farm.createHandle('runPackage', useMainThread);

let start = Date.now();
let {devDeps, invalidDevDeps} = await getDevDepRequests(api);
let {cachePath} = nullthrows(
await api.runRequest<null, ConfigAndCachePath>(createParcelConfigRequest()),
);

let {devDepRequests, configRequests, bundleInfo, invalidations} =
(await runPackage({
bundle,
Expand Down
14 changes: 9 additions & 5 deletions packages/core/core/src/requests/WriteBundlesRequest.js
Expand Up @@ -9,7 +9,6 @@ import type BundleGraph from '../BundleGraph';
import type {BundleInfo} from '../PackagerRunner';

import {HASH_REF_PREFIX} from '../constants';
import {serialize} from '../serializer';
import {joinProjectPath} from '../projectPath';
import nullthrows from 'nullthrows';
import {hashString} from '@parcel/hash';
Expand Down Expand Up @@ -49,10 +48,7 @@ export default function createWriteBundlesRequest(

async function run({input, api, farm, options}: RunInput) {
let {bundleGraph, optionsRef} = input;
let {ref, dispose} = await farm.createSharedReference(
bundleGraph,
serialize(bundleGraph),
);
let {ref, dispose} = await farm.createSharedReference(bundleGraph);

api.invalidateOnOptionChange('shouldContentHash');

Expand Down Expand Up @@ -83,6 +79,13 @@ async function run({input, api, farm, options}: RunInput) {
return true;
});

// Package on the main thread if there is only one bundle to package.
// This avoids the cost of serializing the bundle graph for single file change builds.
let useMainThread =
bundles.length === 1 ||
bundles.filter(b => !api.canSkipSubrequest(bundleGraph.getHash(b)))
.length === 1;

try {
await Promise.all(
bundles.map(async bundle => {
Expand All @@ -91,6 +94,7 @@ async function run({input, api, farm, options}: RunInput) {
bundleGraph,
bundleGraphReference: ref,
optionsRef,
useMainThread,
});

let info = await api.runRequest(request);
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/types.js
Expand Up @@ -342,6 +342,7 @@ export type AssetRequestInput = {|
optionsRef: SharedReference,
isURL?: boolean,
query?: ?string,
isSingleChangeRebuild?: boolean,
|};
export type AssetRequestResult = Array<Asset>;
Expand Down
5 changes: 3 additions & 2 deletions packages/core/core/src/worker.js
Expand Up @@ -17,14 +17,15 @@ import Transformation, {
type TransformationOpts,
type TransformationResult,
} from './Transformation';
import {reportWorker} from './ReporterRunner';
import {reportWorker, report} from './ReporterRunner';
import PackagerRunner, {type PackageRequestResult} from './PackagerRunner';
import Validation, {type ValidationOpts} from './Validation';
import ParcelConfig from './ParcelConfig';
import {registerCoreWithSerializer} from './utils';
import {clearBuildCaches} from './buildCache';
import {init as initSourcemaps} from '@parcel/source-map';
import {init as initHash} from '@parcel/hash';
import WorkerFarm from '@parcel/workers';

import '@parcel/cache'; // register with serializer
import '@parcel/package-manager';
Expand Down Expand Up @@ -137,7 +138,7 @@ export async function runPackage(
let runner = new PackagerRunner({
config: parcelConfig,
options,
report: reportWorker.bind(null, workerApi),
report: WorkerFarm.isWorker() ? reportWorker.bind(null, workerApi) : report,
previousDevDeps,
previousInvalidations,
});
Expand Down
3 changes: 3 additions & 0 deletions packages/core/core/test/TargetRequest.test.js
Expand Up @@ -93,6 +93,9 @@ describe('TargetResolver', () => {
getSubRequests() {
return [];
},
getInvalidSubRequests() {
return [];
},
};

it('resolves exactly specified targets', async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/core/workers/src/Worker.js
Expand Up @@ -31,7 +31,7 @@ export default class Worker extends EventEmitter {
+options: WorkerOpts;
worker: WorkerImpl;
id: number = WORKER_ID++;
sharedReferences: $ReadOnlyMap<SharedReference, mixed> = new Map();
sentSharedReferences: Set<SharedReference> = new Set();

calls: Map<number, WorkerCall> = new Map();
exitCode: ?number = null;
Expand Down Expand Up @@ -135,6 +135,7 @@ export default class Worker extends EventEmitter {
}

sendSharedReference(ref: SharedReference, value: mixed): Promise<any> {
this.sentSharedReferences.add(ref);
return new Promise((resolve, reject) => {
this.call({
method: 'createSharedReference',
Expand Down
75 changes: 55 additions & 20 deletions packages/core/workers/src/WorkerFarm.js
Expand Up @@ -76,6 +76,7 @@ export default class WorkerFarm extends EventEmitter {
handles: Map<number, Handle> = new Map();
sharedReferences: Map<SharedReference, mixed> = new Map();
sharedReferencesByValue: Map<mixed, SharedReference> = new Map();
serializedSharedReferences: Map<SharedReference, ?ArrayBuffer> = new Map();
profiler: ?Profiler;

constructor(farmOptions: $Shape<FarmOptions> = {}) {
Expand Down Expand Up @@ -175,21 +176,26 @@ export default class WorkerFarm extends EventEmitter {
);
}

createHandle(method: string): HandleFunction {
createHandle(method: string, useMainThread: boolean = false): HandleFunction {
return async (...args) => {
// Child process workers are slow to start (~600ms).
// While we're waiting, just run on the main thread.
// This significantly speeds up startup time.
if (this.shouldUseRemoteWorkers()) {
if (this.shouldUseRemoteWorkers() && !useMainThread) {
return this.addCall(method, [...args, false]);
} else {
if (this.options.warmWorkers && this.shouldStartRemoteWorkers()) {
this.warmupWorker(method, args);
}

let processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false]),
);
let processedArgs;
if (!useMainThread) {
processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false]),
);
} else {
processedArgs = args;
}

if (this.localWorkerInit != null) {
await this.localWorkerInit;
Expand Down Expand Up @@ -273,11 +279,24 @@ export default class WorkerFarm extends EventEmitter {
}

if (worker.calls.size < this.options.maxConcurrentCallsPerWorker) {
worker.call(this.callQueue.shift());
this.callWorker(worker, this.callQueue.shift());
}
}
}

async callWorker(worker: Worker, call: WorkerCall): Promise<void> {
for (let ref of this.sharedReferences.keys()) {
if (!worker.sentSharedReferences.has(ref)) {
await worker.sendSharedReference(
ref,
this.getSerializedSharedReference(ref),
);
}
}

worker.call(call);
}

async processRequest(
data: {|
location: FilePath,
Expand Down Expand Up @@ -400,33 +419,31 @@ export default class WorkerFarm extends EventEmitter {
return handle;
}

async createSharedReference(
createSharedReference(
value: mixed,
// An optional, pre-serialized representation of the value to be used
// in its place.
buffer?: Buffer,
): Promise<{|ref: SharedReference, dispose(): Promise<mixed>|}> {
isCacheable: boolean = true,
): {|ref: SharedReference, dispose(): Promise<mixed>|} {
let ref = referenceId++;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);

let toSend = buffer ? buffer.buffer : value;
let promises = [];
for (let worker of this.workers.values()) {
if (worker.ready) {
promises.push(worker.sendSharedReference(ref, toSend));
}
if (!isCacheable) {
this.serializedSharedReferences.set(ref, null);
}

await Promise.all(promises);

return {
ref,
dispose: () => {
this.sharedReferences.delete(ref);
this.sharedReferencesByValue.delete(value);
this.serializedSharedReferences.delete(ref);

let promises = [];
for (let worker of this.workers.values()) {
if (!worker.sentSharedReferences.has(ref)) {
continue;
}

worker.sentSharedReferences.delete(ref);
promises.push(
new Promise((resolve, reject) => {
worker.call({
Expand All @@ -445,6 +462,24 @@ export default class WorkerFarm extends EventEmitter {
};
}

getSerializedSharedReference(ref: SharedReference): ArrayBuffer {
let cached = this.serializedSharedReferences.get(ref);
if (cached) {
return cached;
}

let value = this.sharedReferences.get(ref);
let buf = serialize(value).buffer;

// If the reference was created with the isCacheable option set to false,
// serializedSharedReferences will contain `null` as the value.
if (cached !== null) {
this.serializedSharedReferences.set(ref, buf);
}

return buf;
}

async startProfile() {
let promises = [];
for (let worker of this.workers.values()) {
Expand Down

0 comments on commit 3480705

Please sign in to comment.