From 6fe1a7962358770a4d981b3b32b788583ad68311 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 8 Jul 2025 13:54:00 -0400 Subject: [PATCH] Add a HollowBatch::push_run method and convert usages --- src/persist-client/src/batch.rs | 24 +-- src/persist-client/src/internal/compact.rs | 33 +--- src/persist-client/src/internal/state.rs | 170 ++++++++------------- src/persist-client/src/internal/trace.rs | 47 ++---- src/persist-client/src/write.rs | 34 +---- 5 files changed, 95 insertions(+), 213 deletions(-) diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 94803f4993e2b..6a2f9a079b199 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -695,21 +695,9 @@ where let shard_metrics = Arc::clone(&self.parts.shard_metrics); let runs = self.parts.finish().await; - let mut run_parts = vec![]; - let mut run_splits = vec![]; - let mut run_meta = vec![]; - let total_updates = runs - .iter() - .map(|(_, _, num_updates)| num_updates) - .sum::(); + let mut hollow_batch = HollowBatch::empty(registered_desc); for (order, parts, num_updates) in runs { - if parts.is_empty() { - continue; - } - if run_parts.len() != 0 { - run_splits.push(run_parts.len()); - } - run_meta.push(RunMeta { + let meta = RunMeta { order: Some(order), schema: self.write_schemas.id, // Field has been deprecated but kept around to roundtrip state. @@ -725,10 +713,10 @@ where None }, meta: MetadataMap::default(), - }); - run_parts.extend(parts); + }; + hollow_batch.push_run(meta, parts); + hollow_batch.len += num_updates; } - let desc = registered_desc; let batch = Batch::new( batch_delete_enabled, @@ -740,7 +728,7 @@ where K::encode_schema(&*self.write_schemas.key), V::encode_schema(&*self.write_schemas.val), ), - HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits), + hollow_batch, ); Ok(batch) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index cf315bfc3c7f5..4524371ada286 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -484,43 +484,24 @@ where ) -> Result, anyhow::Error> { pin_mut!(stream); - let mut all_parts = vec![]; - let mut all_run_splits = vec![]; - let mut all_run_meta = vec![]; - let mut len = 0; + let mut output = HollowBatch::empty(req.desc.clone()); while let Some(res) = stream.next().await { let res = res?.output; - let (parts, updates, run_meta, run_splits) = - (res.parts, res.len, res.run_meta, res.run_splits); - - if updates == 0 { + if res.len == 0 { continue; } - - let run_offset = all_parts.len(); - if !all_parts.is_empty() { - all_run_splits.push(run_offset); + debug_assert_eq!(res.desc, output.desc); + for (meta, run) in res.runs() { + output.push_run(meta.clone(), run.iter().cloned()); } - all_run_splits.extend(run_splits.iter().map(|r| r + run_offset)); - all_run_meta.extend(run_meta); - all_parts.extend(parts); - len += updates; + output.len += res.len; } let batches = req.inputs.iter().map(|x| x.id).collect::>(); let input = input_id_range(batches); - Ok(CompactRes { - output: HollowBatch::new( - req.desc.clone(), - all_parts, - len, - all_run_meta, - all_run_splits, - ), - input, - }) + Ok(CompactRes { output, input }) } pub async fn apply( diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 193982966d80a..c65ec2500a132 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1010,82 +1010,38 @@ impl HollowBatch { } } impl HollowBatch { - /// Construct an in-memory hollow batch from the given metadata. - /// - /// This method checks that `runs` is a sequence of valid indices into `parts`. The caller - /// is responsible for ensuring that the defined runs are valid. - /// - /// `len` should represent the number of valid updates in the referenced parts. - pub(crate) fn new( - desc: Description, - parts: Vec>, - len: usize, - run_meta: Vec, - run_splits: Vec, - ) -> Self { + pub(crate) fn check_invariants(&self) { + let Self { + desc: _, + len: _, + parts, + run_splits, + run_meta, + } = self; debug_assert!( run_splits.is_strictly_sorted(), "run indices should be strictly increasing" ); - debug_assert!( + assert!( run_splits.first().map_or(true, |i| *i > 0), "run indices should be positive" ); - debug_assert!( + assert!( run_splits.last().map_or(true, |i| *i < parts.len()), "run indices should be valid indices into parts" ); - debug_assert!( + assert!( parts.is_empty() || run_meta.len() == run_splits.len() + 1, "all metadata should correspond to a run" ); - - Self { - desc, - len, - parts, - run_splits, - run_meta, - } } /// Construct a batch of a single run with default metadata. Mostly interesting for tests. pub(crate) fn new_run(desc: Description, parts: Vec>, len: usize) -> Self { - let run_meta = if parts.is_empty() { - vec![] - } else { - vec![RunMeta::default()] - }; - Self { - desc, - len, - parts, - run_splits: vec![], - run_meta, - } - } - - #[cfg(test)] - pub(crate) fn new_run_for_test( - desc: Description, - parts: Vec>, - len: usize, - run_id: RunId, - ) -> Self { - let run_meta = if parts.is_empty() { - vec![] - } else { - let mut meta = RunMeta::default(); - meta.id = Some(run_id); - vec![meta] - }; - Self { - desc, - len, - parts, - run_splits: vec![], - run_meta, - } + let mut out = Self::empty(desc); + out.push_run(RunMeta::default(), parts); + out.len = len; + out } /// An empty hollow batch, representing no updates over the given desc. @@ -1099,6 +1055,35 @@ impl HollowBatch { } } + /// Push a run onto the given batch. + /// + /// NB: This does not currently increment the batch len, since we don't currently track the update + /// count per run. Callers will need to explicitly update the batch len when appending runs. + pub(crate) fn push_run(&mut self, meta: RunMeta, parts: impl IntoIterator>) { + let Self { + desc: _, + len: _, + parts: self_parts, + run_splits, + run_meta, + } = self; + + let old_part_count = self_parts.len(); + self_parts.extend(parts); + let new_part_count = self_parts.len(); + if old_part_count == new_part_count { + // This was a zero-part run; return. + return; + } + if old_part_count != 0 { + run_splits.push(old_part_count); + } + run_meta.push(meta); + // This call means that pushing runs one-at-a-time _in debug mode_ is quadratic + // in the number of runs. If this is too slow even in tests, we can remove the check. + self.check_invariants(); + } + pub(crate) fn runs(&self) -> impl Iterator])> { let run_ends = self .run_splits @@ -2881,25 +2866,15 @@ pub(crate) mod tests { }; let since = Antichain::from_elem(since); - let run_splits = (1..num_runs) - .map(|i| i * parts.len() / num_runs) - .collect::>(); - - let run_meta = (0..num_runs) - .map(|_| { - let mut meta = RunMeta::default(); - meta.id = Some(RunId::new()); - meta - }) - .collect::>(); - - HollowBatch::new( - Description::new(lower, upper, since), - parts, - len % 10, - run_meta, - run_splits, - ) + let mut batch = HollowBatch::empty(Description::new(lower, upper, since)); + let run_splits = (0..=num_runs).map(|i| i * parts.len() / num_runs); + for (lo, hi) in run_splits.tuple_windows() { + let mut meta = RunMeta::default(); + meta.id = Some(RunId::new()); + batch.push_run(meta, parts[lo..hi].iter().cloned()); + } + batch.len += len % 10; + batch }) } @@ -2921,35 +2896,22 @@ pub(crate) mod tests { (Antichain::from_elem(t1), Antichain::from_elem(t0)) }; let since = Antichain::from_elem(since); + let mut batch = HollowBatch::empty(Description::new(lower, upper, since)); if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() { - let run_splits = (1..num_runs) - .map(|i| i * parts.len() / num_runs) - .collect::>(); - - let run_meta = (0..num_runs) - .enumerate() - .map(|(i, _)| { - let mut meta = RunMeta::default(); - meta.id = Some(run_ids[i]); - meta - }) - .collect::>(); - - HollowBatch::new( - Description::new(lower, upper, since), - parts, - len % 10, - run_meta, - run_splits, - ) + let run_splits = (0..=num_runs).map(|i| i * parts.len() / num_runs); + + for (i, (lo, hi)) in run_splits.tuple_windows().enumerate() { + let mut meta = RunMeta::default(); + meta.id = Some(run_ids[i]); + batch.push_run(meta, parts[lo..hi].iter().cloned()); + } } else { - HollowBatch::new_run_for_test( - Description::new(lower, upper, since), - parts, - len % 10, - run_ids[0], - ) + let mut meta = RunMeta::default(); + meta.id = Some(run_ids[0]); + batch.push_run(meta, parts); } + batch.len += len % 10; + batch }, ) } diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 20d47847d142b..ff559732850ba 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -1027,50 +1027,21 @@ impl SpineBatch { return Err(ApplyMergeResult::NotAppliedNoMatch); } - let runs: Vec<_> = original + let mut result = HollowBatch::empty(replacement.desc.clone()); + + let runs = original .runs() .filter(|(meta, _)| { !run_ids.contains(&meta.id.expect("id should be present at this point")) }) - .chain(replacement.runs()) - .collect(); - - let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::(); - - let run_meta = runs - .iter() - .map(|(meta, _)| *meta) - .cloned() - .collect::>(); - - let parts = runs - .iter() - .flat_map(|(_, parts)| *parts) - .cloned() - .collect::>(); + .chain(replacement.runs()); - let run_splits = { - let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1)); - let mut pointer = 0; - for (i, (_, parts)) in runs.into_iter().enumerate() { - if parts.is_empty() { - continue; - } - if i < run_meta.len() - 1 { - splits.push(pointer + parts.len()); - } - pointer += parts.len(); - } - splits - }; + for (meta, parts) in runs { + result.push_run(meta.clone(), parts.iter().cloned()); + result.len += meta.len.unwrap_or(0); + } - Ok(HollowBatch::new( - replacement.desc.clone(), - parts, - len, - run_meta, - run_splits, - )) + Ok(result) } fn maybe_replace_checked( diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 61ab7ba8a6e8a..de3499b86773f 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -606,8 +606,7 @@ where let any_batch_rewrite = batches .iter() .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some())); - let (mut parts, mut num_updates, mut run_splits, mut run_metas) = - (vec![], 0, vec![], vec![]); + let mut combined_batch = HollowBatch::empty(desc.clone()); let mut key_storage = None; let mut val_storage = None; for batch in batches.iter() { @@ -618,7 +617,7 @@ where validate_part_bounds_on_write, )?; for (run_meta, run) in batch.batch.runs() { - let start_index = parts.len(); + let mut run_parts = Vec::with_capacity(run.len()); for part in run { if let ( RunPart::Single( @@ -669,23 +668,12 @@ where .expect("re-encoding just-decoded data"); } } else { - parts.push(part.clone()) + run_parts.push(part.clone()) } } - - let end_index = parts.len(); - - if start_index == end_index { - continue; - } - - // Mark the boundary if this is not the first run in the batch. - if start_index != 0 { - run_splits.push(start_index); - } - run_metas.push(run_meta.clone()); + combined_batch.push_run(run_meta.clone(), run_parts); } - num_updates += batch.batch.len; + combined_batch.len += batch.batch.len; } let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() { @@ -709,19 +697,11 @@ where if let Some(batch) = &flushed_inline_batch { for (run_meta, run) in batch.batch.runs() { - assert!(run.len() > 0); - let start_index = parts.len(); - if start_index != 0 { - run_splits.push(start_index); - } - run_metas.push(run_meta.clone()); - parts.extend(run.iter().cloned()) + combined_batch.push_run(run_meta.clone(), run.iter().cloned()); } + combined_batch.len += batch.batch.len; } - let mut combined_batch = - HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits); - // The batch may have been written by a writer without a registered schema. // Ensure we have a schema ID in the batch metadata before we append, to avoid type // confusion later.