From f7606981e9b17915c7e31684a4be6db9edd03014 Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Thu, 1 May 2025 04:54:17 +0800 Subject: [PATCH 1/2] refactor: update signing related configs to be optional --- src/main.rs | 2 +- src/pipeline/ingest.rs | 12 ++++++------ src/pipeline/mod.rs | 13 ++++++++++--- src/queue/mod.rs | 1 + src/server/submit.rs | 3 ++- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index d39a2f7..eec63ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -77,7 +77,7 @@ struct Config { #[serde(default)] queues: HashSet, u5c: ledger::u5c::Config, - signing: signing::Config, + signing: Option, } impl Config { diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index e663f6c..a3ac76e 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -22,7 +22,7 @@ pub struct Stage { storage: Arc, priority: Arc, u5c_adapter: Arc, - secret_adapter: Arc, + signing_adapter: Option>, config: Config, pub output: OutputPort>, } @@ -32,14 +32,14 @@ impl Stage { storage: Arc, priority: Arc, u5c_adapter: Arc, - secret_adapter: Arc, + signing_adapter: Option>, config: Config, ) -> Self { Self { storage, priority, u5c_adapter, - secret_adapter, + signing_adapter, config, output: Default::default(), } @@ -95,9 +95,9 @@ impl gasket::framework::Worker for Worker { .unwrap_or(false); if should_sign { - info!("Signing transaction {} with server key", tx.id); - tx.raw = stage.secret_adapter.sign(tx.raw).await.or_retry()?; - info!("Transaction {} signed successfully", tx.id); + let signer = stage.signing_adapter.as_ref().ok_or(WorkerError::Retry)?; + tx.raw = signer.sign(tx.raw).await.or_retry()?; + info!(tx_id = %tx.id, "Transaction signed with server key"); } let metx = MultiEraTx::decode(&tx.raw).map_err(|_| WorkerError::Recv)?; diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 6401c87..fae9969 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -10,7 +10,7 @@ use crate::{ }, network::peer_manager::PeerManager, queue::priority::Priority, - signing::hashicorp::HashicorpVaultClient, + signing::{hashicorp::HashicorpVaultClient, SigningAdapter}, storage::{ sqlite::{SqliteCursor, SqliteTransaction}, Cursor, @@ -48,13 +48,20 @@ pub async fn run( let peer_manager = Arc::new(peer_manager); let priority = Arc::new(Priority::new(tx_storage.clone(), config.queues.clone())); - let secret_adapter = Arc::new(HashicorpVaultClient::new(config.signing.clone())?); + let signing_adapter: Option> = config + .signing + .as_ref() + .map(|cfg| { + HashicorpVaultClient::new(cfg.clone()) + .map(|c| Arc::new(c) as Arc) + }) + .transpose()?; let mut ingest = ingest::Stage::new( tx_storage.clone(), priority.clone(), u5c_data_adapter.clone(), - secret_adapter, + signing_adapter, config.clone(), ); diff --git a/src/queue/mod.rs b/src/queue/mod.rs index 0804ce9..f400a4e 100644 --- a/src/queue/mod.rs +++ b/src/queue/mod.rs @@ -15,6 +15,7 @@ pub struct Config { pub weight: u8, #[serde(default)] pub chained: bool, + #[serde(default)] pub server_signing: bool, } impl Default for Config { diff --git a/src/server/submit.rs b/src/server/submit.rs index eca468e..313d570 100644 --- a/src/server/submit.rs +++ b/src/server/submit.rs @@ -69,7 +69,8 @@ impl SubmitService for SubmitServiceImpl { self.queues.iter().find(|q| q.name == *DEFAULT_QUEUE) }) }) - .is_none_or(|config| !config.server_signing); + .map(|cfg| cfg.server_signing) + .is_none_or(|server_signing| !server_signing); if should_validate { if let Err(error) = validate_tx(&metx, self.u5c_adapter.clone()).await { From 5c067a3f5a21a8f277661b6e7b222602f131f743 Mon Sep 17 00:00:00 2001 From: Lance Vincent Salera Date: Thu, 1 May 2025 05:01:49 +0800 Subject: [PATCH 2/2] fix: handle missing queue config gracefully in worker scheduling --- src/pipeline/ingest.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipeline/ingest.rs b/src/pipeline/ingest.rs index a3ac76e..3b90a06 100644 --- a/src/pipeline/ingest.rs +++ b/src/pipeline/ingest.rs @@ -91,8 +91,8 @@ impl gasket::framework::Worker for Worker { .config .queues .get(&tx.queue) - .map(|config| config.server_signing) - .unwrap_or(false); + .ok_or(WorkerError::Retry)? + .server_signing; if should_sign { let signer = stage.signing_adapter.as_ref().ok_or(WorkerError::Retry)?;