diff --git a/crates/store/re_chunk/src/helpers.rs b/crates/store/re_chunk/src/helpers.rs new file mode 100644 index 000000000000..f9969587ae9c --- /dev/null +++ b/crates/store/re_chunk/src/helpers.rs @@ -0,0 +1,360 @@ +use std::sync::Arc; + +use arrow2::array::Array as ArrowArray; + +use re_log_types::{TimeInt, Timeline}; +use re_types_core::{Component, ComponentName, SizeBytes}; + +use crate::{Chunk, ChunkResult, RowId}; + +// --- Helpers --- + +impl Chunk { + // --- Batch --- + + /// Returns the raw data for the specified component. + /// + /// Returns an error if the row index is out of bounds. + #[inline] + pub fn component_batch_raw( + &self, + component_name: &ComponentName, + row_index: usize, + ) -> Option>> { + self.components.get(component_name).map(|list_array| { + if list_array.len() > row_index { + Ok(list_array.value(row_index)) + } else { + Err(crate::ChunkError::IndexOutOfBounds { + kind: "row".to_owned(), + len: list_array.len(), + index: row_index, + }) + } + }) + } + + /// Returns the deserialized data for the specified component. + /// + /// Returns an error if the data cannot be deserialized, or if the row index is out of bounds. + #[inline] + pub fn component_batch(&self, row_index: usize) -> Option>> { + let res = self.component_batch_raw(&C::name(), row_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + let data = C::from_arrow(&*array); + Some(data.map_err(Into::into)) + } + + // --- Instance --- + + /// Returns the raw data for the specified component at the given instance index. + /// + /// Returns an error if either the row index or instance index are out of bounds. + #[inline] + pub fn component_instance_raw( + &self, + component_name: &ComponentName, + row_index: usize, + instance_index: usize, + ) -> Option>> { + let res = self.component_batch_raw(component_name, row_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + if array.len() > instance_index { + Some(Ok(array.sliced(instance_index, 1))) + } else { + Some(Err(crate::ChunkError::IndexOutOfBounds { + kind: "instance".to_owned(), + len: array.len(), + index: instance_index, + })) + } + } + + /// Returns the component data of the specified instance. + /// + /// Returns an error if the data cannot be deserialized, or if either the row index or instance index + /// are out of bounds. + #[inline] + pub fn component_instance( + &self, + row_index: usize, + instance_index: usize, + ) -> Option> { + let res = self.component_instance_raw(&C::name(), row_index, instance_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + match C::from_arrow(&*array) { + Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced! + Err(err) => Some(Err(err.into())), + } + } + + // --- Mono --- + + /// Returns the raw data for the specified component, assuming a mono-batch. + /// + /// Returns an error if either the row index is out of bounds, or the underlying batch is not + /// of unit length. + #[inline] + pub fn component_mono_raw( + &self, + component_name: &ComponentName, + row_index: usize, + ) -> Option>> { + let res = self.component_batch_raw(component_name, row_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + if array.len() == 1 { + Some(Ok(array.sliced(0, 1))) + } else { + Some(Err(crate::ChunkError::IndexOutOfBounds { + kind: "mono".to_owned(), + len: array.len(), + index: 0, + })) + } + } + + /// Returns the deserialized data for the specified component, assuming a mono-batch. + /// + /// Returns an error if the data cannot be deserialized, or if either the row index is out of bounds, + /// or the underlying batch is not of unit length. + #[inline] + pub fn component_mono(&self, row_index: usize) -> Option> { + let res = self.component_mono_raw(&C::name(), row_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + match C::from_arrow(&*array) { + Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced! + Err(err) => Some(Err(err.into())), + } + } +} + +// --- Unit --- + +/// A simple type alias for an `Arc`. +pub type ChunkShared = Arc; + +/// A [`ChunkShared`] that is guaranteed to always contain a single row's worth of data. +#[derive(Debug, Clone)] +pub struct UnitChunkShared(ChunkShared); + +impl std::ops::Deref for UnitChunkShared { + type Target = Chunk; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SizeBytes for UnitChunkShared { + #[inline] + fn heap_size_bytes(&self) -> u64 { + Chunk::heap_size_bytes(&self.0) + } +} + +impl Chunk { + /// Turns the chunk into a [`UnitChunkShared`], if possible. + #[inline] + pub fn to_unit(self: &ChunkShared) -> Option { + (self.num_rows() == 1).then(|| UnitChunkShared(Arc::clone(self))) + } + + /// Turns the chunk into a [`UnitChunkShared`], if possible. + #[inline] + pub fn into_unit(self) -> Option { + (self.num_rows() == 1).then(|| UnitChunkShared(Arc::new(self))) + } +} + +impl UnitChunkShared { + // Turns the unit chunk back into a standard [`Chunk`]. + #[inline] + pub fn into_chunk(self) -> ChunkShared { + self.0 + } +} + +impl UnitChunkShared { + /// Returns the index (`(TimeInt, RowId)` pair) of the single row within, on the given timeline. + /// + /// Returns the single static index if the chunk is static. + #[inline] + pub fn index(&self, timeline: &Timeline) -> Option<(TimeInt, RowId)> { + debug_assert!(self.num_rows() == 1); + if self.is_static() { + self.row_ids() + .next() + .map(|row_id| (TimeInt::STATIC, row_id)) + } else { + self.timelines.get(timeline).and_then(|time_chunk| { + time_chunk + .times() + .next() + .and_then(|time| self.row_ids().next().map(|row_id| (time, row_id))) + }) + } + } + + /// Returns the [`RowId`] of the single row within, on the given timeline. + /// + /// Returns the single static `RowId` if the chunk is static. + #[inline] + pub fn row_id(&self) -> Option { + debug_assert!(self.num_rows() == 1); + self.row_ids().next() + } + + /// Returns the number of instances of the single row within. + /// + /// The maximum value amongst all components is what's returned. + #[inline] + pub fn num_instances(&self) -> u64 { + self.components + .values() + .map(|list_array| { + list_array.validity().map_or_else( + || list_array.len(), + |validity| validity.len() - validity.unset_bits(), + ) + }) + .max() + .unwrap_or(0) as u64 + } +} + +// --- Unit helpers --- + +impl UnitChunkShared { + // --- Batch --- + + /// Returns the raw data for the specified component. + #[inline] + pub fn component_batch_raw( + &self, + component_name: &ComponentName, + ) -> Option> { + debug_assert!(self.num_rows() == 1); + self.components + .get(component_name) + .map(|list_array| list_array.value(0)) + } + + /// Returns the deserialized data for the specified component. + /// + /// Returns an error if the data cannot be deserialized. + #[inline] + pub fn component_batch(&self) -> Option>> { + let data = C::from_arrow(&*self.component_batch_raw(&C::name())?); + Some(data.map_err(Into::into)) + } + + // --- Instance --- + + /// Returns the raw data for the specified component at the given instance index. + /// + /// Returns an error if the instance index is out of bounds. + #[inline] + pub fn component_instance_raw( + &self, + component_name: &ComponentName, + instance_index: usize, + ) -> Option>> { + let array = self.component_batch_raw(component_name)?; + if array.len() > instance_index { + Some(Ok(array.sliced(instance_index, 1))) + } else { + Some(Err(crate::ChunkError::IndexOutOfBounds { + kind: "instance".to_owned(), + len: array.len(), + index: instance_index, + })) + } + } + + /// Returns the deserialized data for the specified component at the given instance index. + /// + /// Returns an error if the data cannot be deserialized, or if the instance index is out of bounds. + #[inline] + pub fn component_instance( + &self, + instance_index: usize, + ) -> Option> { + let res = self.component_instance_raw(&C::name(), instance_index)?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + match C::from_arrow(&*array) { + Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced! + Err(err) => Some(Err(err.into())), + } + } + + // --- Mono --- + + /// Returns the raw data for the specified component, assuming a mono-batch. + /// + /// Returns an error if the underlying batch is not of unit length. + #[inline] + pub fn component_mono_raw( + &self, + component_name: &ComponentName, + ) -> Option>> { + let array = self.component_batch_raw(component_name)?; + if array.len() == 1 { + Some(Ok(array.sliced(0, 1))) + } else { + Some(Err(crate::ChunkError::IndexOutOfBounds { + kind: "mono".to_owned(), + len: array.len(), + index: 0, + })) + } + } + + /// Returns the deserialized data for the specified component, assuming a mono-batch. + /// + /// Returns an error if the data cannot be deserialized, or if the underlying batch is not of unit length. + #[inline] + pub fn component_mono(&self) -> Option> { + let res = self.component_mono_raw(&C::name())?; + + let array = match res { + Ok(array) => array, + Err(err) => return Some(Err(err)), + }; + + match C::from_arrow(&*array) { + Ok(data) => data.into_iter().next().map(Ok), // NOTE: It's already sliced! + Err(err) => Some(Err(err.into())), + } + } +} diff --git a/crates/store/re_chunk/src/iter.rs b/crates/store/re_chunk/src/iter.rs index 27a37ac69c95..4dcc2406161e 100644 --- a/crates/store/re_chunk/src/iter.rs +++ b/crates/store/re_chunk/src/iter.rs @@ -176,47 +176,6 @@ impl Chunk { } } -// TODO(cmc): get rid of this -impl Chunk { - /// Returns an iterator over the rows of the [`Chunk`]. - /// - /// Each yielded item is a component batch with its associated index ([`RowId`] + data time). - /// - /// Iterating a [`Chunk`] on a row basis is very wasteful, performance-wise. - /// Prefer columnar access when possible. - pub fn iter_rows( - &self, - timeline: &Timeline, - component_name: &ComponentName, - ) -> impl Iterator>)> + '_ { - let Self { - id: _, - entity_path: _, - heap_size_bytes: _, - is_sorted: _, - row_ids: _, - timelines, - components, - } = self; - - let row_ids = self.row_ids(); - - let data_times = timelines - .get(timeline) - .into_iter() - .flat_map(|time_chunk| time_chunk.times().collect::>()) - // If there's no time data, then the associate data time must be `TimeInt::STATIC`. - .chain(std::iter::repeat(TimeInt::STATIC)); - - let arrays = components - .get(component_name) - .into_iter() - .flat_map(|list_array| list_array.into_iter()); - - itertools::izip!(data_times, row_ids, arrays) - } -} - // --- pub struct ChunkIndicesIter { diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 6b320ef7dcb3..0f2693f41ced 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -6,6 +6,7 @@ mod builder; mod chunk; +mod helpers; mod id; mod iter; mod latest_at; @@ -21,6 +22,7 @@ mod batcher; pub use self::builder::{ChunkBuilder, ChunkTimelineBuilder}; pub use self::chunk::{Chunk, ChunkError, ChunkResult, ChunkTimeline}; +pub use self::helpers::{ChunkShared, UnitChunkShared}; pub use self::id::{ChunkId, RowId}; pub use self::latest_at::LatestAtQuery; pub use self::range::RangeQuery; diff --git a/crates/store/re_chunk_store/src/lib.rs b/crates/store/re_chunk_store/src/lib.rs index ec6e606f69a2..3cf4b54304eb 100644 --- a/crates/store/re_chunk_store/src/lib.rs +++ b/crates/store/re_chunk_store/src/lib.rs @@ -30,7 +30,9 @@ pub use self::subscribers::{ChunkStoreSubscriber, ChunkStoreSubscriberHandle}; // Re-exports #[doc(no_inline)] -pub use re_chunk::{Chunk, ChunkId, LatestAtQuery, RangeQuery, RowId}; +pub use re_chunk::{ + Chunk, ChunkId, ChunkShared, LatestAtQuery, RangeQuery, RowId, UnitChunkShared, +}; #[doc(no_inline)] pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline}; diff --git a/crates/store/re_chunk_store/tests/correctness.rs b/crates/store/re_chunk_store/tests/correctness.rs index 4a06e65e4550..a8ee192f7283 100644 --- a/crates/store/re_chunk_store/tests/correctness.rs +++ b/crates/store/re_chunk_store/tests/correctness.rs @@ -3,7 +3,6 @@ use std::sync::Arc; -use itertools::Itertools as _; use re_chunk::{Chunk, ChunkId, RowId}; use re_chunk_store::{ChunkStore, ChunkStoreError, LatestAtQuery}; use re_log_types::example_components::{MyIndex, MyPoint}; @@ -22,21 +21,20 @@ fn query_latest_component( ) -> Option<(TimeInt, RowId, C)> { re_tracing::profile_function!(); - let (data_time, row_id, array) = store + let ((data_time, row_id), unit) = store .latest_at_relevant_chunks(query, entity_path, C::name()) .into_iter() - .flat_map(|chunk| { + .filter_map(|chunk| { chunk .latest_at(query, C::name()) - .iter_rows(&query.timeline(), &C::name()) - .collect_vec() + .into_unit() + .and_then(|unit| unit.index(&query.timeline()).map(|index| (index, unit))) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .and_then(|(data_time, row_id, array)| array.map(|array| (data_time, row_id, array)))?; + .max_by_key(|(index, _unit)| *index)?; - let value = C::from_arrow(&*array).ok()?.first()?.clone(); - - Some((data_time, row_id, value)) + unit.component_mono()? + .ok() + .map(|values| (data_time, row_id, values)) } // --- diff --git a/crates/store/re_chunk_store/tests/gc.rs b/crates/store/re_chunk_store/tests/gc.rs index f91b4205b996..03dff3db286b 100644 --- a/crates/store/re_chunk_store/tests/gc.rs +++ b/crates/store/re_chunk_store/tests/gc.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use arrow2::array::Array as ArrowArray; -use itertools::Itertools as _; use rand::Rng as _; use re_chunk::{Chunk, ChunkId, ComponentName, LatestAtQuery, RowId, TimeInt, TimePoint}; @@ -27,19 +26,19 @@ fn query_latest_array( ) -> Option<(TimeInt, RowId, Box)> { re_tracing::profile_function!(); - let (data_time, row_id, array) = store + let ((data_time, row_id), unit) = store .latest_at_relevant_chunks(query, entity_path, component_name) .into_iter() - .flat_map(|chunk| { + .filter_map(|chunk| { chunk .latest_at(query, component_name) - .iter_rows(&query.timeline(), &component_name) - .collect_vec() + .into_unit() + .and_then(|chunk| chunk.index(&query.timeline()).map(|index| (index, chunk))) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .and_then(|(data_time, row_id, array)| array.map(|array| (data_time, row_id, array)))?; + .max_by_key(|(index, _chunk)| *index)?; - Some((data_time, row_id, array)) + unit.component_batch_raw(&component_name) + .map(|array| (data_time, row_id, array)) } // --- diff --git a/crates/store/re_chunk_store/tests/reads.rs b/crates/store/re_chunk_store/tests/reads.rs index fe5ff17d5c62..8ff27964164f 100644 --- a/crates/store/re_chunk_store/tests/reads.rs +++ b/crates/store/re_chunk_store/tests/reads.rs @@ -26,19 +26,19 @@ fn query_latest_array( ) -> Option<(TimeInt, RowId, Box)> { re_tracing::profile_function!(); - let (data_time, row_id, array) = store + let ((data_time, row_id), unit) = store .latest_at_relevant_chunks(query, entity_path, component_name) .into_iter() - .flat_map(|chunk| { + .filter_map(|chunk| { chunk .latest_at(query, component_name) - .iter_rows(&query.timeline(), &component_name) - .collect_vec() + .into_unit() + .and_then(|chunk| chunk.index(&query.timeline()).map(|index| (index, chunk))) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .and_then(|(data_time, row_id, array)| array.map(|array| (data_time, row_id, array)))?; + .max_by_key(|(index, _chunk)| *index)?; - Some((data_time, row_id, array)) + unit.component_batch_raw(&component_name) + .map(|array| (data_time, row_id, array)) } // --- @@ -723,9 +723,7 @@ fn range() -> anyhow::Result<()> { for chunk in results { let chunk = chunk.range(&query, component_name); eprintln!("{chunk}"); - for (data_time, row_id, _array) in - chunk.iter_rows(&timeline_frame_nr, &component_name) - { + for (data_time, row_id) in chunk.iter_indices(&timeline_frame_nr) { let (expected_data_time, expected_row_id) = row_ids_at_times[results_processed]; assert_eq!(expected_data_time, data_time); assert_eq!(expected_row_id, row_id); diff --git a/crates/store/re_query/src/latest_at/query.rs b/crates/store/re_query/src/latest_at/query.rs index df5c90d1a3b4..8418ba80edb7 100644 --- a/crates/store/re_query/src/latest_at/query.rs +++ b/crates/store/re_query/src/latest_at/query.rs @@ -297,17 +297,19 @@ pub fn latest_at( entity_path: &EntityPath, component_name: ComponentName, ) -> Option<(TimeInt, RowId, Box)> { - store + let ((data_time, row_id), unit) = store .latest_at_relevant_chunks(query, entity_path, component_name) .into_iter() - .flat_map(|chunk| { + .filter_map(|chunk| { chunk .latest_at(query, component_name) - .iter_rows(&query.timeline(), &component_name) - .collect_vec() + .into_unit() + .and_then(|chunk| chunk.index(&query.timeline()).map(|index| (index, chunk))) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .and_then(|(data_time, row_id, array)| array.map(|array| (data_time, row_id, array))) + .max_by_key(|(index, _chunk)| *index)?; + + unit.component_batch_raw(&component_name) + .map(|array| (data_time, row_id, array)) } impl LatestAtCache { diff --git a/crates/store/re_query/src/range/query.rs b/crates/store/re_query/src/range/query.rs index 932902a85e38..afe0ec158f6a 100644 --- a/crates/store/re_query/src/range/query.rs +++ b/crates/store/re_query/src/range/query.rs @@ -209,19 +209,19 @@ pub fn range<'a>( query: &'a RangeQuery, entity_path: &EntityPath, component_name: ComponentName, -) -> impl Iterator)> + 'a { +) -> impl Iterator)> + 'a { store .range_relevant_chunks(query, entity_path, component_name) .into_iter() .map(move |chunk| chunk.range(query, component_name)) .filter(|chunk| !chunk.is_empty()) .flat_map(move |chunk| { - chunk - .iter_rows(&query.timeline(), &component_name) - .filter_map(|(data_time, row_id, array)| { - array.map(|array| (data_time, row_id, array)) - }) - .collect_vec() + itertools::izip!( + chunk + .iter_component_indices(&query.timeline(), &component_name) + .collect_vec(), + chunk.iter_component_arrays(&component_name).collect_vec(), + ) }) } @@ -248,7 +248,8 @@ impl RangeCache { if let Some(query_front) = query_front.as_ref() { re_tracing::profile_scope!("front"); - for (data_time, row_id, array) in range(store, query_front, entity_path, component_name) + for ((data_time, row_id), array) in + range(store, query_front, entity_path, component_name) { per_data_time .promises_front @@ -265,9 +266,10 @@ impl RangeCache { if let Some(query_back) = per_data_time.compute_back_query(query, query_front.as_ref()) { re_tracing::profile_scope!("back"); - for (data_time, row_id, array) in range(store, &query_back, entity_path, component_name) - // If there's static data to be found, the front query will take care of it already. - .filter(|(data_time, _, _)| !data_time.is_static()) + for ((data_time, row_id), array) in + range(store, &query_back, entity_path, component_name) + // If there's static data to be found, the front query will take care of it already. + .filter(|((data_time, _), _)| !data_time.is_static()) { per_data_time .promises_back diff --git a/crates/viewer/re_space_view_dataframe/src/latest_at_table.rs b/crates/viewer/re_space_view_dataframe/src/latest_at_table.rs index 658ef78c470a..6eccb88c94f4 100644 --- a/crates/viewer/re_space_view_dataframe/src/latest_at_table.rs +++ b/crates/viewer/re_space_view_dataframe/src/latest_at_table.rs @@ -146,6 +146,7 @@ pub(crate) fn latest_at_table_ui( for component_name in &sorted_components { row.col(|ui| { // TODO(ab, cmc): use the suitable API from re_query when it becomes available. + let result = ctx .recording_store() .latest_at_relevant_chunks( @@ -154,20 +155,22 @@ pub(crate) fn latest_at_table_ui( *component_name, ) .into_iter() - .flat_map(|chunk| { - chunk + .filter_map(|chunk| { + let (index, unit) = chunk .latest_at(&latest_at_query, *component_name) - .iter_rows(&query.timeline, component_name) - .collect::>() + .into_unit() + .and_then(|unit| { + unit.index(&query.timeline).map(|index| (index, unit)) + })?; + + unit.component_batch_raw(component_name) + .map(|array| (index, array)) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .and_then(|(data_time, row_id, array)| { - array.map(|array| (data_time, row_id, array)) - }); + .max_by_key(|(index, _array)| *index); // TODO(#4466): it would be nice to display the time and row id somewhere, since we // have them. - if let Some((_time, _row_id, array)) = result { + if let Some(((_time, _row_id), array)) = result { let instance_index = instance_path.instance.get() as usize; let (data, clamped) = if instance_index >= array.len() { diff --git a/crates/viewer/re_space_view_dataframe/src/utils.rs b/crates/viewer/re_space_view_dataframe/src/utils.rs index 49eeaa1f8a9d..e622e2cb413a 100644 --- a/crates/viewer/re_space_view_dataframe/src/utils.rs +++ b/crates/viewer/re_space_view_dataframe/src/utils.rs @@ -27,6 +27,8 @@ pub(crate) fn sorted_instance_paths_for<'a>( ) -> impl Iterator + 'a { re_tracing::profile_function!(); + // TODO(cmc): This should be using re_query. + store .all_components_on_timeline(timeline, entity_path) .unwrap_or_default() @@ -37,14 +39,17 @@ pub(crate) fn sorted_instance_paths_for<'a>( .latest_at_relevant_chunks(latest_at_query, entity_path, component_name) .into_iter() .filter_map(|chunk| { - let (data_time, row_id, batch) = chunk + let (index, unit) = chunk .latest_at(latest_at_query, component_name) - .iter_rows(timeline, &component_name) - .next()?; - batch.map(|batch| (data_time, row_id, batch)) + .into_unit() + .and_then(|unit| unit.index(timeline).map(|index| (index, unit)))?; + + unit.component_batch_raw(&component_name) + .map(|array| (index, array)) }) - .max_by_key(|(data_time, row_id, _)| (*data_time, *row_id)) - .map_or(0, |(_, _, batch)| batch.len()); + .max_by_key(|(index, _array)| *index) + .map_or(0, |(_index, array)| array.len()); + (0..num_instances).map(|i| Instance::from(i as u64)) }) .collect::>() // dedup and sort