diff --git a/packages/core/cache/src/FSCache.js b/packages/core/cache/src/FSCache.js index f517be5a77b..92c371013e4 100644 --- a/packages/core/cache/src/FSCache.js +++ b/packages/core/cache/src/FSCache.js @@ -46,17 +46,10 @@ export class FSCache implements Cache { setStream(key: string, stream: Readable): Promise { return new Promise((resolve, reject) => { - let {writeStream, move} = this.fs.createWriteStream( - path.join(this.dir, key), - ); stream - .pipe(writeStream) - .on('error', e => { - reject(e); - }) - .on('finish', () => { - move().then(resolve); - }); + .pipe(this.fs.createWriteStream(path.join(this.dir, key))) + .on('error', reject) + .on('finish', resolve); }); } diff --git a/packages/core/cache/src/LMDBCache.js b/packages/core/cache/src/LMDBCache.js index a11dee74578..8e2366533f4 100644 --- a/packages/core/cache/src/LMDBCache.js +++ b/packages/core/cache/src/LMDBCache.js @@ -65,17 +65,10 @@ export class LMDBCache implements Cache { setStream(key: string, stream: Readable): Promise { return new Promise((resolve, reject) => { - let {writeStream, move} = this.fs.createWriteStream( - path.join(this.dir, key), - ); stream - .pipe(writeStream) - .on('error', e => { - reject(e); - }) - .on('finish', () => { - move().then(resolve); - }); + .pipe(this.fs.createWriteStream(path.join(this.dir, key))) + .on('error', reject) + .on('finish', resolve); }); } diff --git a/packages/core/core/src/requests/WriteBundleRequest.js b/packages/core/core/src/requests/WriteBundleRequest.js index 569033140a0..232d4b0d387 100644 --- a/packages/core/core/src/requests/WriteBundleRequest.js +++ b/packages/core/core/src/requests/WriteBundleRequest.js @@ -251,17 +251,19 @@ async function runCompressor( }); if (res != null) { - let {writeStream, move} = outputFS.createWriteStream( - filePath + (res.type != null ? '.' + res.type : ''), - writeOptions, - ); await new Promise((resolve, reject) => - pipeline(res.stream, writeStream, err => { - if (err) reject(err); - else resolve(); - }), + pipeline( + res.stream, + outputFS.createWriteStream( + filePath + (res.type != null ? '.' + res.type : ''), + writeOptions, + ), + err => { + if (err) reject(err); + else resolve(); + }, + ), ); - await move(); } } catch (err) { throw new ThrowableDiagnostic({ diff --git a/packages/core/fs/src/MemoryFS.js b/packages/core/fs/src/MemoryFS.js index fb76cdafb1e..17afdcc4f75 100644 --- a/packages/core/fs/src/MemoryFS.js +++ b/packages/core/fs/src/MemoryFS.js @@ -483,16 +483,7 @@ export class MemoryFS implements FileSystem { return new ReadStream(this, filePath); } - createWriteStream( - filePath: FilePath, - options: ?FileOptions, - ): {|writeStream: WriteStream, move: () => Promise|} { - return { - writeStream: new WriteStream(this, filePath, options), - move: () => Promise.resolve(), - }; - } - createWriteStr(filePath: FilePath, options: ?FileOptions): WriteStream { + createWriteStream(filePath: FilePath, options: ?FileOptions): WriteStream { return new WriteStream(this, filePath, options); } diff --git a/packages/core/fs/src/NodeFS.js b/packages/core/fs/src/NodeFS.js index 4dbf8daac5a..8b58b369a08 100644 --- a/packages/core/fs/src/NodeFS.js +++ b/packages/core/fs/src/NodeFS.js @@ -57,48 +57,44 @@ export class NodeFS implements FileSystem { ? (...args) => searchJS.findFirstFile(this, ...args) : searchNative.findFirstFile; - createWriteStream( - filePath: string, - options: any, - ): {|writeStream: Writable, move: () => Promise|} { + createWriteStream(filePath: string, options: any): Writable { + // Make createWriteStream atomic let tmpFilePath = getTempFilePath(filePath); - let writeStream = fs.createWriteStream(tmpFilePath, options); + let failed = false; writeStream.once('error', () => { failed = true; fs.unlinkSync(tmpFilePath); }); - - return { - writeStream, - move: async () => { - if (!failed) { - try { - await fs.promises.rename(tmpFilePath, filePath); - } catch (e) { - if ( - process.platform === 'win32' && - e.syscall && - e.syscall === 'rename' && - e.code && - e.code === 'EPERM' - ) { - let [hashTmp, hashTarget] = await Promise.all([ - hashStream(writeStream.__atomicTmp), - hashStream(writeStream.__atomicTarget), - ]); - - await this.unlink(writeStream.__atomicTmp); - - if (hashTmp != hashTarget) { - throw e; - } + writeStream.once('close', async () => { + if (!failed) { + try { + await fs.promises.rename(tmpFilePath, filePath); + } catch (e) { + if ( + process.platform === 'win32' && + e.syscall && + e.syscall === 'rename' && + e.code && + e.code === 'EPERM' + ) { + let [hashTmp, hashTarget] = await Promise.all([ + hashStream(writeStream.__atomicTmp), + hashStream(writeStream.__atomicTarget), + ]); + + await this.unlink(writeStream.__atomicTmp); + + if (hashTmp != hashTarget) { + throw e; } } } - }, - }; + } + }); + + return writeStream; } async writeFile( diff --git a/packages/core/fs/src/index.js b/packages/core/fs/src/index.js index 1633fa8930c..35ccfc7b51b 100644 --- a/packages/core/fs/src/index.js +++ b/packages/core/fs/src/index.js @@ -23,14 +23,11 @@ export async function ncp( let stats = await sourceFS.stat(sourcePath); if (stats.isFile()) { await new Promise((resolve, reject) => { - let {writeStream, move} = destinationFS.createWriteStream(destPath); sourceFS .createReadStream(sourcePath) - .pipe(writeStream) - .on('error', reject) - .on('finish', () => { - move().then(resolve); - }); + .pipe(destinationFS.createWriteStream(destPath)) + .on('finish', () => resolve()) + .on('error', reject); }); } else if (stats.isDirectory()) { await ncp(sourceFS, sourcePath, destinationFS, destPath); diff --git a/packages/core/fs/src/types.js b/packages/core/fs/src/types.js index 409baf3f589..6463e31b3d0 100644 --- a/packages/core/fs/src/types.js +++ b/packages/core/fs/src/types.js @@ -86,10 +86,7 @@ export interface FileSystem { rimraf(path: FilePath): Promise; ncp(source: FilePath, destination: FilePath): Promise; createReadStream(path: FilePath, options?: ?FileOptions): Readable; - createWriteStream( - path: FilePath, - options?: ?FileOptions, - ): {|+writeStream: Writable, move: () => Promise|}; + createWriteStream(path: FilePath, options?: ?FileOptions): Writable; cwd(): FilePath; chdir(dir: FilePath): void; watch(