From d16bab64f3baff5d5be49dd792fb83ea00c6548a Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Tue, 15 Jul 2025 14:37:48 -0400 Subject: [PATCH 1/3] Persist: plumb incremental apply APIs through Depends on https://github.com/MaterializeInc/materialize/pull/33044 --- src/persist-client/src/cli/admin.rs | 41 +- src/persist-client/src/internal/compact.rs | 431 ++++++++++++++++----- src/persist-client/src/internal/machine.rs | 150 +++++-- src/persist-client/src/internal/state.rs | 17 + src/persist-client/src/internal/trace.rs | 117 ++++-- 5 files changed, 561 insertions(+), 195 deletions(-) diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index 5afd9f2f6abfc..a7ae44f068dc8 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -40,7 +40,7 @@ use crate::internal::compact::{CompactConfig, CompactReq, Compactor}; use crate::internal::encoding::Schemas; use crate::internal::gc::{GarbageCollector, GcReq}; use crate::internal::machine::Machine; -use crate::internal::trace::{CompactionInput, FueledMergeRes}; +use crate::internal::trace::FueledMergeRes; use crate::rpc::{NoopPubSubSender, PubSubSender}; use crate::write::{WriteHandle, WriterId}; use crate::{ @@ -455,17 +455,17 @@ where let req = CompactReq { shard_id, desc: req.desc, - inputs: req - .inputs - .into_iter() - .map(|b| Arc::unwrap_or_clone(b.batch)) - .collect(), + inputs: req.inputs, }; - let parts = req.inputs.iter().map(|x| x.part_count()).sum::(); + let parts = req + .inputs + .iter() + .map(|x| x.batch.part_count()) + .sum::(); let bytes = req .inputs .iter() - .map(|x| x.encoded_size_bytes()) + .map(|x| x.batch.encoded_size_bytes()) .sum::(); let start = Instant::now(); info!( @@ -514,7 +514,7 @@ where let (apply_res, maintenance) = machine .merge_res(&FueledMergeRes { output: res.output, - input: CompactionInput::Legacy, + input: res.input, new_active_compaction: None, }) .await; @@ -749,14 +749,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown( let (reqs, mut maintenance) = machine.spine_exert(fuel).await; for req in reqs { info!( - "force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}", + "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}", machine.applier.shard_metrics.name, machine.applier.shard_metrics.shard_id, req.inputs.len(), - req.inputs.iter().flat_map(|x| &x.parts).count(), + req.inputs.iter().flat_map(|x| &x.batch.parts).count(), + req.inputs + .iter() + .map(|x| x.batch.runs().count()) + .sum::(), req.inputs .iter() - .flat_map(|x| &x.parts) + .flat_map(|x| &x.batch.parts) .map(|x| x.encoded_size_bytes()) .sum::(), req.desc.lower().elements(), @@ -801,13 +805,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown( // NB: This check is intentionally at the end so that it's safe to call // this method in a loop. - let num_batches = machine.applier.all_batches().len(); - if num_batches < 2 { + let num_runs: usize = machine + .applier + .all_batches() + .iter() + .map(|x| x.runs().count()) + .sum(); + if num_runs <= 1 { info!( - "force_compaction {} {} exiting with {} batches", + "force_compaction {} {} exiting with {} runs", machine.applier.shard_metrics.name, machine.applier.shard_metrics.shard_id, - num_batches + num_runs ); return; } diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 464599bea4244..273d55b650c3e 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0. use std::borrow::Cow; -use std::collections::VecDeque; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::pin; @@ -24,6 +24,7 @@ use futures_util::StreamExt; use mz_dyncfg::Config; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; +use mz_ore::now::NowFn; use mz_persist::location::Blob; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; @@ -47,9 +48,12 @@ use crate::internal::machine::Machine; use crate::internal::maintenance::RoutineMaintenance; use crate::internal::metrics::ShardMetrics; use crate::internal::state::{ - ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunMeta, RunOrder, RunPart, + ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunId, RunMeta, RunOrder, RunPart, +}; +use crate::internal::trace::{ + ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId, + id_range, }; -use crate::internal::trace::{ApplyMergeResult, CompactionInput, FueledMergeRes}; use crate::iter::{Consolidator, StructuredSort}; use crate::{Metrics, PersistConfig, ShardId}; @@ -66,7 +70,7 @@ pub struct CompactReq { pub desc: Description, /// The updates to include in the output batch. Any data in these outside of /// the output descriptions bounds should be ignored. - pub inputs: Vec>, + pub inputs: Vec>, } /// A response from compaction. @@ -74,8 +78,15 @@ pub struct CompactReq { pub struct CompactRes { /// The compacted batch. pub output: HollowBatch, + /// The runs that were compacted together to produce the output batch. + pub input: CompactionInput, } +/// A location in a spine, used to identify the Run that a batch belongs to. +/// If the Run is not known, the RunId is None. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RunLocation(pub SpineId, pub Option); + /// A snapshot of dynamic configs to make it easier to reason about an /// individual run of compaction. #[derive(Debug, Clone)] @@ -86,6 +97,7 @@ pub struct CompactConfig { pub(crate) version: semver::Version, pub(crate) batch: BatchBuilderConfig, pub(crate) fetch_config: FetchConfig, + pub(crate) now: NowFn, } impl CompactConfig { @@ -98,6 +110,7 @@ impl CompactConfig { version: value.build_version.clone(), batch: BatchBuilderConfig::new(value, shard_id), fetch_config: FetchConfig::from_persist_config(value), + now: value.now.clone(), } } } @@ -158,6 +171,13 @@ pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config = Config::new( which allows dynamically disabling compaction. If false, all compaction requests will be processed.", ); +/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s. +fn input_id_range(ids: BTreeSet) -> CompactionInput { + let id = id_range(ids); + + CompactionInput::IdRange(id) +} + impl Compactor where K: Debug + Codec, @@ -277,9 +297,13 @@ where // (especially in aggregate). This heuristic is something we'll need to // tune over time. let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg) - || req.inputs.iter().map(|x| x.part_count()).sum::() + || req + .inputs + .iter() + .map(|x| x.batch.part_count()) + .sum::() >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg) - || req.inputs.iter().map(|x| x.len).sum::() + || req.inputs.iter().map(|x| x.batch.len).sum::() >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg); if !should_compact { self.metrics.compaction.skipped.inc(); @@ -323,7 +347,7 @@ where let total_input_bytes = req .inputs .iter() - .map(|batch| batch.encoded_size_bytes()) + .map(|batch| batch.batch.encoded_size_bytes()) .sum::(); let timeout = Duration::max( // either our minimum timeout @@ -336,7 +360,7 @@ where let compaction_schema_id = req .inputs .iter() - .flat_map(|batch| batch.run_meta.iter()) + .flat_map(|batch| batch.batch.run_meta.iter()) .filter_map(|run_meta| run_meta.schema) // It's an invariant that SchemaIds are ordered. .max(); @@ -385,6 +409,22 @@ where isolated_runtime.spawn_named( || "persist::compact::consolidate", async move { + // If the batches we are compacting are written with old versions of persist, + // we may not have run UUIDs for them, meaning we don't have enough info to + // safely compact them incrementally. + let all_runs_have_uuids = req + .inputs + .iter() + .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some())); + let all_runs_have_len = req + .inputs + .iter() + .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some())); + + let incremental_enabled = ENABLE_INCREMENTAL_COMPACTION + .get(&machine_clone.applier.cfg) + && all_runs_have_uuids + && all_runs_have_len; let stream = Self::compact_stream( CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id()), Arc::clone(&machine_clone.applier.state_versions.blob), @@ -393,18 +433,32 @@ where Arc::clone(&machine_clone.isolated_runtime), req.clone(), compaction_schema, + incremental_enabled, ); - let res = Self::compact_all(stream, req.clone()).await?; - let maintenance = Self::apply( - FueledMergeRes { - output: res.output, - input: CompactionInput::Legacy, - new_active_compaction: None, - }, - &metrics_clone, - &machine_clone, - ) - .await?; + + let maintenance = if incremental_enabled { + let mut maintenance = RoutineMaintenance::default(); + pin_mut!(stream); + while let Some(res) = stream.next().await { + let res = res?; + let new_maintenance = + Self::apply(res, &metrics_clone, &machine_clone).await?; + maintenance.merge(new_maintenance); + } + maintenance + } else { + let res = Self::compact_all(stream, req.clone()).await?; + Self::apply( + FueledMergeRes { + output: res.output, + input: res.input, + new_active_compaction: None, + }, + &metrics_clone, + &machine_clone, + ) + .await? + }; Ok::<_, anyhow::Error>(maintenance) } @@ -443,7 +497,7 @@ where } pub async fn compact_all( - stream: impl Stream, anyhow::Error>>, + stream: impl Stream, anyhow::Error>>, req: CompactReq, ) -> Result, anyhow::Error> { pin_mut!(stream); @@ -454,7 +508,7 @@ where let mut len = 0; while let Some(res) = stream.next().await { - let res = res?; + let res = res?.output; let (parts, updates, run_meta, run_splits) = (res.parts, res.len, res.run_meta, res.run_splits); @@ -472,6 +526,9 @@ where len += updates; } + let batches = req.inputs.iter().map(|x| x.id).collect::>(); + let input = input_id_range(batches); + Ok(CompactRes { output: HollowBatch::new( req.desc.clone(), @@ -480,6 +537,7 @@ where all_run_meta, all_run_splits, ), + input, }) } @@ -558,7 +616,8 @@ where isolated_runtime: Arc, req: CompactReq, write_schemas: Schemas, - ) -> impl Stream, anyhow::Error>> { + incremental_enabled: bool, + ) -> impl Stream, anyhow::Error>> { async_stream::stream! { let () = Self::validate_req(&req)?; @@ -568,7 +627,7 @@ where // reintroduce it. let mut single_nonempty_batch = None; for batch in &req.inputs { - if batch.len > 0 { + if batch.batch.len > 0 { match single_nonempty_batch { None => single_nonempty_batch = Some(batch), Some(_previous_nonempty_batch) => { @@ -579,8 +638,8 @@ where } } if let Some(single_nonempty_batch) = single_nonempty_batch { - if single_nonempty_batch.run_splits.len() == 0 - && single_nonempty_batch.desc.since() != &Antichain::from_elem(T::minimum()) + if single_nonempty_batch.batch.run_splits.len() == 0 + && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum()) { metrics.compaction.fast_path_eligible.inc(); } @@ -597,9 +656,20 @@ where .saturating_sub(in_progress_part_reserved_memory_bytes); let ordered_runs = - Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?; + Self::flatten_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?; + + let chunked_runs = Self::chunk_runs( + &ordered_runs, + &cfg, + &*metrics, + run_reserved_memory_bytes, + ); + let total_chunked_runs = chunked_runs.len(); + + let mut applied = 0; + for (runs, run_chunk_max_memory_usage) in - Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes) + chunked_runs { metrics.compaction.chunks_compacted.inc(); metrics @@ -615,10 +685,59 @@ where / cfg.batch.blob_target_size; let mut run_cfg = cfg.clone(); run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts; + let descriptions = runs.iter() + .map(|(_, desc, _, _)| *desc) + .collect::>(); + + let input = if incremental_enabled { + let batch_ids = runs.iter() + .map(|(run_id, _, _, _)| run_id.0) + .collect::>(); + if let Some(batch_id) = batch_ids.first() && batch_ids.len() == 1 { + CompactionInput::PartialBatch( + batch_id.clone(), + runs.iter() + .map(|(run_id, _, _, _)| run_id.1.expect("run_id must be present")) + .collect::>(), + ) + } else { + input_id_range(batch_ids) + } + } else { + let runs = runs.iter() + .map(|(run_id, _, _, _)| run_id.0) + .collect::>(); + input_id_range(runs) + }; + + let desc = if incremental_enabled { + let desc_lower = descriptions + .iter() + .map(|desc| desc.lower()) + .cloned() + .reduce(|a, b| a.meet(&b)) + .unwrap_or_else(|| req.desc.lower().clone()); + + let desc_upper = descriptions + .iter() + .map(|desc| desc.upper()) + .cloned() + .reduce(|a, b| a.join(&b)) + .unwrap_or_else(|| req.desc.upper().clone()); + + Description::new(desc_lower, desc_upper, req.desc.since().clone()) + } else { + req.desc.clone() + }; + + let runs = runs.iter() + .map(|(_, desc, meta, run)| (*desc, *meta, *run)) + .collect::>(); + let batch = Self::compact_runs( &run_cfg, &req.shard_id, - &req.desc, + &desc, runs, Arc::clone(&blob), Arc::clone(&metrics), @@ -635,8 +754,27 @@ where batch.parts.len(), ); + // Set up active compaction metadata + let clock = cfg.now.clone(); + let active_compaction = if applied < total_chunked_runs - 1 { + Some(ActiveCompaction { start_ms: clock() }) + } else { + None + }; + + let res = CompactRes { + output: batch, + input, + }; - yield Ok(batch); + let res = FueledMergeRes { + output: res.output, + new_active_compaction: active_compaction, + input: res.input, + }; + + yield Ok(res); + applied += 1; } } } @@ -662,6 +800,7 @@ where Arc::clone(&isolated_runtime), req.clone(), write_schemas, + false, ); Self::compact_all(stream, req).await @@ -669,110 +808,164 @@ where /// Sorts and groups all runs from the inputs into chunks, each of which has been determined /// to consume no more than `run_reserved_memory_bytes` at a time, unless the input parts - /// were written with a different target size than this build. Uses [Self::order_runs] to - /// determine the order in which runs are selected. + /// were written with a different target size than this build. fn chunk_runs<'a>( - ordered_runs: &'a [(&'a Description, &'a RunMeta, Cow<'a, [RunPart]>)], + ordered_runs: &'a [( + RunLocation, + &'a Description, + &'a RunMeta, + Cow<'a, [RunPart]>, + )], cfg: &CompactConfig, metrics: &Metrics, run_reserved_memory_bytes: usize, ) -> Vec<( - Vec<(&'a Description, &'a RunMeta, &'a [RunPart])>, + Vec<( + &'a RunLocation, + &'a Description, + &'a RunMeta, + &'a [RunPart], + )>, usize, )> { - let mut ordered_runs = ordered_runs.into_iter().peekable(); - - let mut chunks = vec![]; - let mut current_chunk = vec![]; - let mut current_chunk_max_memory_usage = 0; - while let Some((desc, meta, run)) = ordered_runs.next() { - let run_greatest_part_size = run - .iter() - .map(|x| x.max_part_bytes()) - .max() - .unwrap_or(cfg.batch.blob_target_size); - current_chunk.push((*desc, *meta, &**run)); - current_chunk_max_memory_usage += run_greatest_part_size; - - if let Some((_next_desc, _next_meta, next_run)) = ordered_runs.peek() { - let next_run_greatest_part_size = next_run + // Group runs by SpineId + let grouped: BTreeMap> = ordered_runs + .iter() + .map(|(run_id, desc, meta, parts)| (run_id.0, (run_id, *desc, *meta, &**parts))) + .fold(BTreeMap::new(), |mut acc, item| { + acc.entry(item.0).or_default().push(item.1); + acc + }); + + let all_batches_have_one_run = grouped.values().all(|runs| runs.len() <= 1); + + let mut chunks = Vec::new(); + + // There are two cases to consider here: + // 1. All input batches have ≤1 run each, in which case we can compact runs + // from different batches together as memory allows. + // 2. Some input batches have >1 run, in which case we must compact runs only + // within each individual batch to maintain batch boundaries. + // In both cases, we should compact runs in the order they were written. + // This is all to make it easy to apply the compaction result incrementally. + // By ensuring we never write out batches that contain runs from separate input batches + // (except for when each input batch has exactly one run), we can easily slot the + // results in to the existing batches. The special case of a single run per input batch + // means that each batch is, by itself, fully "compact", and the result of compaction + // will cleanly replace the input batches in a grouped manner. + + if all_batches_have_one_run { + // All batches have ≤1 run — compact across batches + let mut current_chunk = Vec::new(); + let mut current_memory = 0; + + for (_spine_id, mut runs) in grouped.into_iter() { + let (run_id, desc, meta, parts) = runs.pop().unwrap(); + + let run_size = parts .iter() - .map(|x| x.max_part_bytes()) + .map(|p| p.max_part_bytes()) .max() .unwrap_or(cfg.batch.blob_target_size); - // if we can fit the next run in our chunk without going over our reserved memory, we should do so - if current_chunk_max_memory_usage + next_run_greatest_part_size - <= run_reserved_memory_bytes + if !current_chunk.is_empty() + && current_memory + run_size > run_reserved_memory_bytes { - continue; - } + if current_chunk.len() == 1 { + metrics.compaction.memory_violations.inc(); + current_chunk.push((run_id, desc, meta, &*parts)); + current_memory += run_size; + chunks.push((std::mem::take(&mut current_chunk), current_memory)); + current_memory = 0; + continue; + } - // NB: There's an edge case where we cannot fit at least 2 runs into a chunk - // with our reserved memory. This could happen if blobs were written with a - // larger target size than the current build. When this happens, we violate - // our memory requirement and force chunks to be at least length 2, so that we - // can be assured runs are merged and converge over time. - if current_chunk.len() == 1 { - // in the steady state we expect this counter to be 0, and would only - // anticipate it being temporarily nonzero if we changed target blob size - // or our memory requirement calculations - metrics.compaction.memory_violations.inc(); - continue; + chunks.push((std::mem::take(&mut current_chunk), current_memory)); + current_memory = 0; } + + current_chunk.push((run_id, desc, meta, &*parts)); + current_memory += run_size; } - chunks.push(( - std::mem::take(&mut current_chunk), - current_chunk_max_memory_usage, - )); - current_chunk_max_memory_usage = 0; + if !current_chunk.is_empty() { + chunks.push((current_chunk, current_memory)); + } + } else { + // Some batches have >1 run — compact only within each batch + for (_spine_id, runs) in grouped.into_iter() { + let mut current_chunk = Vec::new(); + let mut current_memory = 0; + + for (run_id, desc, meta, parts) in runs { + let run_size = parts + .iter() + .map(|p| p.max_part_bytes()) + .max() + .unwrap_or(cfg.batch.blob_target_size); + + if !current_chunk.is_empty() + && current_memory + run_size > run_reserved_memory_bytes + { + if current_chunk.len() == 1 { + metrics.compaction.memory_violations.inc(); + current_chunk.push((run_id, desc, meta, &*parts)); + current_memory += run_size; + chunks.push((std::mem::take(&mut current_chunk), current_memory)); + current_memory = 0; + continue; + } + + chunks.push((std::mem::take(&mut current_chunk), current_memory)); + current_memory = 0; + } + + current_chunk.push((run_id, desc, meta, &*parts)); + current_memory += run_size; + } + + if !current_chunk.is_empty() { + chunks.push((current_chunk, current_memory)); + } + } } chunks } - /// With bounded memory where we cannot compact all runs/parts together, the groupings - /// in which we select runs to compact together will affect how much we're able to - /// consolidate updates. - /// - /// This approach orders the input runs by cycling through each batch, selecting the - /// head element until all are consumed. It assumes that it is generally more effective - /// to prioritize compacting runs from different batches, rather than runs from within - /// a single batch. - /// - /// ex. - /// ```text - /// inputs output - /// b0 runs=[A, B] - /// b1 runs=[C] output=[A, C, D, B, E, F] - /// b2 runs=[D, E, F] - /// ``` - async fn order_runs<'a>( + /// Flattens the runs in the input batches into a single ordered list of runs. + async fn flatten_runs<'a>( req: &'a CompactReq, target_order: RunOrder, blob: &'a dyn Blob, metrics: &'a Metrics, - ) -> anyhow::Result, &'a RunMeta, Cow<'a, [RunPart]>)>> { + ) -> anyhow::Result< + Vec<( + RunLocation, + &'a Description, + &'a RunMeta, + Cow<'a, [RunPart]>, + )>, + > { let total_number_of_runs = req .inputs .iter() - .map(|x| x.run_splits.len() + 1) + .map(|x| x.batch.run_splits.len() + 1) .sum::(); - let mut batch_runs: VecDeque<_> = req + let batch_runs: Vec<_> = req .inputs .iter() - .map(|batch| (&batch.desc, batch.runs())) + .map(|x| (x.id, &x.batch.desc, x.batch.runs())) .collect(); let mut ordered_runs = Vec::with_capacity(total_number_of_runs); - - while let Some((desc, mut runs)) = batch_runs.pop_front() { - if let Some((meta, run)) = runs.next() { + for (spine_id, desc, runs) in batch_runs { + for (meta, run) in runs { + let run_id = RunLocation(spine_id, meta.id); let same_order = meta.order.unwrap_or(RunOrder::Codec) == target_order; if same_order { - ordered_runs.push((desc, meta, Cow::Borrowed(run))); + ordered_runs.push((run_id, desc, meta, Cow::Borrowed(run))); } else { // The downstream consolidation step will handle a long run that's not in // the desired order by splitting it up into many single-element runs. This preserves @@ -787,6 +980,7 @@ where let mut batch_parts = pin!(part.part_stream(req.shard_id, blob, metrics)); while let Some(part) = batch_parts.next().await { ordered_runs.push(( + run_id, desc, meta, Cow::Owned(vec![RunPart::Single(part?.into_owned())]), @@ -794,7 +988,6 @@ where } } } - batch_runs.push_back((desc, runs)); } } @@ -942,21 +1135,21 @@ where fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> { let mut frontier = req.desc.lower(); for input in req.inputs.iter() { - if PartialOrder::less_than(req.desc.since(), input.desc.since()) { + if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) { return Err(anyhow!( "output since {:?} must be at or in advance of input since {:?}", req.desc.since(), - input.desc.since() + input.batch.desc.since() )); } - if frontier != input.desc.lower() { + if frontier != input.batch.desc.lower() { return Err(anyhow!( "invalid merge of non-consecutive batches {:?} vs {:?}", frontier, - input.desc.lower() + input.batch.desc.lower() )); } - frontier = input.desc.upper(); + frontier = input.batch.desc.upper(); } if frontier != req.desc.upper() { return Err(anyhow!( @@ -1005,6 +1198,7 @@ mod tests { use crate::PersistLocation; use crate::batch::BLOB_TARGET_SIZE; + use crate::internal::trace::SpineId; use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache}; use super::*; @@ -1045,7 +1239,16 @@ mod tests { b1.desc.upper().clone(), Antichain::from_elem(10u64), ), - inputs: vec![b0, b1], + inputs: vec![ + IdHollowBatch { + batch: Arc::new(b0), + id: SpineId(0, 1), + }, + IdHollowBatch { + batch: Arc::new(b1), + id: SpineId(1, 2), + }, + ], }; let schemas = Schemas { id: None, @@ -1112,7 +1315,16 @@ mod tests { b1.desc.upper().clone(), Antichain::from_elem(10u64), ), - inputs: vec![b0, b1], + inputs: vec![ + IdHollowBatch { + batch: Arc::new(b0), + id: SpineId(0, 1), + }, + IdHollowBatch { + batch: Arc::new(b1), + id: SpineId(1, 2), + }, + ], }; write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1); let compactor = write.compact.as_ref().expect("compaction hard disabled"); @@ -1157,7 +1369,16 @@ mod tests { b3.desc.upper().clone(), Antichain::from_elem(20u64), ), - inputs: vec![b2, b3], + inputs: vec![ + IdHollowBatch { + batch: Arc::new(b2), + id: SpineId(0, 1), + }, + IdHollowBatch { + batch: Arc::new(b3), + id: SpineId(1, 2), + }, + ], }; let compactor = write.compact.as_ref().expect("compaction hard disabled"); diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 52ac6f914f52d..748e220ef7780 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -47,9 +47,9 @@ use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; use crate::internal::state::{ - CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup, - IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections, - Upper, + CompareAndAppendBreak, CriticalReaderState, ENABLE_INCREMENTAL_COMPACTION, HandleDebugState, + HollowBatch, HollowRollup, IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, + SnapshotErr, StateCollections, Upper, }; use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; @@ -273,11 +273,7 @@ where .map(|req| CompactReq { shard_id: self.shard_id(), desc: req.desc, - inputs: req - .inputs - .into_iter() - .map(|b| Arc::unwrap_or_clone(b.batch)) - .collect(), + inputs: req.inputs, }) .collect(); (reqs, maintenance) @@ -495,11 +491,7 @@ where let req = CompactReq { shard_id: self.shard_id(), desc: req.desc, - inputs: req - .inputs - .into_iter() - .map(|b| Arc::unwrap_or_clone(b.batch)) - .collect(), + inputs: req.inputs, }; compact_reqs.push(req); } @@ -1095,6 +1087,7 @@ where res: &FueledMergeRes, ) -> (ApplyMergeResult, RoutineMaintenance) { let metrics = Arc::clone(&self.applier.metrics); + let use_incremental_compaction = ENABLE_INCREMENTAL_COMPACTION.get(&self.applier.cfg); // SUBTLE! If Machine::merge_res returns false, the blobs referenced in // compaction output are deleted so we don't leak them. Naively passing @@ -1127,7 +1120,11 @@ where let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch; let (_seqno, _apply_merge_result, maintenance) = self .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| { - let ret = state.apply_merge_res::(res, &Arc::clone(&metrics).columnar); + let ret = if use_incremental_compaction { + state.apply_merge_res::(res, &Arc::clone(&metrics).columnar) + } else { + state.apply_merge_res_classic::(res, &Arc::clone(&metrics).columnar) + }; if let Continue(result) = ret { // record if we've ever applied the merge if result.applied() { @@ -1413,7 +1410,7 @@ where #[cfg(test)] pub mod datadriven { - use std::collections::BTreeMap; + use std::collections::{BTreeMap, BTreeSet}; use std::pin::pin; use std::sync::{Arc, LazyLock}; @@ -1438,7 +1435,7 @@ pub mod datadriven { use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey}; use crate::internal::state::{BatchPart, RunOrder, RunPart}; use crate::internal::state_versions::EncodedRollup; - use crate::internal::trace::CompactionInput; + use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId}; use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION}; use crate::rpc::NoopPubSubSender; use crate::tests::new_test_client; @@ -1461,10 +1458,12 @@ pub mod datadriven { pub state_versions: Arc, pub machine: Machine, pub gc: GarbageCollector, - pub batches: BTreeMap>, + pub batches: BTreeMap>, + pub next_id: usize, pub rollups: BTreeMap, pub listens: BTreeMap>, pub routine: Vec, + pub compactions: BTreeMap>, } impl MachineState { @@ -1509,6 +1508,8 @@ pub mod datadriven { rollups: BTreeMap::default(), listens: BTreeMap::default(), routine: Vec::new(), + compactions: BTreeMap::default(), + next_id: 0, } } @@ -1559,7 +1560,7 @@ pub mod datadriven { datadriven .batches .iter() - .find(|(_, original_batch)| original_batch.parts == b.parts) + .find(|(_, original_batch)| original_batch.batch.parts == b.parts) .map(|(batch_name, _)| batch_name.to_owned()) }) .collect(); @@ -1821,21 +1822,32 @@ pub mod datadriven { .await; } let batch = batch.into_hollow_batch(); + let batch = IdHollowBatch { + batch: Arc::new(batch), + id: SpineId(datadriven.next_id, datadriven.next_id + 1), + }; + datadriven.next_id += 1; if let Some(size) = parts_size_override { let mut batch = batch.clone(); - for part in batch.parts.iter_mut() { + let mut hollow_batch = (*batch.batch).clone(); + for part in hollow_batch.parts.iter_mut() { match part { RunPart::Many(run) => run.max_part_bytes = size, RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size, RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"), } } + batch.batch = Arc::new(hollow_batch); datadriven.batches.insert(output.to_owned(), batch); } else { datadriven.batches.insert(output.to_owned(), batch.clone()); } - Ok(format!("parts={} len={}\n", batch.part_count(), batch.len)) + Ok(format!( + "parts={} len={}\n", + batch.batch.part_count(), + batch.batch.len + )) } pub async fn fetch_batch( @@ -1849,6 +1861,7 @@ pub mod datadriven { let mut s = String::new(); let mut stream = pin!( batch + .batch .part_stream( datadriven.shard_id, &*datadriven.state_versions.blob, @@ -1901,7 +1914,7 @@ pub mod datadriven { datadriven.client.metrics.as_ref(), datadriven.machine.applier.shard_metrics.as_ref(), &datadriven.client.metrics.read.batch_fetcher, - &batch.desc, + &batch.batch.desc, part, ) .await @@ -1918,10 +1931,11 @@ pub mod datadriven { } } if !s.is_empty() { - for (idx, (_meta, run)) in batch.runs().enumerate() { + for (idx, (_meta, run)) in batch.batch.runs().enumerate() { write!(s, "\n"); for part in run { let part_idx = batch + .batch .parts .iter() .position(|p| p == part) @@ -1943,16 +1957,27 @@ pub mod datadriven { let lower = args.expect_antichain("lower"); let upper = args.expect_antichain("upper"); - let mut batch = datadriven + let batch = datadriven .batches .get(input) .expect("unknown batch") .clone(); - let truncated_desc = Description::new(lower, upper, batch.desc.since().clone()); - let () = validate_truncate_batch(&batch, &truncated_desc, false, true)?; - batch.desc = truncated_desc; - datadriven.batches.insert(output.to_owned(), batch.clone()); - Ok(format!("parts={} len={}\n", batch.part_count(), batch.len)) + let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone()); + let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?; + let mut new_hollow_batch = (*batch.batch).clone(); + new_hollow_batch.desc = truncated_desc; + let new_batch = IdHollowBatch { + batch: Arc::new(new_hollow_batch), + id: batch.id, + }; + datadriven + .batches + .insert(output.to_owned(), new_batch.clone()); + Ok(format!( + "parts={} len={}\n", + batch.batch.part_count(), + batch.batch.len + )) } #[allow(clippy::unused_async)] @@ -1963,7 +1988,8 @@ pub mod datadriven { let input = args.expect_str("input"); let size = args.expect("size"); let batch = datadriven.batches.get_mut(input).expect("unknown batch"); - for part in batch.parts.iter_mut() { + let mut hollow_batch = (*batch.batch).clone(); + for part in hollow_batch.parts.iter_mut() { match part { RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size, _ => { @@ -1971,6 +1997,7 @@ pub mod datadriven { } } } + batch.batch = Arc::new(hollow_batch); Ok("ok\n".to_string()) } @@ -2006,8 +2033,22 @@ pub mod datadriven { let req = CompactReq { shard_id: datadriven.shard_id, desc: Description::new(lower, upper, since), - inputs, + inputs: inputs.clone(), }; + datadriven + .compactions + .insert(output.to_owned(), req.clone()); + let spine_lower = inputs + .first() + .map_or_else(|| datadriven.next_id, |x| x.id.0); + let spine_upper = inputs.last().map_or_else( + || { + datadriven.next_id += 1; + datadriven.next_id + }, + |x| x.id.1, + ); + let new_spine_id = SpineId(spine_lower, spine_upper); let res = Compactor::::compact( CompactConfig::new(&cfg, datadriven.shard_id), Arc::clone(&datadriven.client.blob), @@ -2019,9 +2060,12 @@ pub mod datadriven { ) .await?; - datadriven - .batches - .insert(output.to_owned(), res.output.clone()); + let batch = IdHollowBatch { + batch: Arc::new(res.output.clone()), + id: new_spine_id, + }; + + datadriven.batches.insert(output.to_owned(), batch.clone()); Ok(format!( "parts={} len={}\n", res.output.part_count(), @@ -2076,9 +2120,11 @@ pub mod datadriven { let upper = args.expect_antichain("upper"); let batch = datadriven.batches.get_mut(input).expect("unknown batch"); - let () = batch + let mut hollow_batch = (*batch.batch).clone(); + let () = hollow_batch .rewrite_ts(&ts_rewrite, upper) .map_err(|err| anyhow!("invalid rewrite: {}", err))?; + batch.batch = Arc::new(hollow_batch); Ok("ok\n".into()) } @@ -2328,10 +2374,12 @@ pub mod datadriven { .expect("missing batches") .into_iter() .map(|batch| { - let hollow = datadriven + let hollow = (*datadriven .batches .get(batch) .expect("unknown batch") + .clone() + .batch) .clone(); datadriven.to_batch(hollow) }) @@ -2414,7 +2462,7 @@ pub mod datadriven { let res = datadriven .machine .compare_and_append_idempotent( - &batch, + &batch.batch, &writer_id, now, &token, @@ -2428,7 +2476,8 @@ pub mod datadriven { return Err(anyhow!("{:?}", Upper(upper))); } CompareAndAppendRes::InlineBackpressure => { - let mut b = datadriven.to_batch(batch.clone()); + let hollow_batch = (*batch.batch).clone(); + let mut b = datadriven.to_batch(hollow_batch); let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id); b.flush_to_blob( &cfg, @@ -2437,7 +2486,7 @@ pub mod datadriven { &*SCHEMAS, ) .await; - batch = b.into_hollow_batch(); + batch.batch = Arc::new(b.into_hollow_batch()); continue; } _ => panic!("{:?}", res), @@ -2463,11 +2512,32 @@ pub mod datadriven { .get(input) .expect("unknown batch") .clone(); + let compact_req = datadriven + .compactions + .get(input) + .expect("unknown compact req") + .clone(); + let input_batches = compact_req + .inputs + .iter() + .map(|x| x.id) + .collect::>(); + let lower_spine_bound = input_batches + .first() + .map(|id| id.0) + .expect("at least one batch must be present"); + let upper_spine_bound = input_batches + .last() + .map(|id| id.1) + .expect("at least one batch must be present"); + let id = SpineId(lower_spine_bound, upper_spine_bound); + let hollow_batch = (*batch.batch).clone(); + let (merge_res, maintenance) = datadriven .machine .merge_res(&FueledMergeRes { - output: batch, - input: CompactionInput::Legacy, + output: hollow_batch, + input: CompactionInput::IdRange(id), new_active_compaction: None, }) .await; diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 9ee32eab870da..6dd16fb81fd9e 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1833,6 +1833,23 @@ where return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch)); } + let apply_merge_result = self.trace.apply_merge_res_checked::(res, metrics); + Continue(apply_merge_result) + } + + pub fn apply_merge_res_classic( + &mut self, + res: &FueledMergeRes, + metrics: &ColumnarMetrics, + ) -> ControlFlow, ApplyMergeResult> { + // We expire all writers if the upper and since both advance to the + // empty antichain. Gracefully handle this. At the same time, + // short-circuit the cmd application so we don't needlessly create new + // SeqNos. + if self.is_tombstone() { + return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch)); + } + let apply_merge_result = self .trace .apply_merge_res_checked_classic::(res, metrics); diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 09be0c7586eec..e0219f603db87 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -361,7 +361,7 @@ impl Trace { batch = Arc::new(HollowBatch::empty(Description::new( batch.desc.lower().clone(), new_upper, - expected_desc.since().clone(), + batch.desc.since().clone(), ))) } @@ -668,7 +668,6 @@ impl Trace { ApplyMergeResult::NotAppliedNoMatch } - #[allow(dead_code)] pub fn apply_merge_res_checked( &mut self, res: &FueledMergeRes, @@ -719,10 +718,8 @@ pub enum CompactionInput { /// unchecked legacy replacements. Legacy, /// This compaction output is a total replacement for all batches in this id range. - #[allow(dead_code)] IdRange(SpineId), /// This compaction output replaces the specified runs in this id range. - #[allow(dead_code)] PartialBatch(SpineId, BTreeSet), } @@ -739,6 +736,20 @@ impl Serialize for SpineId { } } +/// Creates a `SpineId` that covers the range of ids in the set. +pub fn id_range(ids: BTreeSet) -> SpineId { + let lower_spine_bound = ids + .first() + .map(|id| id.0) + .expect("at least one batch must be present"); + let upper_spine_bound = ids + .last() + .map(|id| id.1) + .expect("at least one batch must be present"); + + SpineId(lower_spine_bound, upper_spine_bound) +} + impl SpineId { fn covers(self, other: SpineId) -> bool { self.0 <= other.0 && other.1 <= self.1 @@ -823,9 +834,17 @@ impl SpineBatch { } pub fn is_compact(&self) -> bool { - // This definition is extremely likely to change, but for now, we consider a batch - // "compact" if it has at most one hollow batch with at most one run. - self.parts.len() <= 1 && self.parts.iter().all(|p| p.batch.run_splits.is_empty()) + // A compact batch has at most one run. + // This check used to be if there was at most one hollow batch with at most one run, + // but that was a bit too strict since introducing incremental compaction. + // Incremental compaction can result in a batch with a single run, but multiple empty + // hollow batches, which we still consider compact. As levels are merged, we + // will eventually clean up the empty hollow batches. + self.parts + .iter() + .map(|p| p.batch.run_meta.len()) + .sum::() + <= 1 } pub fn is_merging(&self) -> bool { @@ -949,7 +968,6 @@ impl SpineBatch { .flatten() } - #[allow(dead_code)] fn diffs_sum_for_runs( batch: &HollowBatch, run_ids: &[RunId], @@ -989,7 +1007,6 @@ impl SpineBatch { } } - #[allow(dead_code)] fn construct_batch_with_runs_replaced( original: &HollowBatch, run_ids: &[RunId], @@ -1028,7 +1045,7 @@ impl SpineBatch { .collect::>(); let run_splits = { - let mut splits = Vec::with_capacity(run_meta.len() - 1); + let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1)); let mut pointer = 0; for (i, (_, parts)) in runs.into_iter().enumerate() { if parts.is_empty() { @@ -1104,9 +1121,32 @@ impl SpineBatch { .parts .iter() .enumerate() - .filter_map(|(i, p)| if id.covers(p.id) { Some(i) } else { None }) + .filter_map(|(i, p)| { + if id.covers(p.id) { + Some((i, p.id)) + } else { + None + } + }) .collect::>(); + let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect(); + + // If ids is empty, it means that we didn't find any parts that match the id range. + // We also check that the id matches the range of ids we found. + // At scale, sometimes regular compaction will race forced compaction, + // for things like the catalog. In that case, we may have a + // a replacement that no longer lines up with the spine batches. + // I think this is because forced compaction ignores the active_compaction + // and just goes for it. This is slightly annoying but probably the right behavior + // for a functions whose prefix is `force_`, so we just return + // NotAppliedNoMatch here. + if ids.is_empty() || id != &id_range(ids) { + return ApplyMergeResult::NotAppliedNoMatch; + } + + let range: BTreeSet<_> = range.iter().map(|(i, _)| *i).collect(); + // This is the range of hollow batches that we will replace. let min = *range.iter().min().unwrap(); let max = *range.iter().max().unwrap(); @@ -1121,13 +1161,11 @@ impl SpineBatch { metrics, ); - self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum); + self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement"); - let parts = &self.parts[replacement_range.clone()]; - let id = SpineId(parts.first().unwrap().id.0, parts.last().unwrap().id.1); self.perform_subset_replacement( &res.output, - id, + *id, replacement_range, res.new_active_compaction.clone(), ) @@ -1160,12 +1198,16 @@ impl SpineBatch { let old_batch_diff_sum = Self::diffs_sum::(batch.parts.iter(), metrics); let old_diffs_sum = Self::diffs_sum_for_runs::(batch, &run_ids, metrics); - self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum); + self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "partial batch replacement"); match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) { Ok(new_batch) => { let new_batch_diff_sum = Self::diffs_sum::(new_batch.parts.iter(), metrics); - self.validate_diffs_sum_match(old_batch_diff_sum, new_batch_diff_sum); + self.validate_diffs_sum_match( + old_batch_diff_sum, + new_batch_diff_sum, + "sanity checking diffs sum for replaced runs", + ); self.perform_subset_replacement( &new_batch, id, @@ -1177,25 +1219,32 @@ impl SpineBatch { } } - fn validate_diffs_sum_match(&self, old_diffs_sum: Option, new_diffs_sum: Option) - where + fn validate_diffs_sum_match( + &self, + old_diffs_sum: Option, + new_diffs_sum: Option, + context: &str, + ) where D: Semigroup + Codec64 + PartialEq + Debug, { - if new_diffs_sum.is_none() { - assert!( - old_diffs_sum.is_none(), - "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})", - new_diffs_sum, - old_diffs_sum - ); - } - if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) { - assert_eq!( - old_diffs_sum, new_diffs_sum, - "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})", - new_diffs_sum, old_diffs_sum - ); - } + match (new_diffs_sum, old_diffs_sum) { + (None, Some(old)) => { + if !D::is_zero(&old) { + panic!( + "merge res diffs sum is None, but spine batch diffs sum ({:?}) is not zero ({})", + old, context + ); + } + } + (Some(new_diffs_sum), Some(old_diffs_sum)) => { + assert_eq!( + old_diffs_sum, new_diffs_sum, + "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})", + new_diffs_sum, old_diffs_sum, context + ); + } + _ => {} + }; } /// This is the "legacy" way of replacing a spine batch with a merge result. From 83ea2101f0eda3648e77e4232079d1ebbc7ee7d4 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Wed, 6 Aug 2025 22:34:18 -0400 Subject: [PATCH 2/3] Cleanup some grossly nested code --- src/persist-client/src/internal/compact.rs | 34 ++++++++++------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 273d55b650c3e..7623301840836 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -21,6 +21,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use futures::{Stream, pin_mut}; use futures_util::StreamExt; +use itertools::Itertools; use mz_dyncfg::Config; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; @@ -685,29 +686,26 @@ where / cfg.batch.blob_target_size; let mut run_cfg = cfg.clone(); run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts; - let descriptions = runs.iter() - .map(|(_, desc, _, _)| *desc) - .collect::>(); + + let (batch_ids, descriptions): (BTreeSet<_>, Vec<_>) = runs.iter() + .map(|(run_id, desc, _, _)| (run_id.0, *desc)) + .unzip(); let input = if incremental_enabled { - let batch_ids = runs.iter() - .map(|(run_id, _, _, _)| run_id.0) + let run_ids = runs.iter() + .map(|(run_id, _, _, _)| run_id.1.expect("run_id should be present")) .collect::>(); - if let Some(batch_id) = batch_ids.first() && batch_ids.len() == 1 { - CompactionInput::PartialBatch( - batch_id.clone(), - runs.iter() - .map(|(run_id, _, _, _)| run_id.1.expect("run_id must be present")) - .collect::>(), - ) - } else { - input_id_range(batch_ids) + match batch_ids.iter().exactly_one().ok() { + Some(batch_id) => { + CompactionInput::PartialBatch( + *batch_id, + run_ids + ) + } + None => input_id_range(batch_ids), } } else { - let runs = runs.iter() - .map(|(run_id, _, _, _)| run_id.0) - .collect::>(); - input_id_range(runs) + input_id_range(batch_ids) }; let desc = if incremental_enabled { From f64d01dcba7d3816d295079a4b938d67c642e1c3 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Thu, 7 Aug 2025 15:34:29 -0400 Subject: [PATCH 3/3] Add specific fetch handling --- src/persist-client/src/fetch.rs | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 4e030bc0416b7..41465712d81c5 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -1255,8 +1255,11 @@ where // truncated bounds must be ignored. Not every user batch is // truncated. // - Batches written by compaction. These always have an inline desc - // that exactly matches the one they are registered with. The since - // can be anything. + // lower and upper that matches the registered desc lower and upper, + // and a since that is less than or equal to the registered desc. + // The inline since may be less than the registered desc since, + // this is because of incremental compaction, where we might rewrite + // certain runs in a batch but not others. let inline_desc = &parsed.desc; let needs_truncation = inline_desc.lower() != registered_desc.lower() || inline_desc.upper() != registered_desc.upper(); @@ -1297,10 +1300,28 @@ where registered_desc ); } else { + assert!( + PartialOrder::less_equal(inline_desc.since(), registered_desc.since()), + "key={} inline={:?} registered={:?}", + printable_name, + inline_desc, + registered_desc + ); + assert_eq!( + inline_desc.lower(), + registered_desc.lower(), + "key={} inline={:?} registered={:?}", + printable_name, + inline_desc, + registered_desc + ); assert_eq!( - inline_desc, ®istered_desc, + inline_desc.upper(), + registered_desc.upper(), "key={} inline={:?} registered={:?}", - printable_name, inline_desc, registered_desc + printable_name, + inline_desc, + registered_desc ); }