Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams, Buffers, and Worker Threads #33240

Closed
jasnell opened this issue May 4, 2020 · 20 comments
Closed

Streams, Buffers, and Worker Threads #33240

jasnell opened this issue May 4, 2020 · 20 comments
Labels
stream Issues and PRs related to the stream subsystem. worker Issues and PRs related to Worker support. zlib Issues and PRs related to the zlib subsystem.

Comments

@jasnell
Copy link
Member

jasnell commented May 4, 2020

There is a bit of undefined grey area when passing a Buffer instance into a stream.Writable: who has ownership responsibility for the Buffer?

See this example as reference: https://github.com/jasnell/piscina/pull/34/files#diff-123d6328fe4f2579686ee49111e38427

In the example, I create a stream.Duplex instance that wraps a MessagePort. On _write, I post the given Buffer to the MessagePort and include it in the transfer list to avoid copying the data:

  _write (chunk, encoding, callback) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk, encoding);
    }
    this.#port.postMessage(chunk, [chunk.buffer]);
    callback();
  }

This is where it gets fun. In my example, from within a worker thread, I open a pipeline that reads from a file, passes that into a gzip compressor, then out to my custom duplex. What happens is a fun little Abort...

james@ubuntu:~/nearform/piscina/examples/stream$ node index
node[30456]: ../src/node_zlib.cc:323:static void node::{anonymous}::CompressionStream<CompressionContext>::Write(const v8::FunctionCallbackInfo<v8::Value>&) [with bool async = true; CompressionContext = node::{anonymous}::ZlibContext]: Assertion `Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf))' failed.
 1: 0xa295b0 node::Abort() [node]
 2: 0xa2962e  [node]
 3: 0xae6b1a  [node]
 4: 0xc0251b  [node]
 5: 0xc03ac6  [node]
 6: 0xc04146 v8::internal::Builtin_HandleApiCall(int, unsigned long*, v8::internal::Isolate*) [node]
2
 7: 0x13a5919  [node]
Aborted (core dumped)

The reason for the Abort is that my custom Duplex just transferred ownership of the Buffer away to the main thread while the gzip compressor was still using it.

Replace the gzip compressor with the brotli compressor, and things work! Change the postMessage() call to remove the transferList and things work!

Obviously the Abort itself is problematic and we need to figure out how to avoid that in the zlib, so that's issue #1.... issue #2 is that we really should try to specify some ownership expectations on Buffer instances passed into streams instances.

/cc @nodejs/streams @ronag @addaleax @nodejs/zlib

@jasnell jasnell added stream Issues and PRs related to the stream subsystem. worker Issues and PRs related to Worker support. zlib Issues and PRs related to the zlib subsystem. labels May 4, 2020
@ronag
Copy link
Member

ronag commented May 4, 2020

Oh, move semantics in javascript. How exciting.

I would prefer if the ownership was transferred down the pipe. However, I find it unlikely that we will be able to enforce this in user land...

It's not the most performant way but I think in this case you should not transfer the Buffer, but copy it.

@ronag
Copy link
Member

ronag commented May 4, 2020

Maybe stream implementators can set a standardised flag on the Buffer to indicate it is safe to transfer?

@jasnell
Copy link
Member Author

jasnell commented May 4, 2020

Yep, for my case, I'm going to allow an option that does both, with copy as the default. We should, however, deal with the Abort case here more elegantly to avoid the crash as it's just a matter of time before other end users hit it.

@addaleax
Copy link
Member

addaleax commented May 4, 2020

I think the question you posed in the linked issue is the relevant one here for the second issue:

when using a Writable, and passing a Buffer, who takes ownership responsibility for that Buffer? The caller or the receiver? I don't think we have a clear answer for that but we definitely need a fix that does not trigger an Abort.

We kind of do have an answer for that: Modifications to the Buffer that happen between write() and _write() are reflected in the written data, which implies that the caller is responsible for maintaining ownership of the Buffer. As @ronag said, that’s not really preferable, but it’s unlikely that we can do something about it.

There kind of is a good and performand solution here, which would be having copy-on-write semantics for ArrayBuffers (where detaching an ArrayBuffer would qualify as a “write”). I’m afraid that that’s a language-level feature that we can’t necessarily implement, though.

And for issue 1, yeah, it might make sense to audit the codebase for calls that read from buffers and check how they react to detaching ArrayBuffers.

(None of this is really Worker-thread specific, btw.)

@jasnell
Copy link
Member Author

jasnell commented May 4, 2020

Nope, definitely not worker-thread specific but makes it easy to spot the issue :-)

@mafintosh
Copy link
Member

@jasnell I think your issue is due to your stream "breaking" the pooling contract here https://github.com/jasnell/piscina/pull/34/files#r419929366

@jasnell
Copy link
Member Author

jasnell commented May 5, 2020

@mafintosh yeah, that's precisely the point here tho. The streams contract is underdefined here with an implicit, undocumented rule that the writer owns the buffer most of the time. That doesn't hold true all of the time, however. Such as when the Buffer is sliced off the pool, or when the stream instance itself creates the Buffer because it was given a string instead. Meanwhile, there aren't protections downstream to prevent the Buffer from being misused (such as being put into a transfer list a triggering an abort).

There are two actions here:

  1. Make the implicit rule explicit through better documentation
  2. As @addaleax suggested, audit our uses of Buffer to ensure we are catching and handling better violations of that rule.

@mafintosh
Copy link
Member

mafintosh commented May 5, 2020

@jasnell I don't think this is a stream issue tho. Node pooling makes this impossible to solve, in general.

const a = Buffer.from('hello')
const b = Buffer.from('world')

console.log(a.buffer === b.buffer)

@mafintosh
Copy link
Member

To expand on o/, is there ever a case where transferring an unsafe buffer using transferlist won't cause undefined behaviour in your Node program, since it'll "free" all kinds of unrelated buffers?

@ronag
Copy link
Member

ronag commented May 5, 2020

Given that, is there any case where we can allow buffer in the transferlist? Shouldn't we always make a copy (event if the user puts the buffer into the transferlist), i.e. we support the API but with a slow implementation. Until we have a better solution?

@addaleax
Copy link
Member

addaleax commented May 5, 2020

@mafintosh Just fyi, you are right, but in the near future, that is no longer going to be a concern here: #32759

@mafintosh
Copy link
Member

@addaleax excellent!

I don't think this a stream issue still. You call write with a buffer, you're implicitly giving ownership of that buffer to the stream.

If you mess with the arraybuffer, without taking into account byteOffset and byteLength you're breaking out of the "Buffer view" sandbox which requires some kind of coupling with whoever else uses that arraybuffer.

@jasnell
Copy link
Member Author

jasnell commented May 5, 2020

For streams, I think the rule is actually that the stream has zero ownership over the Buffer at all.

For handling of MessagePort and Buffer interactions, the situation really does depend on how the Buffer was created and what version of Node.js is being used.

Assuming the setup:

const Piscina = require('piscina');
const { resolve } = require('path');

const pool = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

const b = Buffer.from([1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,10]);
const b1 = b.slice(0, 10);
const b2 = b.slice(10);
console.log(b.buffer === b1.buffer);
console.log(b.buffer === b2.buffer);
console.log(b1.buffer === b2.buffer);

(async () => {
  console.log(await pool.runTask({data:b2}, [b2.buffer]));

  console.log(b2, b2.buffer);
})();

Note the use of Buffer.from() to allocate the Buffer (uses the pool)

Then, in worker.js:

module.exports = ({data}) => {
    const u = new Uint8Array(data.buffer);
    u[0] = 1;
    u[1] = 2;
    u[2] = 3;
    data.fill(2);
    console.log('\t',data, data.buffer);
}

Running this in Node.js 12.x and 13.x yields an error.

C:\Users\jasne\Projects\tmp>nvs use 13
PATH -= %LOCALAPPDATA%\nvs\node\14.1.0\x64
PATH += %LOCALAPPDATA%\nvs\node\13.13.0\x64

C:\Users\jasne\Projects\tmp>node index
true
true
true
internal/buffer.js:945
class FastBuffer extends Uint8Array {}
^

TypeError: Cannot perform Construct on a detached ArrayBuffer
    at new Uint8Array (<anonymous>)
    at new FastBuffer (internal/buffer.js:945:1)
    at fromStringFast (buffer.js:427:11)
    at fromString (buffer.js:452:10)
    at Function.from (buffer.js:302:12)
    at readableAddChunk (_stream_readable.js:235:24)
    at ReadableWorkerStdio.Readable.push (_stream_readable.js:214:10)
    at Worker.[kOnMessage] (internal/worker.js:246:47)
    at MessagePort.<anonymous> (internal/worker.js:163:57)
    at MessagePort.emit (events.js:315:20)

Running this in Node.js 14.1 yields:

C:\Users\jasne\Projects\tmp>node index
true
true
true
         Uint8Array(10) [
  2, 2, 2, 2, 2,
  2, 2, 2, 2, 2
] ArrayBuffer {
  [Uint8Contents]: <01 02 03 65 20 73 74 72 69 63 74 27 0a 0a 63 6f 6e 73 74 20 70 65 72 63 65 6e 74 69 6c 65 73 20 3d 20 6d 6f 64 75 6c 65 2e 65 78 70 6f 72 74 73 2e 70 65 72 63 65 6e 74 69 6c 65 73 20 3d 20 5b 0a 20 20 30 2e 30 30 31 2c 0a 20 20 30 2e 30 31 2c 0a 20 20 30 2e 31 2c 0a 20 20 31 2c 0a 20 20 32 2e 35 2c ... 8092 more bytes>,
  byteLength: 8192
}
undefined
<Buffer 01 02 03 04 05 06 07 08 09 0a> ArrayBuffer {
  [Uint8Contents]: <27 75 73 65 20 73 74 72 69 63 74 27 0a 0a 63 6f 6e 73 74 20 70 65 72 63 65 6e 74 69 6c 65 73 20 3d 20 6d 6f 64 75 6c 65 2e 65 78 70 6f 72 74 73 2e 70 65 72 63 65 6e 74 69 6c 65 73 20 3d 20 5b 0a 20 20 30 2e 30 30 31 2c 0a 20 20 30 2e 30 31 2c 0a 20 20 30 2e 31 2c 0a 20 20 31 2c 0a 20 20 32 2e 35 2c ... 8092 more bytes>,
  byteLength: 8192
}

Note that, in this case, b2 (and it's entire underlying ArrayBuffer) ends up being cloned rather than transferred, despite use of the transfer list, and all three Buffer instances in the main thread continue to be usable.

Things change, however, if we switch from using Buffer.from to Buffer.alloc(), which does not use the buffer pool... That is...

const Piscina = require('piscina');
const { resolve } = require('path');

const pool = new Piscina({
  filename: resolve(__dirname, 'worker.js')
});

const b = Buffer.alloc(20).fill(1);
//const b = Buffer.from([1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,10]);
const b1 = b.slice(0, 10);
const b2 = b.slice(10);
console.log(b.buffer === b1.buffer);
console.log(b.buffer === b2.buffer);
console.log(b1.buffer === b2.buffer);

(async () => {
  console.log(await pool.runTask({data:b2}, [b2.buffer]));

  console.log(b2, b2.buffer);
  console.log(b1, b1.buffer);
})();

If we run this in Node.js 14, 13, or 12 we get:

C:\Users\jasne\Projects\tmp>node -v
v14.1.0

C:\Users\jasne\Projects\tmp>node index
true
true
true
         Uint8Array(10) [
  2, 2, 2, 2, 2,
  2, 2, 2, 2, 2
] ArrayBuffer {
  [Uint8Contents]: <01 02 03 01 01 01 01 01 01 01 02 02 02 02 02 02 02 02 02 02>,
  byteLength: 20
}
undefined
<Buffer > ArrayBuffer { (detached), byteLength: 0 }
<Buffer > ArrayBuffer { (detached), byteLength: 0 }

The Buffer in this case was transferred and all instances in the main thread are no longer usable.

So, with Buffer instances sliced off the pre-allocated pool, it looks like we're actually currently fairly safe, surprisingly! That said, as illustrated by the Abort in the original case, we're not handling detached buffers appropriately elsewhere throughout the code.

@jasnell
Copy link
Member Author

jasnell commented May 5, 2020

One bit to point out... using postMessage() to send a Buffer created using either Buffer.from() or Buffer.allocUnsafe() in 14.x will actually cause the entire underlying pre-allocation pool to be cloned.

@mafintosh
Copy link
Member

@jasnell unless something has changed almost all of cores apis return unsafe allocated buffers (i did some research into this for a potentially security issue).

So any buffer from fs.readFile, fs.createReadStream etc won't work

@mafintosh
Copy link
Member

For the streams discussion, I'd still argue that the stream owns the Buffer when you pass it to it, but not the arraybuffer.

Ie when you do fsWriteStream.write(buf) the fs write stream is free to buffer that for as long as it wants, without worrying about anyone changing the contents, but it cannot assume anything about buf.buffer, except for the cases where buf.byteLength === buf.buffer.byteLength

@jasnell
Copy link
Member Author

jasnell commented May 5, 2020

@mafintosh:

the fs write stream is free to buffer that for as long as it wants, without worrying about anyone changing the contents...

That's not really true tho... because if that Buffer is just a view over some other ArrayBuffer, the contents can change at any time after it has been buffered.

const { createWriteStream, readFileSync } = require('fs')

const assert = require('assert');

const u = new Uint8Array(10);
const b = Buffer.from(u.buffer);

assert.strictEqual(b[0], 0);

const out = createWriteStream('./test');
out.cork();
out.write(b);

u[0] = 1;

out.uncork();
out.end();

out.on('close', () => {
  const check = readFileSync('./test');
  console.log(check);
  assert.strictEqual(check[0], 0);  // fails
})

// and...

const { createWriteStream, readFileSync } = require('fs')

const assert = require('assert');

const b = Buffer.from('hello');

const out = createWriteStream('./test');
out.cork();
out.write(b);

const ab = Buffer.from(Buffer.from('world').buffer);
ab.fill('a');

out.uncork();
out.end();

out.on('close', () => {
  const check = readFileSync('./test');
  console.log(check.toString());  // prints 'aaaaa'
})

@mafintosh
Copy link
Member

@jasnell that's why i wrote the Buffer and not the attached array buffer.

Messing around the attached array buffer will have side effects and security issues all over the program. The only one that owns the array buffer is Node.js internals for pooled buffers and whoever allocated it for slices of safe ones.

@jasnell
Copy link
Member Author

jasnell commented May 5, 2020

Btw, conversation moving over to the PR at #33252

@jasnell
Copy link
Member Author

jasnell commented May 30, 2020

Closing as the PR #33252 landed

@jasnell jasnell closed this as completed May 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem. worker Issues and PRs related to Worker support. zlib Issues and PRs related to the zlib subsystem.
Projects
None yet
Development

No branches or pull requests

4 participants