forked from tinylibs/tinypool
/
worker.ts
146 lines (131 loc) · 4.96 KB
/
worker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import {
parentPort,
MessagePort,
receiveMessageOnPort,
workerData as tinypoolData,
} from 'worker_threads'
import {
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
TinypoolData,
kResponseCountField,
kRequestCountField,
isMovable,
kTransferable,
kValue,
} from '../common'
import { stderr, stdout } from '../utils'
import { getHandler, throwInNextTick } from './utils'
const [tinypoolPrivateData, workerData] = tinypoolData as TinypoolData
process.__tinypool_state__ = {
isWorkerThread: true,
isTinypoolWorker: true,
workerData: workerData,
workerId: tinypoolPrivateData.workerId,
}
const memoryUsage = process.memoryUsage.bind(process)
let useAtomics: boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1'
// We should only receive this message once, when the Worker starts. It gives
// us the MessagePort used for receiving tasks, a SharedArrayBuffer for fast
// communication using Atomics, and the name of the default filename for tasks
// (so we can pre-load and cache the handler).
parentPort!.on('message', (message: StartupMessage) => {
useAtomics =
process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics
const { port, sharedBuffer, filename, name } = message
;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}
const readyMessage: ReadyMessage = { ready: true }
parentPort!.postMessage(readyMessage)
port.on('message', onMessage.bind(null, port, sharedBuffer))
atomicsWaitLoop(port, sharedBuffer)
})().catch(throwInNextTick)
})
let currentTasks: number = 0
let lastSeenRequestCount: number = 0
function atomicsWaitLoop(port: MessagePort, sharedBuffer: Int32Array) {
if (!useAtomics) return
// This function is entered either after receiving the startup message, or
// when we are done with a task. In those situations, the *only* thing we
// expect to happen next is a 'message' on `port`.
// That call would come with the overhead of a C++ → JS boundary crossing,
// including async tracking. So, instead, if there is no task currently
// running, we wait for a signal from the parent thread using Atomics.wait(),
// and read the message from the port instead of generating an event,
// in order to avoid that overhead.
// The one catch is that this stops asynchronous operations that are still
// running from proceeding. Generally, tasks should not spawn asynchronous
// operations without waiting for them to finish, though.
while (currentTasks === 0) {
// Check whether there are new messages by testing whether the current
// number of requests posted by the parent thread matches the number of
// requests received.
Atomics.wait(sharedBuffer, kRequestCountField, lastSeenRequestCount)
lastSeenRequestCount = Atomics.load(sharedBuffer, kRequestCountField)
// We have to read messages *after* updating lastSeenRequestCount in order
// to avoid race conditions.
let entry
while ((entry = receiveMessageOnPort(port)) !== undefined) {
onMessage(port, sharedBuffer, entry.message)
}
}
}
function onMessage(
port: MessagePort,
sharedBuffer: Int32Array,
message: RequestMessage
) {
currentTasks++
const { taskId, task, filename, name } = message
;(async function () {
let response: ResponseMessage
let transferList: any[] = []
try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
let result = await handler(task)
if (isMovable(result)) {
transferList = transferList.concat(result[kTransferable])
result = result[kValue]
}
response = {
taskId,
result: result,
error: null,
usedMemory: memoryUsage().heapUsed,
}
// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
taskId,
result: null,
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error,
usedMemory: memoryUsage().heapUsed,
}
}
currentTasks--
// Post the response to the parent thread, and let it know that we have
// an additional message available. If possible, use Atomics.wait()
// to wait for the next message.
port.postMessage(response, transferList)
Atomics.add(sharedBuffer, kResponseCountField, 1)
atomicsWaitLoop(port, sharedBuffer)
})().catch(throwInNextTick)
}