From 21581ea199471bfffee12e8de003557c04819b0f Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Thu, 24 Apr 2025 13:05:33 +0400 Subject: [PATCH 1/7] Add custom decoder in arrow-json --- arrow-json/src/lib.rs | 2 +- arrow-json/src/reader/list_array.rs | 5 + arrow-json/src/reader/map_array.rs | 7 + arrow-json/src/reader/mod.rs | 191 +++++++++++++++++++++++++- arrow-json/src/reader/struct_array.rs | 6 + arrow-json/src/reader/tape.rs | 1 + 6 files changed, 205 insertions(+), 7 deletions(-) diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs index 1b18e0094708..a53ca74fd618 100644 --- a/arrow-json/src/lib.rs +++ b/arrow-json/src/lib.rs @@ -86,7 +86,7 @@ pub mod reader; pub mod writer; -pub use self::reader::{Reader, ReaderBuilder}; +pub use self::reader::{ArrayDecoder, DecoderFactory, Reader, ReaderBuilder, Tape, TapeElement}; pub use self::writer::{ ArrayWriter, Encoder, EncoderFactory, EncoderOptions, LineDelimitedWriter, Writer, WriterBuilder, diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index e74fef79178a..72a2d0e1bb9b 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -24,6 +24,9 @@ use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; use std::marker::PhantomData; +use std::sync::Arc; + +use super::DecoderFactory; pub struct ListArrayDecoder { data_type: DataType, @@ -39,6 +42,7 @@ impl ListArrayDecoder { strict_mode: bool, is_nullable: bool, struct_mode: StructMode, + decoder_factory: Option>, ) -> Result { let field = match &data_type { DataType::List(f) if !O::IS_LARGE => f, @@ -51,6 +55,7 @@ impl ListArrayDecoder { strict_mode, field.is_nullable(), struct_mode, + decoder_factory, )?; Ok(Self { diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index c2068577a094..572f4f38522c 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::StructMode; use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, make_decoder}; @@ -24,6 +26,8 @@ use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType}; +use super::DecoderFactory; + pub struct MapArrayDecoder { data_type: DataType, keys: Box, @@ -38,6 +42,7 @@ impl MapArrayDecoder { strict_mode: bool, is_nullable: bool, struct_mode: StructMode, + decoder_factory: Option>, ) -> Result { let fields = match &data_type { DataType::Map(_, true) => { @@ -62,6 +67,7 @@ impl MapArrayDecoder { strict_mode, fields[0].is_nullable(), struct_mode, + decoder_factory.clone(), )?; let values = make_decoder( fields[1].data_type().clone(), @@ -69,6 +75,7 @@ impl MapArrayDecoder { strict_mode, fields[1].is_nullable(), struct_mode, + decoder_factory, )?; Ok(Self { diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f5fd1a8e7c38..f1bfd979f391 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -149,6 +149,7 @@ use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; pub use schema::*; +pub use tape::*; use crate::reader::boolean_array::BooleanArrayDecoder; use crate::reader::decimal_array::DecimalArrayDecoder; @@ -159,7 +160,6 @@ use crate::reader::primitive_array::PrimitiveArrayDecoder; use crate::reader::string_array::StringArrayDecoder; use crate::reader::string_view_array::StringViewArrayDecoder; use crate::reader::struct_array::StructArrayDecoder; -use crate::reader::tape::{Tape, TapeDecoder}; use crate::reader::timestamp_array::TimestampArrayDecoder; mod binary_array; @@ -184,6 +184,7 @@ pub struct ReaderBuilder { strict_mode: bool, is_field: bool, struct_mode: StructMode, + decoder_factory: Option>, schema: SchemaRef, } @@ -205,6 +206,7 @@ impl ReaderBuilder { is_field: false, struct_mode: Default::default(), schema, + decoder_factory: None, } } @@ -246,6 +248,7 @@ impl ReaderBuilder { is_field: true, struct_mode: Default::default(), schema: Arc::new(Schema::new([field.into()])), + decoder_factory: None, } } @@ -285,6 +288,14 @@ impl ReaderBuilder { } } + /// Set an optional hook for customizing decoding behavior. + pub fn with_decoder_factory(self, decoder_factory: Arc) -> Self { + Self { + decoder_factory: Some(decoder_factory), + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -309,6 +320,7 @@ impl ReaderBuilder { self.strict_mode, nullable, self.struct_mode, + self.decoder_factory, )?; let num_fields = self.schema.flattened_fields().len(); @@ -373,6 +385,95 @@ impl RecordBatchReader for Reader { } } +/// A trait to create custom decoders for specific data types. +/// +/// This allows overriding the default decoders for specific data types, +/// or adding new decoders for custom data types. +/// +/// # Examples +/// +/// ``` +/// use arrow_json::{ArrayDecoder, DecoderFactory, TapeElement, Tape, ReaderBuilder, StructMode}; +/// use arrow_schema::ArrowError; +/// use arrow_schema::{DataType, Field, Fields, Schema}; +/// use arrow_array::cast::AsArray; +/// use arrow_array::Array; +/// use arrow_array::builder::StringBuilder; +/// use arrow_data::ArrayData; +/// use std::sync::Arc; +/// +/// struct IncorrectStringAsNullDecoder {} +/// +/// impl ArrayDecoder for IncorrectStringAsNullDecoder { +/// fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { +/// let mut builder = StringBuilder::new(); +/// for p in pos { +/// match tape.get(*p) { +/// TapeElement::String(idx) => { +/// builder.append_value(tape.get_string(idx)); +/// } +/// _ => builder.append_null(), +/// } +/// } +/// Ok(builder.finish().into_data()) +/// } +/// } +/// +/// #[derive(Debug)] +/// struct IncorrectStringAsNullDecoderFactory; +/// +/// impl DecoderFactory for IncorrectStringAsNullDecoderFactory { +/// fn make_default_decoder<'a>( +/// &self, +/// data_type: DataType, +/// _coerce_primitive: bool, +/// _strict_mode: bool, +/// _is_nullable: bool, +/// _struct_mode: StructMode, +/// ) -> Result>, ArrowError> { +/// match data_type { +/// DataType::Utf8 => Ok(Some(Box::new(IncorrectStringAsNullDecoder {}))), +/// _ => Ok(None), +/// } +/// } +/// } +/// +/// let json = r#" +/// {"a": "a"} +/// {"a": 12} +/// "#; +/// let batch = ReaderBuilder::new(Arc::new(Schema::new(Fields::from(vec![Field::new( +/// "a", +/// DataType::Utf8, +/// true, +/// )])))) +/// .with_decoder_factory(Arc::new(IncorrectStringAsNullDecoderFactory)) +/// .build(json.as_bytes()) +/// .unwrap() +/// .next() +/// .unwrap() +/// .unwrap(); +/// +/// let values = batch.column(0).as_string::(); +/// assert_eq!(values.len(), 2); +/// assert_eq!(values.value(0), "a"); +/// assert!(values.is_null(1)); +/// ``` +pub trait DecoderFactory: std::fmt::Debug + Send + Sync { + /// Make a decoder that overrides the default decoder for a specific data type. + /// This can be used to override how e.g. error in decoding are handled. + fn make_default_decoder( + &self, + _data_type: DataType, + _coerce_primitive: bool, + _strict_mode: bool, + _is_nullable: bool, + _struct_mode: StructMode, + ) -> Result>, ArrowError> { + Ok(None) + } +} + /// A low-level interface for reading JSON data from a byte stream /// /// See [`Reader`] for a higher-level interface for interface with [`BufRead`] @@ -674,7 +775,8 @@ impl Decoder { } } -trait ArrayDecoder: Send { +/// A trait to decode JSON values into arrow arrays +pub trait ArrayDecoder: Send { /// Decode elements from `tape` starting at the indexes contained in `pos` fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result; } @@ -691,7 +793,20 @@ fn make_decoder( strict_mode: bool, is_nullable: bool, struct_mode: StructMode, + decoder_factory: Option>, ) -> Result, ArrowError> { + if let Some(ref factory) = decoder_factory { + if let Some(decoder) = factory.make_default_decoder( + data_type.clone(), + coerce_primitive, + strict_mode, + is_nullable, + struct_mode, + )? { + return Ok(decoder); + } + } + downcast_integer! { data_type => (primitive_decoder, data_type), DataType::Null => Ok(Box::::default()), @@ -744,14 +859,14 @@ fn make_decoder( DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), DataType::Binary => Ok(Box::new(BinaryArrayDecoder::::default())), DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::::default())), DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))), DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())), - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } @@ -2815,4 +2930,68 @@ mod tests { "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() ); } + + #[test] + fn test_decoder_factory() { + use arrow_array::builder; + + struct AlwaysNullStringArrayDecoder; + + impl ArrayDecoder for AlwaysNullStringArrayDecoder { + fn decode(&mut self, _tape: &Tape<'_>, pos: &[u32]) -> Result { + let mut builder = builder::StringBuilder::new(); + for _ in pos { + builder.append_null(); + } + Ok(builder.finish().into_data()) + } + } + + #[derive(Debug)] + struct AlwaysNullStringArrayDecoderFactory; + + impl DecoderFactory for AlwaysNullStringArrayDecoderFactory { + fn make_default_decoder<'a>( + &self, + data_type: DataType, + _coerce_primitive: bool, + _strict_mode: bool, + _is_nullable: bool, + _struct_mode: StructMode, + ) -> Result>, ArrowError> { + match data_type { + DataType::Utf8 => Ok(Some(Box::new(AlwaysNullStringArrayDecoder {}))), + _ => Ok(None), + } + } + } + + let buf = r#" + {"a": "1", "b": 2} + {"a": "hello", "b": 23} + "#; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Int32, true), + ])); + + let batches = ReaderBuilder::new(schema.clone()) + .with_batch_size(2) + .with_decoder_factory(Arc::new(AlwaysNullStringArrayDecoderFactory)) + .build(Cursor::new(buf.as_bytes())) + .unwrap() + .collect::, _>>() + .unwrap(); + + assert_eq!(batches.len(), 1); + + let col1 = batches[0].column(0).as_string::(); + assert_eq!(col1.null_count(), 2); + assert!(col1.is_null(0)); + assert!(col1.is_null(1)); + + let col2 = batches[0].column(1).as_primitive::(); + assert_eq!(col2.value(0), 2); + assert_eq!(col2.value(1), 23); + } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 262097ace396..0b1832c530ed 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{ArrayDecoder, StructMode, make_decoder}; use arrow_array::builder::BooleanBufferBuilder; @@ -22,6 +24,8 @@ use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Fields}; +use super::DecoderFactory; + pub struct StructArrayDecoder { data_type: DataType, decoders: Vec>, @@ -37,6 +41,7 @@ impl StructArrayDecoder { strict_mode: bool, is_nullable: bool, struct_mode: StructMode, + decoder_factory: Option>, ) -> Result { let decoders = struct_fields(&data_type) .iter() @@ -51,6 +56,7 @@ impl StructArrayDecoder { strict_mode, nullable, struct_mode, + decoder_factory.clone(), ) }) .collect::, ArrowError>>()?; diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 89ee3f778765..fcab173ef110 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -338,6 +338,7 @@ impl TapeDecoder { } } + /// Decodes JSON data from the provided buffer, returning the number of bytes consumed pub fn decode(&mut self, buf: &[u8]) -> Result { let mut iter = BufIter::new(buf); From 5fe364287881ad91ace7b6f4bfca70c1ec384cf1 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Fri, 19 Dec 2025 08:23:33 -0500 Subject: [PATCH 2/7] include field --- arrow-json/src/reader/list_array.rs | 1 + arrow-json/src/reader/map_array.rs | 2 + arrow-json/src/reader/mod.rs | 10 +- arrow-json/src/reader/struct_array.rs | 1 + parquet-variant-compute/Cargo.toml | 4 + parquet-variant-compute/src/decoder.rs | 252 +++++++++++++++++++++++++ parquet-variant-compute/src/lib.rs | 2 + 7 files changed, 269 insertions(+), 3 deletions(-) create mode 100644 parquet-variant-compute/src/decoder.rs diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index 72a2d0e1bb9b..7a697faf9bc2 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -50,6 +50,7 @@ impl ListArrayDecoder { _ => unreachable!(), }; let decoder = make_decoder( + Some(field.clone()), field.data_type().clone(), coerce_primitive, strict_mode, diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index 572f4f38522c..4a115d49bea3 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -62,6 +62,7 @@ impl MapArrayDecoder { }; let keys = make_decoder( + Some(fields[0].clone()), fields[0].data_type().clone(), coerce_primitive, strict_mode, @@ -70,6 +71,7 @@ impl MapArrayDecoder { decoder_factory.clone(), )?; let values = make_decoder( + Some(fields[1].clone()), fields[1].data_type().clone(), coerce_primitive, strict_mode, diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f1bfd979f391..cdd035d83347 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -315,6 +315,7 @@ impl ReaderBuilder { }; let decoder = make_decoder( + None, data_type, self.coerce_primitive, self.strict_mode, @@ -425,6 +426,7 @@ impl RecordBatchReader for Reader { /// impl DecoderFactory for IncorrectStringAsNullDecoderFactory { /// fn make_default_decoder<'a>( /// &self, +/// _field: Option, /// data_type: DataType, /// _coerce_primitive: bool, /// _strict_mode: bool, @@ -464,14 +466,13 @@ pub trait DecoderFactory: std::fmt::Debug + Send + Sync { /// This can be used to override how e.g. error in decoding are handled. fn make_default_decoder( &self, + _field: Option, _data_type: DataType, _coerce_primitive: bool, _strict_mode: bool, _is_nullable: bool, _struct_mode: StructMode, - ) -> Result>, ArrowError> { - Ok(None) - } + ) -> Result>, ArrowError>; } /// A low-level interface for reading JSON data from a byte stream @@ -788,6 +789,7 @@ macro_rules! primitive_decoder { } fn make_decoder( + field: Option, data_type: DataType, coerce_primitive: bool, strict_mode: bool, @@ -797,6 +799,7 @@ fn make_decoder( ) -> Result, ArrowError> { if let Some(ref factory) = decoder_factory { if let Some(decoder) = factory.make_default_decoder( + field.clone(), data_type.clone(), coerce_primitive, strict_mode, @@ -2953,6 +2956,7 @@ mod tests { impl DecoderFactory for AlwaysNullStringArrayDecoderFactory { fn make_default_decoder<'a>( &self, + _field: Option, data_type: DataType, _coerce_primitive: bool, _strict_mode: bool, diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 0b1832c530ed..71a17141a4d4 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -51,6 +51,7 @@ impl StructArrayDecoder { // it doesn't contain any nulls not masked by its parent let nullable = f.is_nullable() || is_nullable; make_decoder( + Some(f.clone()), f.data_type().clone(), coerce_primitive, strict_mode, diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 74c3dd3fb72f..c88d479393ad 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -30,9 +30,13 @@ rust-version = { workspace = true } [dependencies] arrow = { workspace = true , features = ["canonical_extension_types"]} +arrow-array = { workspace = true } +arrow-data = { workspace = true } +arrow-json = { workspace = true } arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } indexmap = "2.10.0" +lexical-core = { version = "1.0", default-features = false} parquet-variant = { workspace = true } parquet-variant-json = { workspace = true } chrono = { workspace = true } diff --git a/parquet-variant-compute/src/decoder.rs b/parquet-variant-compute/src/decoder.rs new file mode 100644 index 000000000000..d20ff20e0a00 --- /dev/null +++ b/parquet-variant-compute/src/decoder.rs @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{Array, StructArray}; +use arrow_json::{DecoderFactory, StructMode}; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilder, VariantBuilderExt}; +use crate::{VariantArrayBuilder, VariantType}; +use arrow_data::ArrayData; +use arrow_schema::{ArrowError, DataType, FieldRef}; + +use arrow_json::reader::ArrayDecoder; +use arrow_json::reader::{Tape, TapeElement}; + +/// An [`ArrayDecoder`] implementation that decodes JSON values into a Variant array. +/// +/// This decoder converts JSON tape elements (parsed JSON tokens) into Parquet Variant +/// format, preserving the full structure of arbitrary JSON including nested objects, +/// arrays, and primitive types. +/// +/// This decoder is typically used indirectly via [`VariantArrayDecoderFactory`] when +/// reading JSON data into Variant columns. +#[derive(Default)] +pub struct VariantArrayDecoder; + +impl ArrayDecoder for VariantArrayDecoder { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + let mut array_builder = VariantArrayBuilder::new(pos.len()); + for p in pos { + let mut builder = VariantBuilder::new(); + variant_from_tape_element(&mut builder, *p, tape)?; + let (metadata, value) = builder.finish(); + array_builder.append_value(Variant::new(&metadata, &value)); + } + let variant_struct_array: StructArray = array_builder.build().into(); + Ok(variant_struct_array.into_data()) + } +} + +/// A [`DecoderFactory`] that creates [`VariantArrayDecoder`] instances for Variant-typed fields. +/// +/// This factory integrates with the Arrow JSON reader to automatically decode JSON values +/// into Variant arrays when the target field is registered as a [`VariantType`] extension type. +/// +/// # Example +/// +/// ```ignore +/// use arrow_json::reader::ReaderBuilder; +/// use arrow_json::StructMode; +/// use std::sync::Arc; +/// +/// let builder = ReaderBuilder::new(Arc::new(schema)); +/// let reader = builder +/// .with_struct_mode(StructMode::ObjectOnly) +/// .with_decoder_factory(Arc::new(VariantArrayDecoderFactory)) +/// .build(json_input)?; +/// ``` +#[derive(Default, Debug)] +#[allow(unused)] +pub struct VariantArrayDecoderFactory; + +impl DecoderFactory for VariantArrayDecoderFactory { + fn make_default_decoder<'a>(&self, field: Option, + _data_type: DataType, + _coerce_primitive: bool, + _strict_mode: bool, + _is_nullable: bool, + _struct_mode: StructMode, + ) -> Result>, ArrowError> { + if let Some(field) = field && field.try_extension_type::().is_ok() { + return Ok(Some(Box::new(VariantArrayDecoder))); + } + Ok(None) + } +} + +fn variant_from_tape_element(builder: &mut impl VariantBuilderExt, mut p: u32, tape: &Tape) -> Result { + match tape.get(p) { + TapeElement::StartObject(end_idx) => { + let mut object_builder = builder.try_new_object()?; + p += 1; + while p < end_idx { + // Read field name + let field_name = match tape.get(p) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(p, "field name")), + }; + + let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder); + p = tape.next(p, "field value")?; + p = variant_from_tape_element(&mut field_builder, p, tape)?; + } + object_builder.finish(); + } + TapeElement::EndObject(_u32) => { return Err(ArrowError::JsonError("unexpected end of object".to_string())) }, + TapeElement::StartList(end_idx) => { + let mut list_builder = builder.try_new_list()?; + p+= 1; + while p < end_idx { + p = variant_from_tape_element(&mut list_builder, p, tape)?; + } + list_builder.finish(); + } + TapeElement::EndList(_u32) => { return Err(ArrowError::JsonError("unexpected end of list".to_string())) }, + TapeElement::String(idx) => builder.append_value(tape.get_string(idx)), + TapeElement::Number(idx) => { + let s = tape.get_string(idx); + builder.append_value(parse_number(s)?) + }, + TapeElement::I64(i) => builder.append_value(i), + TapeElement::I32(i) => builder.append_value(i), + TapeElement::F64(f) => builder.append_value(f), + TapeElement::F32(f) => builder.append_value(f), + TapeElement::True => builder.append_value(true), + TapeElement::False => builder.append_value(false), + TapeElement::Null => builder.append_value(Variant::Null), + } + p += 1; + Ok(p) +} + +fn parse_number<'a, 'b>(s: &'a str) -> Result, ArrowError> { + if let Ok(v) = lexical_core::parse(s.as_bytes()) { + return Ok(Variant::Int64(v)); + } + + match lexical_core::parse(s.as_bytes()) { + Ok(v) => Ok(Variant::Double(v)), + Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::{Schema, Field, DataType}; + use arrow_json::reader::ReaderBuilder; + use arrow_json::StructMode; + use std::sync::Arc; + use std::io::Cursor; + use arrow_array::RecordBatch; + + #[test] + fn test_variant() { + let do_test = |json_input: &str, ids: Vec, variants: Vec| { + let variant_array = VariantArrayBuilder::new(0).build(); + + let struct_field = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + // call VariantArray::field to get the correct Field + variant_array.field("var"), + ]); + + let builder = ReaderBuilder::new(Arc::new(struct_field.clone())); + let result = builder + .with_struct_mode(StructMode::ObjectOnly) + .with_decoder_factory(Arc::new(VariantArrayDecoderFactory)) + .build(Cursor::new(json_input.as_bytes())) + .unwrap() + .next() + .unwrap() + .unwrap(); + + let int_array = arrow_array::array::Int32Array::from(ids); + + let variant_array = { + let mut variant_builder = VariantArrayBuilder::new(variants.len()); + for v in variants { + variant_builder.append_variant(v); + } + variant_builder.build() + }; + + let variant_struct_array: StructArray = variant_array.into(); + + let expected = RecordBatch::try_new( + struct_field.into(), + vec![Arc::new(int_array), Arc::new(variant_struct_array)], + ) + .unwrap(); + + assert_eq!(result, expected); + }; + + do_test( + "{\"id\": 1, \"var\": \"a\"}\n{\"id\": 2, \"var\": \"b\"}", + vec![1, 2], + vec![Variant::from("a"), Variant::from("b")], + ); + + let mut builder = VariantBuilder::new(); + let mut object_builder = builder.new_object(); + object_builder.insert("int64", Variant::Int64(1)); + object_builder.insert("double", Variant::Double(1.0)); + object_builder.insert("null", Variant::Null); + object_builder.insert("true", Variant::BooleanTrue); + object_builder.insert("false", Variant::BooleanFalse); + object_builder.insert("string", Variant::from("a")); + object_builder.finish(); + let (metadata, value) = builder.finish(); + let variant = Variant::try_new(&metadata, &value).unwrap(); + + do_test( + "{\"id\": 1, \"var\": {\"int64\": 1, \"double\": 1.0, \"null\": null, \"true\": true, \"false\": false, \"string\": \"a\"}}", + vec![1], + vec![variant], + ); + + // nested structs + let mut builder = VariantBuilder::new(); + let mut object_builder = builder.new_object(); + { + let mut list_builder = object_builder.new_list("somelist"); + { + let mut nested_object_builder = list_builder.new_object(); + nested_object_builder.insert("num", Variant::Int64(2)); + nested_object_builder.finish(); + } + { + let mut nested_object_builder = list_builder.new_object(); + nested_object_builder.insert("num", Variant::Int64(3)); + nested_object_builder.finish(); + } + list_builder.finish(); + object_builder.insert("scalar", Variant::from("a")); + } + object_builder.finish(); + + let (metadata, value) = builder.finish(); + let variant = Variant::try_new(&metadata, &value).unwrap(); + + do_test( + "{\"id\": 1, \"var\": {\"somelist\": [{\"num\": 2}, {\"num\": 3}], \"scalar\": \"a\"}}", + vec![1], + vec![variant], + ); + } + +} diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 9b8008f58422..cd8025e011aa 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -41,6 +41,7 @@ mod arrow_to_variant; mod cast_to_variant; +mod decoder; mod from_json; mod shred_variant; mod to_json; @@ -61,3 +62,4 @@ pub use to_json::variant_to_json; pub use type_conversion::CastOptions; pub use unshred_variant::unshred_variant; pub use variant_get::{GetOptions, variant_get}; +pub use decoder::VariantArrayDecoder; From 53bffa29a028eb28a31f16283a55d5b69409b8d7 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Fri, 19 Dec 2025 17:44:02 -0500 Subject: [PATCH 3/7] change export --- parquet-variant-compute/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index cd8025e011aa..e6eab80283fd 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -62,4 +62,4 @@ pub use to_json::variant_to_json; pub use type_conversion::CastOptions; pub use unshred_variant::unshred_variant; pub use variant_get::{GetOptions, variant_get}; -pub use decoder::VariantArrayDecoder; +pub use decoder::VariantArrayDecoderFactory; From 66b75126b2426c6a6f4decc0082099150c3f7db7 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Mon, 22 Dec 2025 07:30:23 -0500 Subject: [PATCH 4/7] cleanup --- arrow-json/src/reader/mod.rs | 8 +- parquet-variant-compute/src/decoder.rs | 117 +++++++++++++++---------- parquet-variant-compute/src/lib.rs | 2 +- 3 files changed, 77 insertions(+), 50 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index cdd035d83347..dadb437611dc 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -424,7 +424,7 @@ impl RecordBatchReader for Reader { /// struct IncorrectStringAsNullDecoderFactory; /// /// impl DecoderFactory for IncorrectStringAsNullDecoderFactory { -/// fn make_default_decoder<'a>( +/// fn make_custom_decoder<'a>( /// &self, /// _field: Option, /// data_type: DataType, @@ -464,7 +464,7 @@ impl RecordBatchReader for Reader { pub trait DecoderFactory: std::fmt::Debug + Send + Sync { /// Make a decoder that overrides the default decoder for a specific data type. /// This can be used to override how e.g. error in decoding are handled. - fn make_default_decoder( + fn make_custom_decoder( &self, _field: Option, _data_type: DataType, @@ -798,7 +798,7 @@ fn make_decoder( decoder_factory: Option>, ) -> Result, ArrowError> { if let Some(ref factory) = decoder_factory { - if let Some(decoder) = factory.make_default_decoder( + if let Some(decoder) = factory.make_custom_decoder( field.clone(), data_type.clone(), coerce_primitive, @@ -2954,7 +2954,7 @@ mod tests { struct AlwaysNullStringArrayDecoderFactory; impl DecoderFactory for AlwaysNullStringArrayDecoderFactory { - fn make_default_decoder<'a>( + fn make_custom_decoder<'a>( &self, _field: Option, data_type: DataType, diff --git a/parquet-variant-compute/src/decoder.rs b/parquet-variant-compute/src/decoder.rs index d20ff20e0a00..f05668bfa65e 100644 --- a/parquet-variant-compute/src/decoder.rs +++ b/parquet-variant-compute/src/decoder.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::{Array, StructArray}; -use arrow_json::{DecoderFactory, StructMode}; -use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilder, VariantBuilderExt}; use crate::{VariantArrayBuilder, VariantType}; +use arrow_array::{Array, StructArray}; use arrow_data::ArrayData; +use arrow_json::{DecoderFactory, StructMode}; +use arrow_schema::extension::ExtensionType; use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt}; use arrow_json::reader::ArrayDecoder; use arrow_json::reader::{Tape, TapeElement}; @@ -40,12 +41,9 @@ impl ArrayDecoder for VariantArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut array_builder = VariantArrayBuilder::new(pos.len()); for p in pos { - let mut builder = VariantBuilder::new(); - variant_from_tape_element(&mut builder, *p, tape)?; - let (metadata, value) = builder.finish(); - array_builder.append_value(Variant::new(&metadata, &value)); + variant_from_tape_element(&mut array_builder, *p, tape)?; } - let variant_struct_array: StructArray = array_builder.build().into(); + let variant_struct_array = StructArray::from(array_builder.build()); Ok(variant_struct_array.into_data()) } } @@ -73,21 +71,29 @@ impl ArrayDecoder for VariantArrayDecoder { pub struct VariantArrayDecoderFactory; impl DecoderFactory for VariantArrayDecoderFactory { - fn make_default_decoder<'a>(&self, field: Option, + fn make_custom_decoder<'a>( + &self, + field: Option, _data_type: DataType, _coerce_primitive: bool, _strict_mode: bool, _is_nullable: bool, _struct_mode: StructMode, ) -> Result>, ArrowError> { - if let Some(field) = field && field.try_extension_type::().is_ok() { + if let Some(field) = field + && field.extension_type_name() == Some(VariantType::NAME) + { return Ok(Some(Box::new(VariantArrayDecoder))); } - Ok(None) + Ok(None) } } -fn variant_from_tape_element(builder: &mut impl VariantBuilderExt, mut p: u32, tape: &Tape) -> Result { +fn variant_from_tape_element( + builder: &mut impl VariantBuilderExt, + mut p: u32, + tape: &Tape, +) -> Result { match tape.get(p) { TapeElement::StartObject(end_idx) => { let mut object_builder = builder.try_new_object()?; @@ -98,28 +104,34 @@ fn variant_from_tape_element(builder: &mut impl VariantBuilderExt, mut p: u32, t TapeElement::String(s) => tape.get_string(s), _ => return Err(tape.error(p, "field name")), }; - + let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder); p = tape.next(p, "field value")?; p = variant_from_tape_element(&mut field_builder, p, tape)?; } object_builder.finish(); } - TapeElement::EndObject(_u32) => { return Err(ArrowError::JsonError("unexpected end of object".to_string())) }, + TapeElement::EndObject(_u32) => { + return Err(ArrowError::JsonError( + "unexpected end of object".to_string(), + )); + } TapeElement::StartList(end_idx) => { let mut list_builder = builder.try_new_list()?; - p+= 1; + p += 1; while p < end_idx { p = variant_from_tape_element(&mut list_builder, p, tape)?; } list_builder.finish(); } - TapeElement::EndList(_u32) => { return Err(ArrowError::JsonError("unexpected end of list".to_string())) }, + TapeElement::EndList(_u32) => { + return Err(ArrowError::JsonError("unexpected end of list".to_string())); + } TapeElement::String(idx) => builder.append_value(tape.get_string(idx)), TapeElement::Number(idx) => { let s = tape.get_string(idx); builder.append_value(parse_number(s)?) - }, + } TapeElement::I64(i) => builder.append_value(i), TapeElement::I32(i) => builder.append_value(i), TapeElement::F64(f) => builder.append_value(f), @@ -139,23 +151,28 @@ fn parse_number<'a, 'b>(s: &'a str) -> Result, ArrowError> { match lexical_core::parse(s.as_bytes()) { Ok(v) => Ok(Variant::Double(v)), - Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))), + Err(_) => Err(ArrowError::JsonError(format!( + "failed to parse {s} as number" + ))), } } #[cfg(test)] mod tests { + use crate::VariantArray; + use super::*; - use arrow_schema::{Schema, Field, DataType}; - use arrow_json::reader::ReaderBuilder; + use arrow_array::Int32Array; use arrow_json::StructMode; - use std::sync::Arc; + use arrow_json::reader::ReaderBuilder; + use arrow_schema::{DataType, Field, Schema}; + use parquet_variant::VariantBuilder; use std::io::Cursor; - use arrow_array::RecordBatch; + use std::sync::Arc; #[test] fn test_variant() { - let do_test = |json_input: &str, ids: Vec, variants: Vec| { + let do_test = |json_input: &str, ids: Vec, variants: Vec>| { let variant_array = VariantArrayBuilder::new(0).build(); let struct_field = Schema::new(vec![ @@ -174,31 +191,28 @@ mod tests { .unwrap() .unwrap(); + assert_eq!(result.num_columns(), 2); let int_array = arrow_array::array::Int32Array::from(ids); + assert_eq!( + result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(), + &int_array + ); - let variant_array = { - let mut variant_builder = VariantArrayBuilder::new(variants.len()); - for v in variants { - variant_builder.append_variant(v); - } - variant_builder.build() - }; - - let variant_struct_array: StructArray = variant_array.into(); + let result_variant_array: VariantArray = + VariantArray::try_new(result.column(1)).unwrap(); + let values = result_variant_array.iter().collect::>(); - let expected = RecordBatch::try_new( - struct_field.into(), - vec![Arc::new(int_array), Arc::new(variant_struct_array)], - ) - .unwrap(); - - assert_eq!(result, expected); + assert_eq!(values, variants); }; do_test( "{\"id\": 1, \"var\": \"a\"}\n{\"id\": 2, \"var\": \"b\"}", vec![1, 2], - vec![Variant::from("a"), Variant::from("b")], + vec![Some(Variant::from("a")), Some(Variant::from("b"))], ); let mut builder = VariantBuilder::new(); @@ -216,10 +230,10 @@ mod tests { do_test( "{\"id\": 1, \"var\": {\"int64\": 1, \"double\": 1.0, \"null\": null, \"true\": true, \"false\": false, \"string\": \"a\"}}", vec![1], - vec![variant], + vec![Some(variant)], ); - // nested structs + // nested structs let mut builder = VariantBuilder::new(); let mut object_builder = builder.new_object(); { @@ -233,7 +247,7 @@ mod tests { let mut nested_object_builder = list_builder.new_object(); nested_object_builder.insert("num", Variant::Int64(3)); nested_object_builder.finish(); - } + } list_builder.finish(); object_builder.insert("scalar", Variant::from("a")); } @@ -245,8 +259,21 @@ mod tests { do_test( "{\"id\": 1, \"var\": {\"somelist\": [{\"num\": 2}, {\"num\": 3}], \"scalar\": \"a\"}}", vec![1], - vec![variant], + vec![Some(variant)], ); - } + let mut builder = VariantBuilder::new(); + let mut list_builder = builder.new_list(); + list_builder.append_value(Variant::Int64(1000000000000)); + list_builder.append_value(Variant::Double(2.718281828459045)); + list_builder.finish(); + let (metadata, value) = builder.finish(); + let variant = Variant::try_new(&metadata, &value).unwrap(); + + do_test( + "{\"id\": 1, \"var\": [1000000000000, 2.718281828459045]}", + vec![1], + vec![Some(variant)], + ); + } } diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index e6eab80283fd..afc064b3c106 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -56,10 +56,10 @@ pub use variant_array::{BorrowedShreddingState, ShreddingState, VariantArray, Va pub use variant_array_builder::{VariantArrayBuilder, VariantValueArrayBuilder}; pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; +pub use decoder::VariantArrayDecoderFactory; pub use from_json::json_to_variant; pub use shred_variant::{IntoShreddingField, ShreddedSchemaBuilder, shred_variant}; pub use to_json::variant_to_json; pub use type_conversion::CastOptions; pub use unshred_variant::unshred_variant; pub use variant_get::{GetOptions, variant_get}; -pub use decoder::VariantArrayDecoderFactory; From 6aeb892706742afc69e421cf9b3766ff26a4e459 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Mon, 22 Dec 2025 07:37:34 -0500 Subject: [PATCH 5/7] more obvious --- parquet-variant-compute/src/decoder.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/parquet-variant-compute/src/decoder.rs b/parquet-variant-compute/src/decoder.rs index f05668bfa65e..a7150338b1f1 100644 --- a/parquet-variant-compute/src/decoder.rs +++ b/parquet-variant-compute/src/decoder.rs @@ -132,10 +132,26 @@ fn variant_from_tape_element( let s = tape.get_string(idx); builder.append_value(parse_number(s)?) } - TapeElement::I64(i) => builder.append_value(i), - TapeElement::I32(i) => builder.append_value(i), - TapeElement::F64(f) => builder.append_value(f), - TapeElement::F32(f) => builder.append_value(f), + TapeElement::I64(i) => { + return Err(ArrowError::JsonError(format!( + "I64 tape element not supported: {i}" + ))); + } + TapeElement::I32(i) => { + return Err(ArrowError::JsonError(format!( + "I32 tape element not supported: {i}" + ))); + } + TapeElement::F64(f) => { + return Err(ArrowError::JsonError(format!( + "F64 tape element not supported: {f}" + ))); + } + TapeElement::F32(f) => { + return Err(ArrowError::JsonError(format!( + "F32 tape element not supported: {f}" + ))); + } TapeElement::True => builder.append_value(true), TapeElement::False => builder.append_value(false), TapeElement::Null => builder.append_value(Variant::Null), From 93ad08c3a046b3daf38ba5d189e7cbe69e47ac54 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Thu, 8 Jan 2026 14:23:06 -0500 Subject: [PATCH 6/7] fix doctest and more exhaustive check --- arrow-json/src/reader/mod.rs | 4 ++-- parquet-variant-compute/src/decoder.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index dadb437611dc..5602d5e92f64 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -220,7 +220,7 @@ impl ReaderBuilder { /// # use arrow_array::cast::AsArray; /// # use arrow_array::types::Int32Type; /// # use arrow_json::ReaderBuilder; - /// # use arrow_schema::{DataType, Field}; + /// # use arrow_schema::{DataType, Field, FieldRef}; /// // Root of JSON schema is a numeric type /// let data = "1\n2\n3\n"; /// let field = Arc::new(Field::new("int", DataType::Int32, true)); @@ -396,7 +396,7 @@ impl RecordBatchReader for Reader { /// ``` /// use arrow_json::{ArrayDecoder, DecoderFactory, TapeElement, Tape, ReaderBuilder, StructMode}; /// use arrow_schema::ArrowError; -/// use arrow_schema::{DataType, Field, Fields, Schema}; +/// use arrow_schema::{DataType, Field, FieldRef, Fields, Schema}; /// use arrow_array::cast::AsArray; /// use arrow_array::Array; /// use arrow_array::builder::StringBuilder; diff --git a/parquet-variant-compute/src/decoder.rs b/parquet-variant-compute/src/decoder.rs index a7150338b1f1..68cc6c9c6299 100644 --- a/parquet-variant-compute/src/decoder.rs +++ b/parquet-variant-compute/src/decoder.rs @@ -82,6 +82,7 @@ impl DecoderFactory for VariantArrayDecoderFactory { ) -> Result>, ArrowError> { if let Some(field) = field && field.extension_type_name() == Some(VariantType::NAME) + && field.try_extension_type::().is_ok() { return Ok(Some(Box::new(VariantArrayDecoder))); } From 1fb7c47f3cd105b35014e9050776c06f14f9fbb8 Mon Sep 17 00:00:00 2001 From: Doug Miller Date: Sun, 11 Jan 2026 12:10:13 -0500 Subject: [PATCH 7/7] more lint fixes --- parquet-variant-compute/src/decoder.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/parquet-variant-compute/src/decoder.rs b/parquet-variant-compute/src/decoder.rs index 68cc6c9c6299..7f9eaa5267c4 100644 --- a/parquet-variant-compute/src/decoder.rs +++ b/parquet-variant-compute/src/decoder.rs @@ -48,10 +48,9 @@ impl ArrayDecoder for VariantArrayDecoder { } } -/// A [`DecoderFactory`] that creates [`VariantArrayDecoder`] instances for Variant-typed fields. -/// -/// This factory integrates with the Arrow JSON reader to automatically decode JSON values -/// into Variant arrays when the target field is registered as a [`VariantType`] extension type. +/// A [`DecoderFactory`] that integrates with the Arrow JSON reader to automatically +/// decode JSON values into Variant arrays when the target field is registered as a +/// [`VariantType`] extension type. /// /// # Example /// @@ -80,11 +79,14 @@ impl DecoderFactory for VariantArrayDecoderFactory { _is_nullable: bool, _struct_mode: StructMode, ) -> Result>, ArrowError> { - if let Some(field) = field - && field.extension_type_name() == Some(VariantType::NAME) + let field = match field { + Some(inner_field) => inner_field, + None => return Ok(None), + }; + if field.extension_type_name() == Some(VariantType::NAME) && field.try_extension_type::().is_ok() { - return Ok(Some(Box::new(VariantArrayDecoder))); + return Ok(Some(Box::new(VariantArrayDecoder))) } Ok(None) } @@ -282,7 +284,7 @@ mod tests { let mut builder = VariantBuilder::new(); let mut list_builder = builder.new_list(); list_builder.append_value(Variant::Int64(1000000000000)); - list_builder.append_value(Variant::Double(2.718281828459045)); + list_builder.append_value(Variant::Double(std::f64::consts::E)); list_builder.finish(); let (metadata, value) = builder.finish(); let variant = Variant::try_new(&metadata, &value).unwrap();