From a32664291aafa41fe4d9f30143203a096c063894 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Mon, 26 May 2025 17:14:59 -0300 Subject: [PATCH 1/5] feat: Allow development of custom stores --- Cargo.lock | 1 + balius-runtime/src/lib.rs | 32 +++--- balius-runtime/src/store/mod.rs | 102 ++++++++++++++++++ .../src/{store.rs => store/redb.rs} | 78 +++++++------- balius-runtime/tests/e2e.rs | 6 +- balius/src/bin/command/test.rs | 10 +- baliusd/src/main.rs | 12 ++- 7 files changed, 176 insertions(+), 65 deletions(-) create mode 100644 balius-runtime/src/store/mod.rs rename balius-runtime/src/{store.rs => store/redb.rs} (72%) diff --git a/Cargo.lock b/Cargo.lock index c0290c6..a3b404c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,6 +608,7 @@ name = "comprehensive" version = "0.1.0" dependencies = [ "balius-sdk", + "hex", "serde", "serde_with", "tracing", diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index 5fe2589..9680be9 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -16,7 +16,6 @@ pub mod wit { } mod router; -mod store; // implementations pub mod drivers; @@ -25,9 +24,10 @@ pub mod kv; pub mod ledgers; pub mod logging; pub mod sign; +pub mod store; pub mod submit; -pub use store::Store; +pub use store::{AtomicUpdateTrait, Store, StoreTrait}; pub use wit::Response; pub type WorkerId = String; @@ -38,7 +38,7 @@ pub enum Error { Wasm(wasmtime::Error), #[error("store error {0}")] - Store(Box), + Store(String), #[error("worker not found '{0}'")] WorkerNotFound(WorkerId), @@ -79,37 +79,37 @@ impl From for Error { impl From for Error { fn from(value: redb::Error) -> Self { - Self::Store(Box::new(value)) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::DatabaseError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::TransactionError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::TableError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::CommitError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } impl From for Error { fn from(value: redb::StorageError) -> Self { - Self::Store(Box::new(value.into())) + Self::Store(value.to_string()) } } @@ -475,7 +475,7 @@ impl Runtime { if let Some(seq) = lowest_seq { debug!(lowest_seq, "found lowest seq"); - return self.store.find_chain_point(seq); + return self.store.find_chain_point(seq).await; } Ok(None) @@ -509,7 +509,7 @@ impl Runtime { let config = serde_json::to_vec(&config).unwrap(); instance.call_init(&mut wasm_store, &config).await?; - let cursor = self.store.get_worker_cursor(id)?; + let cursor = self.store.get_worker_cursor(id).await?; debug!(cursor, id, "found cursor for worker"); self.loaded.lock().await.insert( @@ -571,18 +571,20 @@ impl Runtime { ) -> Result<(), Error> { info!("applying block"); - let log_seq = self.store.write_ahead(undo_blocks, next_block)?; + let log_seq = self.store.write_ahead(undo_blocks, next_block).await?; let mut workers = self.loaded.lock().await; - let mut store_update = self.store.start_atomic_update(log_seq)?; + let mut store_update = self.store.start_atomic_update(log_seq).await?; for (_, worker) in workers.iter_mut() { worker.apply_chain(undo_blocks, next_block).await?; - store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?; + store_update + .update_worker_cursor(&worker.wasm_store.data().worker_id) + .await?; } - store_update.commit()?; + store_update.commit().await?; Ok(()) } diff --git a/balius-runtime/src/store/mod.rs b/balius-runtime/src/store/mod.rs new file mode 100644 index 0000000..ea4c117 --- /dev/null +++ b/balius-runtime/src/store/mod.rs @@ -0,0 +1,102 @@ +pub mod redb; + +use prost::Message; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::{Block, ChainPoint, Error}; + +pub type WorkerId = String; +pub type LogSeq = u64; + +#[derive(Message)] +pub struct LogEntry { + #[prost(bytes, tag = "1")] + pub next_block: Vec, + #[prost(bytes, repeated, tag = "2")] + pub undo_blocks: Vec>, +} + +#[async_trait::async_trait] +pub trait AtomicUpdateTrait { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>; + async fn commit(&mut self) -> Result<(), super::Error>; +} + +#[allow(clippy::large_enum_variant)] +pub enum AtomicUpdate { + Redb(redb::AtomicUpdate), + Custom(Arc>), +} + +#[async_trait::async_trait] +impl AtomicUpdateTrait for AtomicUpdate { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { + match self { + AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await, + AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await, + } + } + async fn commit(&mut self) -> Result<(), super::Error> { + match self { + AtomicUpdate::Redb(au) => au.commit().await, + AtomicUpdate::Custom(au) => au.lock().await.commit().await, + } + } +} + +#[derive(Clone)] +pub enum Store { + Redb(redb::Store), + Custom(Arc>), +} + +#[async_trait::async_trait] +pub trait StoreTrait { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error>; + async fn write_ahead( + &mut self, + undo_blocks: &[Block], + next_block: &Block, + ) -> Result; + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error>; + async fn start_atomic_update(&self, log_seq: LogSeq) -> Result; +} + +#[async_trait::async_trait] +impl StoreTrait for Store { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { + match self { + Store::Redb(store) => store.find_chain_point(seq).await, + Store::Custom(store) => store.lock().await.find_chain_point(seq).await, + } + } + async fn write_ahead( + &mut self, + undo_blocks: &[Block], + next_block: &Block, + ) -> Result { + match self { + Store::Redb(store) => store.write_ahead(undo_blocks, next_block).await, + Store::Custom(store) => { + store + .lock() + .await + .write_ahead(undo_blocks, next_block) + .await + } + } + } + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { + match self { + Store::Redb(store) => store.get_worker_cursor(id).await, + Store::Custom(store) => store.lock().await.get_worker_cursor(id).await, + } + } + async fn start_atomic_update(&self, log_seq: LogSeq) -> Result { + match self { + Store::Redb(store) => store.start_atomic_update(log_seq).await, + Store::Custom(store) => store.lock().await.start_atomic_update(log_seq).await, + } + } +} diff --git a/balius-runtime/src/store.rs b/balius-runtime/src/store/redb.rs similarity index 72% rename from balius-runtime/src/store.rs rename to balius-runtime/src/store/redb.rs index 16dc340..9516a35 100644 --- a/balius-runtime/src/store.rs +++ b/balius-runtime/src/store/redb.rs @@ -1,4 +1,3 @@ -use itertools::Itertools; use prost::Message; use redb::{ReadableTable as _, TableDefinition, WriteTransaction}; use std::{path::Path, sync::Arc}; @@ -6,16 +5,8 @@ use tracing::warn; use crate::{Block, ChainPoint, Error}; -pub type WorkerId = String; -pub type LogSeq = u64; - -#[derive(Message)] -pub struct LogEntry { - #[prost(bytes, tag = "1")] - pub next_block: Vec, - #[prost(bytes, repeated, tag = "2")] - pub undo_blocks: Vec>, -} +use super::StoreTrait; +pub use super::{AtomicUpdateTrait, LogEntry, LogSeq, WorkerId}; impl redb::Value for LogEntry { type SelfType<'a> @@ -58,20 +49,40 @@ const WAL: TableDefinition = TableDefinition::new("wal"); const DEFAULT_CACHE_SIZE_MB: usize = 50; pub struct AtomicUpdate { - wx: WriteTransaction, + wx: Option, log_seq: LogSeq, } - impl AtomicUpdate { - pub fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { - let mut table = self.wx.open_table(CURSORS)?; + pub fn new(wx: WriteTransaction, log_seq: LogSeq) -> Self { + Self { + wx: Some(wx), + log_seq, + } + } +} + +#[async_trait::async_trait] +impl AtomicUpdateTrait for AtomicUpdate { + async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> { + let Some(wx) = self.wx.as_mut() else { + return Err(super::Error::Store( + "Transaction already commited".to_string(), + )); + }; + + let mut table = wx.open_table(CURSORS)?; table.insert(id.to_owned(), self.log_seq)?; Ok(()) } - pub fn commit(self) -> Result<(), super::Error> { - self.wx.commit()?; + async fn commit(&mut self) -> Result<(), super::Error> { + let Some(wx) = self.wx.take() else { + return Err(super::Error::Store( + "Transaction already commited".to_string(), + )); + }; + wx.commit()?; Ok(()) } } @@ -120,15 +131,18 @@ impl Store { let entry = table.get(seq)?; Ok(entry.map(|x| x.value())) } +} - pub fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { +#[async_trait::async_trait] +impl StoreTrait for Store { + async fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { let entry = self.get_entry(seq)?; let block = Block::from_bytes(&entry.unwrap().next_block); Ok(Some(block.chain_point())) } - pub fn write_ahead( + async fn write_ahead( &mut self, undo_blocks: &[Block], next_block: &Block, @@ -151,7 +165,7 @@ impl Store { } // TODO: see if loading in batch is worth it - pub fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { + async fn get_worker_cursor(&self, id: &str) -> Result, super::Error> { let rx = self.db.begin_read()?; let table = match rx.open_table(CURSORS) { @@ -164,25 +178,11 @@ impl Store { Ok(cursor.map(|x| x.value())) } - pub fn start_atomic_update(&self, log_seq: LogSeq) -> Result { + async fn start_atomic_update( + &self, + log_seq: LogSeq, + ) -> Result { let wx = self.db.begin_write()?; - Ok(AtomicUpdate { wx, log_seq }) - } - - // TODO: I don't think we need this since we're going to load each cursor as - // part of the loaded worker - pub fn lowest_cursor(&self) -> Result, super::Error> { - let rx = self.db.begin_read()?; - - let table = rx.open_table(CURSORS)?; - - let cursors: Vec<_> = table - .iter()? - .map_ok(|(_, value)| value.value()) - .try_collect()?; - - let lowest = cursors.iter().fold(None, |all, item| all.min(Some(*item))); - - Ok(lowest) + Ok(super::AtomicUpdate::Redb(AtomicUpdate::new(wx, log_seq))) } } diff --git a/balius-runtime/tests/e2e.rs b/balius-runtime/tests/e2e.rs index 0c64931..0b324a2 100644 --- a/balius-runtime/tests/e2e.rs +++ b/balius-runtime/tests/e2e.rs @@ -5,7 +5,7 @@ use std::{ process::Command, }; -use balius_runtime::{ledgers, Runtime, Store}; +use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store}; use serde_json::json; use wit_component::ComponentEncoder; @@ -41,9 +41,9 @@ fn build_module(src_dir: impl AsRef, module_name: &str, target: impl AsRef async fn faucet_claim() { build_module("../examples/minter/offchain", "minter", "tests/faucet.wasm"); - let store = Store::open("tests/balius.db", None).unwrap(); + let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap()); - let mut runtime = Runtime::builder(store) + let runtime = Runtime::builder(store) .with_ledger(ledgers::mock::Ledger.into()) .build() .unwrap(); diff --git a/balius/src/bin/command/test.rs b/balius/src/bin/command/test.rs index f4476c4..e9a5023 100644 --- a/balius/src/bin/command/test.rs +++ b/balius/src/bin/command/test.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, path::PathBuf}; -use balius_runtime::{ledgers, Runtime, Store}; +use balius_runtime::{ledgers, store::redb::Store as RedbStore, Runtime, Store}; use miette::{Context as _, IntoDiagnostic as _}; use tokio_util::sync::CancellationToken; use tracing::{info, warn, Level}; @@ -90,9 +90,11 @@ async fn run_project_with_config( setup_tracing()?; info!("Running Balius project on daemon..."); - let store: Store = Store::open("baliusd.db", None) - .into_diagnostic() - .context("opening store")?; + let store = Store::Redb( + RedbStore::open("baliusd.db", None) + .into_diagnostic() + .context("opening store")?, + ); let config = ledgers::u5c::Config { endpoint_url: utxo_url.clone(), diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 320d631..bc4009e 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -1,6 +1,8 @@ use std::{path::PathBuf, sync::Arc}; -use balius_runtime::{drivers, ledgers, logging::file::FileLogger, Runtime, Store}; +use balius_runtime::{ + drivers, ledgers, logging::file::FileLogger, store::redb::Store as RedbStore, Runtime, Store, +}; use miette::{Context as _, IntoDiagnostic as _}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -106,9 +108,11 @@ async fn main() -> miette::Result<()> { boilerplate::setup_tracing(&config.logging).unwrap(); - let store = Store::open("baliusd.db", None) - .into_diagnostic() - .context("opening store")?; + let store = Store::Redb( + RedbStore::open("baliusd.db", None) + .into_diagnostic() + .context("opening store")?, + ); let ledger = ledgers::u5c::Ledger::new(&config.ledger) .await From e6cb82132cbfb380cd800bce66b6db473ba29c14 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Mon, 23 Jun 2025 13:46:23 -0300 Subject: [PATCH 2/5] Fix after rebase --- baliusd/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 3e5918f..92c9867 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -125,10 +125,10 @@ async fn main() -> miette::Result<()> { boilerplate::setup_tracing(&config.logging).unwrap(); let store = match config.store.as_ref() { - Some(cfg) => Store::open(cfg.path.clone(), None) + Some(cfg) => RedbStore::open(cfg.path.clone(), None) .into_diagnostic() .context("opening store")?, - None => Store::in_memory() + None => RedbStore::in_memory() .into_diagnostic() .context("opening in memory store")?, }; @@ -138,7 +138,7 @@ async fn main() -> miette::Result<()> { .into_diagnostic() .context("setting up ledger")?; - let runtime = Runtime::builder(store) + let runtime = Runtime::builder(Store::Redb(store)) .with_ledger(ledger.into()) .with_kv((&config).into()) .with_logger((&config).into()) From 53ddfe2cae526a4b570d78c36bc7f96272953d99 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Mon, 13 Oct 2025 18:13:31 -0300 Subject: [PATCH 3/5] Rebase --- balius-runtime/src/drivers/chainsync.rs | 4 ++-- balius-runtime/src/store/mod.rs | 8 ++++++++ balius-runtime/src/store/redb.rs | 3 +-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/balius-runtime/src/drivers/chainsync.rs b/balius-runtime/src/drivers/chainsync.rs index 77e09be..98fdf26 100644 --- a/balius-runtime/src/drivers/chainsync.rs +++ b/balius-runtime/src/drivers/chainsync.rs @@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use utxorpc::CardanoSyncClient; -use crate::{Block, ChainPoint, Error, Runtime, Store}; +use crate::{Block, ChainPoint, Error, Runtime, Store, StoreTrait}; impl From for utxorpc::spec::sync::BlockRef { fn from(point: ChainPoint) -> Self { @@ -53,7 +53,7 @@ async fn gather_blocks( } utxorpc::TipEvent::Reset(block_ref) => { tracing::warn!(block_ref =? &block_ref, "received reset event, reseting tip"); - undos = store.handle_reset(block_ref.into())?; + undos = store.handle_reset(block_ref.into()).await?; } } } diff --git a/balius-runtime/src/store/mod.rs b/balius-runtime/src/store/mod.rs index ea4c117..5a4eba2 100644 --- a/balius-runtime/src/store/mod.rs +++ b/balius-runtime/src/store/mod.rs @@ -61,6 +61,7 @@ pub trait StoreTrait { ) -> Result; async fn get_worker_cursor(&self, id: &str) -> Result, super::Error>; async fn start_atomic_update(&self, log_seq: LogSeq) -> Result; + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error>; } #[async_trait::async_trait] @@ -99,4 +100,11 @@ impl StoreTrait for Store { Store::Custom(store) => store.lock().await.start_atomic_update(log_seq).await, } } + + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { + match self { + Store::Redb(store) => store.handle_reset(point).await, + Store::Custom(store) => store.lock().await.handle_reset(point).await, + } + } } diff --git a/balius-runtime/src/store/redb.rs b/balius-runtime/src/store/redb.rs index 788d2fd..4734f1b 100644 --- a/balius-runtime/src/store/redb.rs +++ b/balius-runtime/src/store/redb.rs @@ -231,8 +231,7 @@ impl StoreTrait for Store { Ok(super::AtomicUpdate::Redb(AtomicUpdate::new(wx, log_seq))) } - /// Return list of blocks to undo after receiving a reset response from chainsync. - pub fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { + async fn handle_reset(&self, point: ChainPoint) -> Result, super::Error> { let rx = self.db.begin_read()?; let table = rx.open_table(WAL)?; From dc525a18af6fe472da16982e402476868a834966 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Dec 2025 12:37:13 -0300 Subject: [PATCH 4/5] fix: Improve state locks --- balius-runtime/src/lib.rs | 50 +++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index a5c4d8d..de4ffdb 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -5,7 +5,7 @@ use router::Router; use sign::SignerHost; use std::{collections::HashMap, io::Read, path::Path, sync::Arc}; use thiserror::Error; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, warn}; use utxorpc::spec::sync::BlockRef; use wasmtime::component::HasSelf; @@ -485,13 +485,13 @@ impl LoadedWorker { } } -type WorkerMap = HashMap; +type WorkerMap = HashMap>; #[derive(Clone)] pub struct Runtime { engine: wasmtime::Engine, linker: wasmtime::component::Linker, - loaded: Arc>, + loaded: Arc>, store: store::Store, ledger: Option, @@ -510,13 +510,15 @@ impl Runtime { } pub async fn chain_cursor(&self) -> Result, Error> { - let lowest_seq = self - .loaded - .lock() - .await - .values() - .flat_map(|w| w.cursor) - .min(); + let mut lowest_seq = None; + for w in self.loaded.read().await.values() { + if let Some(cursor) = w.lock().await.cursor { + lowest_seq = match lowest_seq { + Some(prev) => Some(std::cmp::min(prev, cursor)), + None => Some(cursor), + }; + } + } if let Some(seq) = lowest_seq { debug!(lowest_seq, "found lowest seq"); @@ -569,14 +571,14 @@ impl Runtime { let cursor = self.store.get_worker_cursor(id)?; debug!(cursor, id, "found cursor for worker"); - self.loaded.lock().await.insert( + self.loaded.write().await.insert( id.to_owned(), - LoadedWorker { + Mutex::new(LoadedWorker { wasm_store, instance, cursor, metrics: self.metrics.clone(), - }, + }), ); Ok(()) @@ -610,7 +612,7 @@ impl Runtime { } pub async fn remove_worker(&self, id: &str) -> Result<(), Error> { - match self.loaded.lock().await.remove(id) { + match self.loaded.write().await.remove(id) { Some(_) => { info!(worker = id, "Successfully removed worker from runtime.") } @@ -631,13 +633,14 @@ impl Runtime { let log_seq = self.store.write_ahead(undo_blocks, next_block)?; - let mut workers = self.loaded.lock().await; + let workers = self.loaded.read().await; let mut store_update = self.store.start_atomic_update(log_seq)?; - for (_, worker) in workers.iter_mut() { - worker.apply_chain(undo_blocks, next_block).await?; - store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?; + for (_, worker) in workers.iter() { + let mut lock = worker.lock().await; + lock.apply_chain(undo_blocks, next_block).await?; + store_update.update_worker_cursor(&lock.wasm_store.data().worker_id)?; } store_update.commit()?; @@ -651,11 +654,12 @@ impl Runtime { method: &str, params: Vec, ) -> Result { - let mut lock = self.loaded.lock().await; - - let worker = lock - .get_mut(worker_id) - .ok_or(Error::WorkerNotFound(worker_id.to_string()))?; + let workers = self.loaded.read().await; + let mut worker = workers + .get(worker_id) + .ok_or(Error::WorkerNotFound(worker_id.to_string()))? + .lock() + .await; let channel = worker .wasm_store From 4d886037b79ed0fa7ce2154f8f4fa41cb9a738b8 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Dec 2025 13:32:44 -0300 Subject: [PATCH 5/5] chore: Update u5c --- Cargo.lock | 181 ++++++++++++++++++++---- balius-runtime/Cargo.toml | 2 +- balius-runtime/src/drivers/chainsync.rs | 13 +- balius-runtime/src/ledgers/u5c.rs | 15 +- balius-runtime/src/lib.rs | 5 +- 5 files changed, 167 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4118935..56a74af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,7 +225,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -499,7 +499,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -1476,7 +1476,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -1554,7 +1554,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.5.9", "system-configuration", "tokio", "tower-service", @@ -1727,6 +1727,17 @@ dependencies = [ "serde", ] +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2366,7 +2377,7 @@ dependencies = [ "pallas-codec", "pallas-crypto", "rand 0.8.5", - "socket2", + "socket2 0.5.9", "thiserror 1.0.69", "tokio", "tracing", @@ -2473,7 +2484,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2776,7 +2787,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.5.9", "thiserror 2.0.12", "tokio", "tracing", @@ -2813,7 +2824,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.5.9", "tracing", "windows-sys 0.59.0", ] @@ -3379,6 +3390,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spdx" version = "0.10.8" @@ -3662,19 +3683,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.0" +version = "1.47.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +checksum = "36cde6a64bcbb101731e7db34c087674206357a5316e6f695f5fef730bd711de" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "pin-project-lite", "signal-hook-registry", - "socket2", + "slab", + "socket2 0.6.1", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3807,7 +3830,7 @@ dependencies = [ "prost", "rustls-native-certs", "rustls-pemfile", - "socket2", + "socket2 0.5.9", "tokio", "tokio-rustls", "tokio-stream", @@ -4064,15 +4087,15 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utxorpc" -version = "0.10.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bea8f520897c0d0b8048413a2ad72044cc5cbc00c98219a750f048be616f3d7f" +checksum = "d60b6baab3e86e71acf144ebfe97263f22eddcf2707d19fca5ffdaeeea95e96f" dependencies = [ "bytes", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "tonic", - "utxorpc-spec 0.15.0", + "utxorpc-spec 0.17.0", ] [[package]] @@ -4107,6 +4130,22 @@ dependencies = [ "tonic", ] +[[package]] +name = "utxorpc-spec" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d984ee351b308377e118135e638a5d544fdb0855f12a3b088d9dcaf0432052" +dependencies = [ + "bytes", + "futures-core", + "pbjson", + "pbjson-types", + "prost", + "prost-types", + "serde", + "tonic", +] + [[package]] name = "uuid" version = "1.16.0" @@ -4754,7 +4793,7 @@ checksum = "46ec44dc15085cea82cf9c78f85a9114c463a369786585ad2882d1ff0b0acf40" dependencies = [ "windows-implement", "windows-interface", - "windows-link", + "windows-link 0.1.1", "windows-result", "windows-strings", ] @@ -4787,13 +4826,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-registry" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" dependencies = [ - "windows-link", + "windows-link 0.1.1", "windows-result", "windows-strings", ] @@ -4804,7 +4849,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b895b5356fc36103d0f64dd1e94dfa7ac5633f1c9dd6e80fe9ec4adef69e09d" dependencies = [ - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -4813,7 +4858,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a7ab927b2637c19b3dbe0965e75d8f2d30bdd697a1516191cad2ec4df8fb28a" dependencies = [ - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -4822,7 +4867,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4831,7 +4876,16 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", ] [[package]] @@ -4840,14 +4894,31 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link 0.2.1", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", ] [[package]] @@ -4856,48 +4927,96 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.7.10" diff --git a/balius-runtime/Cargo.toml b/balius-runtime/Cargo.toml index 6555edd..785ae7d 100644 --- a/balius-runtime/Cargo.toml +++ b/balius-runtime/Cargo.toml @@ -23,7 +23,7 @@ tracing = "0.1.40" hex = "0.4.3" itertools = "0.14.0" async-trait = "0.1.83" -utxorpc = { version = "0.10.0" } +utxorpc = { version = "0.12.0" } # utxorpc = { path = "../../../utxorpc/rust-sdk" } tokio-util = "0.7.12" prost = "0.13" diff --git a/balius-runtime/src/drivers/chainsync.rs b/balius-runtime/src/drivers/chainsync.rs index 77e09be..d878f7a 100644 --- a/balius-runtime/src/drivers/chainsync.rs +++ b/balius-runtime/src/drivers/chainsync.rs @@ -44,17 +44,20 @@ async fn gather_blocks( let event = tip.event().await?; match event { - utxorpc::TipEvent::Apply(chain_block) => { + Some(utxorpc::TipEvent::Apply(chain_block)) => { let next = Block::Cardano(chain_block.parsed.unwrap()); break Ok((next, undos)); } - utxorpc::TipEvent::Undo(chain_block) => { + Some(utxorpc::TipEvent::Undo(chain_block)) => { undos.push(Block::Cardano(chain_block.parsed.unwrap())); } - utxorpc::TipEvent::Reset(block_ref) => { + Some(utxorpc::TipEvent::Reset(block_ref)) => { tracing::warn!(block_ref =? &block_ref, "received reset event, reseting tip"); undos = store.handle_reset(block_ref.into())?; } + None => { + tracing::warn!("Received None response from follow_tip, skipping") + } } } } @@ -96,9 +99,9 @@ pub async fn run( // confirm first event is a reset to the requested chain point match tip.event().await? { - utxorpc::TipEvent::Reset(point) => { + Some(utxorpc::TipEvent::Reset(point)) => { warn!( - slot = point.index, + slot = point.slot, "TODO: check that reset is to the requested chain point" ); } diff --git a/balius-runtime/src/ledgers/u5c.rs b/balius-runtime/src/ledgers/u5c.rs index 9a21243..f36b0ce 100644 --- a/balius-runtime/src/ledgers/u5c.rs +++ b/balius-runtime/src/ledgers/u5c.rs @@ -126,20 +126,15 @@ impl Ledger { } pub async fn read_params(&mut self) -> Result { - let req = utxorpc::spec::query::ReadParamsRequest::default(); let res = self .queries - .read_params(req) + .read_params() .await - .map_err(|err| wit::LedgerError::Upstream(format!("{err:?}")))? - .into_inner(); + .map_err(|err| wit::LedgerError::Upstream(format!("{err:?}")))?; - let params = res - .values - .and_then(|v| v.params) - .ok_or(wit::LedgerError::Upstream( - "unexpected response from read_params".to_string(), - ))?; + let params = res.params.ok_or(wit::LedgerError::Upstream( + "unexpected response from read_params".to_string(), + ))?; match params { utxorpc::spec::query::any_chain_params::Params::Cardano(params) => { Ok(serde_json::to_vec(¶ms).unwrap()) diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index a5c4d8d..3b41c46 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -156,7 +156,7 @@ pub enum ChainPoint { impl ChainPoint { pub fn slot(&self) -> BlockSlot { match self { - Self::Cardano(point) => point.index, + Self::Cardano(point) => point.slot, } } @@ -281,8 +281,9 @@ impl Block { pub fn chain_point(&self) -> ChainPoint { match self { Self::Cardano(block) => ChainPoint::Cardano(BlockRef { - index: block.header.as_ref().unwrap().slot, + slot: block.header.as_ref().unwrap().slot, hash: block.header.as_ref().unwrap().hash.clone(), + height: block.header.as_ref().unwrap().height, }), } }