Skip to content

Commit

Permalink
fix(child_process): prevent writing to terminating process (#85)
Browse files Browse the repository at this point in the history
* test: rename test file

* fix(child_process): prevent writing to terminating process
  • Loading branch information
AriPerkkio committed Apr 13, 2024
1 parent c045ecf commit b003003
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/runtime/process-worker.ts
Expand Up @@ -18,6 +18,7 @@ export default class ProcessWorker implements TinypoolWorker {
port?: MessagePort
channel?: TinypoolChannel
waitForExit!: Promise<void>
isTerminating = false

initialize(options: Parameters<TinypoolWorker['initialize']>[0]) {
this.process = fork(
Expand All @@ -42,6 +43,7 @@ export default class ProcessWorker implements TinypoolWorker {
}

async terminate() {
this.isTerminating = true
this.process.off('exit', this.onUnexpectedExit)

const sigkillTimeout = setTimeout(
Expand All @@ -61,10 +63,16 @@ export default class ProcessWorker implements TinypoolWorker {

// Mirror channel's messages to process
this.channel.onMessage((message: any) => {
this.process.send(message)
this.send(message)
})
}

private send(message: Parameters<NonNullable<typeof process['send']>>[0]) {
if (!this.isTerminating) {
this.process.send(message)
}
}

postMessage(message: any, transferListItem?: Readonly<TransferListItem[]>) {
transferListItem?.forEach((item) => {
if (item instanceof MessagePort) {
Expand All @@ -75,15 +83,15 @@ export default class ProcessWorker implements TinypoolWorker {
// Mirror port's messages to process
if (this.port) {
this.port.on('message', (message) =>
this.process.send(<TinypoolWorkerMessage<'port'>>{
this.send(<TinypoolWorkerMessage<'port'>>{
...message,
source: 'port',
__tinypool_worker_message__,
})
)
}

return this.process.send(<TinypoolWorkerMessage<'pool'>>{
return this.send(<TinypoolWorkerMessage<'pool'>>{
...message,
source: 'pool',
__tinypool_worker_message__,
Expand Down
26 changes: 26 additions & 0 deletions test/termination-timeout.test.ts → test/termination.test.ts
Expand Up @@ -30,3 +30,29 @@ test('termination timeout throws when worker does not terminate in time', async
'Failed to terminate worker'
)
})

test('writing to terminating worker does not crash', async () => {
const listeners: ((msg: any) => void)[] = []

const pool = new Tinypool({
runtime: 'child_process',
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 1,
maxThreads: 1,
})

await pool.run(
{},
{
channel: {
onMessage: (listener) => listeners.push(listener),
postMessage: () => {},
},
}
)

const destroyed = pool.destroy()
listeners.forEach((listener) => listener('Hello from main thread'))

await destroyed
})

0 comments on commit b003003

Please sign in to comment.