Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions crates/store/re_query2/examples/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::sync::Arc;

use itertools::{izip, Itertools};
use re_chunk::{Chunk, RowId};
use re_chunk_store::{ChunkStore, RangeQuery};
use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints};
use re_log_types::{build_frame_nr, ResolvedTimeRange, TimeType, Timeline};
use re_types::ComponentBatch;
use re_types_core::{Archetype as _, Loggable as _};

use re_query2::{clamped_zip_1x2, range_zip_1x2, RangeResults};

// ---

fn main() -> anyhow::Result<()> {
let store = store()?;
eprintln!("store:\n{store}");

let entity_path = "points";
let timeline = Timeline::new("frame_nr", TimeType::Sequence);
let query = RangeQuery::new(timeline, ResolvedTimeRange::EVERYTHING);
eprintln!("query:{query:?}");

let caches = re_query2::Caches::new(&store);

// First, get the (potentially cached) results for this query.
let results: RangeResults = caches.range(
&store,
&query,
&entity_path.into(),
MyPoints::all_components().iter().copied(), // no generics!
);

// * `get_required` returns an error if the chunk is missing.
// * `get` returns an option.
let all_points_chunks = results.get_required(&MyPoint::name())?;
let all_colors_chunks = results.get(&MyColor::name());
let all_labels_chunks = results.get(&MyLabel::name());

// You can always use the standard deserialization path.
//
// The underlying operator is optimized to only pay the cost of downcasting and deserialization
// once for the whole column, and will then return references into that data.
// This is why you have to process the data in two-steps: the iterator needs to have somewhere
// to reference to.
let mut all_points_iters = all_points_chunks
.iter()
.map(|chunk| chunk.iter_component::<MyPoint>())
.collect_vec();
let all_points_indexed = {
let all_points = all_points_iters.iter_mut().flat_map(|it| it.into_iter());
let all_points_indices = all_points_chunks
.iter()
.flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyPoint::name()));
izip!(all_points_indices, all_points)
};
let mut all_labels_iters = all_labels_chunks
.unwrap_or_default()
.iter()
.map(|chunk| chunk.iter_component::<MyLabel>())
.collect_vec();
let all_labels_indexed = {
let all_labels = all_labels_iters.iter_mut().flat_map(|it| it.into_iter());
let all_labels_indices = all_labels_chunks
.unwrap_or_default()
.iter()
.flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyLabel::name()));
izip!(all_labels_indices, all_labels)
};

// Or, if you want every last bit of performance you can get, you can manipulate the raw
// data directly:
let all_colors_indexed = all_colors_chunks
.unwrap_or_default()
.iter()
.flat_map(|chunk| {
itertools::izip!(
chunk.iter_component_indices(&query.timeline(), &MyColor::name()),
chunk.iter_primitive::<u32>(&MyColor::name()),
)
});

// Zip the results together using a stateful time-based join.
let all_frames = range_zip_1x2(all_points_indexed, all_colors_indexed, all_labels_indexed);

// And finally inspect our final results:
{
let color_default_fn = || Some(MyColor(0xFF00FFFF));
let label_default_fn = || None;

eprintln!("results:");
for ((data_time, row_id), points, colors, labels) in all_frames {
let colors = colors.unwrap_or(&[]).iter().map(|c| Some(MyColor(*c)));
let labels = labels.unwrap_or(&[]).iter().cloned().map(Some);

// Apply your instance-level joining logic, if any:
let results =
clamped_zip_1x2(points, colors, color_default_fn, labels, label_default_fn)
.collect_vec();
eprintln!("{data_time:?} @ {row_id}:\n {results:?}");
}
}

Ok(())
}

// ---

fn store() -> anyhow::Result<ChunkStore> {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);

let entity_path = "points";

{
let timepoint = [build_frame_nr(123)];

let chunk = Chunk::builder(entity_path.into())
.with_component_batches(
RowId::new(),
timepoint,
[
&[MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)] as &dyn ComponentBatch, //
&[MyColor::from_rgb(255, 0, 0)],
&[MyLabel("a".into()), MyLabel("b".into())],
],
)
.build()?;

store.insert_chunk(&Arc::new(chunk))?;
}

{
let timepoint = [build_frame_nr(423)];

let chunk = Chunk::builder(entity_path.into())
.with_component_batches(
RowId::new(),
timepoint,
[
&[
MyPoint::new(10.0, 20.0),
MyPoint::new(30.0, 40.0),
MyPoint::new(50.0, 60.0),
] as &dyn ComponentBatch, //
&[MyColor::from_rgb(255, 0, 0), MyColor::from_rgb(0, 0, 255)],
],
)
.build()?;

store.insert_chunk(&Arc::new(chunk))?;
}

Ok(store)
}
52 changes: 50 additions & 2 deletions crates/store/re_query2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use re_chunk_store::{ChunkStore, ChunkStoreDiff, ChunkStoreEvent, ChunkStoreSubs
use re_log_types::{EntityPath, ResolvedTimeRange, StoreId, TimeInt, Timeline};
use re_types_core::{components::ClearIsRecursive, ComponentName, Loggable as _};

use crate::LatestAtCache;
use crate::{LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -82,6 +82,9 @@ pub struct Caches {

// NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
pub(crate) latest_at_per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,

// NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
pub(crate) range_per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<RangeCache>>>>,
}

impl std::fmt::Debug for Caches {
Expand All @@ -90,6 +93,7 @@ impl std::fmt::Debug for Caches {
store_id,
might_require_clearing,
latest_at_per_cache_key,
range_per_cache_key,
} = self;

let mut strings = Vec::new();
Expand Down Expand Up @@ -123,6 +127,21 @@ impl std::fmt::Debug for Caches {
}
}

strings.push(format!("[Range @ {store_id}]"));
{
let range_per_cache_key = range_per_cache_key.read();
let range_per_cache_key: BTreeMap<_, _> = range_per_cache_key.iter().collect();

for (cache_key, cache) in &range_per_cache_key {
let cache = cache.read();
strings.push(format!(
" [{cache_key:?} (pending_invalidations={:?})]",
cache.pending_invalidations,
));
strings.push(indent::indent_all_by(4, format!("{cache:?}")));
}
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}
Expand All @@ -134,6 +153,7 @@ impl Caches {
store_id: store.id().clone(),
might_require_clearing: Default::default(),
latest_at_per_cache_key: Default::default(),
range_per_cache_key: Default::default(),
}
}

Expand All @@ -143,10 +163,12 @@ impl Caches {
store_id: _,
might_require_clearing,
latest_at_per_cache_key,
range_per_cache_key,
} = self;

might_require_clearing.write().clear();
latest_at_per_cache_key.write().clear();
range_per_cache_key.write().clear();
}
}

Expand All @@ -173,6 +195,7 @@ impl ChunkStoreSubscriber for Caches {
struct CompactedEvents {
static_: HashMap<(EntityPath, ComponentName), BTreeSet<ChunkId>>,
temporal_latest_at: HashMap<CacheKey, TimeInt>,
temporal_range: HashMap<CacheKey, BTreeSet<ChunkId>>,
}

let mut compacted = CompactedEvents::default();
Expand Down Expand Up @@ -225,6 +248,12 @@ impl ChunkStoreSubscriber for Caches {
.entry(key.clone())
.and_modify(|time| *time = TimeInt::min(*time, data_time))
.or_insert(data_time);

compacted
.temporal_range
.entry(key)
.or_default()
.insert(chunk.id());
}
}
}
Expand All @@ -233,6 +262,7 @@ impl ChunkStoreSubscriber for Caches {

let mut might_require_clearing = self.might_require_clearing.write();
let caches_latest_at = self.latest_at_per_cache_key.write();
let caches_range = self.range_per_cache_key.write();
// NOTE: Don't release the top-level locks -- even though this cannot happen yet with
// our current macro-architecture, we want to prevent queries from concurrently
// running while we're updating the invalidation flags.
Expand All @@ -244,7 +274,7 @@ impl ChunkStoreSubscriber for Caches {
// yet another layer of caching indirection.
// But since this pretty much never happens in practice, let's not go there until we
// have metrics showing that show we need to.
for ((entity_path, component_name), _chunk_ids) in compacted.static_ {
for ((entity_path, component_name), chunk_ids) in compacted.static_ {
if component_name == ClearIsRecursive::name() {
might_require_clearing.insert(entity_path.clone());
}
Expand All @@ -254,6 +284,15 @@ impl ChunkStoreSubscriber for Caches {
cache.write().pending_invalidation = Some(TimeInt::STATIC);
}
}

for (key, cache) in caches_range.iter() {
if key.entity_path == entity_path && key.component_name == component_name {
cache
.write()
.pending_invalidations
.extend(chunk_ids.iter().copied());
}
}
}
}

Expand All @@ -270,6 +309,15 @@ impl ChunkStoreSubscriber for Caches {
cache.pending_invalidation = Some(time);
}
}

for (key, chunk_ids) in compacted.temporal_range {
if let Some(cache) = caches_range.get(&key) {
cache
.write()
.pending_invalidations
.extend(chunk_ids.iter().copied());
}
}
}
}
}
36 changes: 33 additions & 3 deletions crates/store/re_query2/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@ use crate::{CacheKey, Caches};
#[derive(Default, Debug, Clone)]
pub struct CachesStats {
pub latest_at: BTreeMap<CacheKey, CacheStats>,
pub range: BTreeMap<CacheKey, CacheStats>,
}

impl CachesStats {
#[inline]
pub fn total_size_bytes(&self) -> u64 {
re_tracing::profile_function!();

let Self { latest_at } = self;
let Self { latest_at, range } = self;

let latest_at_size_bytes: u64 = latest_at
.values()
.map(|stats| stats.total_actual_size_bytes)
.sum();
let range_size_bytes: u64 = range
.values()
.map(|stats| stats.total_actual_size_bytes)
.sum();

latest_at_size_bytes
latest_at_size_bytes + range_size_bytes
}
}

Expand Down Expand Up @@ -73,6 +78,31 @@ impl Caches {
.collect()
};

CachesStats { latest_at }
let range = {
let range = self.range_per_cache_key.read().clone();
// Implicitly releasing top-level cache mappings -- concurrent queries can run once again.

range
.iter()
.map(|(key, cache)| {
let cache = cache.read();

(
key.clone(),
CacheStats {
total_chunks: cache.chunks.len() as _,
total_effective_size_bytes: cache
.chunks
.values()
.map(|cached| cached.chunk.total_size_bytes())
.sum(),
total_actual_size_bytes: cache.chunks.total_size_bytes(),
},
)
})
.collect()
};

CachesStats { latest_at, range }
}
}
3 changes: 3 additions & 0 deletions crates/store/re_query2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod cache;
mod cache_stats;
mod latest_at;
mod range;

pub mod clamped_zip;
pub mod range_zip;
Expand All @@ -11,9 +12,11 @@ pub use self::cache::{CacheKey, Caches};
pub use self::cache_stats::{CacheStats, CachesStats};
pub use self::clamped_zip::*;
pub use self::latest_at::LatestAtResults;
pub use self::range::RangeResults;
pub use self::range_zip::*;

pub(crate) use self::latest_at::LatestAtCache;
pub(crate) use self::range::RangeCache;

pub mod external {
pub use paste;
Expand Down
Loading