-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[arrow-avro] Add AsyncWriter #9241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[arrow-avro] Add AsyncWriter #9241
Conversation
- Introduce AsyncFileWriter trait for async sink abstraction - Implement AsyncWriter<W, F> generic over async sink and format - Provide AsyncAvroWriter and AsyncAvroStreamWriter type aliases - Support OCF and SOE formats with identical API to sync Writer - Add AsyncWriterBuilder for configuration - Include comprehensive tests for OCF, SOE, and batch writing - Gate behind 'async' feature with tokio/futures/bytes dependencies
- Test OCF and stream writing modes - Test multiple batch writing with write_batches - Test builder configuration and capacity settings - Test schema mismatch error handling - Test deflate compression with conditional feature gate - Test into_inner to verify writer consumption
- Add comprehensive feature list in module docs - Add note about future object_store integration - Update code formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a comprehensive async writer API for the arrow-avro crate, providing an idiomatic async counterpart to the existing synchronous writer. The implementation mirrors the sync writer's API while following established Arrow async patterns (consistent with Parquet's async writer).
Changes:
- Added async writer feature with tokio, futures, and bytes dependencies
- Implemented
AsyncFileWritertrait andAsyncWritergeneric struct with type aliases for OCF and SOE formats - Added 7 comprehensive tests covering OCF/SOE round-trips, multiple batches, compression, and error handling
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| arrow-avro/Cargo.toml | Added async feature with optional dependencies for tokio, futures, and bytes |
| arrow-avro/src/writer/mod.rs | Added feature-gated public exports for async writer types |
| arrow-avro/src/writer/async_writer.rs | New module implementing full async writer API with trait, builder, writer struct, and comprehensive tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Add detailed schema resolution behavior documentation - Explain metadata key usage and fallback to conversion - Document default settings and customization options - Match documentation level with sync WriterBuilder
jecsand838
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks you so much for getting this up so quickly! I went ahead and left some initial comments, but overall this is a really solid start.
| pub struct AsyncWriterBuilder { | ||
| schema: Schema, | ||
| codec: Option<CompressionCodec>, | ||
| capacity: usize, | ||
| fingerprint_strategy: Option<FingerprintStrategy>, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maintainability thought: AsyncWriterBuilder duplicates the same knobs/fields as the existing sync WriterBuilder (schema/codec/capacity/fingerprint_strategy).
To avoid future drift, could we factor shared builder logic, or add something like WriterBuilder::build_async(...) in mod.rs behind cfg(feature="async") so there is a single source of truth for configuration + schema selection?
| async fn test_async_writer_into_inner() -> Result<(), Box<dyn std::error::Error>> { | ||
| let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); | ||
| let batch = RecordBatch::try_new( | ||
| Arc::new(schema.clone()), | ||
| vec![Arc::new(Int32Array::from(vec![99])) as ArrayRef], | ||
| )?; | ||
|
|
||
| let mut buffer = Vec::new(); | ||
| { | ||
| let mut writer = AsyncAvroWriter::new(&mut buffer, schema).await?; | ||
| writer.write(&batch).await?; | ||
| writer.finish().await?; | ||
| } | ||
|
|
||
| assert!(!buffer.is_empty()); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is named *_into_inner but never calls into_inner. Would you be open to either renaming it (e.g. test_async_writer) or updating it to exercise into_inner?
One way is to use an owned test sink (custom AsyncFileWriter that collects Bytes) and assert that writer.into_inner() returns it with non-empty contents.
| #[tokio::test] | ||
| async fn test_async_avro_stream_writer() -> Result<(), Box<dyn std::error::Error>> { | ||
| let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]); | ||
|
|
||
| let batch = RecordBatch::try_new( | ||
| Arc::new(schema.clone()), | ||
| vec![Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef], | ||
| )?; | ||
|
|
||
| let mut buffer = Vec::new(); | ||
| let mut writer = AsyncAvroStreamWriter::new(&mut buffer, schema).await?; | ||
| writer.write(&batch).await?; | ||
| writer.finish().await?; | ||
|
|
||
| assert!(!buffer.is_empty()); | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This SOE test is currently a non-empty buffer smoke test. What do you think about strengthening it to catch regressions in SOE framing by asserting on exact matches?
Ideas:
- Validate the stream prefix matches the expected SOE magic + fingerprint (e.g. compare against
schema::SINGLE_OBJECT_MAGICand ensure the fingerprint bytes are present), and/or - Do a real round-trip decode using the existing streaming decode path (
Decoder+SchemaStore) with the writer schema registered.
| async fn write_ocf_block( | ||
| &mut self, | ||
| batch: &RecordBatch, | ||
| sync: &[u8; 16], | ||
| ) -> Result<(), ArrowError> { | ||
| let mut buf = Vec::<u8>::with_capacity(self.capacity); | ||
| self.encoder.encode(&mut buf, batch)?; | ||
| let encoded = match self.compression { | ||
| Some(codec) => codec.compress(&buf)?, | ||
| None => buf, | ||
| }; | ||
|
|
||
| let mut block_buf = Vec::<u8>::new(); | ||
| write_long(&mut block_buf, batch.num_rows() as i64)?; | ||
| write_long(&mut block_buf, encoded.len() as i64)?; | ||
| block_buf.extend_from_slice(&encoded); | ||
| block_buf.extend_from_slice(sync); | ||
|
|
||
| self.writer.write(Bytes::from(block_buf)).await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_ocf_block currently does a full memcpy of the encoded/compressed payload via block_buf.extend_from_slice(&encoded). For large batches this can significantly increase peak memory (encode/compress buffer + block_buf copy) and adds extra CPU.
What do you think about writing in multiple chunks (header bytes, then Bytes::from(encoded) to move without copy, then sync marker) to avoid copying, instead of re-buffering the entire block into block_buf?
| let mut header_buf = Vec::<u8>::with_capacity(256); | ||
| format.start_stream(&mut header_buf, &schema, self.codec)?; | ||
| writer.write(Bytes::from(header_buf)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: for formats where start_stream writes no header bytes, this will still call write with an empty buffer. Might be worth guarding with if !header_buf.is_empty() to avoid a no-op write (some sinks treat zero-length writes oddly).
| /// The asynchronous interface used by [`AsyncWriter`] to write Avro files. | ||
| pub trait AsyncFileWriter: Send { | ||
| /// Write the provided bytes to the underlying writer | ||
| fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>>; | ||
|
|
||
| /// Flush any buffered data and finish the writing process. | ||
| /// | ||
| /// After `complete` returns `Ok(())`, the caller SHOULD not call write again. | ||
| fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>>; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe worth expanding the trait docs a bit to clarify intended semantics imo.
Specifically that implementations may buffer internally (or write immediately), and may implement retry logic, and that write is expected to append all bytes or return an error.
This becomes especially important once we add an object_store-backed implementation.
| /// Create a new builder with default settings. | ||
| /// | ||
| /// The Avro schema used for writing is determined as follows: | ||
| /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`), | |
| /// 1) If the Arrow schema metadata contains `avro.schema` (see `SCHEMA_METADATA_KEY`), |
| //! ```no_run | ||
| //! use std::sync::Arc; | ||
| //! use arrow_array::{ArrayRef, Int64Array, RecordBatch}; | ||
| //! use arrow_schema::{DataType, Field, Schema}; | ||
| //! use arrow_avro::writer::AsyncAvroWriter; | ||
| //! use bytes::Bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use bytes::Bytes; appears unused in the example. Also I'd recommend we make all of our examples runnable imo.
| //! ```no_run | |
| //! use std::sync::Arc; | |
| //! use arrow_array::{ArrayRef, Int64Array, RecordBatch}; | |
| //! use arrow_schema::{DataType, Field, Schema}; | |
| //! use arrow_avro::writer::AsyncAvroWriter; | |
| //! use bytes::Bytes; | |
| //! ``` | |
| //! use std::sync::Arc; | |
| //! use arrow_array::{ArrayRef, Int64Array, RecordBatch}; | |
| //! use arrow_schema::{DataType, Field, Schema}; | |
| //! use arrow_avro::writer::AsyncAvroWriter; |
Summary
This PR implements a fully functional async Avro writer for
arrow-avro, providing a symmetric and idiomatic async API that mirrors the existing synchronousWriterwhile following Arrow's established async patterns (consistent with Parquet's async writer).Fixes: #9212
Design Overview
New Types
AsyncFileWritertrait: Minimal abstraction for async I/O sinkswrite(Bytes) -> BoxFuture<Result<()>>complete() -> BoxFuture<Result<()>>tokio::io::AsyncWrite + Unpin + SendAsyncWriter<W, F>struct: Generic async writerW: AnyAsyncFileWriter(tokio types, custom implementations, etc.)F: AnyAvroFormat(OCF or SOE)WriterType aliases:
AsyncAvroWriter<W>- OCF (Object Container File) formatAsyncAvroStreamWriter<W>- SOE (Single Object Encoding) formatAsyncWriterBuilder: Configuration builderwith_compression()- All codecs (Deflate, Snappy, ZStandard, etc.)with_fingerprint_strategy()- SOE fingerprintingwith_capacity()- Buffer sizingbuild()methodKey Implementation Details
RecordEncoder- no re-implementation of Avro encodingVec<u8>, converts toBytes, flushes asynchronouslyasyncfeature withtokio,futures,bytesdependenciesAPI Parity
The async writer provides identical methods to the sync writer:
Test Coverage
7 comprehensive tests covering:
into_inner()All tests verify data integrity through round-trip with sync
ReaderBuilder, not byte-for-byte equality (OCF sync markers are random).Feature Gating
All async code is guarded with
#[cfg(feature = "async")].Commits
[arrow-avro] add async writer module with feature gating- Core implementation[arrow-avro] add comprehensive async writer tests- Test coverage[arrow-avro] format code with rustfmt- Formatting[arrow-avro] improve async writer documentation- DocsTesting
All tests pass: ✅
Clippy: No warnings ✅
Rustfmt: Clean ✅
Files Modified
arrow-avro/Cargo.toml- Added async feature and dependenciesarrow-avro/src/writer/mod.rs- Module exportsarrow-avro/src/writer/async_writer.rs- New module (486 lines, 7 tests)Future Work
object_storefeature for cloud storage integration (S3, GCS, Azure)ParquetObjectWriterExample Usage
References
AsyncArrowWriter(parquet/src/arrow/async_writer/)