diff --git a/balius-runtime/src/drivers/chainsync.rs b/balius-runtime/src/drivers/chainsync.rs index d878f7a..db8dc98 100644 --- a/balius-runtime/src/drivers/chainsync.rs +++ b/balius-runtime/src/drivers/chainsync.rs @@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use utxorpc::CardanoSyncClient; -use crate::{Block, ChainPoint, Error, Runtime, Store}; +use crate::{Block, ChainPoint, Error, Runtime, Store, StoreTrait}; impl From for utxorpc::spec::sync::BlockRef { fn from(point: ChainPoint) -> Self { @@ -53,7 +53,7 @@ async fn gather_blocks( } Some(utxorpc::TipEvent::Reset(block_ref)) => { tracing::warn!(block_ref =? &block_ref, "received reset event, reseting tip"); - undos = store.handle_reset(block_ref.into())?; + undos = store.handle_reset(block_ref.into()).await?; } None => { tracing::warn!("Received None response from follow_tip, skipping") diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index bcbd089..3641aaa 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -22,7 +22,6 @@ pub mod wit { mod metrics; mod router; -mod store; // implementations pub mod drivers; @@ -31,9 +30,10 @@ pub mod kv; pub mod ledgers; pub mod logging; pub mod sign; +pub mod store; pub mod submit; -pub use store::Store; +pub use store::{AtomicUpdateTrait, Store, StoreTrait}; pub use wit::Response; pub type WorkerId = String; @@ -44,7 +44,7 @@ pub enum Error { Wasm(wasmtime::Error), #[error("store error {0}")] - Store(Box), + Store(String), #[error("worker not found '{0}'")] WorkerNotFound(WorkerId), @@ -88,37 +88,37 @@ impl From for Error { impl From for Error { fn from(value: redb::Error) -> Self { - Self::Store(Box::new(value)) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::DatabaseError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::TransactionError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::TableError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::CommitError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::StorageError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } @@ -525,7 +525,7 @@ impl Runtime { if let Some(seq) = lowest_seq { debug!(lowest_seq, "found lowest seq"); - return self.store.find_chain_point(seq); + return self.store.find_chain_point(seq).await; } Ok(None) @@ -571,7 +571,7 @@ impl Runtime { let config = serde_json::to_vec(&config).unwrap(); instance.call_init(&mut wasm_store, &config).await?; - let cursor = self.store.get_worker_cursor(id)?; + let cursor = self.store.get_worker_cursor(id).await?; debug!(cursor, id, "found cursor for worker"); let mut loaded = self.loaded.write().await; @@ -643,11 +643,11 @@ impl Runtime { let start = Instant::now(); info!("applying block"); - let log_seq = self.store.write_ahead(undo_blocks, next_block)?; + let log_seq = self.store.write_ahead(undo_blocks, next_block).await?; let workers = self.loaded.read().await; - let mut store_update = self.store.start_atomic_update(log_seq)?; + let mut store_update = self.store.start_atomic_update(log_seq).await?; let update = async |worker: &Mutex| -> Result<(String, f64), Error> { let worker_start = Instant::now(); @@ -661,17 +661,16 @@ impl Runtime { }; let updates = workers.values().map(update).collect_vec(); - join_all(updates) + for (x, duration) in join_all(updates) .await .into_iter() .collect::, _>>()? - .iter() - .try_for_each(|(x, duration)| { - self.metrics.handle_worker_chain_duration_ms(x, *duration); - store_update.update_worker_cursor(x) - })?; + { + self.metrics.handle_worker_chain_duration_ms(&x, duration); + store_update.update_worker_cursor(&x).await?; + } - store_update.commit()?; + store_update.commit().await?; self.metrics .handle_chain_duration_ms(start.elapsed().as_secs_f64() * 1000.0); diff --git a/balius-runtime/src/store/mod.rs b/balius-runtime/src/store/mod.rs new file mode 100644 index 0000000..5a4eba2 --- /dev/null +++ b/balius-runtime/src/store/mod.rs @@ -0,0 +1,110 @@ +pub mod redb; + +use prost::Message; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::{Block, ChainPoint, Error}; + +pub type WorkerId = String; +pub type LogSeq = u64; + +#[derive(Message)] +pub struct LogEntry { + #[prost(bytes, tag = "1")] + pub next_block: Vec, + #[prost(bytes, repeated, tag = "2")] + pub undo_blocks: Vec>, +} + +#[async_trait::async_trait] +pub trait AtomicUpdateTrait { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>; + async fn commit(&mut self) -> Result<(), super::Error>; +} + +#[allow(clippy::large_enum_variant)] +pub enum AtomicUpdate { + Redb(redb::AtomicUpdate), + Custom(Arc>), +} + +#[async_trait::async_trait] +impl AtomicUpdateTrait for AtomicUpdate { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { + match self { + AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await, + AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await, + } + } + async fn commit(&mut self) -> Result<(), super::Error> { + match self { + AtomicUpdate::Redb(au) => au.commit().await, + AtomicUpdate::Custom(au) => au.lock().await.commit().await, + } + } +} + +#[derive(Clone)] +pub enum Store { + Redb(redb::Store), + Custom(Arc>), +} + +#[async_trait::async_trait] +pub trait StoreTrait { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error>; + async fn write_ahead( + &mut self, + undo_blocks: &[Block], + next_block: &Block, + ) -> Result; + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error>; + async fn start_atomic_update(&self, log_seq: LogSeq) -> Result; + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error>; +} + +#[async_trait::async_trait] +impl StoreTrait for Store { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { + match self { + Store::Redb(store) => store.find_chain_point(seq).await, + Store::Custom(store) => store.lock().await.find_chain_point(seq).await, + } + } + async fn write_ahead( + &mut self, + undo_blocks: &[Block], + next_block: &Block, + ) -> Result { + match self { + Store::Redb(store) => store.write_ahead(undo_blocks, next_block).await, + Store::Custom(store) => { + store + .lock() + .await + .write_ahead(undo_blocks, next_block) + .await + } + } + } + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { + match self { + Store::Redb(store) => store.get_worker_cursor(id).await, + Store::Custom(store) => store.lock().await.get_worker_cursor(id).await, + } + } + async fn start_atomic_update(&self, log_seq: LogSeq) -> Result { + match self { + Store::Redb(store) => store.start_atomic_update(log_seq).await, + Store::Custom(store) => store.lock().await.start_atomic_update(log_seq).await, + } + } + + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { + match self { + Store::Redb(store) => store.handle_reset(point).await, + Store::Custom(store) => store.lock().await.handle_reset(point).await, + } + } +} diff --git a/balius-runtime/src/store.rs b/balius-runtime/src/store/redb.rs similarity index 78% rename from balius-runtime/src/store.rs rename to balius-runtime/src/store/redb.rs index d66e762..4734f1b 100644 --- a/balius-runtime/src/store.rs +++ b/balius-runtime/src/store/redb.rs @@ -1,4 +1,3 @@ -use itertools::Itertools; use prost::Message; use redb::{ReadableTable as _, TableDefinition, WriteTransaction}; use std::{collections::VecDeque, path::Path, sync::Arc}; @@ -6,16 +5,8 @@ use tracing::warn; use crate::{Block, ChainPoint, Error}; -pub type WorkerId = String; -pub type LogSeq = u64; - -#[derive(Message)] -pub struct LogEntry { - #[prost(bytes, tag = "1")] - pub next_block: Vec, - #[prost(bytes, repeated, tag = "2")] - pub undo_blocks: Vec>, -} +use super::StoreTrait; +pub use super::{AtomicUpdateTrait, LogEntry, LogSeq, WorkerId}; impl redb::Value for LogEntry { type SelfType<'a> @@ -58,20 +49,40 @@ const WAL: TableDefinition = TableDefinition::new("wal"); const DEFAULT_CACHE_SIZE_MB: usize = 50; pub struct AtomicUpdate { - wx: WriteTransaction, + wx: Option, log_seq: LogSeq, } - impl AtomicUpdate { - pub fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { - let mut table = self.wx.open_table(CURSORS)?; + pub fn new(wx: WriteTransaction, log_seq: LogSeq) -> Self { + Self { + wx: Some(wx), + log_seq, + } + } +} + +#[async_trait::async_trait] +impl AtomicUpdateTrait for AtomicUpdate { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { + let Some(wx) = self.wx.as_mut() else { + return Err(super::Error::Store( + "Transaction already commited".to_string(), + )); + }; + + let mut table = wx.open_table(CURSORS)?; table.insert(id.to_owned(), self.log_seq)?; Ok(()) } - pub fn commit(self) -> Result<(), super::Error> { - self.wx.commit()?; + async fn commit(&mut self) -> Result<(), super::Error> { + let Some(wx) = self.wx.take() else { + return Err(super::Error::Store( + "Transaction already commited".to_string(), + )); + }; + wx.commit()?; Ok(()) } } @@ -165,15 +176,18 @@ impl Store { let entry = table.get(seq)?; Ok(entry.map(|x| x.value())) } +} - pub fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { +#[async_trait::async_trait] +impl StoreTrait for Store { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { let entry = self.get_entry(seq)?; let block = Block::from_bytes(&entry.unwrap().next_block); Ok(Some(block.chain_point())) } - pub fn write_ahead( + async fn write_ahead( &mut self, undo_blocks: &[Block], next_block: &Block, @@ -196,7 +210,7 @@ impl Store { } // TODO: see if loading in batch is worth it - pub fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { let rx = self.db.begin_read()?; let table = match rx.open_table(CURSORS) { @@ -209,30 +223,15 @@ impl Store { Ok(cursor.map(|x| x.value())) } - pub fn start_atomic_update(&self, log_seq: LogSeq) -> Result { + async fn start_atomic_update( + &self, + log_seq: LogSeq, + ) -> Result { let wx = self.db.begin_write()?; - Ok(AtomicUpdate { wx, log_seq }) - } - - // TODO: I don't think we need this since we're going to load each cursor as - // part of the loaded worker - pub fn lowest_cursor(&self) -> Result, super::Error> { - let rx = self.db.begin_read()?; - - let table = rx.open_table(CURSORS)?; - - let cursors: Vec<_> = table - .iter()? - .map_ok(|(_, value)| value.value()) - .try_collect()?; - - let lowest = cursors.iter().fold(None, |all, item| all.min(Some(*item))); - - Ok(lowest) + Ok(super::AtomicUpdate::Redb(AtomicUpdate::new(wx, log_seq))) } - /// Return list of blocks to undo after receiving a reset response from chainsync. - pub fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { let rx = self.db.begin_read()?; let table = rx.open_table(WAL)?; diff --git a/balius-runtime/tests/e2e.rs b/balius-runtime/tests/e2e.rs index 0c64931..0b324a2 100644 --- a/balius-runtime/tests/e2e.rs +++ b/balius-runtime/tests/e2e.rs @@ -5,7 +5,7 @@ use std::{ process::Command, }; -use balius_runtime::{ledgers, Runtime, Store}; +use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store}; use serde_json::json; use wit_component::ComponentEncoder; @@ -41,9 +41,9 @@ fn build_module(src_dir: impl AsRef, module_name: &str, target: impl AsRef async fn faucet_claim() { build_module("../examples/minter/offchain", "minter", "tests/faucet.wasm"); - let store = Store::open("tests/balius.db", None).unwrap(); + let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap()); - let mut runtime = Runtime::builder(store) + let runtime = Runtime::builder(store) .with_ledger(ledgers::mock::Ledger.into()) .build() .unwrap(); diff --git a/balius/src/bin/command/test.rs b/balius/src/bin/command/test.rs index ef61123..603ad61 100644 --- a/balius/src/bin/command/test.rs +++ b/balius/src/bin/command/test.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, path::PathBuf}; -use balius_runtime::{ledgers, Runtime, Store}; +use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store}; use miette::{Context as _, IntoDiagnostic as _}; use tokio_util::sync::CancellationToken; use tracing::{info, warn, Level}; @@ -90,9 +90,11 @@ async fn run_project_with_config( setup_tracing()?; info!("Running Balius project on daemon..."); - let store: Store = Store::open("baliusd.db", None) - .into_diagnostic() - .context("opening store")?; + let store = Store::Redb( + RedbStore::open("baliusd.db", None) + .into_diagnostic() + .context("opening store")?, + ); let config = ledgers::u5c::Config { endpoint_url: utxo_url.clone(), diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 3dac126..0311674 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -1,6 +1,8 @@ use std::path::PathBuf; -use balius_runtime::{drivers, ledgers, sign::in_memory::SignerKey, Runtime, Store}; +use balius_runtime::{ + drivers, ledgers, sign::in_memory::SignerKey, store::redb::Store as RedbStore, Runtime, Store, +}; use boilerplate::{init_meter_provider, metrics_server}; use clap::{Parser, Subcommand}; use config::SignerConfig; @@ -60,10 +62,10 @@ async fn daemon(debug: bool) -> miette::Result<()> { boilerplate::setup_tracing(&config.logging).unwrap(); let mut store = match config.store.as_ref() { - Some(cfg) => Store::open(cfg.path.clone(), None) + Some(cfg) => RedbStore::open(cfg.path.clone(), None) .into_diagnostic() .context("opening store")?, - None => Store::in_memory() + None => RedbStore::in_memory() .into_diagnostic() .context("opening in memory store")?, }; @@ -92,7 +94,7 @@ async fn daemon(debug: bool) -> miette::Result<()> { .context("converting kv into ephemeral")?; } - let runtime = Runtime::builder(store) + let runtime = Runtime::builder(Store::Redb(store)) .with_ledger(ledger.into()) .with_kv(kv) .with_logger((&config).into())