/
FsaNodeWriteStream.ts
152 lines (140 loc) · 5.22 KB
/
FsaNodeWriteStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import { Writable } from 'stream';
import { Defer } from '../thingies/Defer';
import { concurrency } from '../thingies/concurrency';
import { flagsToNumber } from '../node/util';
import { FLAG } from '../consts/FLAG';
import { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import queueMicrotask from '../queueMicrotask';
import type { IFileSystemWritableFileStream } from '../fsa/types';
import type { IWriteStream } from '../node/types/misc';
import type { IWriteStreamOptions } from '../node/types/options';
/**
* This WriteStream implementation does not build on top of the `fs` module,
* but instead uses the lower-level `FileSystemFileHandle` interface. The reason
* is the different semantics in `fs` and FSA (File System Access API) write streams.
*
* When data is written to an FSA file, a new FSA stream is created, it copies
* the file to a temporary swap file. After each written chunk, that swap file
* is closed and the original file is replaced with the swap file. This means,
* if WriteStream was built on top of `fs`, each chunk write would result in
* a file copy, write, close, rename operations, which is not what we want.
*
* Instead this implementation hooks into the lower-level and closes the swap
* file only once the stream is closed. The downside is that the written data
* is not immediately visible to other processes (because it is written to the
* swap file), but that is the trade-off we have to make.
*
* @todo Could make this flush the data to the original file periodically, so that
* the data is visible to other processes.
* @todo This stream could work through `FileSystemSyncAccessHandle.write` in a
* Worker thread instead.
*/
export class FsaNodeWriteStream extends Writable implements IWriteStream {
protected __pending__: boolean = true;
protected __closed__: boolean = false;
protected __bytes__: number = 0;
protected readonly __stream__: Promise<IFileSystemWritableFileStream>;
protected readonly __mutex__ = concurrency(1);
public constructor(
handle: Promise<FsaNodeFsOpenFile>,
public readonly path: string,
protected readonly options: IWriteStreamOptions,
) {
super();
if (options.start !== undefined) {
if (typeof options.start !== 'number') {
throw new TypeError('"start" option must be a Number');
}
if (options.start < 0) {
throw new TypeError('"start" must be >= zero');
}
}
const stream = new Defer<IFileSystemWritableFileStream>();
this.__stream__ = stream.promise;
(async () => {
const fsaHandle = await handle;
const fileWasOpened = !options.fd;
if (fileWasOpened) this.emit('open', fsaHandle.fd);
const flags = flagsToNumber(options.flags ?? 'w');
const keepExistingData = flags & FLAG.O_APPEND ? true : false;
const writable = await fsaHandle.file.createWritable({ keepExistingData });
if (keepExistingData) {
const start = Number(options.start ?? 0);
if (start) await writable.seek(start);
}
this.__pending__ = false;
stream.resolve(writable);
})().catch(error => {
stream.reject(error);
});
}
private async ___write___(buffers: Buffer[]): Promise<void> {
await this.__mutex__(async () => {
if (this.__closed__) return;
// if (this.__closed__) throw new Error('WriteStream is closed');
const writable = await this.__stream__;
for (const buffer of buffers) {
await writable.write(buffer);
this.__bytes__ += buffer.byteLength;
}
});
}
private async __close__(): Promise<void> {
const emitClose = this.options.emitClose;
await this.__mutex__(async () => {
if (this.__closed__ && emitClose) {
queueMicrotask(() => this.emit('close'));
return;
}
try {
const writable = await this.__stream__;
this.__closed__ = true;
await writable.close();
if (emitClose) this.emit('close');
} catch (error) {
this.emit('error', error);
if (emitClose) this.emit('close', error);
}
});
}
// ------------------------------------------------------------- IWriteStream
public get bytesWritten(): number {
return this.__bytes__;
}
public get pending(): boolean {
return this.__pending__;
}
public close(cb): void {
if (cb) this.once('close', cb);
this.__close__().catch(() => {});
}
// ----------------------------------------------------------------- Writable
_write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {
this.___write___([chunk])
.then(() => {
if (callback) callback(null);
})
.catch(error => {
if (callback) callback(error);
});
}
_writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {
const buffers = chunks.map(({ chunk }) => chunk);
this.___write___(buffers)
.then(() => {
if (callback) callback(null);
})
.catch(error => {
if (callback) callback(error);
});
}
_final(callback: (error?: Error | null) => void): void {
this.__close__()
.then(() => {
if (callback) callback(null);
})
.catch(error => {
if (callback) callback(error);
});
}
}