Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: Make AsyncArrowWriter accepts AsyncFileWriter trait instead #5738

Closed
Xuanwo opened this issue May 9, 2024 · 2 comments · Fixed by #5753
Closed

proposal: Make AsyncArrowWriter accepts AsyncFileWriter trait instead #5738

Xuanwo opened this issue May 9, 2024 · 2 comments · Fixed by #5753
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@Xuanwo
Copy link
Member

Xuanwo commented May 9, 2024

This proposal intends to make AsyncArrowWriter accepts a new trait called AsyncFileWriter instead like what we do for ParquetRecordBatchStream.

AsyncArrowWriter accepts AsyncWrite currently:

impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, options)
}

AsyncWrite is a low-level, poll-based API. Users with writers that provide async fn write() will need to encapsulate it within a manually written future state machine.

For example:

impl AsyncWrite for BufWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
let max_concurrency = self.max_concurrency;

Describe the solution you'd like

I propose to make AsyncArrowWriter accepts a new trait called AsyncFileWriter:

pub struct AsyncArrowWriter<W> {
    /// Underlying sync writer
    sync_writer: ArrowWriter<Vec<u8>>,

    /// Async writer provided by caller
    async_writer: W,
}

impl<W: AsyncFileWriter + Unpin + Send> AsyncArrowWriter<W> {
  ...
}

pub trait AsyncFileWriter: Send {
    async fn write(&mut self, bs: Bytes) -> Result<()>;
    async fn complete(&mut self) -> Result<()>;
}

impl<T: AsyncWrite> AsyncFileWriter for T {
   ...
}

Describe alternatives you've considered

Not yet.

Additional context

ParquetRecordBatchStream accetps AsyncFileReader:

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
}


I'm willing help implement this proposal.

@Xuanwo Xuanwo added the enhancement Any new improvement worthy of a entry in the changelog label May 9, 2024
@tustvold
Copy link
Contributor

tustvold commented May 9, 2024

I think this would be a very nice addition, and would tie in well with the new object_store vectored write APIs such as https://docs.rs/object_store/latest/object_store/struct.WriteMultipart.html.

@Xuanwo
Copy link
Member Author

Xuanwo commented May 9, 2024

I think this would be a very nice addition, and would tie in well with the new object_store vectored write APIs such as https://docs.rs/object_store/latest/object_store/struct.WriteMultipart.html.

Great, I will start the work. Please assign this issue to me 💌

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants