From 83f31ee9509c2eedaf0b28650f10196ceaf370c2 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 21 Aug 2024 09:59:49 +0800 Subject: [PATCH 1/4] avoid resizing if capacity is enough when (de)compressing --- parquet/src/compression.rs | 93 ++++++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 18 deletions(-) diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 10560210e4e8..99e279173537 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -150,40 +150,63 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result { #[cfg(any(feature = "brotli", test))] return Ok(Some(Box::new(BrotliCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: brotli".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: brotli".into(), + )) + } CodecType::GZIP(level) => { #[cfg(any(feature = "flate2", test))] return Ok(Some(Box::new(GZipCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: flate2".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: flate2".into(), + )) + } CodecType::SNAPPY => { #[cfg(any(feature = "snap", test))] return Ok(Some(Box::new(SnappyCodec::new()))); - Err(ParquetError::General("Disabled feature at compile time: snap".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: snap".into(), + )) + } CodecType::LZ4 => { #[cfg(any(feature = "lz4", test))] return Ok(Some(Box::new(LZ4HadoopCodec::new( _options.backward_compatible_lz4, )))); - Err(ParquetError::General("Disabled feature at compile time: lz4".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: lz4".into(), + )) + } CodecType::ZSTD(level) => { #[cfg(any(feature = "zstd", test))] return Ok(Some(Box::new(ZSTDCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: zstd".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: zstd".into(), + )) + } CodecType::LZ4_RAW => { #[cfg(any(feature = "lz4", test))] return Ok(Some(Box::new(LZ4RawCodec::new()))); - Err(ParquetError::General("Disabled feature at compile time: lz4".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: lz4".into(), + )) + } CodecType::UNCOMPRESSED => Ok(None), _ => Err(nyi_err!("The codec type {} is not supported yet", codec)), } } +/// Resize the `buf` to a new len `n`. +/// +/// Replace resizing on the buffer with reserve and set_len for a good performance (no initialization). +/// And the `set_len` here is safe for the element of the vector is byte whose destruction does nothing. +fn resize_without_init(buf: &mut Vec, n: usize) { + if n > buf.capacity() { + buf.reserve(n - buf.len()); + } + unsafe { buf.set_len(n) }; +} + #[cfg(any(feature = "snap", test))] mod snappy_codec { use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; @@ -191,6 +214,8 @@ mod snappy_codec { use crate::compression::Codec; use crate::errors::Result; + use super::resize_without_init; + /// Codec for Snappy compression format. pub struct SnappyCodec { decoder: Decoder, @@ -219,7 +244,8 @@ mod snappy_codec { None => decompress_len(input_buf)?, }; let offset = output_buf.len(); - output_buf.resize(offset + len, 0); + resize_without_init(output_buf, offset + len); + self.decoder .decompress(input_buf, &mut output_buf[offset..]) .map_err(|e| e.into()) @@ -228,7 +254,7 @@ mod snappy_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let output_buf_len = output_buf.len(); let required_len = max_compress_len(input_buf.len()); - output_buf.resize(output_buf_len + required_len, 0); + resize_without_init(output_buf, required_len); let n = self .encoder .compress(input_buf, &mut output_buf[output_buf_len..])?; @@ -543,6 +569,8 @@ mod lz4_raw_codec { use crate::errors::ParquetError; use crate::errors::Result; + use super::resize_without_init; + /// Codec for LZ4 Raw compression algorithm. pub struct LZ4RawCodec {} @@ -569,7 +597,7 @@ mod lz4_raw_codec { )) } }; - output_buf.resize(offset + required_len, 0); + resize_without_init(output_buf, offset + required_len); match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { if n != required_len { @@ -586,7 +614,7 @@ mod lz4_raw_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let offset = output_buf.len(); let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); - output_buf.resize(offset + required_len, 0); + resize_without_init(output_buf, offset + required_len); match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { output_buf.truncate(offset + n); @@ -608,6 +636,8 @@ mod lz4_hadoop_codec { use crate::errors::{ParquetError, Result}; use std::io; + use super::resize_without_init; + /// Size of u32 type. const SIZE_U32: usize = std::mem::size_of::(); @@ -718,7 +748,7 @@ mod lz4_hadoop_codec { )) } }; - output_buf.resize(output_len + required_len, 0); + resize_without_init(output_buf, output_len + required_len); match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) { Ok(n) => { if n != required_len { @@ -749,7 +779,7 @@ mod lz4_hadoop_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { // Allocate memory to store the LZ4_HADOOP prefix. let offset = output_buf.len(); - output_buf.resize(offset + PREFIX_LEN, 0); + resize_without_init(output_buf, offset + PREFIX_LEN); // Append LZ4_RAW compressed bytes after prefix. LZ4RawCodec::new().compress(input_buf, output_buf)?; @@ -890,4 +920,31 @@ mod tests { fn test_codec_lz4_raw() { test_codec_with_size(CodecType::LZ4_RAW); } + + fn resize_and_check(source: Vec, new_len: usize) { + let mut new = source.clone(); + resize_without_init(&mut new, new_len); + + assert_eq!(new.len(), new_len); + if new.len() > source.len() { + assert_eq!(&new[..source.len()], &source[..]); + } else { + assert_eq!(&new[..], &source[..new.len()]); + } + } + + #[test] + fn test_resize_without_init() { + let cases = [ + (vec![1, 2, 3], 10), + (vec![1, 2, 3], 3), + (vec![1, 2, 3, 4, 5], 3), + (vec![], 10), + (vec![], 0), + ]; + + for (vector, resize_len) in cases { + resize_and_check(vector, resize_len); + } + } } From bffc5dfaa7a2f6f67fd09a841445476216ce1d1a Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 21 Aug 2024 10:29:10 +0800 Subject: [PATCH 2/4] polish up the comments --- parquet/src/compression.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 99e279173537..cdfc62d84a98 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -198,8 +198,10 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result, n: usize) { if n > buf.capacity() { buf.reserve(n - buf.len()); From d7ffb67cdf808a67b4a874ec4a4694f50d0229d4 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 21 Aug 2024 11:59:38 +0800 Subject: [PATCH 3/4] fix failed unit tests --- parquet/src/compression.rs | 62 ++++++------------------------------ parquet/src/util/mod.rs | 1 + parquet/src/util/vec_util.rs | 61 +++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 52 deletions(-) create mode 100644 parquet/src/util/vec_util.rs diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index cdfc62d84a98..70be73081cca 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -196,27 +196,13 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result, n: usize) { - if n > buf.capacity() { - buf.reserve(n - buf.len()); - } - unsafe { buf.set_len(n) }; -} - #[cfg(any(feature = "snap", test))] mod snappy_codec { use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; use crate::compression::Codec; use crate::errors::Result; - - use super::resize_without_init; + use crate::util::vec_util; /// Codec for Snappy compression format. pub struct SnappyCodec { @@ -246,7 +232,7 @@ mod snappy_codec { None => decompress_len(input_buf)?, }; let offset = output_buf.len(); - resize_without_init(output_buf, offset + len); + vec_util::resize_without_init(output_buf, offset + len); self.decoder .decompress(input_buf, &mut output_buf[offset..]) @@ -256,7 +242,7 @@ mod snappy_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let output_buf_len = output_buf.len(); let required_len = max_compress_len(input_buf.len()); - resize_without_init(output_buf, required_len); + vec_util::resize_without_init(output_buf, output_buf_len + required_len); let n = self .encoder .compress(input_buf, &mut output_buf[output_buf_len..])?; @@ -265,6 +251,7 @@ mod snappy_codec { } } } + #[cfg(any(feature = "snap", test))] pub use snappy_codec::*; @@ -570,8 +557,7 @@ mod lz4_raw_codec { use crate::compression::Codec; use crate::errors::ParquetError; use crate::errors::Result; - - use super::resize_without_init; + use crate::util::vec_util; /// Codec for LZ4 Raw compression algorithm. pub struct LZ4RawCodec {} @@ -599,7 +585,7 @@ mod lz4_raw_codec { )) } }; - resize_without_init(output_buf, offset + required_len); + vec_util::resize_without_init(output_buf, offset + required_len); match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { if n != required_len { @@ -616,7 +602,7 @@ mod lz4_raw_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let offset = output_buf.len(); let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); - resize_without_init(output_buf, offset + required_len); + vec_util::resize_without_init(output_buf, offset + required_len); match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { output_buf.truncate(offset + n); @@ -636,10 +622,9 @@ mod lz4_hadoop_codec { use crate::compression::lz4_raw_codec::LZ4RawCodec; use crate::compression::Codec; use crate::errors::{ParquetError, Result}; + use crate::util::vec_util; use std::io; - use super::resize_without_init; - /// Size of u32 type. const SIZE_U32: usize = std::mem::size_of::(); @@ -750,7 +735,7 @@ mod lz4_hadoop_codec { )) } }; - resize_without_init(output_buf, output_len + required_len); + vec_util::resize_without_init(output_buf, output_len + required_len); match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) { Ok(n) => { if n != required_len { @@ -781,7 +766,7 @@ mod lz4_hadoop_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { // Allocate memory to store the LZ4_HADOOP prefix. let offset = output_buf.len(); - resize_without_init(output_buf, offset + PREFIX_LEN); + vec_util::resize_without_init(output_buf, offset + PREFIX_LEN); // Append LZ4_RAW compressed bytes after prefix. LZ4RawCodec::new().compress(input_buf, output_buf)?; @@ -922,31 +907,4 @@ mod tests { fn test_codec_lz4_raw() { test_codec_with_size(CodecType::LZ4_RAW); } - - fn resize_and_check(source: Vec, new_len: usize) { - let mut new = source.clone(); - resize_without_init(&mut new, new_len); - - assert_eq!(new.len(), new_len); - if new.len() > source.len() { - assert_eq!(&new[..source.len()], &source[..]); - } else { - assert_eq!(&new[..], &source[..new.len()]); - } - } - - #[test] - fn test_resize_without_init() { - let cases = [ - (vec![1, 2, 3], 10), - (vec![1, 2, 3], 3), - (vec![1, 2, 3, 4, 5], 3), - (vec![], 10), - (vec![], 0), - ]; - - for (vector, resize_len) in cases { - resize_and_check(vector, resize_len); - } - } } diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index dfa1285afcf2..5959ce03994c 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -21,6 +21,7 @@ mod bit_pack; pub(crate) mod interner; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; +pub mod vec_util; #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ diff --git a/parquet/src/util/vec_util.rs b/parquet/src/util/vec_util.rs new file mode 100644 index 000000000000..a64ee6c704f5 --- /dev/null +++ b/parquet/src/util/vec_util.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Resize the `buf` to a new len `n`. +/// +/// Replacing `resize` on the `buf` with `reserve` and `set_len` can skip the initialization +/// cost for a good performance. +/// And the `set_len` here is safe because the element of the vector is byte whose destruction +/// does nothing. +pub fn resize_without_init(buf: &mut Vec, n: usize) { + if n > buf.capacity() { + buf.reserve(n - buf.len()); + } + unsafe { buf.set_len(n) }; +} + +#[cfg(test)] +mod tests { + use super::*; + + fn resize_and_check(source: Vec, new_len: usize) { + let mut new = source.clone(); + resize_without_init(&mut new, new_len); + + assert_eq!(new.len(), new_len); + if new.len() > source.len() { + assert_eq!(&new[..source.len()], &source[..]); + } else { + assert_eq!(&new[..], &source[..new.len()]); + } + } + + #[test] + fn test_resize_without_init() { + let cases = [ + (vec![1, 2, 3], 10), + (vec![1, 2, 3], 3), + (vec![1, 2, 3, 4, 5], 3), + (vec![], 10), + (vec![], 0), + ]; + + for (vector, resize_len) in cases { + resize_and_check(vector, resize_len); + } + } +} From adba5eb6ab1b0630528580c0c5a84bc59373dbea Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 21 Aug 2024 12:54:44 +0800 Subject: [PATCH 4/4] renaming and add clippy lint --- parquet/src/compression.rs | 12 ++++++------ parquet/src/util/vec_util.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 70be73081cca..ac91e0a4c678 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -232,7 +232,7 @@ mod snappy_codec { None => decompress_len(input_buf)?, }; let offset = output_buf.len(); - vec_util::resize_without_init(output_buf, offset + len); + vec_util::resize_buffer_without_init(output_buf, offset + len); self.decoder .decompress(input_buf, &mut output_buf[offset..]) @@ -242,7 +242,7 @@ mod snappy_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let output_buf_len = output_buf.len(); let required_len = max_compress_len(input_buf.len()); - vec_util::resize_without_init(output_buf, output_buf_len + required_len); + vec_util::resize_buffer_without_init(output_buf, output_buf_len + required_len); let n = self .encoder .compress(input_buf, &mut output_buf[output_buf_len..])?; @@ -585,7 +585,7 @@ mod lz4_raw_codec { )) } }; - vec_util::resize_without_init(output_buf, offset + required_len); + vec_util::resize_buffer_without_init(output_buf, offset + required_len); match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { if n != required_len { @@ -602,7 +602,7 @@ mod lz4_raw_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let offset = output_buf.len(); let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); - vec_util::resize_without_init(output_buf, offset + required_len); + vec_util::resize_buffer_without_init(output_buf, offset + required_len); match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { output_buf.truncate(offset + n); @@ -735,7 +735,7 @@ mod lz4_hadoop_codec { )) } }; - vec_util::resize_without_init(output_buf, output_len + required_len); + vec_util::resize_buffer_without_init(output_buf, output_len + required_len); match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) { Ok(n) => { if n != required_len { @@ -766,7 +766,7 @@ mod lz4_hadoop_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { // Allocate memory to store the LZ4_HADOOP prefix. let offset = output_buf.len(); - vec_util::resize_without_init(output_buf, offset + PREFIX_LEN); + vec_util::resize_buffer_without_init(output_buf, offset + PREFIX_LEN); // Append LZ4_RAW compressed bytes after prefix. LZ4RawCodec::new().compress(input_buf, output_buf)?; diff --git a/parquet/src/util/vec_util.rs b/parquet/src/util/vec_util.rs index a64ee6c704f5..42d215b08a02 100644 --- a/parquet/src/util/vec_util.rs +++ b/parquet/src/util/vec_util.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -/// Resize the `buf` to a new len `n`. +/// Resize the `buf` to a new len `n` without initialization. /// /// Replacing `resize` on the `buf` with `reserve` and `set_len` can skip the initialization /// cost for a good performance. /// And the `set_len` here is safe because the element of the vector is byte whose destruction /// does nothing. -pub fn resize_without_init(buf: &mut Vec, n: usize) { +// TODO: remove this clippy allow lint if it is used by a module without feature gate. +#[allow(dead_code)] +pub fn resize_buffer_without_init(buf: &mut Vec, n: usize) { if n > buf.capacity() { buf.reserve(n - buf.len()); } @@ -34,7 +36,7 @@ mod tests { fn resize_and_check(source: Vec, new_len: usize) { let mut new = source.clone(); - resize_without_init(&mut new, new_len); + resize_buffer_without_init(&mut new, new_len); assert_eq!(new.len(), new_len); if new.len() > source.len() { @@ -45,7 +47,7 @@ mod tests { } #[test] - fn test_resize_without_init() { + fn test_resize_buffer_without_init() { let cases = [ (vec![1, 2, 3], 10), (vec![1, 2, 3], 3),