Skip to content

Commit

Permalink
fixup! fixup! fixup! Add stream example
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed May 4, 2020
1 parent cd9b6e0 commit 39a4b22
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
8 changes: 6 additions & 2 deletions examples/stream/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@ const piscina = new Piscina({
});

class W extends Writable {
length = 0;
_write (chunk, encoding, callback) {
console.log(chunk.length);
this.length += chunk.length;
callback();
}
};

(async function () {
const channel = new MessageChannel();
const duplex = new PortDuplex(channel.port2, { writable: false });
const w = new W();

pipeline(duplex, new W(), () => {});
duplex.on('close', () => channel.port2.close());

pipeline(duplex, w, () => console.log(w.length));

await piscina.runTask({ port: channel.port1 }, [channel.port1]);
})();
19 changes: 12 additions & 7 deletions examples/stream/port_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ class PortDuplex extends Duplex {
callback();
}

_final (callback) {
this.#port.postMessage(null);
callback();
}

_read () {
// Do nothing here. A more complete example would
// implement proper read/pause behavior.
}

_destroy () {
this.#port.close();
_destroy (err, callback) {
if (err) {
// TODO(@jasnell): A more complete example would
// handle this error more appropriately.
this.#port.close();
console.error(err);
return;
}
if (this.writableEnded) {
this.#port.postMessage(null);
}
callback();
}

static #onmessage = function ({ data }) {
Expand Down

0 comments on commit 39a4b22

Please sign in to comment.