Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions balius-runtime/src/drivers/chainsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utxorpc::CardanoSyncClient;

use crate::{Block, ChainPoint, Error, Runtime, Store};
use crate::{Block, ChainPoint, Error, Runtime, Store, StoreTrait};

impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
fn from(point: ChainPoint) -> Self {
Expand Down Expand Up @@ -53,7 +53,7 @@ async fn gather_blocks(
}
Some(utxorpc::TipEvent::Reset(block_ref)) => {
tracing::warn!(block_ref =? &block_ref, "received reset event, reseting tip");
undos = store.handle_reset(block_ref.into())?;
undos = store.handle_reset(block_ref.into()).await?;
}
None => {
tracing::warn!("Received None response from follow_tip, skipping")
Expand Down
39 changes: 19 additions & 20 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub mod wit {

mod metrics;
mod router;
mod store;

// implementations
pub mod drivers;
Expand All @@ -31,9 +30,10 @@ pub mod kv;
pub mod ledgers;
pub mod logging;
pub mod sign;
pub mod store;
pub mod submit;

pub use store::Store;
pub use store::{AtomicUpdateTrait, Store, StoreTrait};
pub use wit::Response;

pub type WorkerId = String;
Expand All @@ -44,7 +44,7 @@ pub enum Error {
Wasm(wasmtime::Error),

#[error("store error {0}")]
Store(Box<redb::Error>),
Store(String),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error handling degradation: String loses type information.

Changing Error::Store from Box<redb::Error> to String is a breaking change that significantly degrades error handling:

  • Callers can no longer match on specific redb error variants
  • Programmatic error handling becomes impossible
  • Error context and type information is lost

Consider preserving structured error information while still supporting custom stores. For example, introduce an enum that can hold both redb errors and a generic string variant for custom store errors:

 #[derive(Error, Debug)]
 pub enum Error {
     // ... other variants
 
-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error {0}")]
+    Store(#[from] StoreError),
+}
+
+#[derive(Error, Debug)]
+pub enum StoreError {
+    #[error("redb error: {0}")]
+    Redb(#[from] redb::Error),
+    
+    #[error("redb database error: {0}")]
+    RedbDatabase(#[from] redb::DatabaseError),
+    
+    #[error("redb transaction error: {0}")]
+    RedbTransaction(#[from] redb::TransactionError),
+    
+    #[error("redb table error: {0}")]
+    RedbTable(#[from] redb::TableError),
+    
+    #[error("redb commit error: {0}")]
+    RedbCommit(#[from] redb::CommitError),
+    
+    #[error("redb storage error: {0}")]
+    RedbStorage(#[from] redb::StorageError),
+    
+    #[error("custom store error: {0}")]
+    Custom(String),
 }

This approach maintains backward compatibility for error inspection while supporting custom store implementations.

Also applies to: 87-121

🤖 Prompt for AI Agents
In balius-runtime/src/lib.rs around lines 45 (and similarly for the related
cases at 87-121), the Error::Store variant was changed from Box<redb::Error> to
String which loses typed error information; revert this by introducing a small
enum (e.g., StoreError { Redb(Box<redb::Error>), Other(String) }) and change
Error::Store to hold that enum instead of plain String; add From implementations
(From<Box<redb::Error>> and From<String>) and update all construction sites to
wrap errors into the appropriate enum variant so callers can still match on
concrete redb errors while custom stores can provide string messages.


#[error("worker not found '{0}'")]
WorkerNotFound(WorkerId),
Expand Down Expand Up @@ -88,37 +88,37 @@ impl From<wasmtime::Error> for Error {

impl From<redb::Error> for Error {
fn from(value: redb::Error) -> Self {
Self::Store(Box::new(value))
Self::Store(value.to_string())
}
}

impl From<redb::DatabaseError> for Error {
fn from(value: redb::DatabaseError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::TransactionError> for Error {
fn from(value: redb::TransactionError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::TableError> for Error {
fn from(value: redb::TableError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::CommitError> for Error {
fn from(value: redb::CommitError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

impl From<redb::StorageError> for Error {
fn from(value: redb::StorageError) -> Self {
Self::Store(Box::new(value.into()))
Self::Store(value.to_string())
}
}

Expand Down Expand Up @@ -525,7 +525,7 @@ impl Runtime {

if let Some(seq) = lowest_seq {
debug!(lowest_seq, "found lowest seq");
return self.store.find_chain_point(seq);
return self.store.find_chain_point(seq).await;
}

Ok(None)
Expand Down Expand Up @@ -571,7 +571,7 @@ impl Runtime {
let config = serde_json::to_vec(&config).unwrap();
instance.call_init(&mut wasm_store, &config).await?;

let cursor = self.store.get_worker_cursor(id)?;
let cursor = self.store.get_worker_cursor(id).await?;
debug!(cursor, id, "found cursor for worker");

let mut loaded = self.loaded.write().await;
Expand Down Expand Up @@ -643,11 +643,11 @@ impl Runtime {
let start = Instant::now();
info!("applying block");

let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
let log_seq = self.store.write_ahead(undo_blocks, next_block).await?;

let workers = self.loaded.read().await;

let mut store_update = self.store.start_atomic_update(log_seq)?;
let mut store_update = self.store.start_atomic_update(log_seq).await?;

let update = async |worker: &Mutex<LoadedWorker>| -> Result<(String, f64), Error> {
let worker_start = Instant::now();
Expand All @@ -661,17 +661,16 @@ impl Runtime {
};
let updates = workers.values().map(update).collect_vec();

join_all(updates)
for (x, duration) in join_all(updates)
.await
.into_iter()
.collect::<Result<Vec<(String, f64)>, _>>()?
.iter()
.try_for_each(|(x, duration)| {
self.metrics.handle_worker_chain_duration_ms(x, *duration);
store_update.update_worker_cursor(x)
})?;
{
self.metrics.handle_worker_chain_duration_ms(&x, duration);
store_update.update_worker_cursor(&x).await?;
}

store_update.commit()?;
store_update.commit().await?;

self.metrics
.handle_chain_duration_ms(start.elapsed().as_secs_f64() * 1000.0);
Expand Down
110 changes: 110 additions & 0 deletions balius-runtime/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
pub mod redb;

use prost::Message;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::{Block, ChainPoint, Error};

pub type WorkerId = String;
pub type LogSeq = u64;

#[derive(Message)]
pub struct LogEntry {
#[prost(bytes, tag = "1")]
pub next_block: Vec<u8>,
#[prost(bytes, repeated, tag = "2")]
pub undo_blocks: Vec<Vec<u8>>,
}

#[async_trait::async_trait]
pub trait AtomicUpdateTrait {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
async fn commit(&mut self) -> Result<(), super::Error>;
}
Comment on lines +20 to +24
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix non-Send future hazard from locking tokio::Mutex across await

Custom dispatch holds a tokio::MutexGuard across .await, but #[async_trait] defaults to Send futures, leading to "future cannot be sent between threads safely". Make the traits/impls non-Send or refactor to avoid awaiting while holding the guard.

Apply this to relax futures:

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait AtomicUpdateTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl AtomicUpdateTrait for AtomicUpdate {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait StoreTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl StoreTrait for Store {

Alternative (larger refactor): avoid outer Arc<Mutex> and require implementers to handle interior mutability, so no guard is held across .await.

Also applies to: 32-46, 54-65, 67-110


#[allow(clippy::large_enum_variant)]
pub enum AtomicUpdate {
Redb(redb::AtomicUpdate),
Custom(Arc<Mutex<dyn AtomicUpdateTrait + Send + Sync>>),
}

#[async_trait::async_trait]
impl AtomicUpdateTrait for AtomicUpdate {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
match self {
AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await,
AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await,
}
}
async fn commit(&mut self) -> Result<(), super::Error> {
match self {
AtomicUpdate::Redb(au) => au.commit().await,
AtomicUpdate::Custom(au) => au.lock().await.commit().await,
}
}
}

#[derive(Clone)]
pub enum Store {
Redb(redb::Store),
Custom(Arc<Mutex<dyn StoreTrait + Send + Sync>>),
}

#[async_trait::async_trait]
pub trait StoreTrait {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error>;
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
) -> Result<LogSeq, Error>;
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error>;
async fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error>;
async fn handle_reset(&self, point: ChainPoint) -> Result<Vec<Block>, super::Error>;
}

#[async_trait::async_trait]
impl StoreTrait for Store {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
match self {
Store::Redb(store) => store.find_chain_point(seq).await,
Store::Custom(store) => store.lock().await.find_chain_point(seq).await,
}
}
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
) -> Result<LogSeq, Error> {
match self {
Store::Redb(store) => store.write_ahead(undo_blocks, next_block).await,
Store::Custom(store) => {
store
.lock()
.await
.write_ahead(undo_blocks, next_block)
.await
}
}
}
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
match self {
Store::Redb(store) => store.get_worker_cursor(id).await,
Store::Custom(store) => store.lock().await.get_worker_cursor(id).await,
}
}
async fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error> {
match self {
Store::Redb(store) => store.start_atomic_update(log_seq).await,
Store::Custom(store) => store.lock().await.start_atomic_update(log_seq).await,
}
}

async fn handle_reset(&self, point: ChainPoint) -> Result<Vec<Block>, super::Error> {
match self {
Store::Redb(store) => store.handle_reset(point).await,
Store::Custom(store) => store.lock().await.handle_reset(point).await,
}
}
}
81 changes: 40 additions & 41 deletions balius-runtime/src/store.rs → balius-runtime/src/store/redb.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
use itertools::Itertools;
use prost::Message;
use redb::{ReadableTable as _, TableDefinition, WriteTransaction};
use std::{collections::VecDeque, path::Path, sync::Arc};
use tracing::warn;

use crate::{Block, ChainPoint, Error};

pub type WorkerId = String;
pub type LogSeq = u64;

#[derive(Message)]
pub struct LogEntry {
#[prost(bytes, tag = "1")]
pub next_block: Vec<u8>,
#[prost(bytes, repeated, tag = "2")]
pub undo_blocks: Vec<Vec<u8>>,
}
use super::StoreTrait;
pub use super::{AtomicUpdateTrait, LogEntry, LogSeq, WorkerId};

impl redb::Value for LogEntry {
type SelfType<'a>
Expand Down Expand Up @@ -58,20 +49,40 @@ const WAL: TableDefinition<LogSeq, LogEntry> = TableDefinition::new("wal");
const DEFAULT_CACHE_SIZE_MB: usize = 50;

pub struct AtomicUpdate {
wx: WriteTransaction,
wx: Option<WriteTransaction>,
log_seq: LogSeq,
}

impl AtomicUpdate {
pub fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let mut table = self.wx.open_table(CURSORS)?;
pub fn new(wx: WriteTransaction, log_seq: LogSeq) -> Self {
Self {
wx: Some(wx),
log_seq,
}
}
}

#[async_trait::async_trait]
impl AtomicUpdateTrait for AtomicUpdate {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo: "commited" → "committed".

The error messages contain a spelling error.

Apply this diff:

             return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
+                "Transaction already committed".to_string(),
             ));

Also applies to: 82-82

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 69 and 82, the error string
"Transaction already commited" contains a typo; update both occurrences to
"Transaction already committed" so the error messages use the correct spelling.

));
};

Comment on lines +66 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Nit: fix typo in error message ("committed")

-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));

Also applies to: 80-85

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 66-72 and 80-85, there is a
typo in the error message "Transaction already commited"; update the string to
the correct spelling "Transaction already committed" in both places where the
error is constructed so the messages read consistently and correctly.

let mut table = wx.open_table(CURSORS)?;
table.insert(id.to_owned(), self.log_seq)?;

Ok(())
}

pub fn commit(self) -> Result<(), super::Error> {
self.wx.commit()?;
async fn commit(&mut self) -> Result<(), super::Error> {
let Some(wx) = self.wx.take() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
));
};
wx.commit()?;
Ok(())
}
}
Expand Down Expand Up @@ -165,15 +176,18 @@ impl Store {
let entry = table.get(seq)?;
Ok(entry.map(|x| x.value()))
}
}

pub fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
#[async_trait::async_trait]
impl StoreTrait for Store {
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);

Ok(Some(block.chain_point()))
}
Comment on lines +183 to 188
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid panic: handle missing WAL entry gracefully in find_chain_point.

entry.unwrap() will panic if the sequence isn’t found. Return Ok(None) instead.

Apply this diff:

-    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
-    }
+    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);
Ok(Some(block.chain_point()))
}
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
match self.get_entry(seq)? {
Some(entry) => {
let block = Block::from_bytes(&entry.next_block);
Ok(Some(block.chain_point()))
}
None => Ok(None),
}
}
🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 183 to 188, replace the
entry.unwrap() panic by handling a missing WAL entry: call self.get_entry(seq)?
and if it returns None return Ok(None); otherwise take the entry, construct the
Block from entry.next_block and return Ok(Some(block.chain_point())). Ensure you
do not call unwrap() so the function returns Ok(None) for missing sequences and
still propagates actual errors via the ? operator.


pub fn write_ahead(
async fn write_ahead(
&mut self,
undo_blocks: &[Block],
next_block: &Block,
Expand All @@ -196,7 +210,7 @@ impl Store {
}

// TODO: see if loading in batch is worth it
pub fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, super::Error> {
let rx = self.db.begin_read()?;

let table = match rx.open_table(CURSORS) {
Expand All @@ -209,30 +223,15 @@ impl Store {
Ok(cursor.map(|x| x.value()))
}

pub fn start_atomic_update(&self, log_seq: LogSeq) -> Result<AtomicUpdate, super::Error> {
async fn start_atomic_update(
&self,
log_seq: LogSeq,
) -> Result<super::AtomicUpdate, super::Error> {
let wx = self.db.begin_write()?;
Ok(AtomicUpdate { wx, log_seq })
}

// TODO: I don't think we need this since we're going to load each cursor as
// part of the loaded worker
pub fn lowest_cursor(&self) -> Result<Option<LogSeq>, super::Error> {
let rx = self.db.begin_read()?;

let table = rx.open_table(CURSORS)?;

let cursors: Vec<_> = table
.iter()?
.map_ok(|(_, value)| value.value())
.try_collect()?;

let lowest = cursors.iter().fold(None, |all, item| all.min(Some(*item)));

Ok(lowest)
Ok(super::AtomicUpdate::Redb(AtomicUpdate::new(wx, log_seq)))
}

/// Return list of blocks to undo after receiving a reset response from chainsync.
pub fn handle_reset(&self, point: ChainPoint) -> Result<Vec<Block>, super::Error> {
async fn handle_reset(&self, point: ChainPoint) -> Result<Vec<Block>, super::Error> {
let rx = self.db.begin_read()?;
let table = rx.open_table(WAL)?;

Expand Down
Loading
Loading