diff --git a/crates/cardano/src/estart/mod.rs b/crates/cardano/src/estart/mod.rs index 42a4949a..eda2aa29 100644 --- a/crates/cardano/src/estart/mod.rs +++ b/crates/cardano/src/estart/mod.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use dolos_core::{ batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis, }; -use tracing::{debug, info, instrument}; +use tracing::{info, instrument}; use crate::{ AccountState, CardanoDelta, CardanoEntity, CardanoLogic, DRepState, EpochState, EraProtocol, @@ -104,13 +104,14 @@ pub fn execute( _config: &CardanoConfig, genesis: Arc, ) -> Result<(), ChainError> { + let started = Instant::now(); info!("executing ESTART work unit"); let mut work = WorkContext::load::(state, genesis)?; work.commit::(state, archive, slot)?; - debug!("ESTART work unit committed"); + info!(elapsed =? started.elapsed(), "ESTART work unit committed"); Ok(()) } diff --git a/crates/cardano/src/ewrap/mod.rs b/crates/cardano/src/ewrap/mod.rs index bc574776..9d2b627f 100644 --- a/crates/cardano/src/ewrap/mod.rs +++ b/crates/cardano/src/ewrap/mod.rs @@ -1,13 +1,13 @@ use std::{ collections::{HashMap, HashSet}, - sync::Arc, + sync::Arc, time::Instant, }; use dolos_core::{ batch::WorkDeltas, config::CardanoConfig, BlockSlot, ChainError, Domain, EntityKey, Genesis, }; use pallas::ledger::primitives::conway::DRep; -use tracing::{debug, info, instrument}; +use tracing::{info, instrument}; use crate::{ rewards::RewardMap, rupd::RupdWork, AccountState, CardanoDelta, CardanoEntity, CardanoLogic, @@ -156,13 +156,14 @@ pub fn execute( genesis: Arc, rewards: RewardMap, ) -> Result<(), ChainError> { + let started = Instant::now(); info!("executing EWRAP work unit"); let mut boundary = BoundaryWork::load::(state, genesis, rewards)?; boundary.commit::(state, archive)?; - debug!("EWRAP work unit committed"); + info!(elapsed =? started.elapsed(), "EWRAP work unit committed"); Ok(()) } diff --git a/crates/cardano/src/genesis/mod.rs b/crates/cardano/src/genesis/mod.rs index 8da74557..828decf1 100644 --- a/crates/cardano/src/genesis/mod.rs +++ b/crates/cardano/src/genesis/mod.rs @@ -118,10 +118,10 @@ pub fn bootstrap_utxos( let writer = state.start_writer()?; let delta = crate::utxoset::compute_origin_delta(genesis); - writer.apply_utxoset(&delta)?; + writer.apply_utxoset(&delta, false)?; let delta = crate::utxoset::build_custom_utxos_delta(config)?; - writer.apply_utxoset(&delta)?; + writer.apply_utxoset(&delta, false)?; writer.commit()?; diff --git a/crates/cardano/src/rewards/mod.rs b/crates/cardano/src/rewards/mod.rs index ee4169f7..a671bf54 100644 --- a/crates/cardano/src/rewards/mod.rs +++ b/crates/cardano/src/rewards/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, marker::PhantomData}; +use std::{collections::HashMap, marker::PhantomData, time::Instant}; use dolos_core::ChainError; use pallas::ledger::primitives::StakeCredential; @@ -331,6 +331,7 @@ pub trait RewardsContext { pub fn define_rewards(ctx: &C) -> Result, ChainError> { let mut map = RewardMap::::new(ctx.incentives().clone()); + let start = Instant::now(); for pool in ctx.iter_all_pools() { let pool_params = ctx.pool_params(pool); @@ -419,6 +420,8 @@ pub fn define_rewards(ctx: &C) -> Result, ChainE } } + tracing::info!(elapsed =? start.elapsed(), "finished rewards calculation"); + Ok(map) } diff --git a/crates/cardano/src/rupd/mod.rs b/crates/cardano/src/rupd/mod.rs index f261447d..0b66b5ef 100644 --- a/crates/cardano/src/rupd/mod.rs +++ b/crates/cardano/src/rupd/mod.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::{collections::{HashMap, HashSet}, time::Instant}; use dolos_core::{ ArchiveStore, ArchiveWriter, BlockSlot, ChainError, ChainPoint, Domain, EntityKey, Genesis, @@ -111,6 +111,7 @@ fn log_work( rewards: &RewardMap, archive: &D::Archive, ) -> Result<(), ChainError> { + let started = Instant::now(); let Some((_, epoch)) = work.relevant_epochs() else { return Ok(()); }; @@ -160,6 +161,8 @@ fn log_work( writer.commit()?; + info!(elapsed =? started.elapsed(), "log_work finished"); + Ok(()) } @@ -170,6 +173,7 @@ pub fn execute( slot: BlockSlot, genesis: &Genesis, ) -> Result, ChainError> { + let started = Instant::now(); info!(slot, "executing rupd work unit"); let work = RupdWork::load::(state, genesis)?; @@ -181,5 +185,7 @@ pub fn execute( // the time being for simplicity. log_work::(&work, &rewards, archive)?; + info!(elapsed =? started.elapsed(), "finished rupd work unit"); + Ok(rewards) } diff --git a/crates/core/src/batch.rs b/crates/core/src/batch.rs index 78f7c185..8538e61b 100644 --- a/crates/core/src/batch.rs +++ b/crates/core/src/batch.rs @@ -298,7 +298,7 @@ impl WorkBatch { // this into the entity system. for block in self.blocks.iter() { if let Some(utxo_delta) = &block.utxo_delta { - writer.apply_utxoset(utxo_delta)?; + writer.apply_utxoset(utxo_delta, false)?; } } diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index 3f095ce4..20152474 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, marker::PhantomData, ops::Range}; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use crate::{ChainError, ChainPoint, Domain, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta}; +use crate::{ChainError, ChainPoint, Domain, EraCbor, TxoRef, UtxoMap, UtxoSet, UtxoSetDelta}; pub const KEY_SIZE: usize = 32; @@ -275,7 +275,10 @@ pub trait StateWriter: Sized + Send + Sync { fn delete_entity(&self, ns: Namespace, key: &EntityKey) -> Result<(), StateError>; - fn apply_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>; + fn apply_utxoset(&self, delta: &UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError>; + + /// Apply utxoset delta to indexes only. + fn index_utxoset(&self, delta: &UtxoSetDelta) -> Result<(), StateError>; #[allow(clippy::double_must_use)] #[must_use] @@ -314,6 +317,7 @@ pub trait StateWriter: Sized + Send + Sync { } pub trait StateStore: Sized + Send + Sync + Clone { + type UtxoIter: Iterator>; type EntityIter: Iterator>; type EntityValueIter: Iterator>; type Writer: StateWriter; @@ -396,6 +400,10 @@ pub trait StateStore: Sized + Send + Sync + Clone { // TODO: generalize UTxO Set into generic entity system + fn iter_utxos(&self) -> Result; + + fn amount_of_utxos(&self) -> Result; + fn get_utxos(&self, refs: Vec) -> Result; fn get_utxo_by_address(&self, address: &[u8]) -> Result; diff --git a/crates/redb3/src/state/mod.rs b/crates/redb3/src/state/mod.rs index 43ae830b..58c9f248 100644 --- a/crates/redb3/src/state/mod.rs +++ b/crates/redb3/src/state/mod.rs @@ -1,8 +1,7 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use dolos_core::{ - ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap, - UtxoSet, + ChainPoint, EntityKey, EntityValue, Namespace, StateError, StateSchema, TxoRef, UtxoMap, UtxoSet }; use redb::{ @@ -13,7 +12,7 @@ use tracing::warn; mod utxoset; -use crate::{build_tables, Error, Table}; +use crate::{build_tables, state::utxoset::UtxosIterator, Error, Table}; impl From for StateError { fn from(error: Error) -> Self { @@ -113,8 +112,7 @@ impl StateStore { &self.db } - #[allow(dead_code)] - pub(crate) fn db_mut(&mut self) -> Option<&mut Database> { + pub fn db_mut(&mut self) -> Option<&mut Database> { Arc::get_mut(&mut self.db) } @@ -215,8 +213,17 @@ impl dolos_core::StateWriter for StateWriter { Ok(()) } - fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> { + fn apply_utxoset(&self, delta: &dolos_core::UtxoSetDelta, defer_indexes: bool) -> Result<(), StateError> { utxoset::UtxosTable::apply(&self.wx, delta)?; + + if !defer_indexes { + utxoset::FilterIndexes::apply(&self.wx, delta)?; + } + + Ok(()) + } + + fn index_utxoset(&self, delta: &dolos_core::UtxoSetDelta) -> Result<(), StateError> { utxoset::FilterIndexes::apply(&self.wx, delta)?; Ok(()) @@ -230,6 +237,7 @@ impl dolos_core::StateWriter for StateWriter { } impl dolos_core::StateStore for StateStore { + type UtxoIter = UtxosIterator; type EntityIter = EntityIter; type EntityValueIter = EntityValueIter; type Writer = StateWriter; @@ -307,6 +315,18 @@ impl dolos_core::StateStore for StateStore { Ok(out) } + fn iter_utxos(&self) -> Result { + let rx = self.db().begin_read().map_err(Error::from)?; + + Ok(utxoset::UtxosTable::iter(&rx)?) + } + + fn amount_of_utxos(&self) -> Result { + let rx = self.db().begin_read().map_err(Error::from)?; + + Ok(utxoset::UtxosTable::len(&rx)?) + } + fn get_utxos(&self, refs: Vec) -> Result { // exit early before opening a read tx in case there's nothing to fetch if refs.is_empty() { diff --git a/crates/redb3/src/state/utxoset.rs b/crates/redb3/src/state/utxoset.rs index 98f2b20c..c66610e9 100644 --- a/crates/redb3/src/state/utxoset.rs +++ b/crates/redb3/src/state/utxoset.rs @@ -1,4 +1,4 @@ -use dolos_core::{EraCbor, TxoRef, UtxoMap, UtxoSetDelta}; +use dolos_core::{EraCbor, StateError, TxoRef, UtxoMap, UtxoSetDelta}; use pallas::ledger::{addresses::ShelleyDelegationPart, traverse::MultiEraOutput}; use redb::{ MultimapTableDefinition, Range, ReadTransaction, ReadableDatabase, ReadableTable as _, @@ -19,10 +19,10 @@ type UtxosValue = (u16, &'static [u8]); pub struct UtxosIterator(Range<'static, UtxosKey, UtxosValue>); impl Iterator for UtxosIterator { - type Item = Result<(TxoRef, EraCbor), ::redb::StorageError>; + type Item = Result<(TxoRef, EraCbor), StateError>; fn next(&mut self) -> Option { - let x = self.0.next()?; + let x = self.0.next()?.map_err(|err| StateError::InternalStoreError(err.to_string())); let x = x.map(|(k, v)| { let (hash, idx) = k.value(); @@ -50,13 +50,17 @@ impl UtxosTable { Ok(()) } - #[allow(unused)] pub fn iter(rx: &ReadTransaction) -> Result { let table = rx.open_table(UtxosTable::DEF)?; let range = table.range::(..)?; Ok(UtxosIterator(range)) } + pub fn len(rx: &ReadTransaction) -> Result { + let table = rx.open_table(UtxosTable::DEF)?; + Ok(table.len()?) + } + pub fn get_sparse(rx: &ReadTransaction, refs: Vec) -> Result { let table = rx.open_table(Self::DEF)?; let mut out = HashMap::new(); diff --git a/src/bin/dolos/data/export.rs b/src/bin/dolos/data/export.rs index b4e69cca..97370367 100644 --- a/src/bin/dolos/data/export.rs +++ b/src/bin/dolos/data/export.rs @@ -51,6 +51,20 @@ fn prepare_chain( Ok(()) } +fn prepare_state( + state: &mut dolos_redb3::state::StateStore, + pb: &crate::feedback::ProgressBar, +) -> miette::Result<()> { + let db = state.db_mut().unwrap(); + pb.set_message("compacting state"); + db.compact().into_diagnostic()?; + + pb.set_message("checking state integrity"); + db.check_integrity().into_diagnostic()?; + + Ok(()) +} + pub fn run( config: &RootConfig, args: &Args, @@ -74,9 +88,8 @@ pub fn run( .append_path_with_name(&path, "wal") .into_diagnostic()?; - prepare_chain(&mut stores.archive, &pb)?; - if args.include_chain { + prepare_chain(&mut stores.archive, &pb)?; let path = root.join("chain"); archive @@ -87,13 +100,14 @@ pub fn run( } if args.include_state { + prepare_state(&mut stores.state, &pb)?; let path = root.join("state"); archive .append_path_with_name(&path, "state") .into_diagnostic()?; - pb.set_message("creating archive"); + pb.set_message("creating state"); } archive.finish().into_diagnostic()?; diff --git a/src/bin/dolos/doctor/build_indexes.rs b/src/bin/dolos/doctor/build_indexes.rs new file mode 100644 index 00000000..35f0c5ae --- /dev/null +++ b/src/bin/dolos/doctor/build_indexes.rs @@ -0,0 +1,59 @@ +use std::{sync::Arc}; + +use dolos_core::config::RootConfig; +use itertools::Itertools; +use miette::{Context, IntoDiagnostic}; + +use dolos::prelude::*; + +use crate::feedback::Feedback; + +#[derive(Debug, clap::Args)] +pub struct Args { + #[arg(short, long, default_value_t = 500)] + pub chunk: usize, +} + +#[tokio::main] +pub async fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> { + //crate::common::setup_tracing(&config.logging)?; + + let progress = feedback.slot_progress_bar(); + progress.set_message("building indexes"); + + let mut domain = crate::common::setup_domain(config).await?; + + progress.set_length(domain.state.amount_of_utxos().into_diagnostic().context("getting amount of utxos")?); + + let remaining = domain + .state + .iter_utxos() + .into_diagnostic() + .context("iterating over utxos")?; + + for chunk in remaining.chunks(args.chunk).into_iter() { + let produced_utxo = chunk.into_iter().map(|x| { + let (k, v) = x.into_diagnostic().context("decoding utxoset")?; + Ok((k, Arc::new(v))) + }).collect::>()?; + let utxoset = UtxoSetDelta {produced_utxo, ..Default::default()}; + + { + let writer = domain.state.start_writer().into_diagnostic().context("starting writer")?; + writer.index_utxoset(&utxoset).into_diagnostic().context("indexing")?; + writer.commit().into_diagnostic().context("committing")?; + } + + progress.inc(args.chunk as u64); + } + + let db = domain.state.db_mut().unwrap(); + + progress.set_message("compacting"); + db.compact().into_diagnostic().context("compacting")?; + + progress.set_message("checking integrity"); + db.check_integrity().into_diagnostic().context("checking integrity")?; + + Ok(()) +} diff --git a/src/bin/dolos/doctor/mod.rs b/src/bin/dolos/doctor/mod.rs index cd144620..b8a348fc 100644 --- a/src/bin/dolos/doctor/mod.rs +++ b/src/bin/dolos/doctor/mod.rs @@ -3,6 +3,7 @@ use dolos_core::config::RootConfig; use crate::feedback::Feedback; +mod build_indexes; mod catchup_stores; mod reset_wal; mod rollback; @@ -32,6 +33,9 @@ pub enum Command { /// manually updates an entity in the state UpdateEntity(update_entity::Args), + + /// catch up store data from WAL records + BuildIndexes(build_indexes::Args), } #[derive(Debug, Parser)] @@ -42,6 +46,7 @@ pub struct Args { pub fn run(config: &RootConfig, args: &Args, feedback: &Feedback) -> miette::Result<()> { match &args.command { + Command::BuildIndexes(x) => build_indexes::run(config, x, feedback)?, Command::CatchupStores(x) => catchup_stores::run(config, x, feedback)?, Command::ResetWal(x) => reset_wal::run(config, x, feedback)?, Command::WalIntegrity(x) => wal_integrity::run(config, x)?, diff --git a/src/facade.rs b/src/facade.rs index 718fa47b..5c429b05 100644 --- a/src/facade.rs +++ b/src/facade.rs @@ -61,7 +61,7 @@ where let utxo_undo = dolos_cardano::utxoset::compute_undo_delta(blockd, &inputs) .map_err(dolos_core::ChainError::from)?; - writer.apply_utxoset(&utxo_undo)?; + writer.apply_utxoset(&utxo_undo, false)?; // TODO: we should differ notifications until the we commit the writers self.notify_tip(TipEvent::Undo(point.clone(), block));