Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Writes implementation #305

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

H-Plus-Time
Copy link
Contributor

The interface for writing to streams so far is:

const table = wasm.readParquet(buf);
const outputStream = wasm.writeParquetStream(table);
// sink the contents somewhere useful, like to a server that supports streaming POST requests.
await fetch(url, {
  method: 'POST',
  body: outputStream,
  duplex: 'half',
});

// or perhaps to the filesystem
const writableStream = await fileHandle.createWritable();
await outputStream.pipeTo(writableStream); // 🤯

I suspect it won't take that much to get this to take an FFI table (do we need unsafe impl From<FFITable> for Table in the arrow-wasm repo for that?).

What I'd really like this to do is close the loop on a workflow similar to:

flowchart LR
    subgraph geoarrow["Bunch of geoarrow transforms"]
    direction TB
    op0["op(batch)"] -.-> opn["op(batch)"]
    end
    A[FS] -->|"readParquetStream"| B("Stream&lt;RecordBatch&gt;") --> geoarrow
    geoarrow -->|"writeParquetStream"| C[FS]

Just taking a table is only somewhat useful (you still have to keep the batches that make up the table in memory until the write stream finishes), and tbh is mostly a placeholder until I figure out JsCast properly.

A few design questions come to mind:

  1. The writer_async function(s) always return ReadableStreams - is there any appetite for doing stuff to the parquet bytes before they cross the WASM<->JS boundary? The only thing I can really think of is streaming checksums.

This definitely needs a variant that takes a rust stream of RecordBatches (similar to the read functions), in part to avoid the cost of bouncing pointers out to JS, but much more importantly, remove a lot of JS-side orchestration. It's still subject to the above proviso (the output is always a ReadableStream, since the only real place to direct the output is to IO of some form).

@kylebarron
Copy link
Owner

I suspect it won't take that much to get this to take an FFI table (do we need unsafe impl From<FFITable> for Table in the arrow-wasm repo for that?).

Yeah I left the conversions from FFI objects to normal objects for a follow up.

Also related is that I started working on a minimal JavaScript representation of the Arrow C Data Interface arrays in kylebarron/arrow-js-ffi#45. In particular, I think that will make it easier to write a table from JS memory into Wasm memory, without the memory overhead of going through an IPC buffer. Also it should allow bundle-size conscious libraries to forgo Arrow JS, at the cost of making the data harder to work with. I think it'll make sense especially for wasm-focused Arrow applications.

What I'd really like this to do is close the loop on a workflow similar to:

That's pretty cool! I guess that's pseudocode like

const readStream = await wasm.readParquetStream(url);
const outputStream = await wasm.writeParquetStream(schema);

for (await table of readStream) {
    outputStream.write(table);
}

I haven't actually used streams in JS much, so I'm not too familiar with the mechanics of piping the stream somewhere.

Just taking a table is only somewhat useful (you still have to keep the batches that make up the table in memory until the write stream finishes)

Seems ideal to have a stream writer like:

  • Initialize the writer stream with the output schema
  • Write instances of Table or RecordBatch to the stream.

Then you don't have to ever materialize the entire table into memory, though it does require you to know the output schema in advance.

  1. The writer_async function(s) always return ReadableStreams - is there any appetite for doing stuff to the parquet bytes before they cross the WASM<->JS boundary?

Yeah I don't really know what you'd do to the parquet bytes

This definitely needs a variant that takes a rust stream of RecordBatches (similar to the read functions), in part to avoid the cost of bouncing pointers out to JS, but much more importantly, remove a lot of JS-side orchestration. It's still subject to the above proviso (the output is always a ReadableStream, since the only real place to direct the output is to IO of some form).

I need to think about this more.....

Comment on lines +58 to +64
// Need to create an encoding for each column
let mut encodings = vec![];
for _ in &schema.fields {
// Note, the nested encoding is for nested Parquet columns
// Here we assume columns are not nested
encodings.push(vec![encoding]);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a separate note that this was kinda a hack and at some point I need to come back and make sure that the encoding is compatible for each type in the schema

@H-Plus-Time
Copy link
Contributor Author

H-Plus-Time commented Aug 31, 2023

That's pretty cool! I guess that's pseudocode like

More or less, I got a bit carried away there with mermaid :S.

Technically the idiomatic way to do it in JS is fooStream.pipeThrough(transformStream) (just the transforming from record batches -> bytes bit. sinking to a a socket/file, etc typically involves await resultStream.pipeTo(writableStream)). The only problem with that is it's strictly struct/object based (there's no way to do it without a state container), and really doesn't gel all that well with Rust (or for that matter, other quasi-iterables in JS). The alternative, that dispenses with all of that, is to just do this:

const byteStream = transformFunc(fooStream) // -> produces a ReadableStream.
await byteStream.pipeTo(writableStream);

Then you don't have to ever materialize the entire table into memory, though it does require you to know the output schema in advance.

It turns out StreamExt::Peekable, provided you assume the schema doesn't change across record batches (I've been bitten by that on very rare occasions), allows this to be done statelessly (most recent commit has the relevant code - peek at the first batch + schema().into_inner()).

The only thing left at this point is figuring out a way to do this:

#[wasm_bindgen]
pub fn foo(opaque: JsValue) -> RecordBatch {}

(The non-generic ReadableStream type (which is always interpreted as emitting JsValue) is the main impediment).
This (rustwasm/wasm-bindgen#3554) looks like it's very close to merging into wasm-bindgen.

@kylebarron
Copy link
Owner

I think I'll have to read that a few times to really understand it.

The only thing left at this point is figuring out a way to do this:

Essentially I envision converting a record batch from Arrow JS to Wasm to be:

import {schemaFromArrowJS, arrayFromArrowJS, schemaToFFI, arrayToFFI} from 'arrow-js-ffi';
import {malloc} from 'parquet-wasm';

const recordBatch = arrow.RecordBatch();
const nanoArrowSchema = schemaFromArrowJS(recordBatch.schema())
const nanoArrowArray = arrayFromArrowJS(recordBatch.toStruct())
const schemaFFIPointer = schemaToFFI(nanoArrowSchema);
const arrayFFIPointer = arrayToFFI(nanoArrowArray);
const wasmRecordBatch = wasm.RecordBatch.from_pointers(arrayFFIPointer, schemaFFIPointer);

Then on the rust side, this wasmRecordBatch struct holds an arrow RecordBatch inside it.

Does this answer your question at all?

@H-Plus-Time
Copy link
Contributor Author

H-Plus-Time commented Sep 2, 2023

To be honest, miscommunication on my part (too terse), this:

#[wasm_bindgen]
pub fn foo(opaque: JsValue) -> RecordBatch {}

really should have included the full context, namely:

#[wasm_bindgen]
pub async fn foo_readable_stream_consumer(input: wasm_streams::readable::sys::ReadableStream) -> {
    let rs_stream: wasm_streams::ReadableStream<Result<JsValue, JsValue>> = wasm_streams::ReadableStream::from_raw(input).into_stream();
    // type annotation above cannot be altered because the ReadableStream type is non-generic.
    while let Some(item) = rs_stream.next().await {
      let recovered_rs_item: RecordBatch = item.dyn_into(); // JsValue dynamic casting - 
      // requires a somewhat unreliable (and unwieldy) JsCast impl. Also this is really intended for 
      // well-known JS classes, rather than JS classes that wrap rust structs
    }
}

The syntax we really want for this is:

let recovered_rs_item: RecordBatch = item.try_into().unwrap();

The additional codegen output in rustwasm/wasm-bindgen#3554 gives us exactly that, for any and all wasm-bindgen'd structs (as well as Vec<Struct>). The bonus here (that I actually wasn't expecting) is that it allows you to transfer ownership of a struct out to JS (out of the purview of rust's borrowchecker :( ) and back in (where the borrow checker kicks back into gear and frees it once it goes out of scope), neutering the JS wrapper class in the process. The extra bindgen code is surprisingly minimal really, it boils down to this:

class RecordBatch {
    static __unwrap(jsValue) {
        if (!(jsValue instanceof RecordBatch)) {
            return 0;
        }
        return jsValue.__destroy_into_raw();
    }
    /* etc */
}
module.exports.__wbg_recordbatch_unwrap = function(arg0) {
    const ret = RecordBatch.__unwrap(takeObject(arg0));
    return ret;
};

Happy to leave this PR open in draft until the wasm-bindgen PR is merged, repin to wasm-bindgen 0.2.88 and undraft (also gives me a chance to figure out the equivalent with arrow1 (and adapt to the arrow1 table abstraction)).

@kylebarron
Copy link
Owner

fwiw, I plan to deprecate arrow2 at some point, so I wouldn't put any more emphasis on that

@andresgutgon
Copy link

Hi @H-Plus-Time what's the status of this PR? I would love to give it a try #542

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants