Skip to content

Commit

Permalink
Use stream#pipeline instead of stream.pipe (#8235)
Browse files Browse the repository at this point in the history
  • Loading branch information
mischnic committed Jun 21, 2022
1 parent b8b203d commit 0dcea3f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 21 deletions.
18 changes: 11 additions & 7 deletions packages/core/cache/src/FSCache.js
@@ -1,16 +1,22 @@
// @flow strict-local

import type {Readable} from 'stream';
import type {Readable, Writable} from 'stream';
import type {FilePath} from '@parcel/types';
import type {FileSystem} from '@parcel/fs';
import type {Cache} from './types';

import stream from 'stream';
import path from 'path';
import {promisify} from 'util';
import logger from '@parcel/logger';
import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
);

export class FSCache implements Cache {
fs: FileSystem;
dir: FilePath;
Expand Down Expand Up @@ -45,12 +51,10 @@ export class FSCache implements Cache {
}

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(this._getCachePath(`${key}-large`)))
.on('error', reject)
.on('finish', resolve);
});
return pipeline(
stream,
this.fs.createWriteStream(this._getCachePath(`${key}-large`)),
);
}

has(key: string): Promise<boolean> {
Expand Down
18 changes: 11 additions & 7 deletions packages/core/cache/src/LMDBCache.js
@@ -1,16 +1,22 @@
// @flow strict-local
import type {FilePath} from '@parcel/types';
import type {Cache} from './types';
import type {Readable, Writable} from 'stream';

import {Readable} from 'stream';
import stream from 'stream';
import path from 'path';
import {promisify} from 'util';
import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
import {NodeFS} from '@parcel/fs';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';
// $FlowFixMe
import lmdb from 'lmdb';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
);

export class LMDBCache implements Cache {
fs: NodeFS;
dir: FilePath;
Expand Down Expand Up @@ -64,12 +70,10 @@ export class LMDBCache implements Cache {
}

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(path.join(this.dir, key)))
.on('error', reject)
.on('finish', resolve);
});
return pipeline(
stream,
this.fs.createWriteStream(path.join(this.dir, key)),
);
}

getBlob(key: string): Promise<Buffer> {
Expand Down
19 changes: 12 additions & 7 deletions packages/core/fs/src/index.js
@@ -1,13 +1,21 @@
// @flow strict-local
import type {FileSystem} from './types';
import type {FilePath} from '@parcel/types';
import type {Readable, Writable} from 'stream';

import path from 'path';
import stream from 'stream';
import {promisify} from 'util';

export type * from './types';
export * from './NodeFS';
export * from './MemoryFS';
export * from './OverlayFS';

const pipeline: (Readable, Writable) => Promise<void> = promisify(
stream.pipeline,
);

// Recursively copies a directory from the sourceFS to the destinationFS
export async function ncp(
sourceFS: FileSystem,
Expand All @@ -22,13 +30,10 @@ export async function ncp(
let destPath = path.join(destination, file);
let stats = await sourceFS.stat(sourcePath);
if (stats.isFile()) {
await new Promise((resolve, reject) => {
sourceFS
.createReadStream(sourcePath)
.pipe(destinationFS.createWriteStream(destPath))
.on('finish', () => resolve())
.on('error', reject);
});
await pipeline(
sourceFS.createReadStream(sourcePath),
destinationFS.createWriteStream(destPath),
);
} else if (stats.isDirectory()) {
await ncp(sourceFS, sourcePath, destinationFS, destPath);
}
Expand Down

0 comments on commit 0dcea3f

Please sign in to comment.