Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,21 +695,9 @@ where
let shard_metrics = Arc::clone(&self.parts.shard_metrics);
let runs = self.parts.finish().await;

let mut run_parts = vec![];
let mut run_splits = vec![];
let mut run_meta = vec![];
let total_updates = runs
.iter()
.map(|(_, _, num_updates)| num_updates)
.sum::<usize>();
let mut hollow_batch = HollowBatch::empty(registered_desc);
for (order, parts, num_updates) in runs {
if parts.is_empty() {
continue;
}
if run_parts.len() != 0 {
run_splits.push(run_parts.len());
}
run_meta.push(RunMeta {
let meta = RunMeta {
order: Some(order),
schema: self.write_schemas.id,
// Field has been deprecated but kept around to roundtrip state.
Expand All @@ -725,10 +713,10 @@ where
None
},
meta: MetadataMap::default(),
});
run_parts.extend(parts);
};
hollow_batch.push_run(meta, parts);
hollow_batch.len += num_updates;
}
let desc = registered_desc;

let batch = Batch::new(
batch_delete_enabled,
Expand All @@ -740,7 +728,7 @@ where
K::encode_schema(&*self.write_schemas.key),
V::encode_schema(&*self.write_schemas.val),
),
HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
hollow_batch,
);

Ok(batch)
Expand Down
33 changes: 7 additions & 26 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,43 +484,24 @@ where
) -> Result<CompactRes<T>, anyhow::Error> {
pin_mut!(stream);

let mut all_parts = vec![];
let mut all_run_splits = vec![];
let mut all_run_meta = vec![];
let mut len = 0;
let mut output = HollowBatch::empty(req.desc.clone());

while let Some(res) = stream.next().await {
let res = res?.output;
let (parts, updates, run_meta, run_splits) =
(res.parts, res.len, res.run_meta, res.run_splits);

if updates == 0 {
if res.len == 0 {
continue;
}

let run_offset = all_parts.len();
if !all_parts.is_empty() {
all_run_splits.push(run_offset);
debug_assert_eq!(res.desc, output.desc);
for (meta, run) in res.runs() {
output.push_run(meta.clone(), run.iter().cloned());
}
all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
all_run_meta.extend(run_meta);
all_parts.extend(parts);
len += updates;
output.len += res.len;
}

let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
let input = input_id_range(batches);

Ok(CompactRes {
output: HollowBatch::new(
req.desc.clone(),
all_parts,
len,
all_run_meta,
all_run_splits,
),
input,
})
Ok(CompactRes { output, input })
}

pub async fn apply(
Expand Down
170 changes: 66 additions & 104 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,82 +1010,38 @@ impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
}
}
impl<T> HollowBatch<T> {
/// Construct an in-memory hollow batch from the given metadata.
///
/// This method checks that `runs` is a sequence of valid indices into `parts`. The caller
/// is responsible for ensuring that the defined runs are valid.
///
/// `len` should represent the number of valid updates in the referenced parts.
pub(crate) fn new(
desc: Description<T>,
parts: Vec<RunPart<T>>,
len: usize,
run_meta: Vec<RunMeta>,
run_splits: Vec<usize>,
) -> Self {
pub(crate) fn check_invariants(&self) {
let Self {
desc: _,
len: _,
parts,
run_splits,
run_meta,
} = self;
debug_assert!(
run_splits.is_strictly_sorted(),
"run indices should be strictly increasing"
);
debug_assert!(
assert!(
run_splits.first().map_or(true, |i| *i > 0),
"run indices should be positive"
);
debug_assert!(
assert!(
run_splits.last().map_or(true, |i| *i < parts.len()),
"run indices should be valid indices into parts"
);
debug_assert!(
assert!(
parts.is_empty() || run_meta.len() == run_splits.len() + 1,
"all metadata should correspond to a run"
);

Self {
desc,
len,
parts,
run_splits,
run_meta,
}
}

/// Construct a batch of a single run with default metadata. Mostly interesting for tests.
pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
let run_meta = if parts.is_empty() {
vec![]
} else {
vec![RunMeta::default()]
};
Self {
desc,
len,
parts,
run_splits: vec![],
run_meta,
}
}

#[cfg(test)]
pub(crate) fn new_run_for_test(
desc: Description<T>,
parts: Vec<RunPart<T>>,
len: usize,
run_id: RunId,
) -> Self {
let run_meta = if parts.is_empty() {
vec![]
} else {
let mut meta = RunMeta::default();
meta.id = Some(run_id);
vec![meta]
};
Self {
desc,
len,
parts,
run_splits: vec![],
run_meta,
}
let mut out = Self::empty(desc);
out.push_run(RunMeta::default(), parts);
out.len = len;
out
}

/// An empty hollow batch, representing no updates over the given desc.
Expand All @@ -1099,6 +1055,35 @@ impl<T> HollowBatch<T> {
}
}

/// Push a run onto the given batch.
///
/// NB: This does not currently increment the batch len, since we don't currently track the update
/// count per run. Callers will need to explicitly update the batch len when appending runs.
pub(crate) fn push_run(&mut self, meta: RunMeta, parts: impl IntoIterator<Item = RunPart<T>>) {
let Self {
desc: _,
len: _,
parts: self_parts,
run_splits,
run_meta,
} = self;

let old_part_count = self_parts.len();
self_parts.extend(parts);
let new_part_count = self_parts.len();
if old_part_count == new_part_count {
// This was a zero-part run; return.
return;
}
if old_part_count != 0 {
run_splits.push(old_part_count);
}
run_meta.push(meta);
// This call means that pushing runs one-at-a-time _in debug mode_ is quadratic
// in the number of runs. If this is too slow even in tests, we can remove the check.
Comment on lines +1082 to +1083
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if this were a problem

self.check_invariants();
}

pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
let run_ends = self
.run_splits
Expand Down Expand Up @@ -2881,25 +2866,15 @@ pub(crate) mod tests {
};
let since = Antichain::from_elem(since);

let run_splits = (1..num_runs)
.map(|i| i * parts.len() / num_runs)
.collect::<Vec<_>>();

let run_meta = (0..num_runs)
.map(|_| {
let mut meta = RunMeta::default();
meta.id = Some(RunId::new());
meta
})
.collect::<Vec<_>>();

HollowBatch::new(
Description::new(lower, upper, since),
parts,
len % 10,
run_meta,
run_splits,
)
let mut batch = HollowBatch::empty(Description::new(lower, upper, since));
let run_splits = (0..=num_runs).map(|i| i * parts.len() / num_runs);
for (lo, hi) in run_splits.tuple_windows() {
let mut meta = RunMeta::default();
meta.id = Some(RunId::new());
batch.push_run(meta, parts[lo..hi].iter().cloned());
}
batch.len += len % 10;
batch
})
}

Expand All @@ -2921,35 +2896,22 @@ pub(crate) mod tests {
(Antichain::from_elem(t1), Antichain::from_elem(t0))
};
let since = Antichain::from_elem(since);
let mut batch = HollowBatch::empty(Description::new(lower, upper, since));
if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
let run_splits = (1..num_runs)
.map(|i| i * parts.len() / num_runs)
.collect::<Vec<_>>();

let run_meta = (0..num_runs)
.enumerate()
.map(|(i, _)| {
let mut meta = RunMeta::default();
meta.id = Some(run_ids[i]);
meta
})
.collect::<Vec<_>>();

HollowBatch::new(
Description::new(lower, upper, since),
parts,
len % 10,
run_meta,
run_splits,
)
let run_splits = (0..=num_runs).map(|i| i * parts.len() / num_runs);

for (i, (lo, hi)) in run_splits.tuple_windows().enumerate() {
let mut meta = RunMeta::default();
meta.id = Some(run_ids[i]);
batch.push_run(meta, parts[lo..hi].iter().cloned());
}
} else {
HollowBatch::new_run_for_test(
Description::new(lower, upper, since),
parts,
len % 10,
run_ids[0],
)
let mut meta = RunMeta::default();
meta.id = Some(run_ids[0]);
batch.push_run(meta, parts);
}
batch.len += len % 10;
batch
},
)
}
Expand Down
47 changes: 9 additions & 38 deletions src/persist-client/src/internal/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,50 +1027,21 @@ impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
return Err(ApplyMergeResult::NotAppliedNoMatch);
}

let runs: Vec<_> = original
let mut result = HollowBatch::empty(replacement.desc.clone());

let runs = original
.runs()
.filter(|(meta, _)| {
!run_ids.contains(&meta.id.expect("id should be present at this point"))
})
.chain(replacement.runs())
.collect();

let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();

let run_meta = runs
.iter()
.map(|(meta, _)| *meta)
.cloned()
.collect::<Vec<_>>();

let parts = runs
.iter()
.flat_map(|(_, parts)| *parts)
.cloned()
.collect::<Vec<_>>();
.chain(replacement.runs());

let run_splits = {
let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
let mut pointer = 0;
for (i, (_, parts)) in runs.into_iter().enumerate() {
if parts.is_empty() {
continue;
}
if i < run_meta.len() - 1 {
splits.push(pointer + parts.len());
}
pointer += parts.len();
}
splits
};
for (meta, parts) in runs {
result.push_run(meta.clone(), parts.iter().cloned());
result.len += meta.len.unwrap_or(0);
}

Ok(HollowBatch::new(
replacement.desc.clone(),
parts,
len,
run_meta,
run_splits,
))
Ok(result)
}

fn maybe_replace_checked<D>(
Expand Down
Loading