-
Notifications
You must be signed in to change notification settings - Fork 26.1k
/
invoke-request.ts
150 lines (130 loc) · 4.05 KB
/
invoke-request.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import '../../node-polyfill-fetch'
import type { IncomingMessage } from 'http'
import type { Readable } from 'stream'
import { filterReqHeaders } from './utils'
export const invokeRequest = async (
targetUrl: string,
requestInit: {
headers: IncomingMessage['headers']
method: IncomingMessage['method']
signal?: AbortSignal
},
readableBody?: Readable | ReadableStream
) => {
// force to 127.0.0.1 as IPC always runs on this hostname
// to avoid localhost issues
const parsedTargetUrl = new URL(targetUrl)
parsedTargetUrl.hostname = '127.0.0.1'
const invokeHeaders = filterReqHeaders({
'cache-control': '',
...requestInit.headers,
}) as IncomingMessage['headers']
return await fetch(parsedTargetUrl.toString(), {
headers: invokeHeaders as any as Headers,
method: requestInit.method,
redirect: 'manual',
signal: requestInit.signal,
...(requestInit.method !== 'GET' &&
requestInit.method !== 'HEAD' &&
readableBody
? {
body: readableBody as BodyInit,
duplex: 'half',
}
: {}),
next: {
// @ts-ignore
internal: true,
},
})
}
/**
* This is a minimal implementation of a Writable with just enough
* functionality to handle stream cancellation.
*/
export interface PipeTarget {
/**
* Called when new data is read from readable source.
*/
write: (chunk: Uint8Array) => unknown
/**
* Always called once we read all data (if the writable isn't already
* destroyed by a client disconnect).
*/
end: () => unknown
/**
* An optional method which is called after every write, to support
* immediately streaming in gzip responses.
*/
flush?: () => unknown
/**
* The close event listener is necessary for us to detect an early client
* disconnect while we're attempting to read data. This must be done
* out-of-band so that we can cancel the readable (else we'd have to wait for
* the readable to produce more data before we could tell it to cancel).
*/
on: (event: 'close', cb: () => void) => void
/**
* Allows us to cleanup our onClose listener.
*/
off: (event: 'close', cb: () => void) => void
}
export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
return e?.name === 'AbortError'
}
export async function pipeReadable(
readable: ReadableStream,
writable: PipeTarget
) {
const reader = readable.getReader()
let readerDone = false
let writableClosed = false
// It's not enough just to check for `writable.destroyed`, because the client
// may disconnect while we're waiting for a read. We need to immediately
// cancel the readable, and that requires an out-of-band listener.
function onClose() {
writableClosed = true
writable.off?.('close', onClose)
// If the reader is not yet done, we need to cancel it so that the stream
// source's resources can be cleaned up. If a read is in-progress, this
// will also ensure the read promise rejects and frees our resources.
if (!readerDone) {
readerDone = true
reader.cancel().catch(() => {})
}
}
writable.on?.('close', onClose)
try {
while (true) {
// If the read throws, then the reader is done. If not, then we'll set
// readerDone to the actual done value after the read.
readerDone = true
const { done, value } = await reader.read()
readerDone = done
if (done || writableClosed) {
break
}
if (value) {
writable.write(Buffer.from(value))
writable.flush?.()
}
}
} catch (e) {
// If the client disconnects, we don't want to emit an unhandled error.
if (!isAbortError(e)) {
throw e
}
} finally {
writable.off?.('close', onClose)
// If we broke out of the loop because of a client disconnect, and the
// close event hasn't yet fired, we can early cancel.
if (!readerDone) {
reader.cancel().catch(() => {})
}
// If the client hasn't disconnected yet, end the writable so that the
// response sends the final bytes.
if (!writableClosed) {
writable.end()
}
}
}