Skip to content

Commit

Permalink
Add large blob storage to Cache (#7198)
Browse files Browse the repository at this point in the history
  • Loading branch information
lettertwo committed Nov 23, 2021
1 parent 1069398 commit cce9e0c
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 26 deletions.
16 changes: 14 additions & 2 deletions packages/core/cache/src/FSCache.js
Expand Up @@ -41,13 +41,13 @@ export class FSCache implements Cache {
}

getStream(key: string): Readable {
return this.fs.createReadStream(this._getCachePath(key));
return this.fs.createReadStream(this._getCachePath(`${key}-large`));
}

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(this._getCachePath(key)))
.pipe(this.fs.createWriteStream(this._getCachePath(`${key}-large`)))
.on('error', reject)
.on('finish', resolve);
});
Expand Down Expand Up @@ -77,6 +77,18 @@ export class FSCache implements Cache {
}
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(this._getCachePath(`${key}-large`));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(this._getCachePath(`${key}-large`));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(this._getCachePath(`${key}-large`), contents);
}

async get<T>(key: string): Promise<?T> {
try {
let data = await this.fs.readFile(this._getCachePath(key));
Expand Down
33 changes: 26 additions & 7 deletions packages/core/cache/src/LMDBCache.js
@@ -1,21 +1,24 @@
// @flow strict-local
import type {Readable} from 'stream';
import type {FilePath} from '@parcel/types';
import type {Cache} from './types';

import {Readable} from 'stream';
import path from 'path';
import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
import {blobToStream, bufferStream} from '@parcel/utils';
import {NodeFS} from '@parcel/fs';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';
// $FlowFixMe
import lmdb from 'lmdb-store';

export class LMDBCache implements Cache {
fs: NodeFS;
dir: FilePath;
// $FlowFixMe
store: any;

constructor(cacheDir: FilePath) {
this.fs = new NodeFS();
this.dir = cacheDir;

this.store = lmdb.open(cacheDir, {
Expand Down Expand Up @@ -53,16 +56,20 @@ export class LMDBCache implements Cache {
}

async set(key: string, value: mixed): Promise<void> {
await this.store.put(key, serialize(value));
await this.setBlob(key, serialize(value));
}

getStream(key: string): Readable {
return blobToStream(this.store.get(key));
return this.fs.createReadStream(path.join(this.dir, key));
}

async setStream(key: string, stream: Readable): Promise<void> {
let buf = await bufferStream(stream);
await this.store.put(key, buf);
setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(path.join(this.dir, key)))
.on('error', reject)
.on('finish', resolve);
});
}

getBlob(key: string): Promise<Buffer> {
Expand All @@ -79,6 +86,18 @@ export class LMDBCache implements Cache {
getBuffer(key: string): Promise<?Buffer> {
return Promise.resolve(this.store.get(key));
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(path.join(this.dir, key));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(path.join(this.dir, key));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(path.join(this.dir, key), contents);
}
}

registerSerializableClass(`${packageJson.version}:LMDBCache`, LMDBCache);
3 changes: 3 additions & 0 deletions packages/core/cache/src/types.js
Expand Up @@ -10,5 +10,8 @@ export interface Cache {
setStream(key: string, stream: Readable): Promise<void>;
getBlob(key: string): Promise<Buffer>;
setBlob(key: string, contents: Buffer | string): Promise<void>;
hasLargeBlob(key: string): Promise<boolean>;
getLargeBlob(key: string): Promise<Buffer>;
setLargeBlob(key: string, contents: Buffer | string): Promise<void>;
getBuffer(key: string): Promise<?Buffer>;
}
6 changes: 5 additions & 1 deletion packages/core/core/src/CommittedAsset.js
Expand Up @@ -27,7 +27,11 @@ export default class CommittedAsset {
getContent(): Blob | Promise<Buffer | string> {
if (this.content == null) {
if (this.value.contentKey != null) {
return this.options.cache.getStream(this.value.contentKey);
if (this.value.isLargeBlob) {
return this.options.cache.getStream(this.value.contentKey);
} else {
return this.options.cache.getBlob(this.value.contentKey);
}
} else if (this.value.astKey != null) {
return streamFromPromise(
generateFromAST(this).then(({content}) => {
Expand Down
16 changes: 13 additions & 3 deletions packages/core/core/src/PackagerRunner.js
Expand Up @@ -83,6 +83,7 @@ export type BundleInfo = {|
+hashReferences: Array<string>,
+time?: number,
+cacheKeys: CacheKeyMap,
+isLargeBlob: boolean,
|};

type CacheKeyMap = {|
Expand Down Expand Up @@ -596,16 +597,22 @@ export default class PackagerRunner {
let contentKey = PackagerRunner.getContentKey(cacheKey);
let mapKey = PackagerRunner.getMapKey(cacheKey);

let contentExists = await this.options.cache.has(contentKey);
let isLargeBlob = await this.options.cache.hasLargeBlob(contentKey);
let contentExists =
isLargeBlob || (await this.options.cache.has(contentKey));
if (!contentExists) {
return null;
}

let mapExists = await this.options.cache.has(mapKey);

return {
contents: this.options.cache.getStream(contentKey),
map: mapExists ? this.options.cache.getStream(mapKey) : null,
contents: isLargeBlob
? this.options.cache.getStream(contentKey)
: blobToStream(await this.options.cache.getBlob(contentKey)),
map: mapExists
? blobToStream(await this.options.cache.getBlob(mapKey))
: null,
};
}

Expand All @@ -618,9 +625,11 @@ export default class PackagerRunner {
let size = 0;
let hash;
let hashReferences = [];
let isLargeBlob = false;

// TODO: don't replace hash references in binary files??
if (contents instanceof Readable) {
isLargeBlob = true;
let boundaryStr = '';
let h = new Hash();
await this.options.cache.setStream(
Expand Down Expand Up @@ -659,6 +668,7 @@ export default class PackagerRunner {
hash,
hashReferences,
cacheKeys,
isLargeBlob,
};
await this.options.cache.set(cacheKeys.info, info);
return info;
Expand Down
26 changes: 17 additions & 9 deletions packages/core/core/src/RequestTracker.js
Expand Up @@ -29,6 +29,7 @@ import {
} from '@parcel/utils';
import {hashString} from '@parcel/hash';
import {ContentGraph} from '@parcel/graph';
import {deserialize, serialize} from './serializer';
import {assertSignalNotAborted, hashFromOption} from './utils';
import {
type ProjectPath,
Expand Down Expand Up @@ -856,10 +857,11 @@ export default class RequestTracker {
let result: T = (node.value.result: any);
return result;
} else if (node.value.resultCacheKey != null && ifMatch == null) {
let cachedResult: T = (nullthrows(
await this.options.cache.get(node.value.resultCacheKey),
// $FlowFixMe
): any);
let key = node.value.resultCacheKey;
invariant(this.options.cache.hasLargeBlob(key));
let cachedResult: T = deserialize(
await this.options.cache.getLargeBlob(key),
);
node.value.result = cachedResult;
return cachedResult;
}
Expand Down Expand Up @@ -1050,13 +1052,18 @@ export default class RequestTracker {
let resultCacheKey = node.value.resultCacheKey;
if (resultCacheKey != null && node.value.result != null) {
promises.push(
this.options.cache.set(resultCacheKey, node.value.result),
this.options.cache.setLargeBlob(
resultCacheKey,
serialize(node.value.result),
),
);
delete node.value.result;
}
}

promises.push(this.options.cache.set(requestGraphKey, this.graph));
promises.push(
this.options.cache.setLargeBlob(requestGraphKey, serialize(this.graph)),
);

let opts = getWatcherOptions(this.options);
let snapshotPath = path.join(this.options.cacheDir, snapshotKey + '.txt');
Expand Down Expand Up @@ -1100,9 +1107,10 @@ async function loadRequestGraph(options): Async<RequestGraph> {

let cacheKey = getCacheKey(options);
let requestGraphKey = hashString(`${cacheKey}:requestGraph`);
let requestGraph = await options.cache.get<RequestGraph>(requestGraphKey);

if (requestGraph) {
if (await options.cache.hasLargeBlob(requestGraphKey)) {
let requestGraph: RequestGraph = deserialize(
await options.cache.getLargeBlob(requestGraphKey),
);
let opts = getWatcherOptions(options);
let snapshotKey = hashString(`${cacheKey}:snapshot`);
let snapshotPath = path.join(options.cacheDir, snapshotKey + '.txt');
Expand Down
4 changes: 3 additions & 1 deletion packages/core/core/src/Transformation.js
Expand Up @@ -512,7 +512,9 @@ export default class Transformation {
cachedAssets.map(async (value: AssetValue) => {
let content =
value.contentKey != null
? this.options.cache.getStream(value.contentKey)
? value.isLargeBlob
? this.options.cache.getStream(value.contentKey)
: await this.options.cache.getBlob(value.contentKey)
: null;
let mapBuffer =
value.astKey != null
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/UncommittedAsset.js
Expand Up @@ -135,6 +135,7 @@ export default class UncommittedAsset {
this.value.stats.size = size;
}

this.value.isLargeBlob = this.content instanceof Readable;
this.value.committed = true;
}

Expand Down
13 changes: 10 additions & 3 deletions packages/core/core/src/requests/WriteBundleRequest.js
Expand Up @@ -16,7 +16,7 @@ import {HASH_REF_PREFIX, HASH_REF_REGEX} from '../constants';
import nullthrows from 'nullthrows';
import path from 'path';
import {NamedBundle} from '../public/Bundle';
import {TapStream} from '@parcel/utils';
import {blobToStream, TapStream} from '@parcel/utils';
import {Readable, Transform, pipeline} from 'stream';
import {
fromProjectPath,
Expand Down Expand Up @@ -123,7 +123,14 @@ async function run({input, options, api}: RunInput) {
: {
mode: (await inputFS.stat(mainEntry.filePath)).mode,
};
let contentStream = options.cache.getStream(cacheKeys.content);
let contentStream: Readable;
if (info.isLargeBlob) {
contentStream = options.cache.getStream(cacheKeys.content);
} else {
contentStream = blobToStream(
await options.cache.getBlob(cacheKeys.content),
);
}
let size = 0;
contentStream = contentStream.pipe(
new TapStream(buf => {
Expand Down Expand Up @@ -159,7 +166,7 @@ async function run({input, options, api}: RunInput) {
(await options.cache.has(mapKey))
) {
await writeFiles(
options.cache.getStream(mapKey),
blobToStream(await options.cache.getBlob(mapKey)),
info,
hashRefToNameHash,
options,
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/types.js
Expand Up @@ -180,6 +180,7 @@ export type Asset = {|
configPath?: ProjectPath,
plugin: ?PackageName,
configKeyPath?: string,
isLargeBlob?: boolean,
|};

export type InternalGlob = ProjectPath;
Expand Down

0 comments on commit cce9e0c

Please sign in to comment.