-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming Writes implementation #305
base: main
Are you sure you want to change the base?
Conversation
Yeah I left the conversions from FFI objects to normal objects for a follow up. Also related is that I started working on a minimal JavaScript representation of the Arrow C Data Interface arrays in kylebarron/arrow-js-ffi#45. In particular, I think that will make it easier to write a table from JS memory into Wasm memory, without the memory overhead of going through an IPC buffer. Also it should allow bundle-size conscious libraries to forgo Arrow JS, at the cost of making the data harder to work with. I think it'll make sense especially for wasm-focused Arrow applications.
That's pretty cool! I guess that's pseudocode like const readStream = await wasm.readParquetStream(url);
const outputStream = await wasm.writeParquetStream(schema);
for (await table of readStream) {
outputStream.write(table);
} I haven't actually used streams in JS much, so I'm not too familiar with the mechanics of piping the stream somewhere.
Seems ideal to have a stream writer like:
Then you don't have to ever materialize the entire table into memory, though it does require you to know the output schema in advance.
Yeah I don't really know what you'd do to the parquet bytes
I need to think about this more..... |
// Need to create an encoding for each column | ||
let mut encodings = vec![]; | ||
for _ in &schema.fields { | ||
// Note, the nested encoding is for nested Parquet columns | ||
// Here we assume columns are not nested | ||
encodings.push(vec![encoding]); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a separate note that this was kinda a hack and at some point I need to come back and make sure that the encoding is compatible for each type in the schema
More or less, I got a bit carried away there with mermaid :S. Technically the idiomatic way to do it in JS is const byteStream = transformFunc(fooStream) // -> produces a ReadableStream.
await byteStream.pipeTo(writableStream);
It turns out The only thing left at this point is figuring out a way to do this: #[wasm_bindgen]
pub fn foo(opaque: JsValue) -> RecordBatch {} (The non-generic ReadableStream type (which is always interpreted as emitting JsValue) is the main impediment). |
I think I'll have to read that a few times to really understand it.
Essentially I envision converting a record batch from Arrow JS to Wasm to be: import {schemaFromArrowJS, arrayFromArrowJS, schemaToFFI, arrayToFFI} from 'arrow-js-ffi';
import {malloc} from 'parquet-wasm';
const recordBatch = arrow.RecordBatch();
const nanoArrowSchema = schemaFromArrowJS(recordBatch.schema())
const nanoArrowArray = arrayFromArrowJS(recordBatch.toStruct())
const schemaFFIPointer = schemaToFFI(nanoArrowSchema);
const arrayFFIPointer = arrayToFFI(nanoArrowArray);
const wasmRecordBatch = wasm.RecordBatch.from_pointers(arrayFFIPointer, schemaFFIPointer); Then on the rust side, this Does this answer your question at all? |
…: Requires as-yet unmerged PR to wasm-bindgen (retarget in separate reversible commit)
f279dd7
to
f07bbc3
Compare
To be honest, miscommunication on my part (too terse), this:
really should have included the full context, namely: #[wasm_bindgen]
pub async fn foo_readable_stream_consumer(input: wasm_streams::readable::sys::ReadableStream) -> {
let rs_stream: wasm_streams::ReadableStream<Result<JsValue, JsValue>> = wasm_streams::ReadableStream::from_raw(input).into_stream();
// type annotation above cannot be altered because the ReadableStream type is non-generic.
while let Some(item) = rs_stream.next().await {
let recovered_rs_item: RecordBatch = item.dyn_into(); // JsValue dynamic casting -
// requires a somewhat unreliable (and unwieldy) JsCast impl. Also this is really intended for
// well-known JS classes, rather than JS classes that wrap rust structs
}
} The syntax we really want for this is: let recovered_rs_item: RecordBatch = item.try_into().unwrap(); The additional codegen output in rustwasm/wasm-bindgen#3554 gives us exactly that, for any and all wasm-bindgen'd structs (as well as class RecordBatch {
static __unwrap(jsValue) {
if (!(jsValue instanceof RecordBatch)) {
return 0;
}
return jsValue.__destroy_into_raw();
}
/* etc */
}
module.exports.__wbg_recordbatch_unwrap = function(arg0) {
const ret = RecordBatch.__unwrap(takeObject(arg0));
return ret;
}; Happy to leave this PR open in draft until the wasm-bindgen PR is merged, repin to wasm-bindgen 0.2.88 and undraft (also gives me a chance to figure out the equivalent with arrow1 (and adapt to the arrow1 table abstraction)). |
9c43488
to
f01c120
Compare
fwiw, I plan to deprecate arrow2 at some point, so I wouldn't put any more emphasis on that |
Hi @H-Plus-Time what's the status of this PR? I would love to give it a try #542 |
The interface for writing to streams so far is:
I suspect it won't take that much to get this to take an FFI table (do we need
unsafe impl From<FFITable> for Table
in the arrow-wasm repo for that?).What I'd really like this to do is close the loop on a workflow similar to:
Just taking a table is only somewhat useful (you still have to keep the batches that make up the table in memory until the write stream finishes), and tbh is mostly a placeholder until I figure out JsCast properly.
A few design questions come to mind:
This definitely needs a variant that takes a rust stream of RecordBatches (similar to the read functions), in part to avoid the cost of bouncing pointers out to JS, but much more importantly, remove a lot of JS-side orchestration. It's still subject to the above proviso (the output is always a ReadableStream, since the only real place to direct the output is to IO of some form).