From b767af53b72cc731b09ffb8a55fad5d4ba049e31 Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sun, 18 Aug 2024 00:28:23 +0800 Subject: [PATCH 1/5] support reading pruned parquet --- parquet_derive/src/lib.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 16b6a6699e2d..294116a56de2 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -189,7 +189,6 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect(); let reader_snippets: Vec = field_infos.iter().map(|x| x.reader_snippet()).collect(); - let i: Vec<_> = (0..reader_snippets.len()).collect(); let derived_for = input.ident; let generics = input.generics; @@ -206,6 +205,13 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let mut row_group_reader = row_group_reader; + // build map to index + let mut name_to_index = std::collections::HashMap::new(); + for (idx, col) in row_group_reader.metadata().schema_descr().columns().iter().enumerate() { + // println!("col {} name {:?}", idx, col.name()); + name_to_index.insert(col.name().to_string(), idx); + } + for _ in 0..num_records { self.push(#derived_for { #( @@ -218,7 +224,9 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke #( { - if let Ok(mut column_reader) = row_group_reader.get_column_reader(#i) { + let idx = name_to_index.get(stringify!(#field_names)).unwrap_or_else( + || panic!("column name '{}' is not found in parquet file!", stringify!(#field_names))); + if let Ok(mut column_reader) = row_group_reader.get_column_reader(idx.clone()) { #reader_snippets } else { return Err(::parquet::errors::ParquetError::General("Failed to get next column".into())) From f9bb8cf6d021442cd0adbd59133df262a08f6699 Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sun, 18 Aug 2024 00:28:49 +0800 Subject: [PATCH 2/5] add pruned parquet reading test --- parquet_derive_test/src/lib.rs | 79 +++++++++++++++++++++++++++++----- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index e7c7896cb7f3..04c4c8fe9ee7 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -70,12 +70,12 @@ struct APartiallyCompleteRecord { // If these fields are guaranteed to be valid // we can load this struct into APartiallyCompleteRecord #[derive(PartialEq, ParquetRecordWriter, Debug)] -struct APartiallyOptionalRecord { +struct AnOptionalRecord { pub bool: bool, pub string: String, - pub maybe_i16: Option, - pub maybe_i32: Option, - pub maybe_u64: Option, + pub i16: Option, + pub i32: Option, + pub u64: Option, pub isize: isize, pub float: f32, pub double: f64, @@ -85,6 +85,22 @@ struct APartiallyOptionalRecord { pub byte_vec: Vec, } +// This struct removes several fields from the "APartiallyCompleteRecord", +// and it sorts the field in another order. +// we can load this struct into APartiallyCompleteRecord +#[derive(PartialEq, ParquetRecordReader, Debug)] +struct APrunedRecord { + pub bool: bool, + pub string: String, + pub byte_vec: Vec, + pub float: f32, + pub double: f64, + pub i16: i16, + pub i32: i32, + pub u64: u64, + pub isize: isize, +} + #[cfg(test)] mod tests { use super::*; @@ -240,12 +256,12 @@ mod tests { #[test] fn test_parquet_derive_read_optional_but_valid_column() { let file = get_temp_file("test_parquet_derive_read_optional", &[]); - let drs: Vec = vec![APartiallyOptionalRecord { + let drs = vec![AnOptionalRecord { bool: true, string: "a string".into(), - maybe_i16: Some(-45), - maybe_i32: Some(456), - maybe_u64: Some(4563424), + i16: Some(-45), + i32: Some(456), + u64: Some(4563424), isize: -365, float: 3.5, double: f64::NAN, @@ -273,9 +289,50 @@ mod tests { let mut row_group = reader.get_row_group(0).unwrap(); out.read_from_row_group(&mut *row_group, 1).unwrap(); - assert_eq!(drs[0].maybe_i16.unwrap(), out[0].i16); - assert_eq!(drs[0].maybe_i32.unwrap(), out[0].i32); - assert_eq!(drs[0].maybe_u64.unwrap(), out[0].u64); + assert_eq!(drs[0].i16.unwrap(), out[0].i16); + assert_eq!(drs[0].i32.unwrap(), out[0].i32); + assert_eq!(drs[0].u64.unwrap(), out[0].u64); + } + + #[test] + fn test_parquet_derive_read_pruned_and_reordered_columns() { + let file = get_temp_file("test_parquet_derive_read_pruned", &[]); + let drs = vec![APartiallyCompleteRecord { + bool: true, + string: "a string".into(), + i16: -45, + i32: 456, + u64: 4563424, + isize: -365, + float: 3.5, + double: f64::NAN, + now: chrono::Utc::now().naive_local(), + date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(), + uuid: uuid::Uuid::new_v4(), + byte_vec: vec![0x65, 0x66, 0x67], + }]; + + let generated_schema = drs.as_slice().schema().unwrap(); + + let props = Default::default(); + let mut writer = + SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap(); + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + row_group.close().unwrap(); + writer.close().unwrap(); + + use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader}; + let reader = SerializedFileReader::new(file).unwrap(); + let mut out: Vec = Vec::new(); + + let mut row_group = reader.get_row_group(0).unwrap(); + out.read_from_row_group(&mut *row_group, 1).unwrap(); + + assert_eq!(drs[0].i16, out[0].i16); + assert_eq!(drs[0].i32, out[0].i32); + assert_eq!(drs[0].u64, out[0].u64); } /// Returns file handle for a temp file in 'target' directory with a provided content From 3590f4ed51cf6760abbb20f6567d2f98d73db8a9 Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sun, 18 Aug 2024 00:36:54 +0800 Subject: [PATCH 3/5] better unit test --- parquet_derive_test/src/lib.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 04c4c8fe9ee7..ee23d6ce3983 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -86,8 +86,8 @@ struct AnOptionalRecord { } // This struct removes several fields from the "APartiallyCompleteRecord", -// and it sorts the field in another order. -// we can load this struct into APartiallyCompleteRecord +// and it shuffles the fields. +// we should still be able to load it from APartiallyCompleteRecord #[derive(PartialEq, ParquetRecordReader, Debug)] struct APrunedRecord { pub bool: bool, @@ -295,7 +295,7 @@ mod tests { } #[test] - fn test_parquet_derive_read_pruned_and_reordered_columns() { + fn test_parquet_derive_read_pruned_and_shuffled_columns() { let file = get_temp_file("test_parquet_derive_read_pruned", &[]); let drs = vec![APartiallyCompleteRecord { bool: true, @@ -330,9 +330,16 @@ mod tests { let mut row_group = reader.get_row_group(0).unwrap(); out.read_from_row_group(&mut *row_group, 1).unwrap(); + assert_eq!(drs[0].bool, out[0].bool); + assert_eq!(drs[0].string, out[0].string); + assert_eq!(drs[0].byte_vec, out[0].byte_vec); + assert_eq!(drs[0].float, out[0].float); + assert!(drs[0].double.is_nan()); + assert!(out[0].double.is_nan()); assert_eq!(drs[0].i16, out[0].i16); assert_eq!(drs[0].i32, out[0].i32); assert_eq!(drs[0].u64, out[0].u64); + assert_eq!(drs[0].isize, out[0].isize); } /// Returns file handle for a temp file in 'target' directory with a provided content From 8337c875e6c9b8f24b3131b67511eadf0dc8d4e5 Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sun, 18 Aug 2024 22:46:06 +0800 Subject: [PATCH 4/5] update comments --- parquet_derive/src/lib.rs | 3 +-- parquet_derive_test/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index 294116a56de2..afbf1c3a7b5e 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -205,10 +205,9 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke let mut row_group_reader = row_group_reader; - // build map to index + // key: parquet file column name, value: column index let mut name_to_index = std::collections::HashMap::new(); for (idx, col) in row_group_reader.metadata().schema_descr().columns().iter().enumerate() { - // println!("col {} name {:?}", idx, col.name()); name_to_index.insert(col.name().to_string(), idx); } diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index ee23d6ce3983..42c4671f4143 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -87,7 +87,7 @@ struct AnOptionalRecord { // This struct removes several fields from the "APartiallyCompleteRecord", // and it shuffles the fields. -// we should still be able to load it from APartiallyCompleteRecord +// we should still be able to load it from APartiallyCompleteRecord parquet file #[derive(PartialEq, ParquetRecordReader, Debug)] struct APrunedRecord { pub bool: bool, From 5b3f21a9cc54cde3733419bcd91dd895d8c95dfa Mon Sep 17 00:00:00 2001 From: Ye Yuan Date: Sun, 18 Aug 2024 22:51:11 +0800 Subject: [PATCH 5/5] deref instead of clone --- parquet_derive/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index afbf1c3a7b5e..48629d4f552a 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -225,7 +225,7 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke { let idx = name_to_index.get(stringify!(#field_names)).unwrap_or_else( || panic!("column name '{}' is not found in parquet file!", stringify!(#field_names))); - if let Ok(mut column_reader) = row_group_reader.get_column_reader(idx.clone()) { + if let Ok(mut column_reader) = row_group_reader.get_column_reader(*idx) { #reader_snippets } else { return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))