From d78d30fd0afb92873585169833ef46f9c5faf31c Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 25 Jul 2024 15:55:35 +0200 Subject: [PATCH 1/2] new and improved Chunk iteration APIs --- crates/store/re_chunk/Cargo.toml | 1 + crates/store/re_chunk/src/chunk.rs | 49 ++- crates/store/re_chunk/src/iter.rs | 470 ++++++++++++++++----------- crates/store/re_chunk/src/shuffle.rs | 39 ++- crates/store/re_chunk/src/slice.rs | 200 +++++++++++- crates/store/re_chunk/src/util.rs | 2 + 6 files changed, 554 insertions(+), 207 deletions(-) diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index f32fb079e859..e6007cb1dbc6 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -37,6 +37,7 @@ serde = [ # Rerun re_build_info.workspace = true +re_error.workspace = true re_format.workspace = true re_format_arrow.workspace = true re_log.workspace = true diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 40ec249852df..f6e082e575ea 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -3,14 +3,19 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; -use arrow2::array::{ - Array as ArrowArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, - StructArray as ArrowStructArray, +use arrow2::{ + array::{ + Array as ArrowArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray, + StructArray as ArrowStructArray, + }, + Either, }; use itertools::{izip, Itertools}; use re_log_types::{EntityPath, ResolvedTimeRange, Time, TimeInt, TimePoint, Timeline}; -use re_types_core::{ComponentName, Loggable, LoggableBatch, SerializationError, SizeBytes}; +use re_types_core::{ + ComponentName, DeserializationError, Loggable, LoggableBatch, SerializationError, SizeBytes, +}; use crate::{ChunkId, RowId}; @@ -26,8 +31,18 @@ pub enum ChunkError { #[error(transparent)] Arrow(#[from] arrow2::error::Error), + #[error("{kind} index out of bounds: {index} (len={len})")] + IndexOutOfBounds { + kind: String, + len: usize, + index: usize, + }, + #[error(transparent)] Serialization(#[from] SerializationError), + + #[error(transparent)] + Deserialization(#[from] DeserializationError), } pub type ChunkResult = Result; @@ -822,6 +837,32 @@ impl Chunk { .map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128))) } + /// Returns an iterator over the [`RowId`]s of a [`Chunk`], for a given component. + /// + /// This is different than [`Self::row_ids`]: it will only yield `RowId`s for rows at which + /// there is data for the specified `component_name`. + #[inline] + pub fn component_row_ids( + &self, + component_name: &ComponentName, + ) -> impl Iterator + '_ { + let Some(list_array) = self.components.get(component_name) else { + return Either::Left(std::iter::empty()); + }; + + let row_ids = self.row_ids(); + + if let Some(validity) = list_array.validity() { + Either::Right(Either::Left( + row_ids + .enumerate() + .filter_map(|(i, o)| validity.get_bit(i).then_some(o)), + )) + } else { + Either::Right(Either::Right(row_ids)) + } + } + /// Returns the [`RowId`]-range covered by this [`Chunk`]. /// /// `None` if the chunk `is_empty`. diff --git a/crates/store/re_chunk/src/iter.rs b/crates/store/re_chunk/src/iter.rs index 59a67b33dfb4..58691cc258f2 100644 --- a/crates/store/re_chunk/src/iter.rs +++ b/crates/store/re_chunk/src/iter.rs @@ -1,15 +1,172 @@ use std::sync::Arc; -use arrow2::array::Array as ArrowArray; -use itertools::Itertools as _; +use arrow2::{ + array::{Array as ArrowArray, PrimitiveArray}, + Either, +}; +use itertools::izip; use re_log_types::{TimeInt, Timeline}; -use re_types_core::ComponentName; +use re_types_core::{Component, ComponentName}; use crate::{Chunk, ChunkTimeline, RowId}; // --- +impl Chunk { + /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline. + /// + /// If the chunk is static, `timeline` will be ignored. + /// + /// See also: + /// * [`Self::iter_component_indices`]. + /// * [`Self::iter_indices_owned`]. + #[inline] + pub fn iter_indices(&self, timeline: &Timeline) -> impl Iterator + '_ { + if self.is_static() { + Either::Right(Either::Left(izip!( + std::iter::repeat(TimeInt::STATIC), + self.row_ids() + ))) + } else { + let Some(time_chunk) = self.timelines.get(timeline) else { + return Either::Left(std::iter::empty()); + }; + + Either::Right(Either::Right(izip!(time_chunk.times(), self.row_ids()))) + } + } + + /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given + /// timeline and component. + /// + /// If the chunk is static, `timeline` will be ignored. + /// + /// This is different than [`Self::iter_indices`] in that it will only yield indices for rows + /// at which there is data for the specified `component_name`. + /// + /// See also [`Self::iter_indices`]. + pub fn iter_component_indices( + &self, + timeline: &Timeline, + component_name: &ComponentName, + ) -> impl Iterator + '_ { + let Some(list_array) = self.components.get(component_name) else { + return Either::Left(std::iter::empty()); + }; + + if self.is_static() { + let indices = izip!(std::iter::repeat(TimeInt::STATIC), self.row_ids()); + + if let Some(validity) = list_array.validity() { + Either::Right(Either::Left(Either::Left( + indices + .enumerate() + .filter_map(|(i, o)| validity.get_bit(i).then_some(o)), + ))) + } else { + Either::Right(Either::Left(Either::Right(indices))) + } + } else { + let Some(time_chunk) = self.timelines.get(timeline) else { + return Either::Left(std::iter::empty()); + }; + + let indices = izip!(time_chunk.times(), self.row_ids()); + + if let Some(validity) = list_array.validity() { + Either::Right(Either::Right(Either::Left( + indices + .enumerate() + .filter_map(|(i, o)| validity.get_bit(i).then_some(o)), + ))) + } else { + Either::Right(Either::Right(Either::Right(indices))) + } + } + } + + /// Returns an iterator over the offsets (`(offset, len)`) of a [`Chunk`], for a given timeline + /// and component. + pub fn iter_component_offsets( + &self, + component_name: &ComponentName, + ) -> impl Iterator + '_ { + let Some(list_array) = self.components.get(component_name) else { + return Either::Left(std::iter::empty()); + }; + + let offsets = list_array.offsets().iter().map(|idx| *idx as usize); + let lengths = list_array.offsets().lengths(); + + if let Some(validity) = list_array.validity() { + Either::Right(Either::Left( + izip!(offsets, lengths) + .enumerate() + .filter_map(|(i, o)| validity.get_bit(i).then_some(o)), + )) + } else { + Either::Right(Either::Right(izip!(offsets, lengths))) + } + } + + /// Returns an iterator over the raw arrays of a [`Chunk`], for a given component. + /// + /// See also: + /// * [`Self::iter_component`]. + /// * [`Self::iter_primitive`]. + pub fn iter_component_arrays( + &self, + component_name: &ComponentName, + ) -> impl Iterator> + '_ { + let Some(list_array) = self.components.get(component_name) else { + return Either::Left(std::iter::empty()); + }; + + Either::Right(list_array.iter().flatten()) + } + + /// Returns an iterator over the raw primitive values of a [`Chunk`], for a given component. + /// + /// This is a very fast path: the entire column will be downcasted at once, and then every + /// component batch will be a slice reference into that global slice. + /// Use this when working with simple arrow datatypes and performance matters (e.g. scalars, + /// points, etc). + /// + /// * [`Self::iter_component_arrays`]. + /// * [`Self::iter_component`]. + #[inline] + pub fn iter_primitive( + &self, + component_name: &ComponentName, + ) -> impl Iterator + '_ { + let Some(list_array) = self.components.get(component_name) else { + return Either::Left(std::iter::empty()); + }; + + let Some(values) = list_array + .values() + .as_any() + .downcast_ref::>() + else { + if cfg!(debug_assertions) { + panic!("downcast failed for {component_name}, data discarded"); + } else { + re_log::error_once!("downcast failed for {component_name}, data discarded"); + } + return Either::Left(std::iter::empty()); + }; + let values = values.values().as_slice(); + + // NOTE: No need for validity checks here, `iter_offsets` already takes care of that. + Either::Right( + self.iter_component_offsets(component_name) + .map(move |(idx, len)| &values[idx..idx + len]), + ) + } +} + +// TODO(cmc): get rid of this impl Chunk { /// Returns an iterator over the rows of the [`Chunk`]. /// @@ -17,13 +174,6 @@ impl Chunk { /// /// Iterating a [`Chunk`] on a row basis is very wasteful, performance-wise. /// Prefer columnar access when possible. - // - // TODO(cmc): a row-based iterator is obviously not what we want -- one of the benefits of - // chunks is to amortize the cost of downcasting & "deserialization". - // But at the moment we still need to run with the native deserialization cache, which expects - // row-based data. - // As soon as we remove the native cache and start exposing `Chunk`s directly to downstream - // systems, we will look into ergonomic ways to do columnar access. pub fn iter_rows( &self, timeline: &Timeline, @@ -55,45 +205,10 @@ impl Chunk { itertools::izip!(data_times, row_ids, arrays) } - - /// Returns the cell corresponding to the specified [`RowId`] for a given [`ComponentName`]. - /// - /// This is `O(log(n))` if `self.is_sorted()`, and `O(n)` otherwise. - /// - /// Reminder: duplicated `RowId`s results in undefined behavior. - pub fn cell( - &self, - row_id: RowId, - component_name: &ComponentName, - ) -> Option> { - let list_array = self.components.get(component_name)?; - - if self.is_sorted() { - let row_id_128 = row_id.as_u128(); - let row_id_time_ns = (row_id_128 >> 64) as u64; - let row_id_inc = (row_id_128 & (!0 >> 64)) as u64; - - let (times, incs) = self.row_ids_raw(); - let times = times.values().as_slice(); - let incs = incs.values().as_slice(); - - let mut index = times.partition_point(|&time| time < row_id_time_ns); - while index < incs.len() && incs[index] < row_id_inc { - index += 1; - } - - let found_it = - times.get(index) == Some(&row_id_time_ns) && incs.get(index) == Some(&row_id_inc); - - (found_it && list_array.is_valid(index)).then(|| list_array.value(index)) - } else { - self.row_ids() - .find_position(|id| *id == row_id) - .and_then(|(index, _)| list_array.is_valid(index).then(|| list_array.value(index))) - } - } } +// --- + pub struct ChunkIndicesIter { chunk: Arc, @@ -135,161 +250,144 @@ impl Chunk { /// If the chunk is static, `timeline` will be ignored. /// /// The returned iterator outlives `self`, thus it can be passed around freely. + /// The tradeoff is that `self` must be an `Arc`. + /// + /// See also [`Self::iter_indices`]. #[inline] - pub fn iter_indices(self: Arc, timeline: &Timeline) -> Option { + pub fn iter_indices_owned( + self: Arc, + timeline: &Timeline, + ) -> impl Iterator { if self.is_static() { - Some(ChunkIndicesIter { + Either::Left(ChunkIndicesIter { chunk: self, time_chunk: None, index: 0, }) } else { - self.timelines - .get(timeline) - .cloned() - .map(|time_chunk| ChunkIndicesIter { - chunk: self, - time_chunk: Some(time_chunk), - index: 0, - }) + self.timelines.get(timeline).cloned().map_or_else( + || Either::Right(Either::Left(std::iter::empty())), + |time_chunk| { + Either::Right(Either::Right(ChunkIndicesIter { + chunk: self, + time_chunk: Some(time_chunk), + index: 0, + })) + }, + ) } } } -#[cfg(test)] -mod tests { - use std::sync::Arc; +// --- - use itertools::{izip, Itertools}; - use re_log_types::{ - example_components::{MyColor, MyLabel, MyPoint}, - EntityPath, TimeInt, TimePoint, - }; - use re_types_core::{ComponentBatch, Loggable}; +/// The actual iterator implementation for [`Chunk::iter_component`]. +pub struct ChunkComponentIter { + values: Vec, + offsets: IO, +} - use crate::{Chunk, RowId, Timeline}; +/// The intermediate state for [`ChunkComponentIter`]. +/// +/// Required so that we can return references to the inner data. +pub struct ChunkComponentIterRef<'a, C, IO> { + values: &'a [C], + offsets: &'a mut IO, +} - #[test] - fn cell() -> anyhow::Result<()> { - let entity_path = "my/entity"; +impl<'a, C: Component, IO: Iterator> IntoIterator + for &'a mut ChunkComponentIter +{ + type Item = &'a [C]; - let row_id1 = RowId::ZERO.incremented_by(10); - let row_id2 = RowId::ZERO.incremented_by(20); - let row_id3 = RowId::ZERO.incremented_by(30); - let row_id4 = RowId::new(); - let row_id5 = RowId::new(); + type IntoIter = ChunkComponentIterRef<'a, C, IO>; - let timepoint1 = [ - (Timeline::log_time(), 1000), - (Timeline::new_sequence("frame"), 1), - ]; - let timepoint2 = [ - (Timeline::log_time(), 1032), - (Timeline::new_sequence("frame"), 3), - ]; - let timepoint3 = [ - (Timeline::log_time(), 1064), - (Timeline::new_sequence("frame"), 5), - ]; - let timepoint4 = [ - (Timeline::log_time(), 1096), - (Timeline::new_sequence("frame"), 7), - ]; - let timepoint5 = [ - (Timeline::log_time(), 1128), - (Timeline::new_sequence("frame"), 9), - ]; - - let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)]; - let points3 = &[MyPoint::new(6.0, 7.0)]; - - let colors4 = &[MyColor::from_rgb(1, 1, 1)]; - let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)]; - - let labels1 = &[MyLabel("a".into())]; - let labels2 = &[MyLabel("b".into())]; - let labels3 = &[MyLabel("c".into())]; - let labels4 = &[MyLabel("d".into())]; - let labels5 = &[MyLabel("e".into())]; - - let mut chunk = Chunk::builder(entity_path.into()) - .with_sparse_component_batches( - row_id2, - timepoint4, - [ - (MyPoint::name(), None), - (MyColor::name(), Some(colors4 as _)), - (MyLabel::name(), Some(labels4 as _)), - ], - ) - .with_sparse_component_batches( - row_id5, - timepoint5, - [ - (MyPoint::name(), None), - (MyColor::name(), Some(colors5 as _)), - (MyLabel::name(), Some(labels5 as _)), - ], - ) - .with_sparse_component_batches( - row_id1, - timepoint3, - [ - (MyPoint::name(), Some(points1 as _)), - (MyColor::name(), None), - (MyLabel::name(), Some(labels1 as _)), - ], - ) - .with_sparse_component_batches( - row_id4, - timepoint2, - [ - (MyPoint::name(), None), - (MyColor::name(), None), - (MyLabel::name(), Some(labels2 as _)), - ], - ) - .with_sparse_component_batches( - row_id3, - timepoint1, - [ - (MyPoint::name(), Some(points3 as _)), - (MyColor::name(), None), - (MyLabel::name(), Some(labels3 as _)), - ], - ) - .build()?; - - eprintln!("chunk:\n{chunk}"); - - let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ - (row_id1, MyPoint::name(), Some(points1 as _)), - (row_id2, MyLabel::name(), Some(labels4 as _)), - (row_id3, MyColor::name(), None), - (row_id4, MyLabel::name(), Some(labels2 as _)), - (row_id5, MyColor::name(), Some(colors5 as _)), - ]; - - assert!(!chunk.is_sorted()); - for (row_id, component_name, expected) in expectations { - let expected = - expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); - eprintln!("{component_name} @ {row_id}"); - similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + #[inline] + fn into_iter(self) -> Self::IntoIter { + ChunkComponentIterRef { + values: &self.values, + offsets: &mut self.offsets, } + } +} - chunk.sort_if_unsorted(); - assert!(chunk.is_sorted()); +impl<'a, C: Component, IO: Iterator> Iterator + for ChunkComponentIterRef<'a, C, IO> +{ + type Item = &'a [C]; - for (row_id, component_name, expected) in expectations { - let expected = - expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); - eprintln!("{component_name} @ {row_id}"); - similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); - } + #[inline] + fn next(&mut self) -> Option { + self.offsets + .next() + .map(move |(idx, len)| &self.values[idx..idx + len]) + } +} - Ok(()) +impl Chunk { + /// Returns an iterator over the deserialized batches of a [`Chunk`], for a given component. + /// + /// This is a dedicated fast path: the entire column will be downcasted and deserialized at + /// once, and then every component batch will be a slice reference into that global slice. + /// Use this when working with complex arrow datatypes and performance matters (e.g. ranging + /// through enum types across many timestamps). + /// + /// See also: + /// * [`Self::iter_component`]. + /// * [`Self::iter_primitive`]. + #[inline] + pub fn iter_component( + &self, + ) -> ChunkComponentIter + '_> { + let Some(list_array) = self.components.get(&C::name()) else { + return ChunkComponentIter { + values: vec![], + offsets: Either::Left(std::iter::empty()), + }; + }; + + let values = list_array.values(); + let values = match C::from_arrow(&**values) { + Ok(values) => values, + Err(err) => { + if cfg!(debug_assertions) { + panic!( + "deserialization failed for {}, data discarded: {}", + C::name(), + re_error::format_ref(&err), + ); + } else { + re_log::error_once!( + "deserialization failed for {}, data discarded: {}", + C::name(), + re_error::format_ref(&err), + ); + } + return ChunkComponentIter { + values: vec![], + offsets: Either::Left(std::iter::empty()), + }; + } + }; + + // NOTE: No need for validity checks here, `iter_offsets` already takes care of that. + ChunkComponentIter { + values, + offsets: Either::Right(self.iter_component_offsets(&C::name())), + } } +} + +// --- + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use itertools::{izip, Itertools}; + use re_log_types::{example_components::MyPoint, EntityPath, TimeInt, TimePoint}; + + use crate::{Chunk, RowId, Timeline}; #[test] fn iter_indices_temporal() -> anyhow::Result<()> { @@ -327,9 +425,8 @@ mod tests { { let got = Arc::clone(&chunk) - .iter_indices(&timeline_frame) - .map(|it| it.collect_vec()) - .unwrap_or_default(); + .iter_indices_owned(&timeline_frame) + .collect_vec(); let expected = izip!( chunk .timelines @@ -376,9 +473,8 @@ mod tests { { let got = Arc::clone(&chunk) - .iter_indices(&timeline_frame) - .map(|it| it.collect_vec()) - .unwrap_or_default(); + .iter_indices_owned(&timeline_frame) + .collect_vec(); let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec(); similar_asserts::assert_eq!(expected, got); diff --git a/crates/store/re_chunk/src/shuffle.rs b/crates/store/re_chunk/src/shuffle.rs index fe409a3f9547..a19c88e3d23a 100644 --- a/crates/store/re_chunk/src/shuffle.rs +++ b/crates/store/re_chunk/src/shuffle.rs @@ -23,6 +23,17 @@ impl Chunk { self.is_sorted } + /// For debugging purposes. + #[doc(hidden)] + #[inline] + pub fn is_sorted_uncached(&self) -> bool { + re_tracing::profile_function!(); + + self.row_ids() + .tuple_windows::<(_, _)>() + .all(|row_ids| row_ids.0 <= row_ids.1) + } + /// Is the chunk ascendingly sorted by time, for all of its timelines? /// /// This is O(1) (cached). @@ -33,19 +44,29 @@ impl Chunk { .all(|time_chunk| time_chunk.is_sorted()) } - /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the - /// cached value. + /// Is the chunk ascendingly sorted by time, for a specific timeline? /// - /// O(n). Useful for tests/debugging, or when you just don't know. + /// This is O(1) (cached). /// - /// See also [`Self::is_sorted`]. + /// See also [`Self::is_timeline_sorted_uncached`]. #[inline] - pub fn is_sorted_uncached(&self) -> bool { - re_tracing::profile_function!(); + pub fn is_timeline_sorted(&self, timeline: &Timeline) -> bool { + self.is_static() + || self + .timelines + .get(timeline) + .map_or(false, |time_chunk| time_chunk.is_sorted()) + } - self.row_ids() - .tuple_windows::<(_, _)>() - .all(|row_ids| row_ids.0 <= row_ids.1) + /// For debugging purposes. + #[doc(hidden)] + #[inline] + pub fn is_timeline_sorted_uncached(&self, timeline: &Timeline) -> bool { + self.is_static() + || self + .timelines + .get(timeline) + .map_or(false, |time_chunk| time_chunk.is_sorted_uncached()) } /// Sort the chunk, if needed. diff --git a/crates/store/re_chunk/src/slice.rs b/crates/store/re_chunk/src/slice.rs index 345b195efa41..7be5da414190 100644 --- a/crates/store/re_chunk/src/slice.rs +++ b/crates/store/re_chunk/src/slice.rs @@ -5,10 +5,11 @@ use arrow2::array::{ use itertools::Itertools; use nohash_hasher::IntSet; + use re_log_types::Timeline; use re_types_core::ComponentName; -use crate::{Chunk, ChunkTimeline}; +use crate::{Chunk, ChunkTimeline, RowId}; // --- @@ -16,6 +17,43 @@ use crate::{Chunk, ChunkTimeline}; // Most of them are indirectly stressed by our higher-level query tests anyhow. impl Chunk { + /// Returns the cell corresponding to the specified [`RowId`] for a given [`ComponentName`]. + /// + /// This is `O(log(n))` if `self.is_sorted()`, and `O(n)` otherwise. + /// + /// Reminder: duplicated `RowId`s results in undefined behavior. + pub fn cell( + &self, + row_id: RowId, + component_name: &ComponentName, + ) -> Option> { + let list_array = self.components.get(component_name)?; + + if self.is_sorted() { + let row_id_128 = row_id.as_u128(); + let row_id_time_ns = (row_id_128 >> 64) as u64; + let row_id_inc = (row_id_128 & (!0 >> 64)) as u64; + + let (times, incs) = self.row_ids_raw(); + let times = times.values().as_slice(); + let incs = incs.values().as_slice(); + + let mut index = times.partition_point(|&time| time < row_id_time_ns); + while index < incs.len() && incs[index] < row_id_inc { + index += 1; + } + + let found_it = + times.get(index) == Some(&row_id_time_ns) && incs.get(index) == Some(&row_id_inc); + + (found_it && list_array.is_valid(index)).then(|| list_array.value(index)) + } else { + self.row_ids() + .find_position(|id| *id == row_id) + .and_then(|(index, _)| list_array.is_valid(index).then(|| list_array.value(index))) + } + } + /// Slices the [`Chunk`] vertically. /// /// The result is a new [`Chunk`] with the same columns and (potentially) less rows. @@ -267,7 +305,7 @@ impl Chunk { /// If `component_name` doesn't exist in this [`Chunk`], or if it is already dense, this method /// is a no-op. #[inline] - pub fn densified(&self, component_name: ComponentName) -> Self { + pub fn densified(&self, component_name_pov: ComponentName) -> Self { let Self { id, entity_path, @@ -282,7 +320,7 @@ impl Chunk { return self.clone(); } - let Some(component_list_array) = components.get(&component_name) else { + let Some(component_list_array) = components.get(&component_name_pov) else { return self.clone(); }; @@ -307,10 +345,26 @@ impl Chunk { components: components .iter() .map(|(&component_name, list_array)| { - ( - component_name, - crate::util::filter_array(list_array, &validity_filter), - ) + let filtered = crate::util::filter_array(list_array, &validity_filter); + let filtered = if component_name == component_name_pov { + // Make sure we fully remove the validity bitmap for the densified + // component. + // This will allow further operations on this densified chunk to take some + // very optimized paths. + + #[allow(clippy::unwrap_used)] + filtered + .with_validity(None) + .as_any() + .downcast_ref::>() + // Unwrap: cannot possibly fail -- going from a ListArray back to a ListArray. + .unwrap() + .clone() + } else { + filtered + }; + + (component_name, filtered) }) .collect(), }; @@ -493,3 +547,135 @@ impl ChunkTimeline { ) } } + +// --- + +#[cfg(test)] +mod tests { + use re_log_types::example_components::{MyColor, MyLabel, MyPoint}; + use re_types_core::{ComponentBatch, Loggable}; + + use crate::{Chunk, RowId, Timeline}; + + #[test] + fn cell() -> anyhow::Result<()> { + let entity_path = "my/entity"; + + let row_id1 = RowId::ZERO.incremented_by(10); + let row_id2 = RowId::ZERO.incremented_by(20); + let row_id3 = RowId::ZERO.incremented_by(30); + let row_id4 = RowId::new(); + let row_id5 = RowId::new(); + + let timepoint1 = [ + (Timeline::log_time(), 1000), + (Timeline::new_sequence("frame"), 1), + ]; + let timepoint2 = [ + (Timeline::log_time(), 1032), + (Timeline::new_sequence("frame"), 3), + ]; + let timepoint3 = [ + (Timeline::log_time(), 1064), + (Timeline::new_sequence("frame"), 5), + ]; + let timepoint4 = [ + (Timeline::log_time(), 1096), + (Timeline::new_sequence("frame"), 7), + ]; + let timepoint5 = [ + (Timeline::log_time(), 1128), + (Timeline::new_sequence("frame"), 9), + ]; + + let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)]; + let points3 = &[MyPoint::new(6.0, 7.0)]; + + let colors4 = &[MyColor::from_rgb(1, 1, 1)]; + let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)]; + + let labels1 = &[MyLabel("a".into())]; + let labels2 = &[MyLabel("b".into())]; + let labels3 = &[MyLabel("c".into())]; + let labels4 = &[MyLabel("d".into())]; + let labels5 = &[MyLabel("e".into())]; + + let mut chunk = Chunk::builder(entity_path.into()) + .with_sparse_component_batches( + row_id2, + timepoint4, + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors4 as _)), + (MyLabel::name(), Some(labels4 as _)), + ], + ) + .with_sparse_component_batches( + row_id5, + timepoint5, + [ + (MyPoint::name(), None), + (MyColor::name(), Some(colors5 as _)), + (MyLabel::name(), Some(labels5 as _)), + ], + ) + .with_sparse_component_batches( + row_id1, + timepoint3, + [ + (MyPoint::name(), Some(points1 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels1 as _)), + ], + ) + .with_sparse_component_batches( + row_id4, + timepoint2, + [ + (MyPoint::name(), None), + (MyColor::name(), None), + (MyLabel::name(), Some(labels2 as _)), + ], + ) + .with_sparse_component_batches( + row_id3, + timepoint1, + [ + (MyPoint::name(), Some(points3 as _)), + (MyColor::name(), None), + (MyLabel::name(), Some(labels3 as _)), + ], + ) + .build()?; + + eprintln!("chunk:\n{chunk}"); + + let expectations: &[(_, _, Option<&dyn ComponentBatch>)] = &[ + (row_id1, MyPoint::name(), Some(points1 as _)), + (row_id2, MyLabel::name(), Some(labels4 as _)), + (row_id3, MyColor::name(), None), + (row_id4, MyLabel::name(), Some(labels2 as _)), + (row_id5, MyColor::name(), Some(colors5 as _)), + ]; + + assert!(!chunk.is_sorted()); + for (row_id, component_name, expected) in expectations { + let expected = + expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + + chunk.sort_if_unsorted(); + assert!(chunk.is_sorted()); + + for (row_id, component_name, expected) in expectations { + let expected = + expected.and_then(|expected| re_types_core::LoggableBatch::to_arrow(expected).ok()); + eprintln!("{component_name} @ {row_id}"); + similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_name)); + } + + Ok(()) + } +} diff --git a/crates/store/re_chunk/src/util.rs b/crates/store/re_chunk/src/util.rs index 87a80e74df69..916de440c33e 100644 --- a/crates/store/re_chunk/src/util.rs +++ b/crates/store/re_chunk/src/util.rs @@ -199,6 +199,8 @@ pub fn pad_list_array_front( /// /// Takes care of up- and down-casting the data back and forth on behalf of the caller. pub fn filter_array(array: &A, filter: &ArrowBooleanArray) -> A { + debug_assert!(filter.validity().is_none()); // just for good measure + #[allow(clippy::unwrap_used)] arrow2::compute::filter::filter(array, filter) // Unwrap: this literally cannot fail. From 8fb2a393599a4f46064f1c1d0856ff19a832ea55 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 25 Jul 2024 16:06:42 +0200 Subject: [PATCH 2/2] propagate changes --- Cargo.lock | 1 + .../src/time_range_table.rs | 15 ++++----------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 07f02659d20c..e0f1268b141e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4311,6 +4311,7 @@ dependencies = [ "rand", "re_arrow2", "re_build_info", + "re_error", "re_format", "re_format_arrow", "re_log", diff --git a/crates/viewer/re_space_view_dataframe/src/time_range_table.rs b/crates/viewer/re_space_view_dataframe/src/time_range_table.rs index 1d18cb17d22d..7448ea0b0e57 100644 --- a/crates/viewer/re_space_view_dataframe/src/time_range_table.rs +++ b/crates/viewer/re_space_view_dataframe/src/time_range_table.rs @@ -98,18 +98,11 @@ pub(crate) fn time_range_table_ui( ctx.recording_store() .range_relevant_chunks(&range_query, entity_path, *component) .into_iter() - // This does two things: - // 1) Filter out instances where `chunk.iter_indices()` returns `None`. - // 2) Exploit the fact that the returned iterator (if any) is *not* bound to the - // lifetime of the chunk (it has an internal Arc). - .filter_map(move |chunk| { - //TODO(ab, cmc): remove this line when a range-aware, iter_indices API is available. + // Exploit the fact that the returned iterator (if any) is *not* bound to the lifetime + // of the chunk (it has an internal Arc). + .map(move |chunk| { let chunk = Arc::new(chunk.range(&range_query, *component)); - - chunk - .clone() - .iter_indices(&timeline) - .map(|iter_indices| (iter_indices, chunk)) + (Arc::clone(&chunk).iter_indices_owned(&timeline), chunk) }) .flat_map(move |(indices_iter, chunk)| { map_chunk_indices_to_key_value_iter(