From aee2ad8562a9fae27e375ab60a7c0b402f6a96f7 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Tue, 15 Jul 2025 10:10:32 -0400 Subject: [PATCH] Persist: Add new API for run based spine replacement --- src/persist-client/src/cli/admin.rs | 8 +- src/persist-client/src/internal/compact.rs | 8 +- src/persist-client/src/internal/machine.rs | 7 +- src/persist-client/src/internal/state.rs | 106 +- src/persist-client/src/internal/state_diff.rs | 13 +- .../src/internal/state_serde.json | 1401 +++++++++++++---- src/persist-client/src/internal/trace.rs | 380 ++++- 7 files changed, 1553 insertions(+), 370 deletions(-) diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index 87868133e44a1..70e2f905b2999 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -40,7 +40,7 @@ use crate::internal::compact::{CompactConfig, CompactReq, Compactor}; use crate::internal::encoding::Schemas; use crate::internal::gc::{GarbageCollector, GcReq}; use crate::internal::machine::Machine; -use crate::internal::trace::FueledMergeRes; +use crate::internal::trace::{CompactionInput, FueledMergeRes}; use crate::rpc::{NoopPubSubSender, PubSubSender}; use crate::write::{WriteHandle, WriterId}; use crate::{ @@ -509,7 +509,11 @@ where start.elapsed(), ); let (apply_res, maintenance) = machine - .merge_res(&FueledMergeRes { output: res.output }) + .merge_res(&FueledMergeRes { + output: res.output, + input: CompactionInput::Legacy, + new_active_compaction: None, + }) .await; if !maintenance.is_empty() { info!("ignoring non-empty requested maintenance: {maintenance:?}") diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 1e41926d33879..7b432afb18bfb 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -49,7 +49,7 @@ use crate::internal::metrics::ShardMetrics; use crate::internal::state::{ ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunMeta, RunOrder, RunPart, }; -use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; +use crate::internal::trace::{ApplyMergeResult, CompactionInput, FueledMergeRes}; use crate::iter::{Consolidator, StructuredSort}; use crate::{Metrics, PersistConfig, ShardId}; @@ -396,7 +396,11 @@ where ); let res = Self::compact_all(stream, req.clone()).await?; let maintenance = Self::apply( - FueledMergeRes { output: res.output }, + FueledMergeRes { + output: res.output, + input: CompactionInput::Legacy, + new_active_compaction: None, + }, &metrics_clone, &machine_clone, ) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 7231de7b3e674..52ac6f914f52d 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -1438,6 +1438,7 @@ pub mod datadriven { use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey}; use crate::internal::state::{BatchPart, RunOrder, RunPart}; use crate::internal::state_versions::EncodedRollup; + use crate::internal::trace::CompactionInput; use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION}; use crate::rpc::NoopPubSubSender; use crate::tests::new_test_client; @@ -2464,7 +2465,11 @@ pub mod datadriven { .clone(); let (merge_res, maintenance) = datadriven .machine - .merge_res(&FueledMergeRes { output: batch }) + .merge_res(&FueledMergeRes { + output: batch, + input: CompactionInput::Legacy, + new_active_compaction: None, + }) .await; datadriven.routine.push(maintenance); Ok(format!( diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 31d8a66dbfac6..bffa0b3f168e0 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1053,6 +1053,29 @@ impl HollowBatch { } } + #[cfg(test)] + pub(crate) fn new_run_for_test( + desc: Description, + parts: Vec>, + len: usize, + run_id: RunId, + ) -> Self { + let run_meta = if parts.is_empty() { + vec![] + } else { + let mut meta = RunMeta::default(); + meta.id = Some(run_id); + vec![meta] + }; + Self { + desc, + len, + parts, + run_splits: vec![], + run_meta, + } + } + /// An empty hollow batch, representing no updates over the given desc. pub(crate) fn empty(desc: Description) -> Self { Self { @@ -1810,7 +1833,9 @@ where return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch)); } - let apply_merge_result = self.trace.apply_merge_res_checked::(res, metrics); + let apply_merge_result = self + .trace + .apply_merge_res_checked_classic::(res, metrics); Continue(apply_merge_result) } @@ -2162,10 +2187,7 @@ where // We have a nonempty batch: replace it with an empty batch and return. // This should not produce an excessively large diff: if it did, we wouldn't have been // able to append that batch in the first place. - let fake_merge = FueledMergeRes { - output: HollowBatch::empty(desc), - }; - let result = self.trace.apply_tombstone_merge(&fake_merge); + let result = self.trace.apply_tombstone_merge(&desc); assert!( result.matched(), "merge with a matching desc should always match" @@ -2833,34 +2855,92 @@ pub(crate) mod tests { } } + pub fn any_hollow_batch_with_exact_runs( + num_runs: usize, + ) -> impl Strategy> { + ( + any::(), + any::(), + any::(), + proptest::collection::vec(any_run_part::(), num_runs + 1..20), + any::(), + ) + .prop_map(move |(t0, t1, since, parts, len)| { + let (lower, upper) = if t0 <= t1 { + (Antichain::from_elem(t0), Antichain::from_elem(t1)) + } else { + (Antichain::from_elem(t1), Antichain::from_elem(t0)) + }; + let since = Antichain::from_elem(since); + + let run_splits = (1..num_runs) + .map(|i| i * parts.len() / num_runs) + .collect::>(); + + let run_meta = (0..num_runs) + .map(|_| { + let mut meta = RunMeta::default(); + meta.id = Some(RunId::new()); + meta + }) + .collect::>(); + + HollowBatch::new( + Description::new(lower, upper, since), + parts, + len % 10, + run_meta, + run_splits, + ) + }) + } + pub fn any_hollow_batch() -> impl Strategy> { Strategy::prop_map( ( any::(), any::(), any::(), - proptest::collection::vec(any_run_part::(), 0..3), + proptest::collection::vec(any_run_part::(), 0..20), any::(), - any::(), + 0..=10usize, + proptest::collection::vec(any::(), 10), ), - |(t0, t1, since, parts, len, runs)| { + |(t0, t1, since, parts, len, num_runs, run_ids)| { let (lower, upper) = if t0 <= t1 { (Antichain::from_elem(t0), Antichain::from_elem(t1)) } else { (Antichain::from_elem(t1), Antichain::from_elem(t0)) }; let since = Antichain::from_elem(since); - if runs && parts.len() > 2 { - let split_at = parts.len() / 2; + if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() { + let run_splits = (1..num_runs) + .map(|i| i * parts.len() / num_runs) + .collect::>(); + + let run_meta = (0..num_runs) + .enumerate() + .map(|(i, _)| { + let mut meta = RunMeta::default(); + meta.id = Some(run_ids[i]); + meta + }) + .collect::>(); + HollowBatch::new( Description::new(lower, upper, since), parts, len % 10, - vec![RunMeta::default(), RunMeta::default()], - vec![split_at], + run_meta, + run_splits, ) } else { - HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10) + HollowBatch::new_run_for_test( + Description::new(lower, upper, since), + parts, + len % 10, + run_ids[0], + ) } }, ) diff --git a/src/persist-client/src/internal/state_diff.rs b/src/persist-client/src/internal/state_diff.rs index 74d7ad90cfcdf..5e4f34862026e 100644 --- a/src/persist-client/src/internal/state_diff.rs +++ b/src/persist-client/src/internal/state_diff.rs @@ -32,6 +32,7 @@ use crate::internal::state::{ LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart, State, StateCollections, WriterState, }; +use crate::internal::trace::CompactionInput; use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace}; use crate::read::LeasedReaderId; use crate::write::WriterId; @@ -877,7 +878,11 @@ fn apply_diffs_spine( // Fast-path: compaction if let Some((_inputs, output)) = sniff_compaction(&diffs) { - let res = FueledMergeRes { output }; + let res = FueledMergeRes { + output, + input: CompactionInput::Legacy, + new_active_compaction: None, + }; // We can't predict how spine will arrange the batches when it's // hydrated. This means that something that is maintaining a Spine // starting at some seqno may not exactly match something else @@ -1444,7 +1449,11 @@ mod tests { leader .collections .trace - .apply_merge_res_unchecked(&FueledMergeRes { output }); + .apply_merge_res_unchecked(&FueledMergeRes { + output, + input: CompactionInput::Legacy, + new_active_compaction: None, + }); } } } diff --git a/src/persist-client/src/internal/state_serde.json b/src/persist-client/src/internal/state_serde.json index 5d0fa4ef83db6..63ac1ac66da25 100644 --- a/src/persist-client/src/internal/state_serde.json +++ b/src/persist-client/src/internal/state_serde.json @@ -226,7 +226,7 @@ }, "since": [], "upper": [ - 15911884402577608130 + 18337917978372154106 ], "batches": [ { @@ -239,119 +239,208 @@ "since": [ 13842126995014298771 ], - "len": 1, - "part_runs": [] + "len": 8, + "part_runs": [ + [ + { + "order": null, + "schema": null, + "deprecated_schema": null, + "id": [ + 15, + 133, + 112, + 245, + 251, + 173, + 34, + 244, + 69, + 197, + 39, + 78, + 121, + 91, + 206, + 240 + ], + "len": null + }, + [ + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 11170199980915058343 + ] + }, + "schema_id": null, + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 17388963502712311841 + ] + }, + "schema_id": "h11180089241843540696", + "deprecated_schema_id": "h4368574412886530792" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 11968885213966612837 + ] + }, + "schema_id": "h14763836069492172040", + "deprecated_schema_id": null + } + ] + ] + ] }, { "lower": [ 1487032587275842953 ], "upper": [ - 3574036754853368823 + 5229731053409306593 ], "since": [ - 8227539279766548057 + 4027847723496697373 ], - "len": 2, + "len": 3, "part_runs": [] }, { "lower": [ - 3574036754853368823 + 5229731053409306593 ], "upper": [ - 3781569559775162213 + 6043507527885355412 ], "since": [ - 11984174003953907109 + 8067289630802337135 ], - "len": 5, + "len": 0, "part_runs": [ [ { "order": null, "schema": null, "deprecated_schema": null, - "id": null, + "id": [ + 212, + 132, + 176, + 165, + 187, + 209, + 204, + 145, + 114, + 70, + 161, + 207, + 211, + 126, + 255, + 136 + ], "len": null }, [ { "type": "Hollow", - "key": "𐆖,᥀🕴\\rኵ/ጕ&", - "encoded_size_bytes": 11603461394316114801, - "key_lower": "639639373cebe660daf1497ae20ff6cc7bc7dfcf3459fd251a471a451249db85884e42b4d0046214b96f7e55a6ed579b630f38a740bc6f8ec0b7cbaccdd8150c2f0ee72ece8173ea0bdb11afc11fb62a1a7d46e03b53dbad52d87264a52aa97eaa", + "key": "ῷೖ`.𐣴/\\¥ö^Ѩ7ୟ<<$\"O*cMBi9Q", + "encoded_size_bytes": 10916098782634356875, + "key_lower": "8b4633d7a4fcd734ffb37eaf4ccaa943395a656804d27b116d8ce2abed749a9be743c6decde1f145317b4503af22c72ceafc748c84a2ab6c83c75fc9a070cbf0f91067028380b257988db0b9114af7e70b7237ab", "structured_key_lower": null, "stats": { - "len": 2968031493642118933, + "len": 5091478577841501061, "cols": { - "*<|EȺ🕴gਫ਼y0~/*$yR.𞀃(RP'": { - "len": 10775437753553688434, - "Ⱥລ&🕴𑂙࿔xr/": { - "lower": "𑈑k≫=<ӎs𑌣K\\8�\":𐒦Z4n�=i,Ѩ𑃕\\𐞃Ѩ!ⷅȺG\\", - "upper": "𑖽=𑛃^ஐê᳙'ৈ¥kC%" - }, - "῎🠗Ⱥ𝄡᪑🕴𑊌BÖ{$ଳ𛄲'c𐔋Gzan|Xt&ꧼ": { - "lower": false, - "upper": false - } + "$d$ﶝ": { + "len": 11844154136178971445 }, - ".ྵ𞴙~\"<;UÃ[.b={𜾯ೋ;🕴𝚝\"𖿰𞹾4&i𑏊%?\"+": { - "len": 5879932233594401360 - }, - "𑴭类𚿱𔘲<Ε<ö&'𑯅𑶨/એA%ઑ{ཱིȺ:🟰𖫲/": { - "len": 15959088740213781786 - }, - "🕴:፮.:֏.\\Gcꩌ": { - "len": 5805315787594892454, - "𑃴.%ῴ*Z&𑤌𑵅ৗȺ<:𐨩𞴬/?$𐖐B֎": { - "len": 13099288627052854649, - "/@𮴷ൌq3𑤕𐮜𑴶": { - "+\\🂂Î\\U𑜰&_𛲜𝓥k𞋿𝈏=P𞋿𝔐ஏ4><𐦾C": { - "len": 1, - "stats": "json_nulls" - }, - "Pn⁏𑣒ᨣ": { - "len": 1, - "stats": "json_nulls" - } - }, - "/zկbೈz&ጔ𞻰ᥳ": { - "lower": true, - "upper": true - } - } + "𑏂": { + "len": 15875759530920683941 } } }, + "ts_rewrite": { + "elements": [ + 16266003546132748144 + ] + }, + "diffs_sum": -500294152410720689, + "format": null, + "schema_id": "h10231746688330205956", + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, "ts_rewrite": null, - "diffs_sum": 7873705479009338732, - "format": "Row", "schema_id": null, + "deprecated_schema_id": "h11761505288833802467" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": null, + "schema_id": "h9508791340183792843", "deprecated_schema_id": null } ] - ] - ] - }, - { - "lower": [ - 3781569559775162213 - ], - "upper": [ - 14534366478183009698 - ], - "since": [ - 8567821759643630250 - ], - "len": 6, - "part_runs": [ + ], [ { "order": null, "schema": null, "deprecated_schema": null, - "id": null, + "id": [ + 158, + 131, + 179, + 68, + 44, + 172, + 184, + 240, + 14, + 191, + 16, + 54, + 99, + 51, + 125, + 216 + ], "len": null }, [ @@ -364,59 +453,145 @@ }, "ts_rewrite": { "elements": [ - 1185719131061393945 + 6515801232818394029 + ] + }, + "schema_id": null, + "deprecated_schema_id": "h9678259999936834161" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 14563520833126537201 ] }, "schema_id": null, - "deprecated_schema_id": "h11876841927855735564" + "deprecated_schema_id": "h12442307507880511790" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 9800338850320209635 + ] + }, + "schema_id": "h5748251180211398577", + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": null, + "schema_id": "h7612581775072434645", + "deprecated_schema_id": "h7104300681417612915" } ] - ] - ] - }, - { - "lower": [ - 14534366478183009698 - ], - "upper": [ - 15431917281868552340 - ], - "since": [ - 17823482996456209977 - ], - "len": 3, - "part_runs": [ + ], [ { "order": null, "schema": null, "deprecated_schema": null, - "id": null, + "id": [ + 142, + 30, + 50, + 91, + 5, + 37, + 29, + 105, + 152, + 235, + 83, + 204, + 171, + 179, + 66, + 35 + ], "len": null }, [ + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": null, + "schema_id": null, + "deprecated_schema_id": null + }, { "type": "Hollow", - "key": "ገi𑵶쬙?¥ⷛp𖪺�'<={i.ý[([Ao/9𑛒*F:𘱐", - "encoded_size_bytes": 5973179277770248155, - "key_lower": "bdc6c9a4ec2f", + "key": "\"𑁲L𐺚`.?}%$K𑵅7x𖭟6yA𞂏𖵈/e", + "encoded_size_bytes": 10612595034917046658, + "key_lower": "a100eeed07db3f700b0cd50d8df1f0c845ba", "structured_key_lower": null, "stats": { - "len": 2140897803534795047, + "len": 15737784327506677475, "cols": { - "𐦟𐇚/𐤟`𑆽G𑄂<:<}&&(#{ೞȺ\\=�I&𞴣E&꣬𐖯{ᱯ<": { - "len": 12966863957103026765 + "=2\"*'ু\\`?\\𞗺BVà\"jꠦåN": { + "len": 17702684700609520577 + }, + "KEy{è$𒐋zn¥ᮋ𐺫W\"ગȺ¥5⵰*'5'&=/𞹗𞸖a": { + "lower": "�(𝒰\"Ⱥ.8𑇗$GȀ𑚛^o🪅*𐀉vDz?", + "upper": "𐩲RxꢧN )¸🞞5𖹕𑘤🕴?PyrR𞺦🕴\"Ⴧ" + }, + "𐎭{𑛀ທ*/𞹗ὸ": { + "lower": -7170581962469477785, + "upper": 3092386771616896881 + } + }, + ":ÿ": { + "len": 16004500277286221810, + "/ዂ\"]^¥𐰳ꬋ": { + "lower": 4305342688481012249, + "upper": 8240631264818094172 + }, + "��I~\\𞀎C%\\»7ȺOₙ𐙳Et𑏂7𐵿:": { + "len": 7378756645013534921 + } + }, + "𐞓>\\Ոꟙ<\\𞅈ೠ[𑵢ZÀ": { + "ÄIਸ਼1\\?)'l\"এ'0&Գ": { + "len": 1, + "stats": "json_mixed" + }, + "ૠ/ਪ𑑝?9𑫯$?Ⱥস!\"ቘu&𝈠ఎ?𐨅": { + "len": 1, + "stats": { + "K&CNM": { + "len": 1, + "stats": "json_mixed" + }, + "𓅟?ࢬ@e$ÿ$ⶠ𞋕<🕴Vᬙ𑍌??'<\\\".w.0/𑐪Vd🃧`": { + "len": 1, + "stats": { + "lower": false, + "upper": true + } + } + } + } + } + } + }, + "ts_rewrite": { + "elements": [ + 12905787938329322904 + ] + }, + "diffs_sum": 6035364526643875865, + "format": { + "Both": 0 + }, + "schema_id": null, + "deprecated_schema_id": "h16419135703859200943" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 17961319443255178303 + ] + }, + "schema_id": null, + "deprecated_schema_id": "h10481858645352204835" + }, + { + "type": "Hollow", + "key": ".🕴]あ𑲭hࡲȺa㈕ೣ/j🕴t'ߐ⺗Y%₾ఎ}𞺣꠴`ꟕ={Ó": { + "lower": "2b8ed5bace7578853dbed469d7fa3cefbf35bbcf5002bb8c7bc707e983449de03c82dc6ccdfa17ba4fd2b621a910f8a9bf8fde2999ba08edb63d54506d7992ece41fc0020912fce77ec1528f3310f1a2", + "upper": "b6c910d88bcb6167a5ada86976a700ef03cb236a07d659d535d807425c44cd7f5e128b67a4ef398782a1142edfb1002830b0f54245ea06f2376880b076e2a8939378a37d6db8e0f8bf82a1c43642beab417c5bb479eea1ab3e418254b44b4031ab1b88" + } + }, + "𝒢jȺ𞹼\"𑲘": { + "len": 5706695125761340958, + "[ୀ&\\𞀂ં{🕴🕴R'Ѩ*Ⱥ<ꡂ\"'\"R$ᏺਿqࡏ&v": { + "len": 18008493096479101915, + "🕴:'�:": { + "lower": -1082501222050416048, + "upper": -70367023181465324 + } + } + } + } + }, + "ts_rewrite": null, + "diffs_sum": 8729350677971700229, + "format": { + "Both": 1 + }, + "schema_id": "h7086141039654104338", + "deprecated_schema_id": "h17154014800515872190" + }, + { + "type": "Hollow", + "key": "Ѩ`න𞹗𝼜ኴJ'*𑌉𑅐{🕴lF🫸", + "encoded_size_bytes": 14865823953107661692, + "key_lower": "b9a21cb9b4bae8381946a585307bcc1be17cd12333850aaa7ff2ec8c449d17f76ddec593ce432eb1cbd8cdbf57676da884092d64cd0859d830df0513de", + "structured_key_lower": null, + "stats": { + "len": 3131739774455132674, + "cols": { + "*?": { + "len": 3739409380249492600, + ".𒉿Ⱥkቝ\\ₜ?4[ಏl8�?&": { + "lower": "V𑤉$@𑌼𑊆pJ&P{lȺa𝕆𖭞𒐂𞟠𝓴", + "upper": "mpꮑ" + } + }, + "8¥/᤺9": { + "len": 2829297860612504331, + "🆄𝌚&឵??¥Ⴧ𐤥 ବ": { + "lower": -5670195561415265982, + "upper": 1590073849152616208 + } + } + } + }, + "ts_rewrite": null, + "diffs_sum": 5962972353869780996, + "format": null, + "schema_id": null, + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": null, + "schema_id": null, + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 16638684726447732261 + ] + }, + "schema_id": null, + "deprecated_schema_id": "h18317773669318003316" + } + ] + ] + ] + }, + { + "lower": [ + 16375068857844839748 + ], + "upper": [ + 18337917978372154106 + ], + "since": [ + 18073149685599702406 + ], + "len": 0, + "part_runs": [ + [ + { + "order": null, + "schema": null, + "deprecated_schema": null, + "id": [ + 165, + 155, + 123, + 54, + 123, + 74, + 234, + 135, + 213, + 212, + 46, + 217, + 241, + 251, + 176, + 24 + ], + "len": null }, - "since": { - "elements": [ - 4224232565015115092 - ] - } - } + [ + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 3221314319823580296 + ] + }, + "schema_id": "h17664550959944045006", + "deprecated_schema_id": null + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": null, + "schema_id": null, + "deprecated_schema_id": "h12090066430157557683" + }, + { + "type": "Inline", + "updates": { + "desc": null, + "index": 0, + "updates[len]": 0 + }, + "ts_rewrite": { + "elements": [ + 14354877790355475893 + ] + }, + "schema_id": null, + "deprecated_schema_id": null + } + ] + ] ] } - }, - "merges": { - "0-6": { - "since": { - "elements": [] - }, - "remaining_work": 0, - "active_compaction": { - "start_ms": 0 - } - } - } + ], + "hollow_batches": {}, + "spine_batches": {}, + "merges": {} } diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index d93ead4912dc1..09be0c7586eec 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -52,11 +52,12 @@ use differential_dataflow::difference::Semigroup; use mz_persist::metrics::ColumnarMetrics; use mz_persist_types::Codec64; use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; use std::mem; use std::ops::Range; use std::sync::Arc; +use tracing::error; use crate::internal::paths::WriterKey; use differential_dataflow::lattice::Lattice; @@ -69,7 +70,7 @@ use timely::PartialOrder; use timely::progress::frontier::AntichainRef; use timely::progress::{Antichain, Timestamp}; -use crate::internal::state::HollowBatch; +use crate::internal::state::{HollowBatch, RunId}; use super::state::RunPart; @@ -83,6 +84,8 @@ pub struct FueledMergeReq { #[derive(Debug)] pub struct FueledMergeRes { pub output: HollowBatch, + pub input: CompactionInput, + pub new_active_compaction: Option, } /// An append-only collection of compactable update batches. @@ -651,6 +654,21 @@ impl Trace { } impl Trace { + pub fn apply_merge_res_checked_classic( + &mut self, + res: &FueledMergeRes, + metrics: &ColumnarMetrics, + ) -> ApplyMergeResult { + for batch in self.spine.spine_batches_mut().rev() { + let result = batch.maybe_replace_checked_classic::(res, metrics); + if result.matched() { + return result; + } + } + ApplyMergeResult::NotAppliedNoMatch + } + + #[allow(dead_code)] pub fn apply_merge_res_checked( &mut self, res: &FueledMergeRes, @@ -675,9 +693,9 @@ impl Trace { ApplyMergeResult::NotAppliedNoMatch } - pub fn apply_tombstone_merge(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { + pub fn apply_tombstone_merge(&mut self, desc: &Description) -> ApplyMergeResult { for batch in self.spine.spine_batches_mut().rev() { - let result = batch.maybe_replace_with_tombstone(res); + let result = batch.maybe_replace_with_tombstone(desc); if result.matched() { return result; } @@ -695,7 +713,20 @@ enum SpineLog<'a, T> { Disabled, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum CompactionInput { + /// We don't know what our inputs were; this should only be used for + /// unchecked legacy replacements. + Legacy, + /// This compaction output is a total replacement for all batches in this id range. + #[allow(dead_code)] + IdRange(SpineId), + /// This compaction output replaces the specified runs in this id range. + #[allow(dead_code)] + PartialBatch(SpineId, BTreeSet), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SpineId(pub usize, pub usize); impl Serialize for SpineId { @@ -918,31 +949,108 @@ impl SpineBatch { .flatten() } - fn maybe_replace_with_tombstone(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { - assert!( - res.output.parts.is_empty(), - "merge res for tombstone must have no parts" - ); - let exact_match = res.output.desc.lower() == self.desc().lower() - && res.output.desc.upper() == self.desc().upper(); + #[allow(dead_code)] + fn diffs_sum_for_runs( + batch: &HollowBatch, + run_ids: &[RunId], + metrics: &ColumnarMetrics, + ) -> Option { + if run_ids.is_empty() { + return None; + } + + let parts = batch + .runs() + .filter(|(meta, _)| { + run_ids.contains(&meta.id.expect("id should be present at this point")) + }) + .flat_map(|(_, parts)| parts); + Self::diffs_sum(parts, metrics) + } + + fn maybe_replace_with_tombstone(&mut self, desc: &Description) -> ApplyMergeResult { + let exact_match = + desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper(); + + let empty_batch = HollowBatch::empty(desc.clone()); if exact_match { *self = SpineBatch::merged(IdHollowBatch { id: self.id(), - batch: Arc::new(res.output.clone()), + batch: Arc::new(empty_batch), }); return ApplyMergeResult::AppliedExact; } - if let Some((id, range)) = self.find_replacement_range(res) { - self.perform_subset_replacement(res, id, range) + if let Some((id, range)) = self.find_replacement_range(desc) { + self.perform_subset_replacement(&empty_batch, id, range, None) } else { ApplyMergeResult::NotAppliedNoMatch } } - // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes? - /// Checked variant that performs diff sum assertions + #[allow(dead_code)] + fn construct_batch_with_runs_replaced( + original: &HollowBatch, + run_ids: &[RunId], + replacement: &HollowBatch, + ) -> Result, ApplyMergeResult> { + if run_ids.is_empty() { + return Err(ApplyMergeResult::NotAppliedNoMatch); + } + + let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect(); + let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect(); + if !orig_run_ids.is_superset(&run_ids) { + return Err(ApplyMergeResult::NotAppliedNoMatch); + } + + let runs: Vec<_> = original + .runs() + .filter(|(meta, _)| { + !run_ids.contains(&meta.id.expect("id should be present at this point")) + }) + .chain(replacement.runs()) + .collect(); + + let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::(); + + let run_meta = runs + .iter() + .map(|(meta, _)| *meta) + .cloned() + .collect::>(); + + let parts = runs + .iter() + .flat_map(|(_, parts)| *parts) + .cloned() + .collect::>(); + + let run_splits = { + let mut splits = Vec::with_capacity(run_meta.len() - 1); + let mut pointer = 0; + for (i, (_, parts)) in runs.into_iter().enumerate() { + if parts.is_empty() { + continue; + } + if i < run_meta.len() - 1 { + splits.push(pointer + parts.len()); + } + pointer += parts.len(); + } + splits + }; + + Ok(HollowBatch::new( + replacement.desc.clone(), + parts, + len, + run_meta, + run_splits, + )) + } + fn maybe_replace_checked( &mut self, res: &FueledMergeRes, @@ -958,6 +1066,158 @@ impl SpineBatch { return ApplyMergeResult::NotAppliedInvalidSince; } + let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics); + let num_batches = self.parts.len(); + + let result = match &res.input { + CompactionInput::IdRange(id) => { + self.handle_id_range_replacement::(res, id, new_diffs_sum, metrics) + } + CompactionInput::PartialBatch(id, runs) => { + self.handle_partial_batch_replacement::(res, *id, runs, new_diffs_sum, metrics) + } + CompactionInput::Legacy => { + error!("legacy compaction input is not supported"); + return ApplyMergeResult::NotAppliedNoMatch; + } + }; + + let num_batches_after = self.parts.len(); + assert!( + num_batches_after <= num_batches, + "replacing parts should not increase the number of batches" + ); + result + } + + fn handle_id_range_replacement( + &mut self, + res: &FueledMergeRes, + id: &SpineId, + new_diffs_sum: Option, + metrics: &ColumnarMetrics, + ) -> ApplyMergeResult + where + D: Semigroup + Codec64 + PartialEq + Debug, + { + let range = self + .parts + .iter() + .enumerate() + .filter_map(|(i, p)| if id.covers(p.id) { Some(i) } else { None }) + .collect::>(); + + // This is the range of hollow batches that we will replace. + let min = *range.iter().min().unwrap(); + let max = *range.iter().max().unwrap(); + let replacement_range = min..max + 1; + + // We need to replace a range of parts. Here we don't care about the run_indices + // because we must be replacing the entire part(s) + let old_diffs_sum = Self::diffs_sum::( + self.parts[replacement_range.clone()] + .iter() + .flat_map(|p| p.batch.parts.iter()), + metrics, + ); + + self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum); + + let parts = &self.parts[replacement_range.clone()]; + let id = SpineId(parts.first().unwrap().id.0, parts.last().unwrap().id.1); + self.perform_subset_replacement( + &res.output, + id, + replacement_range, + res.new_active_compaction.clone(), + ) + } + + fn handle_partial_batch_replacement( + &mut self, + res: &FueledMergeRes, + id: SpineId, + runs: &BTreeSet, + new_diffs_sum: Option, + metrics: &ColumnarMetrics, + ) -> ApplyMergeResult + where + D: Semigroup + Codec64 + PartialEq + Debug, + { + if runs.is_empty() { + return ApplyMergeResult::NotAppliedNoMatch; + } + + let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id); + let Some((i, batch)) = part else { + return ApplyMergeResult::NotAppliedNoMatch; + }; + let replacement_range = i..(i + 1); + + let batch = &batch.batch; + let run_ids = runs.iter().cloned().collect::>(); + + let old_batch_diff_sum = Self::diffs_sum::(batch.parts.iter(), metrics); + let old_diffs_sum = Self::diffs_sum_for_runs::(batch, &run_ids, metrics); + + self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum); + + match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) { + Ok(new_batch) => { + let new_batch_diff_sum = Self::diffs_sum::(new_batch.parts.iter(), metrics); + self.validate_diffs_sum_match(old_batch_diff_sum, new_batch_diff_sum); + self.perform_subset_replacement( + &new_batch, + id, + replacement_range, + res.new_active_compaction.clone(), + ) + } + Err(err) => err, + } + } + + fn validate_diffs_sum_match(&self, old_diffs_sum: Option, new_diffs_sum: Option) + where + D: Semigroup + Codec64 + PartialEq + Debug, + { + if new_diffs_sum.is_none() { + assert!( + old_diffs_sum.is_none(), + "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})", + new_diffs_sum, + old_diffs_sum + ); + } + if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) { + assert_eq!( + old_diffs_sum, new_diffs_sum, + "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})", + new_diffs_sum, old_diffs_sum + ); + } + } + + /// This is the "legacy" way of replacing a spine batch with a merge result. + /// It is used in moments when we don't have the full compaction input + /// information. + /// Eventually we should strive to roundtrip Spine IDs everywhere and + /// deprecate this method. + fn maybe_replace_checked_classic( + &mut self, + res: &FueledMergeRes, + metrics: &ColumnarMetrics, + ) -> ApplyMergeResult + where + D: Semigroup + Codec64 + PartialEq + Debug, + { + // The spine's and merge res's sinces don't need to match (which could occur if Spine + // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine + // since must be in advance of the merge res since. + if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) { + return ApplyMergeResult::NotAppliedInvalidSince; + } + let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics); // If our merge result exactly matches a spine batch, we can swap it in directly @@ -996,7 +1256,7 @@ impl SpineBatch { } // Try subset replacement - if let Some((id, range)) = self.find_replacement_range(res) { + if let Some((id, range)) = self.find_replacement_range(&res.output.desc) { let old_diffs_sum = Self::diffs_sum::( self.parts[range.clone()] .iter() @@ -1012,13 +1272,22 @@ impl SpineBatch { ); } - self.perform_subset_replacement(res, id, range) + self.perform_subset_replacement( + &res.output, + id, + range, + res.new_active_compaction.clone(), + ) } else { ApplyMergeResult::NotAppliedNoMatch } } - /// Unchecked variant that skips diff sum assertions + /// This is the even more legacy way of replacing a spine batch with a merge result. + /// It is used in moments when we don't have the full compaction input + /// information, and we don't have the diffs sum. + /// Eventually we should strive to roundtrip Spine IDs and diffs sums everywhere and + /// deprecate this method. fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { // The spine's and merge res's sinces don't need to match (which could occur if Spine // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine @@ -1051,15 +1320,20 @@ impl SpineBatch { } // Try subset replacement - if let Some((id, range)) = self.find_replacement_range(res) { - self.perform_subset_replacement(res, id, range) + if let Some((id, range)) = self.find_replacement_range(&res.output.desc) { + self.perform_subset_replacement( + &res.output, + id, + range, + res.new_active_compaction.clone(), + ) } else { ApplyMergeResult::NotAppliedNoMatch } } /// Find the range of parts that can be replaced by the merge result - fn find_replacement_range(&self, res: &FueledMergeRes) -> Option<(SpineId, Range)> { + fn find_replacement_range(&self, desc: &Description) -> Option<(SpineId, Range)> { // It is possible the structure of the spine has changed since the merge res // was created, such that it no longer exactly matches the description of a // spine batch. This can happen if another merge has happened in the interim, @@ -1073,10 +1347,10 @@ impl SpineBatch { let mut upper = None; for (i, batch) in self.parts.iter().enumerate() { - if batch.batch.desc.lower() == res.output.desc.lower() { + if batch.batch.desc.lower() == desc.lower() { lower = Some((i, batch.id.0)); } - if batch.batch.desc.upper() == res.output.desc.upper() { + if batch.batch.desc.upper() == desc.upper() { upper = Some((i, batch.id.1)); } if lower.is_some() && upper.is_some() { @@ -1095,9 +1369,10 @@ impl SpineBatch { /// Perform the actual subset replacement fn perform_subset_replacement( &mut self, - res: &FueledMergeRes, + res: &HollowBatch, spine_id: SpineId, range: Range, + new_active_compaction: Option, ) -> ApplyMergeResult { let SpineBatch { id, @@ -1111,7 +1386,7 @@ impl SpineBatch { new_parts.extend_from_slice(&parts[..range.start]); new_parts.push(IdHollowBatch { id: spine_id, - batch: Arc::new(res.output.clone()), + batch: Arc::new(res.clone()), }); new_parts.extend_from_slice(&parts[range.end..]); @@ -1120,7 +1395,7 @@ impl SpineBatch { desc: desc.to_owned(), len: new_parts.iter().map(|x| x.batch.len).sum(), parts: new_parts, - active_compaction: None, + active_compaction: new_active_compaction, }; if new_spine_batch.len() > self.len() { @@ -1983,6 +2258,8 @@ pub mod datadriven { ) -> Result { let res = FueledMergeRes { output: DirectiveArgs::parse_hollow_batch(args.input), + input: CompactionInput::Legacy, + new_active_compaction: None, }; match datadriven.trace.apply_merge_res_unchecked(&res) { ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()), @@ -2001,7 +2278,7 @@ pub(crate) mod tests { use proptest::prelude::*; use semver::Version; - use crate::internal::state::tests::any_hollow_batch; + use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs}; use super::*; @@ -2185,4 +2462,49 @@ pub(crate) mod tests { vec![req015, req(0, 1)] ); } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // proptest is too heavy for miri! + fn construct_batch_with_runs_replaced_test() { + let batch_strategy = any_hollow_batch::(); + let to_replace_strategy = any_hollow_batch_with_exact_runs::(1); + + let combined_strategy = (batch_strategy, to_replace_strategy) + .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1); + + let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| { + let batch_len = batch.run_meta.len(); + let batch_clone = batch.clone(); + let to_replace_clone = to_replace.clone(); + + proptest::collection::vec(any::(), batch_len) + .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x)) + .prop_map(move |mask| { + let indices: Vec = mask + .iter() + .enumerate() + .filter_map(|(i, &selected)| if selected { Some(i) } else { None }) + .collect(); + (batch_clone.clone(), to_replace_clone.clone(), indices) + }) + }); + + proptest!(|( + (batch, to_replace, runs) in final_strategy + )| { + let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x| + x.id.unwrap().clone() + ).collect(); + + let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::>(); + + let new_batch = SpineBatch::construct_batch_with_runs_replaced( + &batch, + &run_ids, + &to_replace, + ).unwrap(); + + prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len()); + }); + } }