Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions net/src/client/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use thiserror::Error;
/// Maximum number of HTTP redirects to follow
const MAX_REDIRECTS: usize = 4;

/// Maximum http response timeout
pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);

/// A surf-alike HTTP client that additionally supports proxies (HTTP(S), SOCKS4 and SOCKS5)
#[derive(Clone, Debug)]
pub struct WitnetHttpClient {
Expand All @@ -15,14 +18,13 @@ pub struct WitnetHttpClient {
impl WitnetHttpClient {
/// Simple wrapper around `isahc::HttpClient::send_async`.
///
/// Opinionated in only one thing: if a timeout is not specified, it uses a 10 seconds timeout.
/// If a timeout is not specified, it uses a 10 seconds timeout.
pub async fn send(
&self,
request: reqwest::RequestBuilder,
timeout: Option<Duration>,
) -> Result<WitnetHttpResponse, WitnetHttpError> {
let timeout = timeout.unwrap_or(Duration::from_secs(10));
let req = match request.timeout(timeout).build() {
let req = match request.timeout(timeout.unwrap_or(DEFAULT_TIMEOUT)).build() {
Ok(req) => req,
Err(e) => return Err(WitnetHttpError::HttpRequestError { msg: e.to_string() }),
};
Expand Down
1 change: 1 addition & 0 deletions rad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ serde_cbor = "0.11.2"
serde_json = "1.0.96"
slicestring = "0.3.2"
thiserror = "2.0.12"
tokio = { version = "1.0.1", features = ["io-util", "net", "time", "sync"] }
# the url crate is used to perform additional validations before passing arguments to the surf http client
# the version of url must be kept in sync with the version used by surf in the `witnet_net` crate
url = "2.1.1"
Expand Down
64 changes: 47 additions & 17 deletions rad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

extern crate witnet_data_structures;

use futures::{executor::block_on, future::join_all};
use futures::{FutureExt, executor::block_on, future::join_all};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, USER_AGENT};
use serde::Serialize;
pub use serde_cbor::{Value as CborValue, to_vec as cbor_to_vec};
Expand Down Expand Up @@ -49,6 +49,9 @@ pub mod user_agents;

pub type Result<T> = std::result::Result<T, RadError>;

/// Maximum http response timeout
pub const MAX_RETRIEVAL_TIMEOUT: Duration = Duration::from_millis(10000);

/// The return type of any method executing the entire life cycle of a data request.
#[derive(Debug, Serialize)]
pub struct RADRequestExecutionReport {
Expand Down Expand Up @@ -76,12 +79,17 @@ pub fn try_data_request(
let active_wips = current_active_wips();
#[cfg(test)]
let active_wips = all_wips_active();
// If no timeout is specified, use the default one. If specified, cap it to the maximum allowed.
let timeout = match timeout {
None => MAX_RETRIEVAL_TIMEOUT,
Some(timeout) => std::cmp::min(timeout, MAX_RETRIEVAL_TIMEOUT),
};
// We assume that the "current" protocol version will always work for retrievals because
// retrievals should not really be re-evaluated (e.g. during synchronization)
let protocol_version = ProtocolVersion::guess();
let mut retrieval_context =
ReportContext::from_stage(Stage::Retrieval(RetrievalMetadata::default()));
let retrieve_responses = if let Some(inputs) = inputs_injection {
let retrieval_reports: Vec<RadonReport<RadonTypes>> = if let Some(inputs) = inputs_injection {
assert_eq!(
inputs.len(),
request.retrieve.len(),
Expand All @@ -101,35 +109,57 @@ pub fn try_data_request(
&mut retrieval_context,
settings,
)
.unwrap_or_else(|error| RadonReport::from_result(Err(error), &retrieval_context))
})
.collect()
} else {
block_on(join_all(
request
.retrieve
let fut = async move {
#[cfg(not(test))]
let active_wips = current_active_wips();
#[cfg(test)]
let active_wips = all_wips_active();

let sources = request.retrieve.clone();
let aggregate = request.aggregate.clone();

let witnessing = witnessing.clone();
// Adding a timeout to each source retrieval.
// Since currently the execution of RADON is blocking this thread, we can only
// handle HTTP timeouts.
let retrieve_response_fut = sources
.iter()
.map(|retrieve| {
run_paranoid_retrieval(
retrieve,
request.aggregate.clone(),
aggregate.clone(),
settings,
active_wips.clone(),
protocol_version,
witnessing.clone().unwrap_or_default(),
timeout,
Some(timeout),
)
})
.collect::<Vec<_>>(),
))
};
.map(|fut| {
tokio::time::timeout(timeout, fut)
.map(|response| response.unwrap_or(Err(RadError::RetrieveTimeout)))
});

// Perform retrievals in parallel for the sake of synchronization between sources
// (increasing the likeliness of multiple sources returning results that are closer to each
// other).
futures::future::join_all(retrieve_response_fut)
.await
.into_iter()
.map(|retrieve| {
retrieve.unwrap_or_else(|error| {
RadonReport::from_result(Err(error), &retrieval_context)
})
})
.collect()
};

let retrieval_reports: Vec<RadonReport<RadonTypes>> = retrieve_responses
.into_iter()
.map(|retrieve| {
retrieve
.unwrap_or_else(|error| RadonReport::from_result(Err(error), &retrieval_context))
})
.collect();
block_on(fut)
};

// Evaluate aggregation pre-condition by using the same logic than for tally pre-condition,
// to ensure that at least 20% of the data sources are not errors.
Expand Down
2 changes: 1 addition & 1 deletion toolkit/src/cli/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) struct TryDataRequest {
pub full_trace: Option<bool>,
#[structopt(
long,
help = "Maximum amount of seconds to wait for a data source to respond."
help = "Maximum time in milliseconds to wait for a data source to respond."
)]
pub timeout: Option<u64>,
}
Expand Down
2 changes: 1 addition & 1 deletion toolkit/src/cli/data_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) fn try_from_args(
args: arguments::TryDataRequest,
) -> Result<RADRequestExecutionReport, Error> {
let full_trace = args.full_trace.unwrap_or(true);
let timeout = args.timeout.map(Duration::from_secs);
let timeout = args.timeout.map(Duration::from_millis);
let request = decode_from_args(args.into())?.data_request;
witnet_toolkit::data_requests::try_data_request(&request, full_trace, timeout)
}
Expand Down
Loading