Skip to content

Conversation

@gonzalezzfelipe
Copy link
Contributor

@gonzalezzfelipe gonzalezzfelipe commented May 26, 2025

Summary by CodeRabbit

  • New Features

    • Store module is now publicly accessible for external use.
    • Added support for pluggable store backends, enabling greater flexibility in storage configurations.
  • Refactor

    • Converted store operations to asynchronous, improving runtime responsiveness and concurrent handling.
    • Simplified error handling throughout the runtime for better consistency.

✏️ Tip: You can customize this high-level summary in your review settings.

@gonzalezzfelipe gonzalezzfelipe marked this pull request as draft May 27, 2025 01:56
@gonzalezzfelipe gonzalezzfelipe marked this pull request as ready for review May 27, 2025 18:32
@coderabbitai
Copy link

coderabbitai bot commented Oct 13, 2025

Walkthrough

This PR introduces a pluggable storage abstraction layer with trait-based design and async/await patterns throughout the store layer. The store module is now public with pluggable Redb and Custom backends, trait definitions for StoreTrait and AtomicUpdateTrait, and error representation changed from Boxredb::Error to String.

Changes

Cohort / File(s) Summary
Store abstraction layer
balius-runtime/src/store/mod.rs
Introduces new trait-based store abstraction with StoreTrait and AtomicUpdateTrait, Store and AtomicUpdate enums supporting Redb and Custom backends, type aliases (WorkerId, LogSeq), and LogEntry struct
Async store backend implementation
balius-runtime/src/store/redb.rs
Converts AtomicUpdate and StoreTrait implementations to async, re-exports types from parent module, adds AtomicUpdate::new constructor
Public API and error handling
balius-runtime/src/lib.rs
Makes store module public, expands re-exports to include AtomicUpdateTrait and StoreTrait, changes Store error variant from Boxredb::Error to String, updates all redb error conversions to String
Chain sync driver
balius-runtime/src/drivers/chainsync.rs
Imports StoreTrait, updates Reset handling to async, adjusts pattern matching to expect Some(..) wrappers for events
Store integration updates
balius-runtime/tests/e2e.rs, balius/src/bin/command/test.rs, baliusd/src/main.rs
Updates store initialization to use Store::Redb wrapper variant with RedbStore alias from balius_runtime::store::redb::Store

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Key areas requiring attention:

  • Error variant changes: Verify all sites correctly handle Store(String) instead of Store(Boxredb::Error), especially error logging and propagation
  • Async/await conversions: Ensure all .await calls on StoreTrait methods are properly placed and handle futures correctly throughout runtime
  • Trait dispatch logic: Verify Store enum variants (Redb vs Custom) correctly implement StoreTrait and AtomicUpdateTrait contracts in both sync and async contexts
  • AtomicUpdate implementation: Confirm Redb and Custom backend paths in update_worker_cursor and commit are equivalent in behavior
  • Public API surface: Validate that new public types (WorkerId, LogSeq, LogEntry, StoreTrait, AtomicUpdateTrait) and Store enum variants don't create unintended exposure or breaking changes

Possibly related PRs

  • fix: Handle reset event on chainsync driver #91: Directly related—both PRs add and integrate store.handle_reset API, with this PR providing the trait abstraction and async implementation that enables handle_reset usage in the chainsync driver

Suggested reviewers

  • scarmuega

Poem

🐰 Behold! The burrow grows more modular and swift,
Async paths await where sync once stood firm,
Trait-based magic binds backends with grace,
From Redb's depths to Custom's embrace—
One Store to rule them all, no more! 🎩✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.14% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: introducing a pluggable store abstraction that allows development of custom store backends alongside the existing Redb implementation.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (8)
baliusd/src/main.rs (1)

65-71: Consider making cache size configurable.

Currently passing None uses the default 50MB. If config has a cache/tuning field, plumb it through to RedbStore::open to control memory.

balius-runtime/src/lib.rs (1)

45-46: Avoid flattening store errors to String; preserve error context.

Converting redb errors to String drops causes and hinders diagnostics. Prefer a structured variant, e.g., Store(#[source] anyhow::Error) or Store(#[source] Box<dyn std::error::Error + Send + Sync>), and keep From impls to wrap the source.

Also applies to: 87-121

balius-runtime/src/store/redb.rs (1)

66-86: Fix typo in error messages ("commited" → "committed").

Small polish to error strings returned after transaction consumption.

Apply this diff:

-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
         };
@@
-        let Some(wx) = self.wx.take() else {
-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+        let Some(wx) = self.wx.take() else {
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
         };
balius-runtime/src/store/mod.rs (5)

26-30: Avoid locking AtomicUpdate behind an async Mutex; use Box and drop the lock across await.

AtomicUpdate is an ephemeral transaction handle and does not need shared ownership. Locking a tokio::Mutex across awaits adds contention and complexity without benefits here.

Switch Custom to Box<dyn AtomicUpdateTrait + Send + Sync> and call directly.

Apply:

 #[allow(clippy::large_enum_variant)]
 pub enum AtomicUpdate {
     Redb(redb::AtomicUpdate),
-    Custom(Arc<Mutex<dyn AtomicUpdateTrait + Send + Sync>>),
+    Custom(Box<dyn AtomicUpdateTrait + Send + Sync>),
 }

 #[async_trait::async_trait]
 impl AtomicUpdateTrait for AtomicUpdate {
     async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
         match self {
             AtomicUpdate::Redb(au) => au.update_worker_cursor(id).await,
-            AtomicUpdate::Custom(au) => au.lock().await.update_worker_cursor(id).await,
+            AtomicUpdate::Custom(au) => au.update_worker_cursor(id).await,
         }
     }
     async fn commit(&mut self) -> Result<(), super::Error> {
         match self {
             AtomicUpdate::Redb(au) => au.commit().await,
-            AtomicUpdate::Custom(au) => au.lock().await.commit().await,
+            AtomicUpdate::Custom(au) => au.commit().await,
         }
     }
 }

Also applies to: 32-46


69-72: Async locking around Custom store holds the mutex across awaits; consider redesign to avoid this hot‑path lock.

The outer tokio::Mutex guard is held while awaiting find_chain_point/write_ahead/get_worker_cursor/start_atomic_update. This serializes all operations on Custom stores and risks priority inversion.

Recommended:

  • Change StoreTrait receivers to &self and move synchronization inside backends (interior mutability).
  • Store Custom as Arc<dyn StoreTrait + Send + Sync> (no Mutex), or Box<dyn StoreTrait + Send + Sync> if Clone is not required.

This removes the lock across await and lets each backend choose optimal concurrency.

Also applies to: 81-87, 91-94, 98-100


22-24: Unify Error type path for readability.

Mixing crate::Error (imported as Error) and super::Error is inconsistent. Prefer one, e.g., Error everywhere in this module.

Minimal change:

-    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
-    async fn commit(&mut self) -> Result<(), super::Error>;
+    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), Error>;
+    async fn commit(&mut self) -> Result<(), Error>;

…and likewise adjust the impl signatures and StoreTrait methods returning super::Error to Error.

Also applies to: 34-34, 40-40, 63-64, 90-90, 96-96


12-18: Derive common traits on LogEntry for diagnostics and tests.

Deriving Clone, Debug, and PartialEq is handy for logging/tracing and assertions.

-#[derive(Message)]
+#[derive(Clone, PartialEq, ::prost::Message, Debug)]
 pub struct LogEntry {

22-22: Use the WorkerId alias in APIs.

Be consistent and accept &WorkerId instead of &str to document intent and avoid accidental type mixups at call sites.

-    async fn update_worker_cursor(&mut self, id: &str) -> Result<(), Error>;
+    async fn update_worker_cursor(&mut self, id: &WorkerId) -> Result<(), Error>;
...
-    async fn get_worker_cursor(&self, id: &str) -> Result<Option<LogSeq>, Error>;
+    async fn get_worker_cursor(&self, id: &WorkerId) -> Result<Option<LogSeq>, Error>;

Also applies to: 62-63, 90-90

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 86b459d and 6507f71.

📒 Files selected for processing (6)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
  • balius-runtime/tests/e2e.rs (2 hunks)
  • balius/src/bin/command/test.rs (2 hunks)
  • baliusd/src/main.rs (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
balius-runtime/tests/e2e.rs (2)
balius-runtime/src/store/redb.rs (1)
  • open (104-120)
balius-runtime/src/lib.rs (1)
  • builder (494-496)
balius/src/bin/command/test.rs (1)
balius-runtime/src/store/redb.rs (1)
  • open (104-120)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (6)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
baliusd/src/main.rs (2)
balius-runtime/src/store/redb.rs (2)
  • in_memory (97-102)
  • open (104-120)
balius-runtime/src/lib.rs (1)
  • builder (494-496)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (12)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (68-73)
  • write_ahead (57-61)
  • write_ahead (74-89)
  • get_worker_cursor (62-62)
  • get_worker_cursor (90-95)
  • start_atomic_update (63-63)
  • start_atomic_update (96-101)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (10)
baliusd/src/main.rs (2)

3-5: Import changes look correct.

New RedbStore alias and Store enum integration are consistent with the new API.


97-97: Runtime builder wrapping is correct.

Passing Store::Redb(store) matches the new pluggable store design.

balius/src/bin/command/test.rs (1)

93-97: LGTM.

Store initialization via Store::Redb(...) and error handling with IntoDiagnostic are correct and align with the new API.

balius-runtime/src/lib.rs (2)

31-35: Re-exports look good.

Publicly exposing store traits and types is consistent with the new pluggable store design.


618-632: Async store update flow looks correct.

write_ahead → start_atomic_update → per-worker cursor updates → commit matches WAL + atomic cursor update semantics.

balius-runtime/src/store/redb.rs (4)

51-62: AtomicUpdate constructor change is sound.

Option + new() cleanly prevents double-commit and allows move semantics.


190-210: write_ahead implementation looks correct.

Monotonic seq, WAL insert, and commit are in the right order.


212-224: get_worker_cursor read path is fine.

Gracefully handles missing table; returns Option as expected.


226-232: start_atomic_update is correct.

Begins a write transaction and wraps it in AtomicUpdate.

balius-runtime/src/store/mod.rs (1)

48-52: Confirm Clone semantics for Store.

Cloning Store duplicates handles to underlying backends. Ensure redb::Store’s Clone is cheap and safe (shared state correctness), and that cloning Custom stores is desired behavior.

Do we require Store to be Clone? If not, dropping Clone could simplify Custom to Box<dyn StoreTrait + Send + Sync> and remove the outer Mutex.

Comment on lines +183 to 188
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);

Ok(Some(block.chain_point()))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid panic: handle missing WAL entry gracefully in find_chain_point.

entry.unwrap() will panic if the sequence isn’t found. Return Ok(None) instead.

Apply this diff:

-    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
-    }
+    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
let entry = self.get_entry(seq)?;
let block = Block::from_bytes(&entry.unwrap().next_block);
Ok(Some(block.chain_point()))
}
async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
match self.get_entry(seq)? {
Some(entry) => {
let block = Block::from_bytes(&entry.next_block);
Ok(Some(block.chain_point()))
}
None => Ok(None),
}
}
🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 183 to 188, replace the
entry.unwrap() panic by handling a missing WAL entry: call self.get_entry(seq)?
and if it returns None return Ok(None); otherwise take the entry, construct the
Block from entry.next_block and return Ok(Some(block.chain_point())). Ensure you
do not call unwrap() so the function returns Ok(None) for missing sequences and
still propagates actual errors via the ? operator.

Comment on lines +44 to 47
let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());

let mut runtime = Runtime::builder(store)
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use in-memory store for tests to avoid cross-run contamination.

A fixed file path can introduce flakiness and require cleanup. Prefer in-memory for reliability and speed.

Apply this diff:

-    let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());
+    let store = Store::Redb(RedbStore::in_memory().unwrap());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let store = Store::Redb(RedbStore::open("tests/balius.db", None).unwrap());
let mut runtime = Runtime::builder(store)
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
let store = Store::Redb(RedbStore::in_memory().unwrap());
let runtime = Runtime::builder(store)
.with_ledger(ledgers::mock::Ledger.into())
🤖 Prompt for AI Agents
In balius-runtime/tests/e2e.rs around lines 44 to 47, the test opens a
persistent DB at "tests/balius.db" which can cause cross-run contamination;
replace the persistent RedbStore::open(...) call with the in-memory Redb store
constructor (e.g., RedbStore::open_in_memory() or the library's equivalent) so
Store::Redb(...) uses an in-memory instance, remove the file path argument, and
ensure unwrap() is still used or handled the same way so the test remains
deterministic and leaves no on-disk artifacts.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
balius-runtime/src/store/redb.rs (1)

183-188: Avoid panic: handle missing WAL entry gracefully in find_chain_point.

The previous review flagged that entry.unwrap() will panic if the sequence isn't found. This issue remains unresolved.

Apply this diff:

     async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
     }
🧹 Nitpick comments (1)
balius-runtime/src/lib.rs (1)

45-45: Consider preserving error type information.

Converting all redb errors to String simplifies the API but loses type information, making it harder for library consumers to handle specific error cases programmatically. If you expect users to need programmatic error handling, consider using an enum or preserving error variants.

If you choose to keep String-based errors, this is acceptable for the current use case.

Also applies to: 89-89, 95-95, 101-101, 107-107, 113-113, 119-119

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6507f71 and 53ddfe2.

📒 Files selected for processing (4)
  • balius-runtime/src/drivers/chainsync.rs (2 hunks)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (7)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
  • handle_reset (234-250)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (14)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (69-74)
  • write_ahead (57-61)
  • write_ahead (75-90)
  • get_worker_cursor (62-62)
  • get_worker_cursor (91-96)
  • start_atomic_update (63-63)
  • start_atomic_update (97-102)
  • handle_reset (64-64)
  • handle_reset (104-109)
🔇 Additional comments (6)
balius-runtime/src/drivers/chainsync.rs (1)

56-56: LGTM! Async store operation integrated correctly.

The .await addition correctly handles the async handle_reset method from the new StoreTrait interface.

balius-runtime/src/lib.rs (2)

31-34: LGTM! Public API expanded to support custom stores.

The public store module and trait re-exports enable users to implement custom storage backends while maintaining the existing Redb implementation.


523-523: LGTM! Async store operations integrated correctly.

All store operations now correctly use .await to handle the async trait methods introduced in the pluggable store abstraction.

Also applies to: 569-569, 632-632, 636-636, 640-642, 645-645

balius-runtime/src/store/mod.rs (1)

1-110: LGTM! Well-designed pluggable store abstraction.

The introduction of StoreTrait and AtomicUpdateTrait with Redb and Custom variants provides a clean abstraction for pluggable storage backends. The use of Arc<Mutex<...>> for the Custom variant appropriately handles thread-safety and async access.

Key design decisions:

  • Async traits enable non-blocking storage operations
  • Enum dispatch pattern provides type-safe backend selection
  • Public traits allow external implementations
balius-runtime/src/store/redb.rs (2)

52-62: LGTM! Option pattern prevents double-commit.

Changing wx to Option<WriteTransaction> with take() semantics in the commit method correctly prevents accidental double-commits or operations after commit.


64-88: LGTM! Async trait implementation follows the design pattern.

The StoreTrait and AtomicUpdateTrait implementations correctly use async fn and integrate with the redb backend. The trait methods properly handle errors and maintain consistency with the pluggable store abstraction.

Also applies to: 181-250

async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo: "commited" → "committed".

The error messages contain a spelling error.

Apply this diff:

             return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
+                "Transaction already committed".to_string(),
             ));

Also applies to: 82-82

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 69 and 82, the error string
"Transaction already commited" contains a typo; update both occurrences to
"Transaction already committed" so the error messages use the correct spelling.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
balius-runtime/src/store/redb.rs (1)

183-188: Avoid panic in find_chain_point: handle missing WAL entry

entry.unwrap() will panic if seq not found; return Ok(None) instead.

-    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
-        let entry = self.get_entry(seq)?;
-        let block = Block::from_bytes(&entry.unwrap().next_block);
-
-        Ok(Some(block.chain_point()))
-    }
+    async fn find_chain_point(&self, seq: LogSeq) -> Result<Option<ChainPoint>, Error> {
+        match self.get_entry(seq)? {
+            Some(entry) => {
+                let block = Block::from_bytes(&entry.next_block);
+                Ok(Some(block.chain_point()))
+            }
+            None => Ok(None),
+        }
+    }
🧹 Nitpick comments (2)
balius-runtime/src/lib.rs (1)

31-35: Consider preserving error sources instead of String

Switching to String loses the original error chain. Prefer a source-carrying variant that stays backend-agnostic:

-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error")]
+    Store(#[source] Box<dyn std::error::Error + Send + Sync>),

And map conversions as:

-        Self::Store(value.to_string())
+        Self::Store(Box::new(value))

Applies similarly to other redb::* conversions. This keeps rich context while decoupling from specific backends.

Also applies to: 45-46, 87-121

balius-runtime/src/store/redb.rs (1)

64-66: Match trait’s non-Send futures in impls if adopting ?Send

If StoreTrait/AtomicUpdateTrait switch to #[async_trait(?Send)], mirror it here:

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl AtomicUpdateTrait for AtomicUpdate { ... }

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl StoreTrait for Store { ... }

Prevents mismatched future bounds during implementation.

Also applies to: 79-88, 181-188

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6507f71 and 53ddfe2.

📒 Files selected for processing (4)
  • balius-runtime/src/drivers/chainsync.rs (2 hunks)
  • balius-runtime/src/lib.rs (6 hunks)
  • balius-runtime/src/store/mod.rs (1 hunks)
  • balius-runtime/src/store/redb.rs (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
balius-runtime/src/store/mod.rs (1)
balius-runtime/src/store/redb.rs (7)
  • update_worker_cursor (66-77)
  • commit (79-87)
  • find_chain_point (183-188)
  • write_ahead (190-210)
  • get_worker_cursor (213-224)
  • start_atomic_update (226-232)
  • handle_reset (234-250)
balius-runtime/src/store/redb.rs (1)
balius-runtime/src/store/mod.rs (14)
  • update_worker_cursor (22-22)
  • update_worker_cursor (34-39)
  • commit (23-23)
  • commit (40-45)
  • find_chain_point (56-56)
  • find_chain_point (69-74)
  • write_ahead (57-61)
  • write_ahead (75-90)
  • get_worker_cursor (62-62)
  • get_worker_cursor (91-96)
  • start_atomic_update (63-63)
  • start_atomic_update (97-102)
  • handle_reset (64-64)
  • handle_reset (104-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
balius-runtime/src/drivers/chainsync.rs (1)

9-9: LGTM: chainsync updated to async StoreTrait

Import and await changes align with the new async store API.

Also applies to: 56-57

Comment on lines +20 to +24
#[async_trait::async_trait]
pub trait AtomicUpdateTrait {
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error>;
async fn commit(&mut self) -> Result<(), super::Error>;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix non-Send future hazard from locking tokio::Mutex across await

Custom dispatch holds a tokio::MutexGuard across .await, but #[async_trait] defaults to Send futures, leading to "future cannot be sent between threads safely". Make the traits/impls non-Send or refactor to avoid awaiting while holding the guard.

Apply this to relax futures:

-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait AtomicUpdateTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl AtomicUpdateTrait for AtomicUpdate {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 pub trait StoreTrait {
-#[async_trait::async_trait]
+#[async_trait::async_trait(?Send)]
 impl StoreTrait for Store {

Alternative (larger refactor): avoid outer Arc<Mutex> and require implementers to handle interior mutability, so no guard is held across .await.

Also applies to: 32-46, 54-65, 67-110

Comment on lines +66 to +72
async fn update_worker_cursor(&mut self, id: &str) -> Result<(), super::Error> {
let Some(wx) = self.wx.as_mut() else {
return Err(super::Error::Store(
"Transaction already commited".to_string(),
));
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Nit: fix typo in error message ("committed")

-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));
-            return Err(super::Error::Store(
-                "Transaction already commited".to_string(),
-            ));
+            return Err(super::Error::Store(
+                "Transaction already committed".to_string(),
+            ));

Also applies to: 80-85

🤖 Prompt for AI Agents
In balius-runtime/src/store/redb.rs around lines 66-72 and 80-85, there is a
typo in the error message "Transaction already commited"; update the string to
the correct spelling "Transaction already committed" in both places where the
error is constructed so the messages read consistently and correctly.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53ddfe2 and 7f271a7.

📒 Files selected for processing (1)
  • balius-runtime/src/lib.rs (10 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (6)
balius-runtime/src/lib.rs (6)

31-34: LGTM: Store abstraction properly exposed.

The new public store module and trait re-exports enable custom store implementations as intended by this PR.


488-494: LGTM: Improved concurrency model.

The shift from Arc<Mutex<WorkerMap>> to Arc<RwLock<HashMap<String, Mutex<LoadedWorker>>>> enables:

  • Multiple concurrent readers (e.g., parallel handle_request calls)
  • Per-worker granular locking for better scalability
  • Reduced contention in read-heavy workloads

The locking hierarchy is consistently applied throughout (outer RwLock → inner worker Mutex).


512-529: LGTM: Correct async locking pattern.

The method correctly:

  • Uses read lock for concurrent access to the worker map
  • Acquires per-worker locks only when needed
  • Properly awaits the async find_chain_point operation

531-585: LGTM: Proper async initialization.

Worker registration correctly:

  • Awaits async store operations
  • Uses write lock for map mutation
  • Wraps worker in Mutex for per-worker concurrency control

653-678: LGTM: Efficient concurrent request handling.

The method correctly uses read lock on the worker map followed by per-worker locking, enabling concurrent request handling for different workers.


627-651: Verify atomic update rollback on error.

The method iterates through workers and updates their cursors within an atomic transaction. If a worker's apply_chain fails (line 642), the error propagates via ?, but it's unclear whether the store_update transaction is properly rolled back.

Ensure that:

  1. AtomicUpdateTrait has a Drop implementation that rolls back uncommitted changes, or
  2. Explicit error handling is added to call a rollback method before returning

This should be verified against the AtomicUpdateTrait implementation in the store module to confirm transaction safety guarantees.


#[error("store error {0}")]
Store(Box<redb::Error>),
Store(String),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Error handling degradation: String loses type information.

Changing Error::Store from Box<redb::Error> to String is a breaking change that significantly degrades error handling:

  • Callers can no longer match on specific redb error variants
  • Programmatic error handling becomes impossible
  • Error context and type information is lost

Consider preserving structured error information while still supporting custom stores. For example, introduce an enum that can hold both redb errors and a generic string variant for custom store errors:

 #[derive(Error, Debug)]
 pub enum Error {
     // ... other variants
 
-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error {0}")]
+    Store(#[from] StoreError),
+}
+
+#[derive(Error, Debug)]
+pub enum StoreError {
+    #[error("redb error: {0}")]
+    Redb(#[from] redb::Error),
+    
+    #[error("redb database error: {0}")]
+    RedbDatabase(#[from] redb::DatabaseError),
+    
+    #[error("redb transaction error: {0}")]
+    RedbTransaction(#[from] redb::TransactionError),
+    
+    #[error("redb table error: {0}")]
+    RedbTable(#[from] redb::TableError),
+    
+    #[error("redb commit error: {0}")]
+    RedbCommit(#[from] redb::CommitError),
+    
+    #[error("redb storage error: {0}")]
+    RedbStorage(#[from] redb::StorageError),
+    
+    #[error("custom store error: {0}")]
+    Custom(String),
 }

This approach maintains backward compatibility for error inspection while supporting custom store implementations.

Also applies to: 87-121

🤖 Prompt for AI Agents
In balius-runtime/src/lib.rs around lines 45 (and similarly for the related
cases at 87-121), the Error::Store variant was changed from Box<redb::Error> to
String which loses typed error information; revert this by introducing a small
enum (e.g., StoreError { Redb(Box<redb::Error>), Other(String) }) and change
Error::Store to hold that enum instead of plain String; add From implementations
(From<Box<redb::Error>> and From<String>) and update all construction sites to
wrap errors into the appropriate enum variant so callers can still match on
concrete redb errors while custom stores can provide string messages.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
balius-runtime/src/lib.rs (1)

45-45: Error handling degradation: String loses type information.

The change from Box<redb::Error> to String prevents callers from matching on specific error variants or performing programmatic error handling. As noted in the previous review, consider introducing a StoreError enum that can hold both redb errors and custom store errors to preserve type information while supporting the pluggable store architecture.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f271a7 and 5046d00.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • balius-runtime/Cargo.toml (1 hunks)
  • balius-runtime/src/drivers/chainsync.rs (3 hunks)
  • balius-runtime/src/ledgers/u5c.rs (1 hunks)
  • balius-runtime/src/lib.rs (12 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
balius-runtime/src/drivers/chainsync.rs (1)
balius-runtime/src/lib.rs (2)
  • slot (157-161)
  • slot (265-269)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (8)
balius-runtime/src/ledgers/u5c.rs (1)

128-147: LGTM: API simplification improves readability.

The updated read_params implementation is cleaner and more maintainable. The direct method call and simplified error handling reduce potential error points.

balius-runtime/src/drivers/chainsync.rs (1)

101-109: LGTM: Correct field access for BlockRef.

The change from point.index to point.slot correctly aligns with the BlockRef structure in utxorpc 0.12.0.

balius-runtime/src/lib.rs (5)

489-495: LGTM: Improved concurrency with RwLock pattern.

The change from Arc<Mutex<WorkerMap>> to Arc<RwLock<HashMap<String, Mutex<LoadedWorker>>>> enables better concurrency. Multiple readers can access the worker map simultaneously, and per-worker mutexes ensure thread-safe state updates without blocking unrelated workers.


513-530: LGTM: Correct async implementation with RwLock.

The rewritten chain_cursor method correctly uses read-write lock semantics, allowing concurrent cursor checks while properly locking individual workers during iteration.


572-583: LGTM: Correct async store integration and locking.

Worker registration correctly uses async store operations and follows the new concurrency model by acquiring a write lock for insertion and wrapping the worker in a per-worker Mutex.


628-652: LGTM: Correct atomic update flow with proper locking.

The handle_chain method correctly implements async store operations with proper lock separation: a read lock for the worker map allows concurrent access, while per-worker locks ensure thread-safe state updates during the atomic update.


654-679: LGTM: Efficient request handling with fine-grained locking.

The handle_request method correctly uses a read lock for the worker map and a per-worker lock for processing, allowing concurrent requests to different workers without blocking each other.

balius-runtime/Cargo.toml (1)

26-26: utxorpc 0.12.0 does not exist for the Rust crate ecosystem.

The Rust utxorpc crate only has version 0.11.0 available on crates.io. Version 0.12.0 exists for other ecosystems (PyPI for Python, NuGet for .NET) but not for Rust. Correct the dependency to an existing version (e.g., 0.11.0 or check if a newer Rust release is available).

Likely an incorrect or invalid review comment.

Comment on lines 58 to 60
None => {
tracing::warn!("Received None response from follow_tip, skipping")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential infinite loop on persistent None responses.

If follow_tip consistently returns None, the loop will spin indefinitely, logging warnings. Consider adding a counter or circuit breaker to handle repeated None responses gracefully.

Apply this diff to add a circuit breaker:

 async fn gather_blocks(
     tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
     store: &Store,
 ) -> Result<(NextBlock, UndoBlocks), Error> {
     let mut undos = vec![];
+    let mut none_count = 0;
+    const MAX_NONE_RESPONSES: u32 = 10;
 
     loop {
         let event = tip.event().await?;
 
         match event {
             Some(utxorpc::TipEvent::Apply(chain_block)) => {
                 let next = Block::Cardano(chain_block.parsed.unwrap());
                 break Ok((next, undos));
             }
             Some(utxorpc::TipEvent::Undo(chain_block)) => {
                 undos.push(Block::Cardano(chain_block.parsed.unwrap()));
+                none_count = 0;
             }
             Some(utxorpc::TipEvent::Reset(block_ref)) => {
                 tracing::warn!(block_ref =? &block_ref, "received reset event, reseting tip");
                 undos = store.handle_reset(block_ref.into()).await?;
+                none_count = 0;
             }
             None => {
-                tracing::warn!("Received None response from follow_tip, skipping")
+                none_count += 1;
+                tracing::warn!(none_count, "Received None response from follow_tip");
+                if none_count >= MAX_NONE_RESPONSES {
+                    return Err(Error::Driver(format!(
+                        "Received {} consecutive None responses from follow_tip",
+                        MAX_NONE_RESPONSES
+                    )));
+                }
             }
         }
     }
 }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
balius-runtime/src/lib.rs (1)

47-122: Store error should preserve structured redb error information

Switching Error::Store from a typed redb error representation to a plain String plus to_string() in all From<redb::...> impls discards structured error information and makes programmatic handling (matching on specific redb error kinds) impossible. This is a regression in error handling even though it simplifies custom store integration.

To keep the API usable for both redb and custom stores, consider introducing a dedicated StoreError enum and wrapping it in Error::Store instead of using a String. That way:

  • redb‑specific variants remain matchable by callers,
  • custom backends can still surface opaque errors via a generic variant.

For example:

 #[derive(Error, Debug)]
 pub enum Error {
@@
-    #[error("store error {0}")]
-    Store(String),
+    #[error("store error {0}")]
+    Store(StoreError),
@@
 }
+
+#[derive(Error, Debug)]
+pub enum StoreError {
+    #[error("redb error: {0}")]
+    Redb(#[from] redb::Error),
+
+    #[error("redb database error: {0}")]
+    RedbDatabase(#[from] redb::DatabaseError),
+
+    #[error("redb transaction error: {0}")]
+    RedbTransaction(#[from] redb::TransactionError),
+
+    #[error("redb table error: {0}")]
+    RedbTable(#[from] redb::TableError),
+
+    #[error("redb commit error: {0}")]
+    RedbCommit(#[from] redb::CommitError),
+
+    #[error("redb storage error: {0}")]
+    RedbStorage(#[from] redb::StorageError),
+
+    #[error("custom store error: {0}")]
+    Custom(String),
+}
@@
 impl From<redb::Error> for Error {
     fn from(value: redb::Error) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::Redb(value))
     }
 }
@@
 impl From<redb::DatabaseError> for Error {
     fn from(value: redb::DatabaseError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbDatabase(value))
     }
 }
@@
 impl From<redb::TransactionError> for Error {
     fn from(value: redb::TransactionError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbTransaction(value))
     }
 }
@@
 impl From<redb::TableError> for Error {
     fn from(value: redb::TableError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbTable(value))
     }
 }
@@
 impl From<redb::CommitError> for Error {
     fn from(value: redb::CommitError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbCommit(value))
     }
 }
@@
 impl From<redb::StorageError> for Error {
     fn from(value: redb::StorageError) -> Self {
-        Self::Store(value.to_string())
+        Self::Store(StoreError::RedbStorage(value))
     }
 }

Custom store implementations can then use Error::Store(StoreError::Custom(msg)) (or StoreError::Custom(msg).into()) without being tied to redb, while existing callers can still inspect redb failures precisely.

🧹 Nitpick comments (2)
balius-runtime/src/lib.rs (2)

574-575: Async worker cursor lookup is fine; consider defaulting semantics

Using self.store.get_worker_cursor(id).await? to hydrate the initial cursor is consistent with the new async store API. One design question: should the store be responsible for defaulting missing workers to None vs. treating that as an error? If you expect missing workers to be common (e.g., first start), ensure the store implementation returns Ok(None) instead of an error so this path doesn't fail unnecessarily.


646-673: Atomic update flow: document commit/rollback semantics for Store

The new async write‑ahead + atomic update sequence:

  • log_seq = self.store.write_ahead(undo_blocks, next_block).await?;
  • let mut store_update = self.store.start_atomic_update(log_seq).await?;
  • parallel apply_chain across workers (via join_all)
  • per‑worker update_worker_cursor(&x).await?;
  • final store_update.commit().await?;

is a solid pattern for pluggable backends, but it implicitly relies on a few contracts that should be guaranteed by the Store / AtomicUpdateTrait APIs:

  1. Dropping store_update without calling commit() must reliably roll back / no‑op any partial state, since any worker error causes ? to short‑circuit before commit.
  2. write_ahead + start_atomic_update(log_seq) must be idempotent and safe to retry after a crash between these calls or between write_ahead and commit.
  3. update_worker_cursor should not have externally visible side effects until the enclosing commit() succeeds.

If these guarantees are not already documented and enforced in the trait definitions, I recommend clarifying them there so custom store implementors don't accidentally introduce subtle consistency bugs.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5046d00 and cc9998c.

📒 Files selected for processing (1)
  • balius-runtime/src/lib.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (2)
balius-runtime/src/lib.rs (2)

33-37: Public store module and re-exports look coherent

Exposing pub mod store; and re‑exporting AtomicUpdateTrait, Store, and StoreTrait gives consumers a clean entry point for custom backends and keeps the runtime API focused. Naming and visibility look good for the intended pluggable store abstraction.


526-529: Async find_chain_point integration looks correct

The new async call self.store.find_chain_point(seq).await cleanly plugs into the existing Result<Option<ChainPoint>, Error> signature, and the lowest‑cursor selection logic is unchanged. No issues here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants