From e32d6d8b9874ae0c5b35f9b4fb4ecfcec8df065a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20D=C3=ADaz?= Date: Thu, 8 Jan 2026 16:11:35 +0100 Subject: [PATCH] fix(rad): paralellize and control timeouts when dry running rad requests locally --- net/src/client/http/mod.rs | 8 ++-- rad/Cargo.toml | 1 + rad/src/lib.rs | 64 +++++++++++++++++++++++--------- toolkit/src/cli/arguments.rs | 2 +- toolkit/src/cli/data_requests.rs | 2 +- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/net/src/client/http/mod.rs b/net/src/client/http/mod.rs index c15217fb6..72c1f1560 100644 --- a/net/src/client/http/mod.rs +++ b/net/src/client/http/mod.rs @@ -6,6 +6,9 @@ use thiserror::Error; /// Maximum number of HTTP redirects to follow const MAX_REDIRECTS: usize = 4; +/// Maximum http response timeout +pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000); + /// A surf-alike HTTP client that additionally supports proxies (HTTP(S), SOCKS4 and SOCKS5) #[derive(Clone, Debug)] pub struct WitnetHttpClient { @@ -15,14 +18,13 @@ pub struct WitnetHttpClient { impl WitnetHttpClient { /// Simple wrapper around `isahc::HttpClient::send_async`. /// - /// Opinionated in only one thing: if a timeout is not specified, it uses a 10 seconds timeout. + /// If a timeout is not specified, it uses a 10 seconds timeout. pub async fn send( &self, request: reqwest::RequestBuilder, timeout: Option, ) -> Result { - let timeout = timeout.unwrap_or(Duration::from_secs(10)); - let req = match request.timeout(timeout).build() { + let req = match request.timeout(timeout.unwrap_or(DEFAULT_TIMEOUT)).build() { Ok(req) => req, Err(e) => return Err(WitnetHttpError::HttpRequestError { msg: e.to_string() }), }; diff --git a/rad/Cargo.toml b/rad/Cargo.toml index 7bd68b53e..66b5a4e84 100644 --- a/rad/Cargo.toml +++ b/rad/Cargo.toml @@ -36,6 +36,7 @@ serde_cbor = "0.11.2" serde_json = "1.0.96" slicestring = "0.3.2" thiserror = "2.0.12" +tokio = { version = "1.0.1", features = ["io-util", "net", "time", "sync"] } # the url crate is used to perform additional validations before passing arguments to the surf http client # the version of url must be kept in sync with the version used by surf in the `witnet_net` crate url = "2.1.1" diff --git a/rad/src/lib.rs b/rad/src/lib.rs index cb7318dc7..ecdba2e1c 100644 --- a/rad/src/lib.rs +++ b/rad/src/lib.rs @@ -2,7 +2,7 @@ extern crate witnet_data_structures; -use futures::{executor::block_on, future::join_all}; +use futures::{FutureExt, executor::block_on, future::join_all}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue, USER_AGENT}; use serde::Serialize; pub use serde_cbor::{Value as CborValue, to_vec as cbor_to_vec}; @@ -49,6 +49,9 @@ pub mod user_agents; pub type Result = std::result::Result; +/// Maximum http response timeout +pub const MAX_RETRIEVAL_TIMEOUT: Duration = Duration::from_millis(10000); + /// The return type of any method executing the entire life cycle of a data request. #[derive(Debug, Serialize)] pub struct RADRequestExecutionReport { @@ -76,12 +79,17 @@ pub fn try_data_request( let active_wips = current_active_wips(); #[cfg(test)] let active_wips = all_wips_active(); + // If no timeout is specified, use the default one. If specified, cap it to the maximum allowed. + let timeout = match timeout { + None => MAX_RETRIEVAL_TIMEOUT, + Some(timeout) => std::cmp::min(timeout, MAX_RETRIEVAL_TIMEOUT), + }; // We assume that the "current" protocol version will always work for retrievals because // retrievals should not really be re-evaluated (e.g. during synchronization) let protocol_version = ProtocolVersion::guess(); let mut retrieval_context = ReportContext::from_stage(Stage::Retrieval(RetrievalMetadata::default())); - let retrieve_responses = if let Some(inputs) = inputs_injection { + let retrieval_reports: Vec> = if let Some(inputs) = inputs_injection { assert_eq!( inputs.len(), request.retrieve.len(), @@ -101,35 +109,57 @@ pub fn try_data_request( &mut retrieval_context, settings, ) + .unwrap_or_else(|error| RadonReport::from_result(Err(error), &retrieval_context)) }) .collect() } else { - block_on(join_all( - request - .retrieve + let fut = async move { + #[cfg(not(test))] + let active_wips = current_active_wips(); + #[cfg(test)] + let active_wips = all_wips_active(); + + let sources = request.retrieve.clone(); + let aggregate = request.aggregate.clone(); + + let witnessing = witnessing.clone(); + // Adding a timeout to each source retrieval. + // Since currently the execution of RADON is blocking this thread, we can only + // handle HTTP timeouts. + let retrieve_response_fut = sources .iter() .map(|retrieve| { run_paranoid_retrieval( retrieve, - request.aggregate.clone(), + aggregate.clone(), settings, active_wips.clone(), protocol_version, witnessing.clone().unwrap_or_default(), - timeout, + Some(timeout), ) }) - .collect::>(), - )) - }; + .map(|fut| { + tokio::time::timeout(timeout, fut) + .map(|response| response.unwrap_or(Err(RadError::RetrieveTimeout))) + }); + + // Perform retrievals in parallel for the sake of synchronization between sources + // (increasing the likeliness of multiple sources returning results that are closer to each + // other). + futures::future::join_all(retrieve_response_fut) + .await + .into_iter() + .map(|retrieve| { + retrieve.unwrap_or_else(|error| { + RadonReport::from_result(Err(error), &retrieval_context) + }) + }) + .collect() + }; - let retrieval_reports: Vec> = retrieve_responses - .into_iter() - .map(|retrieve| { - retrieve - .unwrap_or_else(|error| RadonReport::from_result(Err(error), &retrieval_context)) - }) - .collect(); + block_on(fut) + }; // Evaluate aggregation pre-condition by using the same logic than for tally pre-condition, // to ensure that at least 20% of the data sources are not errors. diff --git a/toolkit/src/cli/arguments.rs b/toolkit/src/cli/arguments.rs index 9e132e8a4..85fc12d47 100644 --- a/toolkit/src/cli/arguments.rs +++ b/toolkit/src/cli/arguments.rs @@ -31,7 +31,7 @@ pub(crate) struct TryDataRequest { pub full_trace: Option, #[structopt( long, - help = "Maximum amount of seconds to wait for a data source to respond." + help = "Maximum time in milliseconds to wait for a data source to respond." )] pub timeout: Option, } diff --git a/toolkit/src/cli/data_requests.rs b/toolkit/src/cli/data_requests.rs index 6ec0c6c7e..5f58be561 100644 --- a/toolkit/src/cli/data_requests.rs +++ b/toolkit/src/cli/data_requests.rs @@ -43,7 +43,7 @@ pub(crate) fn try_from_args( args: arguments::TryDataRequest, ) -> Result { let full_trace = args.full_trace.unwrap_or(true); - let timeout = args.timeout.map(Duration::from_secs); + let timeout = args.timeout.map(Duration::from_millis); let request = decode_from_args(args.into())?.data_request; witnet_toolkit::data_requests::try_data_request(&request, full_trace, timeout) }