diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 517e84f1d7a6..40f5731a7a4d 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -141,6 +141,9 @@ impl RunArray { /// /// [`values`]: Self::values pub fn values_slice(&self) -> ArrayRef { + if self.is_empty() { + return self.values.slice(0, 0); + } let start = self.get_start_physical_index(); let end = self.get_end_physical_index(); self.values.slice(start, end - start + 1) @@ -655,6 +658,7 @@ mod tests { use super::*; use crate::builder::PrimitiveRunBuilder; use crate::cast::AsArray; + use crate::new_empty_array; use crate::types::{Int8Type, UInt32Type}; use crate::{Int16Array, Int32Array, StringArray}; @@ -752,6 +756,26 @@ mod tests { assert_eq!(run_ends.values(), &run_ends_values); } + #[test] + fn test_run_array_empty() { + let runs = new_empty_array(&DataType::Int16); + let runs = runs.as_primitive::(); + let values = new_empty_array(&DataType::Int64); + let array = RunArray::try_new(runs, &values).unwrap(); + + fn assertions(array: &RunArray) { + assert!(array.is_empty()); + assert_eq!(array.get_start_physical_index(), 0); + assert_eq!(array.get_end_physical_index(), 0); + assert!(array.get_physical_indices::(&[]).unwrap().is_empty()); + assert!(array.run_ends().is_empty()); + assert_eq!(array.run_ends().sliced_values().count(), 0); + } + + assertions(&array); + assertions(&array.slice(0, 0)); + } + #[test] fn test_run_array_fmt_debug() { let mut builder = PrimitiveRunBuilder::::with_capacity(3); @@ -1186,4 +1210,91 @@ mod tests { let values_slice2 = values_slice2.as_primitive::(); assert_eq!(values_slice2.values(), &[1]); } + + #[test] + fn test_run_array_values_slice_empty() { + let run_ends = Int32Array::from(vec![2, 5, 10]); + let values = StringArray::from(vec!["a", "b", "c"]); + let array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let slice = array.slice(0, 0); + assert_eq!(slice.len(), 0); + + let values_slice = slice.values_slice(); + assert_eq!(values_slice.len(), 0); + assert_eq!(values_slice.data_type(), &DataType::Utf8); + } + + #[test] + fn test_run_array_eq_empty() { + let run_ends = Int32Array::from(vec![2, 5, 10]); + let values = StringArray::from(vec!["a", "b", "c"]); + let array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let slice1 = array.slice(0, 0); + let slice2 = array.slice(1, 0); + let slice3 = array.slice(10, 0); + + assert_eq!(slice1, slice2); + assert_eq!(slice2, slice3); + + let empty_array = new_empty_array(array.data_type()); + let empty_array = crate::cast::as_run_array::(empty_array.as_ref()); + + assert_eq!(&slice1, empty_array); + } + + #[test] + fn test_run_array_eq_diff_physical_same_logical() { + let run_ends1 = Int32Array::from(vec![1, 3, 6]); + let values1 = StringArray::from(vec!["a", "b", "c"]); + let array1 = RunArray::::try_new(&run_ends1, &values1).unwrap(); + + let run_ends2 = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let values2 = StringArray::from(vec!["a", "b", "b", "c", "c", "c"]); + let array2 = RunArray::::try_new(&run_ends2, &values2).unwrap(); + + assert_eq!(array1, array2); + } + + #[test] + fn test_run_array_eq_sliced() { + let run_ends1 = Int32Array::from(vec![2, 5, 10]); + let values1 = StringArray::from(vec!["a", "b", "c"]); + let array1 = RunArray::::try_new(&run_ends1, &values1).unwrap(); + // Logical: a, a, b, b, b, c, c, c, c, c + + let slice1 = array1.slice(1, 6); + // Logical: a, b, b, b, c, c + + let run_ends2 = Int32Array::from(vec![1, 4, 6]); + let values2 = StringArray::from(vec!["a", "b", "c"]); + let array2 = RunArray::::try_new(&run_ends2, &values2).unwrap(); + // Logical: a, b, b, b, c, c + + assert_eq!(slice1, array2); + + let slice2 = array1.slice(2, 3); + // Logical: b, b, b + let run_ends3 = Int32Array::from(vec![3]); + let values3 = StringArray::from(vec!["b"]); + let array3 = RunArray::::try_new(&run_ends3, &values3).unwrap(); + assert_eq!(slice2, array3); + } + + #[test] + fn test_run_array_eq_sliced_different_offsets() { + let run_ends1 = Int32Array::from(vec![2, 5, 10]); + let values1 = StringArray::from(vec!["a", "b", "c"]); + let array1 = RunArray::::try_new(&run_ends1, &values1).unwrap(); + let array2 = array1.clone(); + assert_eq!(array1, array2); + + let slice1 = array1.slice(1, 4); // a, b, b, b + let slice2 = array1.slice(1, 4); + assert_eq!(slice1, slice2); + + let slice3 = array1.slice(0, 4); // a, a, b, b + assert_ne!(slice1, slice3); + } } diff --git a/arrow-buffer/src/buffer/run.rs b/arrow-buffer/src/buffer/run.rs index 6603dec1bac1..0f4d9234e4cf 100644 --- a/arrow-buffer/src/buffer/run.rs +++ b/arrow-buffer/src/buffer/run.rs @@ -199,9 +199,16 @@ where pub fn sliced_values(&self) -> impl Iterator + '_ { let offset = self.logical_offset; let len = self.logical_length; - let start = self.get_start_physical_index(); - let end = self.get_end_physical_index(); - self.run_ends[start..=end].iter().map(move |&val| { + // Doing this roundabout way since the iterator type we return must be + // the same (i.e. cannot use std::iter::empty()) + let physical_slice = if self.is_empty() { + &self.run_ends[0..0] + } else { + let start = self.get_start_physical_index(); + let end = self.get_end_physical_index(); + &self.run_ends[start..=end] + }; + physical_slice.iter().map(move |&val| { let val = val.as_usize().saturating_sub(offset).min(len); E::from_usize(val).unwrap() }) diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index fb77993a3028..ec5009e0e50d 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -12451,4 +12451,32 @@ mod tests { assert_eq!(casted.as_ref(), &expected); } } + + #[test] + fn test_cast_between_sliced_run_end_encoded() { + let run_ends = Int16Array::from(vec![2, 5, 8]); + let values = StringArray::from(vec!["a", "b", "c"]); + + let ree_array = RunArray::::try_new(&run_ends, &values).unwrap(); + let ree_array = ree_array.slice(1, 2); + let array_ref = Arc::new(ree_array) as ArrayRef; + + let target_type = DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", DataType::Int64, false)), + Arc::new(Field::new("values", DataType::Utf8, true)), + ); + let cast_options = CastOptions { + safe: false, + format_options: FormatOptions::default(), + }; + + let result = cast_with_options(&array_ref, &target_type, &cast_options).unwrap(); + let run_array = result.as_run::(); + let run_array = run_array.downcast::().unwrap(); + + let expected = vec!["a", "b"]; + let actual = run_array.into_iter().flatten().collect::>(); + + assert_eq!(expected, actual); + } } diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index 3e14804dc824..9878f0977282 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -32,17 +32,18 @@ pub(crate) fn run_end_encoded_cast( .downcast_ref::>() .ok_or_else(|| ArrowError::CastError("Expected RunArray".to_string()))?; - let values = run_array.values(); - match to_type { // Stay as RunEndEncoded, cast only the values DataType::RunEndEncoded(target_index_field, target_value_field) => { - let cast_values = - cast_with_options(values, target_value_field.data_type(), cast_options)?; + let values = run_array.values_slice(); + let cast_values = cast_with_options( + values.as_ref(), + target_value_field.data_type(), + cast_options, + )?; - let run_ends_array = PrimitiveArray::::from_iter_values( - run_array.run_ends().values().iter().copied(), - ); + let run_ends_array = + PrimitiveArray::::from_iter_values(run_array.run_ends().sliced_values()); let cast_run_ends = cast_with_options( &run_ends_array, target_index_field.data_type(), @@ -72,6 +73,7 @@ pub(crate) fn run_end_encoded_cast( // Expand to logical form _ => { + let values = run_array.values(); let len = run_array.len(); let offset = run_array.offset(); let run_ends = run_array.run_ends().values(); diff --git a/arrow-data/src/equal/run.rs b/arrow-data/src/equal/run.rs index 6c9393ecd8d3..0032b56d1aca 100644 --- a/arrow-data/src/equal/run.rs +++ b/arrow-data/src/equal/run.rs @@ -16,13 +16,17 @@ // under the License. use crate::data::ArrayData; +use arrow_buffer::ArrowNativeType; +use arrow_buffer::RunEndBuffer; +use arrow_schema::DataType; +use num_traits::ToPrimitive; use super::equal_range; -/// The current implementation of comparison of run array support physical comparison. -/// Comparing run encoded array based on logical indices (`lhs_start`, `rhs_start`) will -/// be time consuming as converting from logical index to physical index cannot be done -/// in constant time. The current comparison compares the underlying physical arrays. +/// Returns true if the two `RunEndEncoded` arrays are equal. +/// +/// This provides a specialized implementation of equality for REE arrays that +/// handles differences in run-encoding by iterating through the logical range. pub(super) fn run_equal( lhs: &ArrayData, rhs: &ArrayData, @@ -30,57 +34,129 @@ pub(super) fn run_equal( rhs_start: usize, len: usize, ) -> bool { - if lhs_start != 0 - || rhs_start != 0 - || (lhs.len() != len && rhs.len() != len) - || lhs.offset() > 0 - || rhs.offset() > 0 - { - unimplemented!("Logical comparison for run array not supported.") + let lhs_index_type = match lhs.data_type() { + DataType::RunEndEncoded(f, _) => f.data_type(), + _ => unreachable!(), + }; + + match lhs_index_type { + DataType::Int16 => run_equal_inner::(lhs, rhs, lhs_start, rhs_start, len), + DataType::Int32 => run_equal_inner::(lhs, rhs, lhs_start, rhs_start, len), + DataType::Int64 => run_equal_inner::(lhs, rhs, lhs_start, rhs_start, len), + _ => unreachable!(), } +} + +struct RunArrayData<'a, T: ArrowNativeType> { + run_ends: RunEndBuffer, + values: &'a ArrayData, +} + +impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> { + fn new(data: &'a ArrayData, start: usize, len: usize) -> Self { + debug_assert!( + data.child_data().len() == 2, + "RunEndEncoded arrays are guaranteed to have 2 children [run_ends, values]" + ); + let run_ends_data = &data.child_data()[0]; + let raw_run_ends_buffer = &run_ends_data.buffers()[0]; + // SAFETY: we're reconstructing RunEndBuffer from a known valid RunArray + let run_ends = unsafe { + RunEndBuffer::::new_unchecked( + raw_run_ends_buffer.clone().into(), + run_ends_data.offset() + data.offset() + start, + len, + ) + }; - if lhs.len() != rhs.len() { - return false; + let values = &data.child_data()[1]; + Self { run_ends, values } } - let lhs_child_data = lhs.child_data(); - let lhs_run_ends_array = &lhs_child_data[0]; - let lhs_values_array = &lhs_child_data[1]; + fn run_end(&self, index: usize) -> usize { + self.run_ends.values()[index].as_usize() + } - let rhs_child_data = rhs.child_data(); - let rhs_run_ends_array = &rhs_child_data[0]; - let rhs_values_array = &rhs_child_data[1]; + fn get_start_end_physical_indices(&self) -> (usize, usize) { + let start = self.run_ends.get_start_physical_index(); + let end = self.run_ends.get_end_physical_index(); + (start, end) + } +} - if lhs_run_ends_array.len() != rhs_run_ends_array.len() { - return false; +fn run_equal_inner( + lhs: &ArrayData, + rhs: &ArrayData, + lhs_start: usize, + rhs_start: usize, + len: usize, +) -> bool { + if len == 0 { + return true; } - if lhs_values_array.len() != rhs_values_array.len() { - return false; + let l_array = RunArrayData::::new(lhs, lhs_start, len); + let r_array = RunArrayData::::new(rhs, rhs_start, len); + + let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices(); + let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices(); + let l_runs = l_end_phys - l_start_phys + 1; + let r_runs = r_end_phys - r_start_phys + 1; + + if l_runs == r_runs { + // When the boundaries align perfectly, we don't need the complex stepping loop that calculates overlaps. + // Instead, we can simply treat the underlying values arrays as if they were standard primitive arrays. + let l_iter = l_array.run_ends.sliced_values(); + let r_iter = r_array.run_ends.sliced_values(); + let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| l_re == r_re); + + if physical_match { + // Both arrays are partitioned identically. + // We can just verify if the physical values in those partitions match. + return equal_range( + l_array.values, + r_array.values, + l_start_phys, + r_start_phys, + l_runs, + ); + } } - // check run ends array are equal. The length of the physical array - // is used to validate the child arrays. - let run_ends_equal = equal_range( - lhs_run_ends_array, - rhs_run_ends_array, - lhs_start, - rhs_start, - lhs_run_ends_array.len(), - ); - - // if run ends array are not the same return early without validating - // values array. - if !run_ends_equal { - return false; + let mut l_phys = l_start_phys; + let mut r_phys = r_start_phys; + let mut processed = 0; + while processed < len { + if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) { + return false; + } + + let l_run_end = l_array.run_end(l_phys); + let r_run_end = r_array.run_end(r_phys); + + //Calculate how many more logical elements are in the current run of the left and right array + let l_remaining_in_run = l_run_end - (l_array.run_ends.offset() + processed); + let r_remaining_in_run = r_run_end - (r_array.run_ends.offset() + processed); + + //Calculate how many elements are left to compare in the requested range + let remaining_in_range = len - processed; + + //Find the smallest of these three to determine our step size + //The goal is to move the logical cursor (processed) forward as far as possible without: + //Crossing the boundary of a run in the left or right array (where the value might change). + //Going past the total length we were asked to compare. + let step = l_remaining_in_run + .min(r_remaining_in_run) + .min(remaining_in_range); + processed += step; + + if l_array.run_ends.offset() + processed == l_run_end { + l_phys += 1; + } + if r_array.run_ends.offset() + processed == r_run_end { + r_phys += 1; + } } - // check values array are equal - equal_range( - lhs_values_array, - rhs_values_array, - lhs_start, - rhs_start, - rhs_values_array.len(), - ) + true } diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index fdad413e0e3b..d535d90cef8e 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -657,14 +657,14 @@ impl Codec { Codec::RunEndEncoded(converter) => { let values = match array.data_type() { DataType::RunEndEncoded(r, _) => match r.data_type() { - DataType::Int16 => array.as_run::().values(), - DataType::Int32 => array.as_run::().values(), - DataType::Int64 => array.as_run::().values(), + DataType::Int16 => array.as_run::().values_slice(), + DataType::Int32 => array.as_run::().values_slice(), + DataType::Int64 => array.as_run::().values_slice(), _ => unreachable!("Unsupported run end index type: {r:?}"), }, _ => unreachable!(), }; - let rows = converter.convert_columns(std::slice::from_ref(values))?; + let rows = converter.convert_columns(std::slice::from_ref(&values))?; Ok(Encoder::RunEndEncoded(rows)) } Codec::Union(converters, _) => { diff --git a/arrow-row/src/run.rs b/arrow-row/src/run.rs index e12fa87dce4b..f775abb6353d 100644 --- a/arrow-row/src/run.rs +++ b/arrow-row/src/run.rs @@ -27,11 +27,11 @@ pub fn compute_lengths( rows: &Rows, array: &RunArray, ) { - let run_ends = array.run_ends().values(); + let run_ends = array.run_ends().sliced_values(); let mut logical_start = 0; // Iterate over each run and apply the same length to all logical positions in the run - for (physical_idx, &run_end) in run_ends.iter().enumerate() { + for (physical_idx, run_end) in run_ends.enumerate() { let logical_end = run_end.as_usize(); let row_len = rows.row_len(physical_idx); let encoded_len = variable::padded_length(Some(row_len)); @@ -55,14 +55,14 @@ pub fn encode( opts: SortOptions, array: &RunArray, ) { - let run_ends = array.run_ends(); + let run_ends = array.run_ends().sliced_values(); let mut logical_idx = 0; let mut offset_idx = 1; // Skip first offset // Iterate over each run - for physical_idx in 0..run_ends.values().len() { - let run_end = run_ends.values()[physical_idx].as_usize(); + for (physical_idx, run_end) in run_ends.enumerate() { + let run_end = run_end.as_usize(); // Process all elements in this run while logical_idx < run_end && offset_idx < offsets.len() { @@ -639,4 +639,15 @@ mod tests { let result_ree = arrays[0].as_run::(); assert_eq!(result_ree.len(), 0); } + + #[test] + fn test_run_end_encoded_round_trip_sliced() { + let values = Int64Array::from(vec![100, 200, 100, 300]); + let run_ends = vec![2, 3, 5, 6]; + let array: RunArray = + RunArray::try_new(&PrimitiveArray::from(run_ends), &values).unwrap(); + let array = array.slice(2, 3); + + assert_roundtrip(&array, DataType::Int16, DataType::Int64, None); + } }