Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions crates/cardano/src/estart/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use dolos_core::{
batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis,
};
use tracing::{debug, info, instrument};
use tracing::{info, instrument};

use crate::{
AccountState, CardanoDelta, CardanoEntity, CardanoLogic, DRepState, EpochState, EraProtocol,
Expand Down Expand Up @@ -104,13 +104,14 @@ pub fn execute<D: Domain>(
_config: &CardanoConfig,
genesis: Arc<Genesis>,
) -> Result<(), ChainError> {
let started = Instant::now();
info!("executing ESTART work unit");

let mut work = WorkContext::load::<D>(state, genesis)?;

work.commit::<D>(state, archive, slot)?;

debug!("ESTART work unit committed");
info!(elapsed =? started.elapsed(), "ESTART work unit committed");

Ok(())
}
7 changes: 4 additions & 3 deletions crates/cardano/src/ewrap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
sync::Arc, time::Instant,
};

use dolos_core::{
batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis,
};
use pallas::ledger::primitives::conway::DRep;
use tracing::{debug, info, instrument};
use tracing::{info, instrument};

use crate::{
rewards::RewardMap, rupd::RupdWork, AccountState, CardanoDelta, CardanoEntity, CardanoLogic,
Expand Down Expand Up @@ -156,13 +156,14 @@ pub fn execute<D: Domain>(
genesis: Arc<Genesis>,
rewards: RewardMap<RupdWork>,
) -> Result<(), ChainError> {
let started = Instant::now();
info!("executing EWRAP work unit");

let mut boundary = BoundaryWork::load::<D>(state, genesis, rewards)?;

boundary.commit::<D>(state, archive)?;

debug!("EWRAP work unit committed");
info!(elapsed =? started.elapsed(), "EWRAP work unit committed");

Ok(())
}
4 changes: 2 additions & 2 deletions crates/cardano/src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ pub fn bootstrap_utxos<D: Domain>(
let writer = state.start_writer()?;

let delta = crate::utxoset::compute_origin_delta(genesis);
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, false)?;

let delta = crate::utxoset::build_custom_utxos_delta(config)?;
writer.apply_utxoset(&delta)?;
writer.apply_utxoset(&delta, false)?;

writer.commit()?;

Expand Down
5 changes: 4 additions & 1 deletion crates/cardano/src/rewards/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, marker::PhantomData};
use std::{collections::HashMap, marker::PhantomData, time::Instant};

use dolos_core::ChainError;
use pallas::ledger::primitives::StakeCredential;
Expand Down Expand Up @@ -331,6 +331,7 @@ pub trait RewardsContext {

pub fn define_rewards<C: RewardsContext>(ctx: &C) -> Result<RewardMap<C>, ChainError> {
let mut map = RewardMap::<C>::new(ctx.incentives().clone());
let start = Instant::now();

for pool in ctx.iter_all_pools() {
let pool_params = ctx.pool_params(pool);
Expand Down Expand Up @@ -419,6 +420,8 @@ pub fn define_rewards<C: RewardsContext>(ctx: &C) -> Result<RewardMap<C>, ChainE
}
}

tracing::info!(elapsed =? start.elapsed(), "finished rewards calculation");

Ok(map)
}

Expand Down
8 changes: 7 additions & 1 deletion crates/cardano/src/rupd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet}, time::Instant};

use dolos_core::{
ArchiveStore, ArchiveWriter, BlockSlot, ChainError, ChainPoint, Domain, EntityKey, Genesis,
Expand Down Expand Up @@ -111,6 +111,7 @@ fn log_work<D: Domain>(
rewards: &RewardMap<RupdWork>,
archive: &D::Archive,
) -> Result<(), ChainError> {
let started = Instant::now();
let Some((_, epoch)) = work.relevant_epochs() else {
return Ok(());
};
Expand Down Expand Up @@ -160,6 +161,8 @@ fn log_work<D: Domain>(

writer.commit()?;

info!(elapsed =? started.elapsed(), "log_work finished");

Ok(())
}

Expand All @@ -170,6 +173,7 @@ pub fn execute<D: Domain>(
slot: BlockSlot,
genesis: &Genesis,
) -> Result<RewardMap<RupdWork>, ChainError> {
let started = Instant::now();
info!(slot, "executing rupd work unit");

let work = RupdWork::load::<D>(state, genesis)?;
Expand All @@ -181,5 +185,7 @@ pub fn execute<D: Domain>(
// the time being for simplicity.
log_work::<D>(&work, &rewards, archive)?;

info!(elapsed =? started.elapsed(), "finished rupd work unit");

Ok(rewards)
}
2 changes: 1 addition & 1 deletion crates/core/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl<C: ChainLogic> WorkBatch<C> {
// this into the entity system.
for block in self.blocks.iter() {
if let Some(utxo_delta) = &block.utxo_delta {
writer.apply_utxoset(utxo_delta)?;
writer.apply_utxoset(utxo_delta, false)?;
}
}

Expand Down
12 changes: 10 additions & 2 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, marker::PhantomData, ops::Range};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::{ChainError, ChainPoint, Domain, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta};
use crate::{ChainError, ChainPoint, Domain, EraCbor, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta};

pub const KEY_SIZE: usize = 32;

Expand Down Expand Up @@ -275,7 +275,10 @@ pub trait StateWriter: Sized + Send + Sync {

fn delete_entity(&self, ns: Namespace, key: &EntityKey) -> Result<(), StateError>;

fn apply_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>;
fn apply_utxoset(&self, delta: &UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError>;

/// Apply utxoset delta to indexes only.
fn index_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>;
Comment on lines +278 to +281
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -n "apply_utxoset\(" --type rust -A 2

Repository: txpipe/dolos

Length of output: 1950


Breaking API change incomplete: apply_utxoset requires defer_indexes parameter, but 3 call sites were not updated.

The signature change from apply_utxoset(&self, delta: &UtxoSetDelta) to apply_utxoset(&self, delta: &UtxoSetDelta, defer_indexes: bool) is a breaking change. However, the following call sites were not updated:

  • crates/testing/src/toy_domain.rs:20
  • crates/testing/src/toy_domain.rs:130
  • crates/redb3/src/state/utxoset.rs:456

All these calls still use the old signature without the defer_indexes parameter. Update them to pass the parameter (typically false based on other call sites).

🤖 Prompt for AI Agents
In crates/core/src/state.rs around lines 278-281 the method signature for
apply_utxoset was changed to add a defer_indexes: bool parameter, but three call
sites still use the old signature; update each call to pass the new boolean
argument (use false where other call sites use false): modify
crates/testing/src/toy_domain.rs at lines ~20 and ~130 to call
apply_utxoset(delta, false) and update crates/redb3/src/state/utxoset.rs at ~456
to call apply_utxoset(delta, false) so the calls match the new signature and
compile.


#[allow(clippy::double_must_use)]
#[must_use]
Expand Down Expand Up @@ -314,6 +317,7 @@ pub trait StateWriter: Sized + Send + Sync {
}

pub trait StateStore: Sized + Send + Sync + Clone {
type UtxoIter: Iterator<Item = Result<(TxoRef, EraCbor), StateError>>;
type EntityIter: Iterator<Item = Result<(EntityKey, EntityValue), StateError>>;
type EntityValueIter: Iterator<Item = Result<EntityValue, StateError>>;
type Writer: StateWriter;
Expand Down Expand Up @@ -396,6 +400,10 @@ pub trait StateStore: Sized + Send + Sync + Clone {

// TODO: generalize UTxO Set into generic entity system

fn iter_utxos(&self) -> Result<Self::UtxoIter, StateError>;

fn amount_of_utxos(&self) -> Result<u64, StateError>;

fn get_utxos(&self, refs: Vec<TxoRef>) -> Result<UtxoMap, StateError>;

fn get_utxo_by_address(&self, address: &[u8]) -> Result<UtxoSet, StateError>;
Expand Down
32 changes: 26 additions & 6 deletions crates/redb3/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::{collections::HashMap, path::Path, sync::Arc};

use dolos_core::{
ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap,
UtxoSet,
ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap, UtxoSet
};

use redb::{
Expand All @@ -13,7 +12,7 @@ use tracing::warn;

mod utxoset;

use crate::{build_tables, Error, Table};
use crate::{build_tables, state::utxoset::UtxosIterator, Error, Table};

impl From<Error> for StateError {
fn from(error: Error) -> Self {
Expand Down Expand Up @@ -113,8 +112,7 @@ impl StateStore {
&self.db
}

#[allow(dead_code)]
pub(crate) fn db_mut(&mut self) -> Option<&mut Database> {
pub fn db_mut(&mut self) -> Option<&mut Database> {
Arc::get_mut(&mut self.db)
}

Expand Down Expand Up @@ -215,8 +213,17 @@ impl dolos_core::StateWriter for StateWriter {
Ok(())
}

fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> {
fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError> {
utxoset::UtxosTable::apply(&self.wx, delta)?;

if !defer_indexes {
utxoset::FilterIndexes::apply(&self.wx, delta)?;
}

Ok(())
}

fn index_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> {
utxoset::FilterIndexes::apply(&self.wx, delta)?;

Ok(())
Expand All @@ -230,6 +237,7 @@ impl dolos_core::StateWriter for StateWriter {
}

impl dolos_core::StateStore for StateStore {
type UtxoIter = UtxosIterator;
type EntityIter = EntityIter;
type EntityValueIter = EntityValueIter;
type Writer = StateWriter;
Expand Down Expand Up @@ -307,6 +315,18 @@ impl dolos_core::StateStore for StateStore {
Ok(out)
}

fn iter_utxos(&self) -> Result<Self::UtxoIter, StateError> {
let rx = self.db().begin_read().map_err(Error::from)?;

Ok(utxoset::UtxosTable::iter(&rx)?)
}

fn amount_of_utxos(&self) -> Result<u64, StateError> {
let rx = self.db().begin_read().map_err(Error::from)?;

Ok(utxoset::UtxosTable::len(&rx)?)
}

fn get_utxos(&self, refs: Vec<TxoRef>) -> Result<UtxoMap, StateError> {
// exit early before opening a read tx in case there's nothing to fetch
if refs.is_empty() {
Expand Down
12 changes: 8 additions & 4 deletions crates/redb3/src/state/utxoset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dolos_core::{EraCbor, TxoRef, UtxoMap, UtxoSetDelta};
use dolos_core::{EraCbor, StateError, TxoRef, UtxoMap, UtxoSetDelta};
use pallas::ledger::{addresses::ShelleyDelegationPart, traverse::MultiEraOutput};
use redb::{
MultimapTableDefinition, Range, ReadTransaction, ReadableDatabase, ReadableTable as _,
Expand All @@ -19,10 +19,10 @@ type UtxosValue = (u16, &'static [u8]);
pub struct UtxosIterator(Range<'static, UtxosKey, UtxosValue>);

impl Iterator for UtxosIterator {
type Item = Result<(TxoRef, EraCbor), ::redb::StorageError>;
type Item = Result<(TxoRef, EraCbor), StateError>;

fn next(&mut self) -> Option<Self::Item> {
let x = self.0.next()?;
let x = self.0.next()?.map_err(|err| StateError::InternalStoreError(err.to_string()));

let x = x.map(|(k, v)| {
let (hash, idx) = k.value();
Expand Down Expand Up @@ -50,13 +50,17 @@ impl UtxosTable {
Ok(())
}

#[allow(unused)]
pub fn iter(rx: &ReadTransaction) -> Result<UtxosIterator, Error> {
let table = rx.open_table(UtxosTable::DEF)?;
let range = table.range::<UtxosKey>(..)?;
Ok(UtxosIterator(range))
}

pub fn len(rx: &ReadTransaction) -> Result<u64, Error> {
let table = rx.open_table(UtxosTable::DEF)?;
Ok(table.len()?)
}

pub fn get_sparse(rx: &ReadTransaction, refs: Vec<TxoRef>) -> Result<UtxoMap, Error> {
let table = rx.open_table(Self::DEF)?;
let mut out = HashMap::new();
Expand Down
20 changes: 17 additions & 3 deletions src/bin/dolos/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ fn prepare_chain(
Ok(())
}

fn prepare_state(
state: &mut dolos_redb3::state::StateStore,
pb: &crate::feedback::ProgressBar,
) -> miette::Result<()> {
let db = state.db_mut().unwrap();
pb.set_message("compacting state");
db.compact().into_diagnostic()?;

pb.set_message("checking state integrity");
db.check_integrity().into_diagnostic()?;

Ok(())
}

pub fn run(
config: &RootConfig,
args: &Args,
Expand All @@ -74,9 +88,8 @@ pub fn run(
.append_path_with_name(&path, "wal")
.into_diagnostic()?;

prepare_chain(&mut stores.archive, &pb)?;

if args.include_chain {
prepare_chain(&mut stores.archive, &pb)?;
let path = root.join("chain");

archive
Expand All @@ -87,13 +100,14 @@ pub fn run(
}

if args.include_state {
prepare_state(&mut stores.state, &pb)?;
let path = root.join("state");

archive
.append_path_with_name(&path, "state")
.into_diagnostic()?;

pb.set_message("creating archive");
pb.set_message("creating state");
}
Comment on lines 102 to 111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Progress message is set after the operation completes.

The message "creating state" is set on line 110 after the archive append operation has already completed. This provides misleading feedback to the user. Consider setting the message before the operation begins, similar to how prepare_state sets its messages before the operations.

🔎 Proposed fix
     if args.include_state {
+        pb.set_message("creating state");
         prepare_state(&mut stores.state, &pb)?;
         let path = root.join("state");

         archive
             .append_path_with_name(&path, "state")
             .into_diagnostic()?;
-
-        pb.set_message("creating state");
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if args.include_state {
prepare_state(&mut stores.state, &pb)?;
let path = root.join("state");
archive
.append_path_with_name(&path, "state")
.into_diagnostic()?;
pb.set_message("creating archive");
pb.set_message("creating state");
}
if args.include_state {
pb.set_message("creating state");
prepare_state(&mut stores.state, &pb)?;
let path = root.join("state");
archive
.append_path_with_name(&path, "state")
.into_diagnostic()?;
}
🤖 Prompt for AI Agents
In src/bin/dolos/data/export.rs around lines 102 to 111, the progress message
"creating state" is being set after the archive.append_path_with_name call,
which gives misleading feedback; move the pb.set_message("creating state") to
occur before calling archive.append_path_with_name(&path, "state") (i.e., set
the progress message immediately after preparing the path and before starting
the append operation) so the user sees the message while the archive operation
runs.


archive.finish().into_diagnostic()?;
Expand Down
Loading
Loading