diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 16b6a6699e2d..48629d4f552a 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -189,7 +189,6 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect(); let reader_snippets: Vec = field_infos.iter().map(|x| x.reader_snippet()).collect(); - let i: Vec<_> = (0..reader_snippets.len()).collect(); let derived_for = input.ident; let generics = input.generics; @@ -206,6 +205,12 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let mut row_group_reader = row_group_reader; + // key: parquet file column name, value: column index + let mut name_to_index = std::collections::HashMap::new(); + for (idx, col) in row_group_reader.metadata().schema_descr().columns().iter().enumerate() { + name_to_index.insert(col.name().to_string(), idx); + } + for _ in 0..num_records { self.push(#derived_for { #( @@ -218,7 +223,9 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke #( { - if let Ok(mut column_reader) = row_group_reader.get_column_reader(#i) { + let idx = name_to_index.get(stringify!(#field_names)).unwrap_or_else( + || panic!("column name '{}' is not found in parquet file!", stringify!(#field_names))); + if let Ok(mut column_reader) = row_group_reader.get_column_reader(*idx) { #reader_snippets } else { return Err(::parquet::errors::ParquetError::General("Failed to get next column".into())) diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index e7c7896cb7f3..42c4671f4143 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -70,12 +70,12 @@ struct APartiallyCompleteRecord { // If these fields are guaranteed to be valid // we can load this struct into APartiallyCompleteRecord #[derive(PartialEq, ParquetRecordWriter, Debug)] -struct APartiallyOptionalRecord { +struct AnOptionalRecord { pub bool: bool, pub string: String, - pub maybe_i16: Option, - pub maybe_i32: Option, - pub maybe_u64: Option, + pub i16: Option, + pub i32: Option, + pub u64: Option, pub isize: isize, pub float: f32, pub double: f64, @@ -85,6 +85,22 @@ struct APartiallyOptionalRecord { pub byte_vec: Vec, } +// This struct removes several fields from the "APartiallyCompleteRecord", +// and it shuffles the fields. +// we should still be able to load it from APartiallyCompleteRecord parquet file +#[derive(PartialEq, ParquetRecordReader, Debug)] +struct APrunedRecord { + pub bool: bool, + pub string: String, + pub byte_vec: Vec, + pub float: f32, + pub double: f64, + pub i16: i16, + pub i32: i32, + pub u64: u64, + pub isize: isize, +} + #[cfg(test)] mod tests { use super::*; @@ -240,12 +256,12 @@ mod tests { #[test] fn test_parquet_derive_read_optional_but_valid_column() { let file = get_temp_file("test_parquet_derive_read_optional", &[]); - let drs: Vec = vec![APartiallyOptionalRecord { + let drs = vec![AnOptionalRecord { bool: true, string: "a string".into(), - maybe_i16: Some(-45), - maybe_i32: Some(456), - maybe_u64: Some(4563424), + i16: Some(-45), + i32: Some(456), + u64: Some(4563424), isize: -365, float: 3.5, double: f64::NAN, @@ -273,9 +289,57 @@ mod tests { let mut row_group = reader.get_row_group(0).unwrap(); out.read_from_row_group(&mut *row_group, 1).unwrap(); - assert_eq!(drs[0].maybe_i16.unwrap(), out[0].i16); - assert_eq!(drs[0].maybe_i32.unwrap(), out[0].i32); - assert_eq!(drs[0].maybe_u64.unwrap(), out[0].u64); + assert_eq!(drs[0].i16.unwrap(), out[0].i16); + assert_eq!(drs[0].i32.unwrap(), out[0].i32); + assert_eq!(drs[0].u64.unwrap(), out[0].u64); + } + + #[test] + fn test_parquet_derive_read_pruned_and_shuffled_columns() { + let file = get_temp_file("test_parquet_derive_read_pruned", &[]); + let drs = vec![APartiallyCompleteRecord { + bool: true, + string: "a string".into(), + i16: -45, + i32: 456, + u64: 4563424, + isize: -365, + float: 3.5, + double: f64::NAN, + now: chrono::Utc::now().naive_local(), + date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), + uuid: uuid::Uuid::new_v4(), + byte_vec: vec![0x65, 0x66, 0x67], + }]; + + let generated_schema = drs.as_slice().schema().unwrap(); + + let props = Default::default(); + let mut writer = + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + row_group.close().unwrap(); + writer.close().unwrap(); + + use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader}; + let reader = SerializedFileReader::new(file).unwrap(); + let mut out: Vec = Vec::new(); + + let mut row_group = reader.get_row_group(0).unwrap(); + out.read_from_row_group(&mut *row_group, 1).unwrap(); + + assert_eq!(drs[0].bool, out[0].bool); + assert_eq!(drs[0].string, out[0].string); + assert_eq!(drs[0].byte_vec, out[0].byte_vec); + assert_eq!(drs[0].float, out[0].float); + assert!(drs[0].double.is_nan()); + assert!(out[0].double.is_nan()); + assert_eq!(drs[0].i16, out[0].i16); + assert_eq!(drs[0].i32, out[0].i32); + assert_eq!(drs[0].u64, out[0].u64); + assert_eq!(drs[0].isize, out[0].isize); } /// Returns file handle for a temp file in 'target' directory with a provided content