Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
PrimitiveArray::new(values, Some(nulls))
}

/// Applies a unary infallible function to each value in an array, producing a
/// new primitive array.
///
/// # Null Handling
///
/// See [`Self::unary`] for more information on null handling.
///
/// # Example: create an [`Int16Array`] from an [`ArrayAccessor`] with item type `&[u8]`
/// ```
/// use arrow_array::{Array, FixedSizeBinaryArray, Int16Array};
/// let input_arg = vec![ vec![1, 0], vec![2, 0], vec![3, 0] ];
/// let arr = FixedSizeBinaryArray::try_from_iter(input_arg.into_iter()).unwrap();
/// let c = Int16Array::from_unary(&arr, |x| i16::from_le_bytes(x[..2].try_into().unwrap()));
/// assert_eq!(c, Int16Array::from(vec![Some(1i16), Some(2i16), Some(3i16)]));
/// ```
pub fn from_unary<U: ArrayAccessor, F>(left: U, mut op: F) -> Self
where
F: FnMut(U::Item) -> T::Native,
{
let nulls = left.logical_nulls();
let buffer = unsafe {
// SAFETY: i in range 0..left.len()
let iter = (0..left.len()).map(|i| op(left.value_unchecked(i)));
// SAFETY: upper bound is trusted because `iter` is over a range
Buffer::from_trusted_len_iter(iter)
};

PrimitiveArray::new(buffer.into(), nulls)
}

/// Returns a `PrimitiveBuilder` for this array, suitable for mutating values
/// in place.
///
Expand Down
29 changes: 17 additions & 12 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,31 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
self.record_reader.reset();

let array: ArrayRef = match self.data_type {
// Apply conversion to all elements regardless of null slots as the conversions
// are infallible. This improves performance by avoiding a branch in the inner
// loop (see docs for `PrimitiveArray::from_unary`).
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal128Array::from_unary(binary, |x| match x.len() {
0 => i128::default(),
_ => i128::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
ArrowType::Decimal256(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i256::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal256Array>()
.with_precision_and_scale(p, s)?;

// Null slots will have 0 length, so we need to check for that in the lambda
// or sign_extend_be will panic.
let decimal = Decimal256Array::from_unary(binary, |x| match x.len() {
0 => i256::default(),
_ => i256::from_be_bytes(sign_extend_be(x)),
})
.with_precision_and_scale(p, s)?;
Arc::new(decimal)
}
_ => buffer.into_array(null_buffer, self.data_type.clone()),
Expand Down
64 changes: 20 additions & 44 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -163,69 +163,45 @@ impl ArrayReader for FixedLenByteArrayReader {
let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });

// TODO: An improvement might be to do this conversion on read
// Note the conversions below apply to all elements regardless of null slots as the
// conversion lambdas are all infallible. This improves performance by avoiding a branch in
// the inner loop (see docs for `PrimitiveArray::from_unary`).
let array: ArrayRef = match &self.data_type {
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `binary` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
// The same applies to the transformations below.
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i128::from_be_bytes(sign_extend_be(b)),
None => i128::default(),
});
let decimal = Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Decimal256(p, s) => {
let nulls = binary.nulls().cloned();
let decimal = binary.iter().map(|o| match o {
Some(b) => i256::from_be_bytes(sign_extend_be(b)),
None => i256::default(),
});
let decimal = Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
.with_precision_and_scale(*p, *s)?;
Arc::new(decimal)
let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
as ArrayRef
}
ArrowType::Interval(unit) => {
let nulls = binary.nulls().cloned();
// An interval is stored as 3x 32-bit unsigned integers storing months, days,
// and milliseconds
match unit {
IntervalUnit::YearMonth => {
let iter = binary.iter().map(|o| match o {
Some(b) => i32::from_le_bytes(b[0..4].try_into().unwrap()),
None => i32::default(),
});
let interval =
IntervalYearMonthArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::DayTime => {
let iter = binary.iter().map(|o| match o {
Some(b) => IntervalDayTime::new(
let f = |b: &[u8]| {
IntervalDayTime::new(
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()),
),
None => IntervalDayTime::default(),
});
let interval =
IntervalDayTimeArray::from_iter_values_with_nulls(iter, nulls);
Arc::new(interval) as ArrayRef
)
};
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
}
}
}
ArrowType::Float16 => {
let nulls = binary.nulls().cloned();
let f16s = binary.iter().map(|o| match o {
Some(b) => f16::from_le_bytes(b[..2].try_into().unwrap()),
None => f16::default(),
});
let f16s = Float16Array::from_iter_values_with_nulls(f16s, nulls);
Arc::new(f16s) as ArrayRef
let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
}
_ => Arc::new(binary) as ArrayRef,
};
Expand Down Expand Up @@ -488,8 +464,8 @@ mod tests {
use crate::arrow::ArrowWriter;
use arrow::datatypes::Field;
use arrow::error::Result as ArrowResult;
use arrow_array::RecordBatch;
use arrow_array::{Array, ListArray};
use arrow_array::{Decimal256Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

Expand Down
84 changes: 28 additions & 56 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,22 @@ where
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
// the inner loop (see docs for `PrimitiveArray::unary`).
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i as i128,
None => i128::default(),
});
Decimal128Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i as i128)
as Decimal128Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand All @@ -258,35 +245,20 @@ where
Arc::new(array) as ArrayRef
}
ArrowType::Decimal256(p, s) => {
// We can simply reuse the null buffer from `array` rather than recomputing it
// (as was the case when we simply used `collect` to produce the new array).
let nulls = array.nulls().cloned();
// See above comment. Conversion to `i256` is likewise infallible.
let array = match array.data_type() {
ArrowType::Int32 => {
let decimal = array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}

ArrowType::Int64 => {
let decimal = array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.iter()
.map(|v| match v {
Some(i) => i256::from_i128(i as i128),
None => i256::default(),
});
Decimal256Array::from_iter_values_with_nulls(decimal, nulls)
}
ArrowType::Int32 => array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
ArrowType::Int64 => array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.unary(|i| i256::from_i128(i as i128))
as Decimal256Array,
_ => {
return Err(arrow_err!(
"Cannot convert {:?} to decimal",
Expand Down