diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index 10560210e4e8..ac91e0a4c678 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -150,35 +150,47 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result { #[cfg(any(feature = "brotli", test))] return Ok(Some(Box::new(BrotliCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: brotli".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: brotli".into(), + )) + } CodecType::GZIP(level) => { #[cfg(any(feature = "flate2", test))] return Ok(Some(Box::new(GZipCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: flate2".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: flate2".into(), + )) + } CodecType::SNAPPY => { #[cfg(any(feature = "snap", test))] return Ok(Some(Box::new(SnappyCodec::new()))); - Err(ParquetError::General("Disabled feature at compile time: snap".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: snap".into(), + )) + } CodecType::LZ4 => { #[cfg(any(feature = "lz4", test))] return Ok(Some(Box::new(LZ4HadoopCodec::new( _options.backward_compatible_lz4, )))); - Err(ParquetError::General("Disabled feature at compile time: lz4".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: lz4".into(), + )) + } CodecType::ZSTD(level) => { #[cfg(any(feature = "zstd", test))] return Ok(Some(Box::new(ZSTDCodec::new(level)))); - Err(ParquetError::General("Disabled feature at compile time: zstd".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: zstd".into(), + )) + } CodecType::LZ4_RAW => { #[cfg(any(feature = "lz4", test))] return Ok(Some(Box::new(LZ4RawCodec::new()))); - Err(ParquetError::General("Disabled feature at compile time: lz4".into())) - }, + Err(ParquetError::General( + "Disabled feature at compile time: lz4".into(), + )) + } CodecType::UNCOMPRESSED => Ok(None), _ => Err(nyi_err!("The codec type {} is not supported yet", codec)), } @@ -190,6 +202,7 @@ mod snappy_codec { use crate::compression::Codec; use crate::errors::Result; + use crate::util::vec_util; /// Codec for Snappy compression format. pub struct SnappyCodec { @@ -219,7 +232,8 @@ mod snappy_codec { None => decompress_len(input_buf)?, }; let offset = output_buf.len(); - output_buf.resize(offset + len, 0); + vec_util::resize_buffer_without_init(output_buf, offset + len); + self.decoder .decompress(input_buf, &mut output_buf[offset..]) .map_err(|e| e.into()) @@ -228,7 +242,7 @@ mod snappy_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let output_buf_len = output_buf.len(); let required_len = max_compress_len(input_buf.len()); - output_buf.resize(output_buf_len + required_len, 0); + vec_util::resize_buffer_without_init(output_buf, output_buf_len + required_len); let n = self .encoder .compress(input_buf, &mut output_buf[output_buf_len..])?; @@ -237,6 +251,7 @@ mod snappy_codec { } } } + #[cfg(any(feature = "snap", test))] pub use snappy_codec::*; @@ -542,6 +557,7 @@ mod lz4_raw_codec { use crate::compression::Codec; use crate::errors::ParquetError; use crate::errors::Result; + use crate::util::vec_util; /// Codec for LZ4 Raw compression algorithm. pub struct LZ4RawCodec {} @@ -569,7 +585,7 @@ mod lz4_raw_codec { )) } }; - output_buf.resize(offset + required_len, 0); + vec_util::resize_buffer_without_init(output_buf, offset + required_len); match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { if n != required_len { @@ -586,7 +602,7 @@ mod lz4_raw_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { let offset = output_buf.len(); let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); - output_buf.resize(offset + required_len, 0); + vec_util::resize_buffer_without_init(output_buf, offset + required_len); match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) { Ok(n) => { output_buf.truncate(offset + n); @@ -606,6 +622,7 @@ mod lz4_hadoop_codec { use crate::compression::lz4_raw_codec::LZ4RawCodec; use crate::compression::Codec; use crate::errors::{ParquetError, Result}; + use crate::util::vec_util; use std::io; /// Size of u32 type. @@ -718,7 +735,7 @@ mod lz4_hadoop_codec { )) } }; - output_buf.resize(output_len + required_len, 0); + vec_util::resize_buffer_without_init(output_buf, output_len + required_len); match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) { Ok(n) => { if n != required_len { @@ -749,7 +766,7 @@ mod lz4_hadoop_codec { fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { // Allocate memory to store the LZ4_HADOOP prefix. let offset = output_buf.len(); - output_buf.resize(offset + PREFIX_LEN, 0); + vec_util::resize_buffer_without_init(output_buf, offset + PREFIX_LEN); // Append LZ4_RAW compressed bytes after prefix. LZ4RawCodec::new().compress(input_buf, output_buf)?; diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index dfa1285afcf2..5959ce03994c 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -21,6 +21,7 @@ mod bit_pack; pub(crate) mod interner; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; +pub mod vec_util; #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ diff --git a/parquet/src/util/vec_util.rs b/parquet/src/util/vec_util.rs new file mode 100644 index 000000000000..42d215b08a02 --- /dev/null +++ b/parquet/src/util/vec_util.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Resize the `buf` to a new len `n` without initialization. +/// +/// Replacing `resize` on the `buf` with `reserve` and `set_len` can skip the initialization +/// cost for a good performance. +/// And the `set_len` here is safe because the element of the vector is byte whose destruction +/// does nothing. +// TODO: remove this clippy allow lint if it is used by a module without feature gate. +#[allow(dead_code)] +pub fn resize_buffer_without_init(buf: &mut Vec, n: usize) { + if n > buf.capacity() { + buf.reserve(n - buf.len()); + } + unsafe { buf.set_len(n) }; +} + +#[cfg(test)] +mod tests { + use super::*; + + fn resize_and_check(source: Vec, new_len: usize) { + let mut new = source.clone(); + resize_buffer_without_init(&mut new, new_len); + + assert_eq!(new.len(), new_len); + if new.len() > source.len() { + assert_eq!(&new[..source.len()], &source[..]); + } else { + assert_eq!(&new[..], &source[..new.len()]); + } + } + + #[test] + fn test_resize_buffer_without_init() { + let cases = [ + (vec![1, 2, 3], 10), + (vec![1, 2, 3], 3), + (vec![1, 2, 3, 4, 5], 3), + (vec![], 10), + (vec![], 0), + ]; + + for (vector, resize_len) in cases { + resize_and_check(vector, resize_len); + } + } +}