Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,35 +150,47 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result<Option<
CodecType::BROTLI(level) => {
#[cfg(any(feature = "brotli", test))]
return Ok(Some(Box::new(BrotliCodec::new(level))));
Err(ParquetError::General("Disabled feature at compile time: brotli".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: brotli".into(),
))
}
CodecType::GZIP(level) => {
#[cfg(any(feature = "flate2", test))]
return Ok(Some(Box::new(GZipCodec::new(level))));
Err(ParquetError::General("Disabled feature at compile time: flate2".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: flate2".into(),
))
}
CodecType::SNAPPY => {
#[cfg(any(feature = "snap", test))]
return Ok(Some(Box::new(SnappyCodec::new())));
Err(ParquetError::General("Disabled feature at compile time: snap".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: snap".into(),
))
}
CodecType::LZ4 => {
#[cfg(any(feature = "lz4", test))]
return Ok(Some(Box::new(LZ4HadoopCodec::new(
_options.backward_compatible_lz4,
))));
Err(ParquetError::General("Disabled feature at compile time: lz4".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: lz4".into(),
))
}
CodecType::ZSTD(level) => {
#[cfg(any(feature = "zstd", test))]
return Ok(Some(Box::new(ZSTDCodec::new(level))));
Err(ParquetError::General("Disabled feature at compile time: zstd".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: zstd".into(),
))
}
CodecType::LZ4_RAW => {
#[cfg(any(feature = "lz4", test))]
return Ok(Some(Box::new(LZ4RawCodec::new())));
Err(ParquetError::General("Disabled feature at compile time: lz4".into()))
},
Err(ParquetError::General(
"Disabled feature at compile time: lz4".into(),
))
}
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
Expand All @@ -190,6 +202,7 @@ mod snappy_codec {

use crate::compression::Codec;
use crate::errors::Result;
use crate::util::vec_util;

/// Codec for Snappy compression format.
pub struct SnappyCodec {
Expand Down Expand Up @@ -219,7 +232,8 @@ mod snappy_codec {
None => decompress_len(input_buf)?,
};
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
vec_util::resize_buffer_without_init(output_buf, offset + len);

self.decoder
.decompress(input_buf, &mut output_buf[offset..])
.map_err(|e| e.into())
Expand All @@ -228,7 +242,7 @@ mod snappy_codec {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let output_buf_len = output_buf.len();
let required_len = max_compress_len(input_buf.len());
output_buf.resize(output_buf_len + required_len, 0);
vec_util::resize_buffer_without_init(output_buf, output_buf_len + required_len);
let n = self
.encoder
.compress(input_buf, &mut output_buf[output_buf_len..])?;
Expand All @@ -237,6 +251,7 @@ mod snappy_codec {
}
}
}

#[cfg(any(feature = "snap", test))]
pub use snappy_codec::*;

Expand Down Expand Up @@ -542,6 +557,7 @@ mod lz4_raw_codec {
use crate::compression::Codec;
use crate::errors::ParquetError;
use crate::errors::Result;
use crate::util::vec_util;

/// Codec for LZ4 Raw compression algorithm.
pub struct LZ4RawCodec {}
Expand Down Expand Up @@ -569,7 +585,7 @@ mod lz4_raw_codec {
))
}
};
output_buf.resize(offset + required_len, 0);
vec_util::resize_buffer_without_init(output_buf, offset + required_len);
match lz4_flex::block::decompress_into(input_buf, &mut output_buf[offset..]) {
Ok(n) => {
if n != required_len {
Expand All @@ -586,7 +602,7 @@ mod lz4_raw_codec {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let offset = output_buf.len();
let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len());
output_buf.resize(offset + required_len, 0);
vec_util::resize_buffer_without_init(output_buf, offset + required_len);
match lz4_flex::block::compress_into(input_buf, &mut output_buf[offset..]) {
Ok(n) => {
output_buf.truncate(offset + n);
Expand All @@ -606,6 +622,7 @@ mod lz4_hadoop_codec {
use crate::compression::lz4_raw_codec::LZ4RawCodec;
use crate::compression::Codec;
use crate::errors::{ParquetError, Result};
use crate::util::vec_util;
use std::io;

/// Size of u32 type.
Expand Down Expand Up @@ -718,7 +735,7 @@ mod lz4_hadoop_codec {
))
}
};
output_buf.resize(output_len + required_len, 0);
vec_util::resize_buffer_without_init(output_buf, output_len + required_len);
match try_decompress_hadoop(input_buf, &mut output_buf[output_len..]) {
Ok(n) => {
if n != required_len {
Expand Down Expand Up @@ -749,7 +766,7 @@ mod lz4_hadoop_codec {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
// Allocate memory to store the LZ4_HADOOP prefix.
let offset = output_buf.len();
output_buf.resize(offset + PREFIX_LEN, 0);
vec_util::resize_buffer_without_init(output_buf, offset + PREFIX_LEN);

// Append LZ4_RAW compressed bytes after prefix.
LZ4RawCodec::new().compress(input_buf, output_buf)?;
Expand Down
1 change: 1 addition & 0 deletions parquet/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod bit_pack;
pub(crate) mod interner;
#[cfg(any(test, feature = "test_common"))]
pub(crate) mod test_common;
pub mod vec_util;

#[cfg(any(test, feature = "test_common"))]
pub use self::test_common::page_util::{
Expand Down
63 changes: 63 additions & 0 deletions parquet/src/util/vec_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Resize the `buf` to a new len `n` without initialization.
///
/// Replacing `resize` on the `buf` with `reserve` and `set_len` can skip the initialization
/// cost for a good performance.
/// And the `set_len` here is safe because the element of the vector is byte whose destruction
/// does nothing.
// TODO: remove this clippy allow lint if it is used by a module without feature gate.
#[allow(dead_code)]
pub fn resize_buffer_without_init(buf: &mut Vec<u8>, n: usize) {
if n > buf.capacity() {
buf.reserve(n - buf.len());
}
unsafe { buf.set_len(n) };
}

#[cfg(test)]
mod tests {
use super::*;

fn resize_and_check(source: Vec<u8>, new_len: usize) {
let mut new = source.clone();
resize_buffer_without_init(&mut new, new_len);

assert_eq!(new.len(), new_len);
if new.len() > source.len() {
assert_eq!(&new[..source.len()], &source[..]);
} else {
assert_eq!(&new[..], &source[..new.len()]);
}
}

#[test]
fn test_resize_buffer_without_init() {
let cases = [
(vec![1, 2, 3], 10),
(vec![1, 2, 3], 3),
(vec![1, 2, 3, 4, 5], 3),
(vec![], 10),
(vec![], 0),
];

for (vector, resize_len) in cases {
resize_and_check(vector, resize_len);
}
}
}