Skip to content

Commit

Permalink
Implementation without move function
Browse files Browse the repository at this point in the history
  • Loading branch information
mischnic committed Jun 10, 2022
1 parent cbfb5a4 commit 577578e
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 81 deletions.
13 changes: 3 additions & 10 deletions packages/core/cache/src/FSCache.js
Expand Up @@ -46,17 +46,10 @@ export class FSCache implements Cache {

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
let {writeStream, move} = this.fs.createWriteStream(
path.join(this.dir, key),
);
stream
.pipe(writeStream)
.on('error', e => {
reject(e);
})
.on('finish', () => {
move().then(resolve);
});
.pipe(this.fs.createWriteStream(path.join(this.dir, key)))
.on('error', reject)
.on('finish', resolve);
});
}

Expand Down
13 changes: 3 additions & 10 deletions packages/core/cache/src/LMDBCache.js
Expand Up @@ -65,17 +65,10 @@ export class LMDBCache implements Cache {

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
let {writeStream, move} = this.fs.createWriteStream(
path.join(this.dir, key),
);
stream
.pipe(writeStream)
.on('error', e => {
reject(e);
})
.on('finish', () => {
move().then(resolve);
});
.pipe(this.fs.createWriteStream(path.join(this.dir, key)))
.on('error', reject)
.on('finish', resolve);
});
}

Expand Down
20 changes: 11 additions & 9 deletions packages/core/core/src/requests/WriteBundleRequest.js
Expand Up @@ -251,17 +251,19 @@ async function runCompressor(
});

if (res != null) {
let {writeStream, move} = outputFS.createWriteStream(
filePath + (res.type != null ? '.' + res.type : ''),
writeOptions,
);
await new Promise((resolve, reject) =>
pipeline(res.stream, writeStream, err => {
if (err) reject(err);
else resolve();
}),
pipeline(
res.stream,
outputFS.createWriteStream(
filePath + (res.type != null ? '.' + res.type : ''),
writeOptions,
),
err => {
if (err) reject(err);
else resolve();
},
),
);
await move();
}
} catch (err) {
throw new ThrowableDiagnostic({
Expand Down
11 changes: 1 addition & 10 deletions packages/core/fs/src/MemoryFS.js
Expand Up @@ -483,16 +483,7 @@ export class MemoryFS implements FileSystem {
return new ReadStream(this, filePath);
}

createWriteStream(
filePath: FilePath,
options: ?FileOptions,
): {|writeStream: WriteStream, move: () => Promise<void>|} {
return {
writeStream: new WriteStream(this, filePath, options),
move: () => Promise.resolve(),
};
}
createWriteStr(filePath: FilePath, options: ?FileOptions): WriteStream {
createWriteStream(filePath: FilePath, options: ?FileOptions): WriteStream {
return new WriteStream(this, filePath, options);
}

Expand Down
60 changes: 28 additions & 32 deletions packages/core/fs/src/NodeFS.js
Expand Up @@ -57,48 +57,44 @@ export class NodeFS implements FileSystem {
? (...args) => searchJS.findFirstFile(this, ...args)
: searchNative.findFirstFile;

createWriteStream(
filePath: string,
options: any,
): {|writeStream: Writable, move: () => Promise<void>|} {
createWriteStream(filePath: string, options: any): Writable {
// Make createWriteStream atomic
let tmpFilePath = getTempFilePath(filePath);

let writeStream = fs.createWriteStream(tmpFilePath, options);

let failed = false;
writeStream.once('error', () => {
failed = true;
fs.unlinkSync(tmpFilePath);
});

return {
writeStream,
move: async () => {
if (!failed) {
try {
await fs.promises.rename(tmpFilePath, filePath);
} catch (e) {
if (
process.platform === 'win32' &&
e.syscall &&
e.syscall === 'rename' &&
e.code &&
e.code === 'EPERM'
) {
let [hashTmp, hashTarget] = await Promise.all([
hashStream(writeStream.__atomicTmp),
hashStream(writeStream.__atomicTarget),
]);

await this.unlink(writeStream.__atomicTmp);

if (hashTmp != hashTarget) {
throw e;
}
writeStream.once('close', async () => {
if (!failed) {
try {
await fs.promises.rename(tmpFilePath, filePath);
} catch (e) {
if (
process.platform === 'win32' &&
e.syscall &&
e.syscall === 'rename' &&
e.code &&
e.code === 'EPERM'
) {
let [hashTmp, hashTarget] = await Promise.all([
hashStream(writeStream.__atomicTmp),
hashStream(writeStream.__atomicTarget),
]);

await this.unlink(writeStream.__atomicTmp);

if (hashTmp != hashTarget) {
throw e;
}
}
}
},
};
}
});

return writeStream;
}

async writeFile(
Expand Down
9 changes: 3 additions & 6 deletions packages/core/fs/src/index.js
Expand Up @@ -23,14 +23,11 @@ export async function ncp(
let stats = await sourceFS.stat(sourcePath);
if (stats.isFile()) {
await new Promise((resolve, reject) => {
let {writeStream, move} = destinationFS.createWriteStream(destPath);
sourceFS
.createReadStream(sourcePath)
.pipe(writeStream)
.on('error', reject)
.on('finish', () => {
move().then(resolve);
});
.pipe(destinationFS.createWriteStream(destPath))
.on('finish', () => resolve())
.on('error', reject);
});
} else if (stats.isDirectory()) {
await ncp(sourceFS, sourcePath, destinationFS, destPath);
Expand Down
5 changes: 1 addition & 4 deletions packages/core/fs/src/types.js
Expand Up @@ -86,10 +86,7 @@ export interface FileSystem {
rimraf(path: FilePath): Promise<void>;
ncp(source: FilePath, destination: FilePath): Promise<void>;
createReadStream(path: FilePath, options?: ?FileOptions): Readable;
createWriteStream(
path: FilePath,
options?: ?FileOptions,
): {|+writeStream: Writable, move: () => Promise<void>|};
createWriteStream(path: FilePath, options?: ?FileOptions): Writable;
cwd(): FilePath;
chdir(dir: FilePath): void;
watch(
Expand Down

0 comments on commit 577578e

Please sign in to comment.