From ca13e14d65277a24e1c375b83f643edeedad4208 Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Fri, 30 Aug 2024 17:04:34 +0200 Subject: [PATCH 1/5] Add TLS connector/acceptor benchmarks using valgrind for further details please refer to the documentation in pingora-core/benches/tls_benchmarks.md --- pingora-core/Cargo.toml | 20 +- pingora-core/benches/tls_acceptor.rs | 163 +++++++++++++ pingora-core/benches/tls_benchmarks.md | 137 +++++++++++ pingora-core/benches/tls_connector.rs | 118 +++++++++ pingora-core/benches/utils/mod.rs | 48 ++++ pingora-core/benches/utils/pingora_conf.yaml | 7 + pingora-core/examples/bench_client.rs | 226 ++++++++++++++++++ pingora-core/examples/bench_server.rs | 85 +++++++ .../tests/utils/conf/keys/server_rustls.crt | 14 ++ 9 files changed, 817 insertions(+), 1 deletion(-) create mode 100644 pingora-core/benches/tls_acceptor.rs create mode 100644 pingora-core/benches/tls_benchmarks.md create mode 100644 pingora-core/benches/tls_connector.rs create mode 100644 pingora-core/benches/utils/mod.rs create mode 100644 pingora-core/benches/utils/pingora_conf.yaml create mode 100644 pingora-core/examples/bench_client.rs create mode 100644 pingora-core/examples/bench_server.rs create mode 100644 pingora-proxy/tests/utils/conf/keys/server_rustls.crt diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index c5bce77e..208d97d5 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -76,9 +76,27 @@ reqwest = { version = "0.11", features = ["rustls"], default-features = false } hyperlocal = "0.8" hyper = "0.14" jemallocator = "0.5" +iai-callgrind = "0.13.1" +axum = { version = "0.7.5", features = ["http2"] } +axum-server = { version = "0.7.1", features = ["tls-rustls"] } [features] default = ["openssl"] openssl = ["pingora-openssl"] boringssl = ["pingora-boringssl"] -patched_http1 = [] \ No newline at end of file +patched_http1 = [] + +[[bench]] +name = "tls_connector" +harness = false + +[[example]] +name = "bench_server" + + +[[bench]] +name = "tls_acceptor" +harness = false + +[[example]] +name = "bench_client" \ No newline at end of file diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs new file mode 100644 index 00000000..c1e7e149 --- /dev/null +++ b/pingora-core/benches/tls_acceptor.rs @@ -0,0 +1,163 @@ +use iai_callgrind::{ + binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, + FlamegraphConfig, +}; +use iai_callgrind::{Pipe, Stdin}; +use once_cell::sync::Lazy; +use reqwest::{Certificate, Client, StatusCode, Version}; +use std::fs::File; +use std::io::Read; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use tokio::task::JoinSet; + +mod utils; + +use utils::{ + generate_random_ascii_data, version_to_port, wait_for_tcp_connect, CERT_PATH, TLS_HTTP11_PORT, + TLS_HTTP2_PORT, +}; + +fn read_cert() -> Certificate { + let mut buf = Vec::new(); + File::open(CERT_PATH.to_string()) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + Certificate::from_pem(&buf).unwrap() +} + +fn client_http11() -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), TLS_HTTP11_PORT); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) + .build() + .unwrap() +} +fn client_http2() -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), TLS_HTTP2_PORT); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) + // avoid error messages during first set of connections (os error 32, broken pipe) + .http2_prior_knowledge() + .build() + .unwrap() +} + +pub static CLIENT_HTTP11: Lazy = Lazy::new(client_http11); +pub static CLIENT_HTTP2: Lazy = Lazy::new(client_http2); + +/// using with client: None instantiates a new client and performs a full handshake +/// providing Some(client) will re-use the provided client/session +async fn post_data(client_reuse: bool, version: Version, port: u16, data: String) { + let client = if client_reuse { + // NOTE: do not perform TLS handshake for each request + match version { + Version::HTTP_11 => &*CLIENT_HTTP11, + Version::HTTP_2 => &*CLIENT_HTTP2, + _ => { + panic!("HTTP version not supported.") + } + } + } else { + // NOTE: perform TLS handshake for each request + match version { + Version::HTTP_11 => &client_http11(), + Version::HTTP_2 => &client_http2(), + _ => { + panic!("HTTP version not supported.") + } + } + }; + + let resp = client + .post(format! {"https://openrusty.org:{}", port}) + .body(data) + .send() + .await + .unwrap(); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.version(), version); + + // read full response, important for consistent tests + let _resp_body = resp.text().await.unwrap(); + // println!("resp_body: {}", resp_body) +} + +async fn tls_post_data(client_reuse: bool, version: Version, data: Vec) { + let port = version_to_port(version); + + let mut req_set = JoinSet::new(); + // spawn request for all elements within data + data.iter().for_each(|d| { + req_set.spawn(post_data(client_reuse, version, port, d.to_string())); + }); + + // wait for all responses + while let Some(res) = req_set.join_next().await { + let _ = res.unwrap(); + } +} + +fn run_benchmark_requests( + client_reuse: bool, + http_version: Version, + request_count: i32, + request_size: usize, +) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + println!("Waiting for TCP connect..."); + wait_for_tcp_connect(http_version).await; + println!("TCP connect successful."); + + println!("Starting to send benchmark requests."); + tls_post_data( + client_reuse, + http_version, + generate_random_ascii_data(request_count, request_size), + ) + .await; + println!("Successfully sent benchmark requests."); + }) +} + +static REQUEST_COUNT: i32 = 128; +static REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::http_11_handshake_always(setup = run_benchmark_requests(false, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE))] +#[bench::http_11_handshake_once(setup = run_benchmark_requests(true, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE))] +#[bench::http_2_handshake_always(setup = run_benchmark_requests(false, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE))] +#[bench::http_2_handshake_once(setup = run_benchmark_requests(true, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE))] +fn bench_server() -> Command { + let path = format!( + "{}/../target/release/examples/bench_server", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .build() +} + +binary_benchmark_group!( + name = tls_acceptor; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //"--toggle-collect=pingora_core::services::listening::Service::run_endpoint*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_server +); + +main!(binary_benchmark_groups = tls_acceptor); diff --git a/pingora-core/benches/tls_benchmarks.md b/pingora-core/benches/tls_benchmarks.md new file mode 100644 index 00000000..4c012ef6 --- /dev/null +++ b/pingora-core/benches/tls_benchmarks.md @@ -0,0 +1,137 @@ +# TLS Benchmarks +The benchmarks are using [Valgrind](https://valgrind.org/) through the [iai_callgrind](https://docs.rs/iai-callgrind/latest/iai_callgrind/) benchmark framework. +For measuring performance the Valgrind tool [callgrind](https://valgrind.org/docs/manual/cl-manual.html) is used. + +```mermaid +C4Context + title Overview + + System_Ext(ContinuousIntegration, "Continuous Integration") + System_Boundary(OS, "Linux") { + System(Cargo, "Cargo", "bench") + Container(Results, "Benchmark Results") + System_Boundary(Valgrind, "Valgrind") { + Container(LogFile, "Log File") + System_Ext(Valgrind, "Valgrind") + Container(CallGraph, "Call Graph") + Rel(Valgrind, CallGraph, "creates") + Rel(Valgrind, LogFile, "creates") + } + Rel(Cargo, Valgrind, "executes") + } + + Person(Developer, "Developer") + System_Ext(QCacheGrind, "QCacheGrind", "KCacheGrind") + + Rel(Developer, Cargo, "runs") + Rel(ContinuousIntegration, Cargo, "runs") + + Rel(Developer, QCacheGrind, "utilizes") + Rel(QCacheGrind, CallGraph, "to visualize") + + Rel(Cargo, Results, "reports") + +``` + +## Visualization +With [kcachegrind](https://github.com/KDE/kcachegrind)/[qcachegrind](https://github.com/KDE/kcachegrind) the call-graphs +can be interactively visualized and navigated. + +[gprof2dot](https://github.com/jrfonseca/gprof2dot) and [graphviz](https://graphviz.org/) can create call-graph images. + +```bash +gprof2dot -f callgrind *out | dot -T png -o out.png +``` + +The iai_callgrind default [Flamegrahps](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.FlamegraphConfig.html#impl-Default-for-FlamegraphConfig) +are activated and stored in [SVG](https://en.wikipedia.org/wiki/SVG) format next to the call-graph files. + + +## Technical Details +The TLS Benchmarks are intended to capture full `connect/accept` cycles. To benchmark such scenario it is required +to have some parallel processes running (`server/client`) while only one of them should be benchmarked. + +### Challenges +pingora-core uses [tokio](https://tokio.rs/) as runtime and [pingora-core::server::Server](https://docs.rs/pingora-core/latest/pingora_core/server/struct.Server.html) +spawns threads when being setup. +This leads to implications on the benchmark process as multiple threads need to be covered. + +As tokio is used and network requests are issued during the benchmarking the results will always have a certain variance. + +To limit the variance impact the following pre-cautions where considered: +- running benchmarks (where possible) within a single thread and utilize tokio single-threaded runtime +- issuing multiple requests during benchmarking + +### Scenario Setup +Within `pingora-core/examples/` the BinaryBenchmark Command executables for benchmarking are built using `dev-dependencies`. +The `pingora-core/benches/` contains the opposite side and the iai_callgrind definitions. + +The benchmarked part (`server/client` executable) is built with `pingora-core`. The opposite part is built using +external components (`reqwest/axum`). + +The `servers` are instantiated to accept `POST` requests and echo the transmitted bodies in the response. + +The binaries (`bench_server/bench_client`) are launched through iai_callgrind as [BinaryBenchmark](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.BinaryBenchmark.html) +within `valgrind/callgrind`. +The BinaryBenchmark [setup](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.BinaryBenchmark.html#structfield.setup) +function is used to run the opposite part (`client/server`) of the benchmark in parallel. + +For the server benchmark scenario the layout looks like: +- iai_callgrind starts the client on the setup + - the client waits for a TCP connect before issuing the requests +- iai_callgrind launches the server within valgrind +- once the server is up the setup function client successfuly connects and starts to run the requests +- the server stops after a pre-configured period of time + +```mermaid +sequenceDiagram + iai_callgrind->>Setup (Client): starts + Setup (Client)->>BinaryBechmark (Server): TcpStream::connect + BinaryBechmark (Server)-->>Setup (Client): Failed - Startup Phase + iai_callgrind->>BinaryBechmark (Server): starts + Setup (Client)->>BinaryBechmark (Server): TcpStream::connect + BinaryBechmark (Server)->>Setup (Client): Succeeded - Server Running + Setup (Client)->>BinaryBechmark (Server): HTTP Request + BinaryBechmark (Server)->>Setup (Client): HTTP Response + iai_callgrind->>BinaryBechmark (Server): waits for success + iai_callgrind->>Setup (Client): waits for success +``` + +For the client benchmark the setup is similar, but inverse as the server runs within the iai_callgrind setup function. + +### Running +The benchmarks can be run using the following commands: +```bash +VERSION="$(cargo metadata --format-version=1 |\ + jq -r '.packages[] | select(.name == "iai-callgrind").version')" +cargo install iai-callgrind-runner --version "${VERSION}" + +FEATURES="openssl" +cargo build --no-default-features --features "${FEATURES}" --release --examples +cargo bench --no-default-features --features "${FEATURES}" --package pingora-core --bench tls_acceptor -- --nocapture +cargo bench --no-default-features --features "${FEATURES}" --package pingora-core --bench tls_connector -- --nocapture +``` + +### Output +Generated benchmark files are located below `target/iai/`: +``` +target/iai/ +└── pingora-core # + └── tls_acceptor # + └── tls_acceptor # + └── bench_server.http_11_handshake_always # . + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.diff.old.svg + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.old.svg + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.svg + ├── callgrind.bench_server.http_11_handshake_always.log + ├── callgrind.bench_server.http_11_handshake_always.log.old + ├── callgrind.bench_server.http_11_handshake_always.out + └── callgrind.bench_server.http_11_handshake_always.out.old +``` + +### Parameters +Server and client benchmark are parameterized with the following options: +- client/session re-use +- HTTP version `1.1|2.0` +- number of requests +- request body size \ No newline at end of file diff --git a/pingora-core/benches/tls_connector.rs b/pingora-core/benches/tls_connector.rs new file mode 100644 index 00000000..745fd820 --- /dev/null +++ b/pingora-core/benches/tls_connector.rs @@ -0,0 +1,118 @@ +use axum::routing::post; +use axum::Router; +use axum_server::tls_rustls::RustlsConfig; +use axum_server::Handle; +use iai_callgrind::{ + binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, + FlamegraphConfig, +}; +use iai_callgrind::{Pipe, Stdin}; +use once_cell::sync::Lazy; +use reqwest::Version; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; + +static CERT_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/server_rustls.crt", + env!("CARGO_MANIFEST_DIR") + ) +}); +static KEY_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/key.pem", + env!("CARGO_MANIFEST_DIR") + ) +}); + +static TLS_HTTP11_PORT: u16 = 6204; +static TLS_HTTP2_PORT: u16 = 6205; + +async fn graceful_shutdown(handle: Handle) { + tokio::time::sleep(Duration::from_secs(10)).await; + + println!("Sending graceful shutdown signal."); + handle.graceful_shutdown(None); +} + +fn run_benchmark_server() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let addr_http11 = SocketAddr::from(([127, 0, 0, 1], TLS_HTTP11_PORT)); + let addr_http2 = SocketAddr::from(([127, 0, 0, 1], TLS_HTTP2_PORT)); + + let app = Router::new().route("/", post(|body: String| async { body })); + + let handle_http11 = Handle::new(); + let handle_http2 = Handle::new(); + + // configure certificate and private key used by https + let config = + RustlsConfig::from_pem_file(PathBuf::from(&*CERT_PATH), PathBuf::from(&*KEY_PATH)) + .await + .unwrap(); + + let (http11_server, _, http2_server, _) = tokio::join!( + axum_server::bind_rustls(addr_http11, config.clone()) + .handle(handle_http11.clone()) + .serve(app.clone().into_make_service()), + graceful_shutdown(handle_http11), + axum_server::bind_rustls(addr_http2, config) + .handle(handle_http2.clone()) + .serve(app.into_make_service()), + graceful_shutdown(handle_http2) + ); + http11_server.unwrap(); + http2_server.unwrap(); + }); +} + +static REQUEST_COUNT: i32 = 128; +static REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::http_11_handshake_always(setup = run_benchmark_server(), args = [false, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE])] +#[bench::http_11_handshake_once(setup = run_benchmark_server(), args = [true, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE])] +#[bench::http_2_handshake_always(setup = run_benchmark_server(), args = [false, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE])] +#[bench::http_2_handshake_once(setup = run_benchmark_server(), args = [true, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE])] +fn bench_client( + stream_reuse: bool, + http_version: Version, + request_count: i32, + request_size: usize, +) -> Command { + let path = format!( + "{}/../target/release/examples/bench_client", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([ + format!("--stream-reuse={}", stream_reuse), + format!("--http-version={:?}", http_version), + format!("--request-count={}", request_count), + format!("--request-size={}", request_size), + ]) + .build() +} + +binary_benchmark_group!( + name = tls_connector; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //, "--toggle-collect=bench_client::post_http*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_client +); + +main!(binary_benchmark_groups = tls_connector); diff --git a/pingora-core/benches/utils/mod.rs b/pingora-core/benches/utils/mod.rs new file mode 100644 index 00000000..bff060be --- /dev/null +++ b/pingora-core/benches/utils/mod.rs @@ -0,0 +1,48 @@ +use once_cell::sync::Lazy; +use rand::distributions::{Alphanumeric, DistString}; +use reqwest::Version; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; +use std::time::Duration; + +pub static CERT_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/server_rustls.crt", + env!("CARGO_MANIFEST_DIR") + ) +}); +#[allow(dead_code)] +pub static KEY_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/key.pem", + env!("CARGO_MANIFEST_DIR") + ) +}); +pub static TLS_HTTP11_PORT: u16 = 6204; +pub static TLS_HTTP2_PORT: u16 = 6205; + +pub fn generate_random_ascii_data(count: i32, len: usize) -> Vec { + let mut random_data = vec![]; + for _i in 0..count { + let random_string = Alphanumeric.sample_string(&mut rand::thread_rng(), len); + random_data.push(random_string) + } + random_data +} + +pub fn version_to_port(version: Version) -> u16 { + match version { + Version::HTTP_11 => TLS_HTTP11_PORT, + Version::HTTP_2 => TLS_HTTP2_PORT, + _ => { + panic!("HTTP version not supported.") + } + } +} + +pub async fn wait_for_tcp_connect(version: Version) { + let port = version_to_port(version); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + while let Err(_err) = TcpStream::connect(addr) { + let _ = tokio::time::sleep(Duration::from_millis(100)).await; + } +} diff --git a/pingora-core/benches/utils/pingora_conf.yaml b/pingora-core/benches/utils/pingora_conf.yaml new file mode 100644 index 00000000..a30d66bd --- /dev/null +++ b/pingora-core/benches/utils/pingora_conf.yaml @@ -0,0 +1,7 @@ +--- +version: 1 +client_bind_to_ipv4: + - 127.0.0.2 +ca_file: tests/keys/server.crt +threads: 1 +work_stealing: false \ No newline at end of file diff --git a/pingora-core/examples/bench_client.rs b/pingora-core/examples/bench_client.rs new file mode 100644 index 00000000..ff77730a --- /dev/null +++ b/pingora-core/examples/bench_client.rs @@ -0,0 +1,226 @@ +use bytes::Bytes; +use clap::Parser; +use http::StatusCode; +use log::debug; +use pingora_core::connectors::http::v1::Connector as ConnectorV11; +use pingora_core::connectors::http::v2::Connector as ConnectorV2; +use pingora_core::connectors::ConnectorOptions; +use pingora_core::prelude::HttpPeer; +use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV11; +use pingora_core::protocols::http::v2::client::Http2Session; +use pingora_http::RequestHeader; +use reqwest::Version; +use std::io::ErrorKind::Unsupported; + +#[allow(dead_code, unused_imports)] +#[path = "../benches/utils/mod.rs"] +mod bench_utils; +use bench_utils::{ + generate_random_ascii_data, wait_for_tcp_connect, CERT_PATH, KEY_PATH, TLS_HTTP2_PORT, +}; +use pingora_core::protocols::http::client::HttpSession; + +const DEFAULT_POOL_SIZE: usize = 2; +const HTTP_HOST: &str = "openrusty.org"; +const SERVER_IP: &str = "127.0.0.1"; + +fn get_tls_connector_options() -> ConnectorOptions { + let mut options = ConnectorOptions::new(DEFAULT_POOL_SIZE); + options.ca_file = Some(CERT_PATH.clone()); + options.cert_key_file = Some((CERT_PATH.clone(), KEY_PATH.clone())); + options +} + +async fn connector_http11_session() -> (ConnectorV11, HttpPeer) { + let connector = ConnectorV11::new(Some(get_tls_connector_options())); + let peer = get_http_peer(TLS_HTTP2_PORT as i32); + + (connector, peer) +} + +async fn connector_http2() -> (ConnectorV2, HttpPeer) { + let connector = ConnectorV2::new(Some(get_tls_connector_options())); + let mut peer = get_http_peer(TLS_HTTP2_PORT as i32); + peer.options.set_http_version(2, 2); + peer.options.max_h2_streams = 1; + + (connector, peer) +} + +async fn session_new_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + let http2_session = connector.new_http_session(&peer).await.unwrap(); + match http2_session { + HttpSession::H1(_) => panic!("expect h2"), + HttpSession::H2(h2_stream) => h2_stream, + } +} + +async fn session_new_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + let (http_session, reused) = connector.get_http_session(&peer).await.unwrap(); + assert!(!reused); + http_session +} + +async fn session_reuse_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + connector.reused_http_session(&peer).await.unwrap().unwrap() +} + +async fn session_reuse_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + connector.reused_http_session(&peer).await.unwrap() +} + +fn get_http_peer(port: i32) -> HttpPeer { + HttpPeer::new(format!("{}:{}", SERVER_IP, port), true, HTTP_HOST.into()) +} + +async fn post_http11( + client_reuse: bool, + connector: ConnectorV11, + peer: HttpPeer, + data: Vec, +) { + let mut first = true; + for d in data { + let mut http_session = if client_reuse { + if first { + debug!("Creating a new HTTP stream for the first request."); + session_new_http11(&connector, peer.clone()).await + } else { + debug!("Re-using existing HTTP stream for request."); + session_reuse_http11(&connector, peer.clone()).await + } + } else { + debug!("Using new HTTP stream for request."); + session_new_http11(&connector, peer.clone()).await + }; + + let mut req = Box::new(RequestHeader::build("POST", b"/", Some(d.len())).unwrap()); + req.append_header("Host", HTTP_HOST).unwrap(); + req.append_header("Content-Length", d.len()).unwrap(); + req.append_header("Content-Type", "text/plain").unwrap(); + debug!("request_headers: {:?}", req.headers); + + http_session.write_request_header(req).await.unwrap(); + http_session.write_body(d.as_bytes()).await.unwrap(); + + let res_headers = *http_session.read_resp_header_parts().await.unwrap(); + debug!("response_headers: {:?}", res_headers); + + let res_body = http_session.read_body_bytes().await.unwrap().unwrap(); + debug!("res_body: {:?}", res_body); + assert_eq!(res_body, Bytes::from(d.clone())); + + assert_eq!(res_headers.version, http::version::Version::HTTP_11); + assert_eq!(res_headers.status, StatusCode::OK); + + if client_reuse { + connector + .release_http_session(http_session, &peer, None) + .await; + first = false; + } + } +} + +async fn post_http2(client_reuse: bool, connector: ConnectorV2, peer: HttpPeer, data: Vec) { + let mut first = true; + for d in data { + let mut http_session = if client_reuse { + if first { + debug!("Creating a new HTTP stream for the first request."); + session_new_http2(&connector, peer.clone()).await + } else { + debug!("Re-using existing HTTP stream for request."); + session_reuse_http2(&connector, peer.clone()).await + } + } else { + debug!("Using new HTTP stream for request."); + session_new_http2(&connector, peer.clone()).await + }; + + let mut req = Box::new(RequestHeader::build("POST", b"/", Some(d.len())).unwrap()); + req.append_header("Host", HTTP_HOST).unwrap(); + req.append_header("Content-Length", d.len()).unwrap(); + req.append_header("Content-Type", "text/plain").unwrap(); + debug!("res_headers: {:?}", req.headers); + + http_session.write_request_header(req, false).unwrap(); + http_session + .write_request_body(Bytes::from(d.clone()), true) + .unwrap(); + http_session.finish_request_body().unwrap(); + + http_session.read_response_header().await.unwrap(); + let res_body = http_session.read_response_body().await.unwrap().unwrap(); + debug!("res_body: {:?}", res_body); + assert_eq!(res_body, Bytes::from(d)); + + let res_headers = http_session.response_header().unwrap(); + debug!("res_header: {:?}", res_headers); + assert_eq!(res_headers.version, http::version::Version::HTTP_2); + assert_eq!(res_headers.status, StatusCode::OK); + + if client_reuse { + connector.release_http_session(http_session, &peer, None); + first = false; + } + } +} + +async fn connector_tls_post_data(client_reuse: bool, version: Version, data: Vec) { + match version { + Version::HTTP_11 => { + let (connector, peer) = connector_http11_session().await; + post_http11(client_reuse, connector, peer, data).await; + } + Version::HTTP_2 => { + let (connector, peer) = connector_http2().await; + post_http2(client_reuse, connector, peer, data).await; + } + _ => { + panic!("HTTP version not supported.") + } + }; +} + +fn http_version_parser(version: &str) -> Result { + match version { + "HTTP/1.1" => Ok(Version::HTTP_11), + "HTTP/2.0" => Ok(Version::HTTP_2), + _ => Err(std::io::Error::from(Unsupported)), + } +} + +#[derive(Parser, Debug)] +struct Args { + #[clap(long, parse(try_from_str = http_version_parser))] + http_version: Version, + + #[clap(long, action = clap::ArgAction::Set)] + stream_reuse: bool, + + #[clap(long)] + request_count: i32, + + #[clap(long)] + request_size: usize, +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args = Args::parse(); + println!("{:?}", args); + + println!("Waiting for TCP connect..."); + wait_for_tcp_connect(args.http_version).await; + println!("TCP connect successful."); + + println!("Starting to send benchmark requests."); + connector_tls_post_data( + args.stream_reuse, + args.http_version, + generate_random_ascii_data(args.request_count, args.request_size), + ) + .await; + println!("Successfully sent benchmark requests."); +} diff --git a/pingora-core/examples/bench_server.rs b/pingora-core/examples/bench_server.rs new file mode 100644 index 00000000..c0f8d17a --- /dev/null +++ b/pingora-core/examples/bench_server.rs @@ -0,0 +1,85 @@ +use clap::Parser; +use pingora_core::listeners::Listeners; +use pingora_core::prelude::{Opt, Server}; +use pingora_core::services::listening::Service; +use std::env::current_dir; +use std::thread; +use std::time::Duration; + +#[allow(dead_code, unused_imports)] +#[path = "../tests/utils/mod.rs"] +mod test_utils; +use crate::test_utils::EchoApp; + +#[allow(dead_code, unused_imports)] +#[path = "../benches/utils/mod.rs"] +mod bench_utils; +use bench_utils::{CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; + +pub struct BenchServer { + // Maybe useful in the future + #[allow(dead_code)] + pub handle: thread::JoinHandle<()>, +} + +fn entry_point(opt: Option) { + env_logger::init(); + + let mut test_server = Server::new(opt).unwrap(); + test_server.bootstrap(); + + let mut listeners = Listeners::new(); + + let tls_settings_h1 = + pingora_core::listeners::TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()) + .unwrap(); + let mut tls_settings_h2 = + pingora_core::listeners::TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()) + .unwrap(); + tls_settings_h2.enable_h2(); + + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP11_PORT}.as_str(), + None, + tls_settings_h1, + ); + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP2_PORT}.as_str(), + None, + tls_settings_h2, + ); + + let echo_service_http = + Service::with_listeners("Echo Service HTTP".to_string(), listeners, EchoApp); + + test_server.add_service(echo_service_http); + test_server.run_forever(); +} + +impl BenchServer { + pub fn start() -> Self { + println!("{:?}", current_dir().unwrap()); + let opts: Vec = vec![ + "pingora".into(), + "-c".into(), + "benches/utils/pingora_conf.yaml".into(), + ]; + println!("{:?}", opts); + + let server_handle = thread::spawn(|| { + entry_point(Some(Opt::parse_from(opts))); + }); + // wait until the server is up + thread::sleep(Duration::from_secs(2)); + BenchServer { + handle: server_handle, + } + } +} + +fn main() { + println!("bench_server: starting."); + let _server = BenchServer::start(); + thread::sleep(Duration::from_secs(10)); + println!("bench_server: finished."); +} diff --git a/pingora-proxy/tests/utils/conf/keys/server_rustls.crt b/pingora-proxy/tests/utils/conf/keys/server_rustls.crt new file mode 100644 index 00000000..c02a214b --- /dev/null +++ b/pingora-proxy/tests/utils/conf/keys/server_rustls.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICJzCCAc6gAwIBAgIUU+G0acG/uiMu1ZDSjlcoY4gH53QwCgYIKoZIzj0EAwIw +ZDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNp +c2NvMRgwFgYDVQQKDA9DbG91ZGZsYXJlLCBJbmMxFjAUBgNVBAMMDW9wZW5ydXN0 +eS5vcmcwHhcNMjQwNzI0MTMzOTQ4WhcNMzQwNzIyMTMzOTQ4WjBkMQswCQYDVQQG +EwJVUzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xGDAWBgNV +BAoMD0Nsb3VkZmxhcmUsIEluYzEWMBQGA1UEAwwNb3BlbnJ1c3R5Lm9yZzBZMBMG +ByqGSM49AgEGCCqGSM49AwEHA0IABNn/9RZtR48knaJD6tk9BdccaJfZ0hGEPn6B +SDXmlmJPhcTBqa4iUwW/ABpGvO3FpJcNWasrX2k+qZLq3g205MKjXjBcMDsGA1Ud +EQQ0MDKCDyoub3BlbnJ1c3R5Lm9yZ4INb3BlbnJ1c3R5Lm9yZ4IHY2F0LmNvbYIH +ZG9nLmNvbTAdBgNVHQ4EFgQUnfYAFWyQnSN57IGokj7jcz8ChJQwCgYIKoZIzj0E +AwIDRwAwRAIgQr+Ly2cH04CncbnbhUf4hBl5frTp1pXgGnn8dYjd+UcCICuunEtp +H/a42/sVGBFvjS6FOFe6ZDs4oWBNEqQSw0S2 +-----END CERTIFICATE----- From ac0d8d363662a093472f4fe006498e26a0f8a8f0 Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Fri, 30 Aug 2024 19:08:00 +0200 Subject: [PATCH 2/5] Fix clippy lint warning --- pingora-core/benches/tls_acceptor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs index c1e7e149..bfd2b00d 100644 --- a/pingora-core/benches/tls_acceptor.rs +++ b/pingora-core/benches/tls_acceptor.rs @@ -97,7 +97,7 @@ async fn tls_post_data(client_reuse: bool, version: Version, data: Vec) // wait for all responses while let Some(res) = req_set.join_next().await { - let _ = res.unwrap(); + res.unwrap(); } } From 93c557ba5116df4e81cef8b5cec9d0bf4d9b321e Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Fri, 30 Aug 2024 19:42:15 +0200 Subject: [PATCH 3/5] Fix clippy lint warning for 1.72.0 --- pingora-core/benches/tls_acceptor.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs index bfd2b00d..f6b6009d 100644 --- a/pingora-core/benches/tls_acceptor.rs +++ b/pingora-core/benches/tls_acceptor.rs @@ -4,7 +4,7 @@ use iai_callgrind::{ }; use iai_callgrind::{Pipe, Stdin}; use once_cell::sync::Lazy; -use reqwest::{Certificate, Client, StatusCode, Version}; +use reqwest::{Certificate, Client, Response, StatusCode, Version}; use std::fs::File; use std::io::Read; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -51,11 +51,11 @@ pub static CLIENT_HTTP2: Lazy = Lazy::new(client_http2); /// using with client: None instantiates a new client and performs a full handshake /// providing Some(client) will re-use the provided client/session async fn post_data(client_reuse: bool, version: Version, port: u16, data: String) { - let client = if client_reuse { + let resp = if client_reuse { // NOTE: do not perform TLS handshake for each request match version { - Version::HTTP_11 => &*CLIENT_HTTP11, - Version::HTTP_2 => &*CLIENT_HTTP2, + Version::HTTP_11 => do_post(port, data, &CLIENT_HTTP11).await, + Version::HTTP_2 => do_post(port, data, &CLIENT_HTTP2).await, _ => { panic!("HTTP version not supported.") } @@ -63,21 +63,14 @@ async fn post_data(client_reuse: bool, version: Version, port: u16, data: String } else { // NOTE: perform TLS handshake for each request match version { - Version::HTTP_11 => &client_http11(), - Version::HTTP_2 => &client_http2(), + Version::HTTP_11 => do_post(port, data, &client_http11()).await, + Version::HTTP_2 => do_post(port, data, &client_http2()).await, _ => { panic!("HTTP version not supported.") } } }; - let resp = client - .post(format! {"https://openrusty.org:{}", port}) - .body(data) - .send() - .await - .unwrap(); - assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.version(), version); @@ -86,6 +79,15 @@ async fn post_data(client_reuse: bool, version: Version, port: u16, data: String // println!("resp_body: {}", resp_body) } +async fn do_post(port: u16, data: String, client: &Client) -> Response { + client + .post(format! {"https://openrusty.org:{}", port}) + .body(data) + .send() + .await + .unwrap() +} + async fn tls_post_data(client_reuse: bool, version: Version, data: Vec) { let port = version_to_port(version); From 9f3922f04f5fdefcabd94d273d06ac295a68a079 Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Fri, 6 Sep 2024 17:05:08 +0200 Subject: [PATCH 4/5] Add support for parallel benchmarking, enable DHAT --- pingora-core/benches/tls_acceptor.rs | 308 ++++++++++++++++---------- pingora-core/benches/tls_connector.rs | 186 +++++++++------- pingora-core/benches/utils/mod.rs | 51 +++-- pingora-core/examples/bench_client.rs | 172 +++++++------- pingora-core/examples/bench_server.rs | 79 ++++--- 5 files changed, 475 insertions(+), 321 deletions(-) diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs index f6b6009d..13ee6b95 100644 --- a/pingora-core/benches/tls_acceptor.rs +++ b/pingora-core/benches/tls_acceptor.rs @@ -1,109 +1,97 @@ +use ahash::{HashMap, HashMapExt}; use iai_callgrind::{ binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, - FlamegraphConfig, + FlamegraphConfig, Stdio, Tool, ValgrindTool, }; use iai_callgrind::{Pipe, Stdin}; -use once_cell::sync::Lazy; -use reqwest::{Certificate, Client, Response, StatusCode, Version}; +use log::{debug, info, LevelFilter}; +use regex::Regex; +use reqwest::blocking::Client; +use reqwest::{Certificate, Error, Response, StatusCode, Version}; +use std::env; use std::fs::File; -use std::io::Read; +use std::io::{Read, Stdout}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::path::PathBuf; use tokio::task::JoinSet; mod utils; use utils::{ - generate_random_ascii_data, version_to_port, wait_for_tcp_connect, CERT_PATH, TLS_HTTP11_PORT, - TLS_HTTP2_PORT, + generate_random_ascii_data, http_version_to_port, wait_for_tcp_connect, CERT_PATH, + TLS_HTTP11_PORT, TLS_HTTP2_PORT, }; -fn read_cert() -> Certificate { - let mut buf = Vec::new(); - File::open(CERT_PATH.to_string()) - .unwrap() - .read_to_end(&mut buf) - .unwrap(); - Certificate::from_pem(&buf).unwrap() -} - -fn client_http11() -> Client { - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), TLS_HTTP11_PORT); - Client::builder() - .resolve_to_addrs("openrusty.org", &[socket]) - .add_root_certificate(read_cert()) - .build() - .unwrap() -} -fn client_http2() -> Client { - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), TLS_HTTP2_PORT); - Client::builder() - .resolve_to_addrs("openrusty.org", &[socket]) - .add_root_certificate(read_cert()) - // avoid error messages during first set of connections (os error 32, broken pipe) - .http2_prior_knowledge() - .build() - .unwrap() -} - -pub static CLIENT_HTTP11: Lazy = Lazy::new(client_http11); -pub static CLIENT_HTTP2: Lazy = Lazy::new(client_http2); - -/// using with client: None instantiates a new client and performs a full handshake -/// providing Some(client) will re-use the provided client/session -async fn post_data(client_reuse: bool, version: Version, port: u16, data: String) { - let resp = if client_reuse { - // NOTE: do not perform TLS handshake for each request - match version { - Version::HTTP_11 => do_post(port, data, &CLIENT_HTTP11).await, - Version::HTTP_2 => do_post(port, data, &CLIENT_HTTP2).await, - _ => { - panic!("HTTP version not supported.") - } - } - } else { - // NOTE: perform TLS handshake for each request - match version { - Version::HTTP_11 => do_post(port, data, &client_http11()).await, - Version::HTTP_2 => do_post(port, data, &client_http2()).await, - _ => { - panic!("HTTP version not supported.") - } - } - }; - - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.version(), version); +main!(binary_benchmark_groups = tls_acceptor); - // read full response, important for consistent tests - let _resp_body = resp.text().await.unwrap(); - // println!("resp_body: {}", resp_body) -} +binary_benchmark_group!( + name = tls_acceptor; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .tool(Tool::new(ValgrindTool::DHAT)) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //"--toggle-collect=pingora_core::services::listening::Service::run_endpoint*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_server_sequential, bench_server_parallel +); -async fn do_post(port: u16, data: String, client: &Client) -> Response { - client - .post(format! {"https://openrusty.org:{}", port}) - .body(data) - .send() - .await - .unwrap() +static SEQUENTIAL_REQUEST_COUNT: i32 = 64; +static SEQUENTIAL_REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::http_11_handshake_always(args = (Version::HTTP_11), + setup = send_requests(1, false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::http_11_handshake_once(args = (Version::HTTP_11), + setup = send_requests(1, true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::http_2_handshake_always(args = (Version::HTTP_2), + setup = send_requests(1, false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::http_2_handshake_once(args = (Version::HTTP_2), + setup = send_requests(1, true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +fn bench_server_sequential(http_version: Version) -> Command { + let path = format!( + "{}/../target/release/examples/bench_server", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([format!("--http-version={:?}", http_version)]) + .build() } -async fn tls_post_data(client_reuse: bool, version: Version, data: Vec) { - let port = version_to_port(version); - - let mut req_set = JoinSet::new(); - // spawn request for all elements within data - data.iter().for_each(|d| { - req_set.spawn(post_data(client_reuse, version, port, d.to_string())); - }); - - // wait for all responses - while let Some(res) = req_set.join_next().await { - res.unwrap(); - } +static PARALLEL_ACCEPTORS: u16 = 16; +static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_ACCEPTORS as i32; +static PARALLEL_REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::http_11_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), + setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::http_11_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), + setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::http_2_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), + setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::http_2_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), + setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +fn bench_server_parallel(parallel_acceptors: u16, http_version: Version) -> Command { + let path = format!( + "{}/../target/release/examples/bench_server", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([ + format!("--http-version={:?}", http_version), + format!("--parallel-acceptors={}", parallel_acceptors), + ]) + .build() } -fn run_benchmark_requests( +fn send_requests( + parallel_acceptors: u16, client_reuse: bool, http_version: Version, request_count: i32, @@ -115,51 +103,131 @@ fn run_benchmark_requests( .unwrap() .block_on(async { println!("Waiting for TCP connect..."); - wait_for_tcp_connect(http_version).await; + wait_for_tcp_connect(http_version, parallel_acceptors).await; println!("TCP connect successful."); - println!("Starting to send benchmark requests."); + println!("Sending benchmark requests..."); tls_post_data( + parallel_acceptors, client_reuse, http_version, generate_random_ascii_data(request_count, request_size), ) .await; - println!("Successfully sent benchmark requests."); + println!("Benchmark requests successfully sent."); + println!("Waiting for server(s) to gracefully shutdown..."); }) } -static REQUEST_COUNT: i32 = 128; -static REQUEST_SIZE: usize = 64; -#[binary_benchmark] -#[bench::http_11_handshake_always(setup = run_benchmark_requests(false, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE))] -#[bench::http_11_handshake_once(setup = run_benchmark_requests(true, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE))] -#[bench::http_2_handshake_always(setup = run_benchmark_requests(false, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE))] -#[bench::http_2_handshake_once(setup = run_benchmark_requests(true, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE))] -fn bench_server() -> Command { - let path = format!( - "{}/../target/release/examples/bench_server", - env!("CARGO_MANIFEST_DIR") - ); - Command::new(path) - // TODO: currently a workaround to keep the setup function running parallel with benchmark execution - .stdin(Stdin::Setup(Pipe::Stderr)) +async fn tls_post_data( + parallel_acceptors: u16, + client_reuse: bool, + http_version: Version, + data: Vec, +) { + if http_version != Version::HTTP_2 && http_version != Version::HTTP_11 { + panic!("HTTP version not supported"); + } + let base_port = http_version_to_port(http_version); + + let mut clients: HashMap = HashMap::new(); + if client_reuse { + for i in 0..parallel_acceptors { + if http_version == Version::HTTP_11 { + clients.insert(i.to_string(), client_http11(base_port + i)); + } + if http_version == Version::HTTP_2 { + clients.insert(i.to_string(), client_http2(base_port + i)); + }; + } + }; + + let mut req_set = JoinSet::new(); + for i in 0..parallel_acceptors { + // spawn request for all elements within data + for d in data.iter() { + if client_reuse { + // reuse same connection & avoid new handshake + let reuse_client = clients.get(&i.to_string()).unwrap(); + req_set.spawn(post_data( + reuse_client.to_owned(), + http_version, + base_port + i, + d.to_string(), + )); + } else { + // always create a new client to ensure handshake is performed + if http_version == Version::HTTP_11 { + req_set.spawn(post_data( + client_http11(base_port + i), + http_version, + base_port + i, + d.to_string(), + )); + } + if http_version == Version::HTTP_2 { + req_set.spawn(post_data( + client_http2(base_port + i), + http_version, + base_port + i, + d.to_string(), + )); + }; + } + } + } + // wait for all responses + while let Some(res) = req_set.join_next().await { + res.unwrap(); + } +} + +async fn post_data(client: Client, version: Version, port: u16, data: String) { + // using blocking client to reuse same connection in case of client_reuse=true + // async client does not ensure to use the same connection, will start a new one + // in case the existing is still blocked + let resp = client + .post(format! {"https://openrusty.org:{}", port}) + .body(data) + .send() + .unwrap_or_else(|err| { + println!("HTTP client error: {err}"); + panic!("error: {err}"); + }); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.version(), version); + + // read full response, important for consistent tests + let _resp_body = resp.text().unwrap(); + // println!("resp_body: {}", resp_body) +} + +fn client_http11(port: u16) -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) .build() + .unwrap() } -binary_benchmark_group!( - name = tls_acceptor; - config = BinaryBenchmarkConfig::default() - .flamegraph(FlamegraphConfig::default()) - .raw_callgrind_args(["" - // NOTE: toggle values can be extracted from .out files - // see '^fn=' values, need to be suffixed with '*' or '()' - // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u - //"--toggle-collect=pingora_core::services::listening::Service::run_endpoint*" - // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() - //"--instr-atstart=no" - ]); - benchmarks = bench_server -); +fn client_http2(port: u16) -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) + // avoid error messages during first set of connections (os error 32, broken pipe) + .http2_prior_knowledge() + .build() + .unwrap() +} -main!(binary_benchmark_groups = tls_acceptor); +fn read_cert() -> Certificate { + let mut buf = Vec::new(); + File::open(CERT_PATH.to_string()) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + Certificate::from_pem(&buf).unwrap() +} diff --git a/pingora-core/benches/tls_connector.rs b/pingora-core/benches/tls_connector.rs index 745fd820..09a6790d 100644 --- a/pingora-core/benches/tls_connector.rs +++ b/pingora-core/benches/tls_connector.rs @@ -4,81 +4,84 @@ use axum_server::tls_rustls::RustlsConfig; use axum_server::Handle; use iai_callgrind::{ binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, - FlamegraphConfig, + FlamegraphConfig, Tool, ValgrindTool, }; use iai_callgrind::{Pipe, Stdin}; -use once_cell::sync::Lazy; use reqwest::Version; use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; +use tokio::task::JoinSet; -static CERT_PATH: Lazy = Lazy::new(|| { - format!( - "{}/../pingora-proxy/tests/utils/conf/keys/server_rustls.crt", - env!("CARGO_MANIFEST_DIR") - ) -}); -static KEY_PATH: Lazy = Lazy::new(|| { - format!( - "{}/../pingora-proxy/tests/utils/conf/keys/key.pem", - env!("CARGO_MANIFEST_DIR") - ) -}); +mod utils; -static TLS_HTTP11_PORT: u16 = 6204; -static TLS_HTTP2_PORT: u16 = 6205; +use utils::{CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; -async fn graceful_shutdown(handle: Handle) { - tokio::time::sleep(Duration::from_secs(10)).await; +main!(binary_benchmark_groups = tls_connector); - println!("Sending graceful shutdown signal."); - handle.graceful_shutdown(None); -} +binary_benchmark_group!( + name = tls_connector; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .tool(Tool::new(ValgrindTool::DHAT)) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //, "--toggle-collect=bench_client::post_http*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_client_sequential, bench_client_parallel +); -fn run_benchmark_server() { - tokio::runtime::Builder::new_multi_thread() - .enable_all() +static SEQUENTIAL_REQUEST_COUNT: i32 = 64; +static SEQUENTIAL_REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::http_11_handshake_always(setup = start_servers(Version::HTTP_11, 1), + args = [false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::http_11_handshake_once(setup = start_servers(Version::HTTP_11, 1), + args = [true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::http_2_handshake_always(setup = start_servers(Version::HTTP_2, 1), + args = [false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::http_2_handshake_once(setup = start_servers(Version::HTTP_2, 1), + args = [true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +fn bench_client_sequential( + stream_reuse: bool, + http_version: Version, + request_count: i32, + request_size: usize, +) -> Command { + let path = format!( + "{}/../target/release/examples/bench_client", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([ + format!("--stream-reuse={}", stream_reuse), + format!("--http-version={:?}", http_version), + format!("--request-count={}", request_count), + format!("--request-size={}", request_size), + ]) .build() - .unwrap() - .block_on(async { - let addr_http11 = SocketAddr::from(([127, 0, 0, 1], TLS_HTTP11_PORT)); - let addr_http2 = SocketAddr::from(([127, 0, 0, 1], TLS_HTTP2_PORT)); - - let app = Router::new().route("/", post(|body: String| async { body })); - - let handle_http11 = Handle::new(); - let handle_http2 = Handle::new(); - - // configure certificate and private key used by https - let config = - RustlsConfig::from_pem_file(PathBuf::from(&*CERT_PATH), PathBuf::from(&*KEY_PATH)) - .await - .unwrap(); - - let (http11_server, _, http2_server, _) = tokio::join!( - axum_server::bind_rustls(addr_http11, config.clone()) - .handle(handle_http11.clone()) - .serve(app.clone().into_make_service()), - graceful_shutdown(handle_http11), - axum_server::bind_rustls(addr_http2, config) - .handle(handle_http2.clone()) - .serve(app.into_make_service()), - graceful_shutdown(handle_http2) - ); - http11_server.unwrap(); - http2_server.unwrap(); - }); } -static REQUEST_COUNT: i32 = 128; -static REQUEST_SIZE: usize = 64; +static PARALLEL_CONNECTORS: u16 = 16; +static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_CONNECTORS as i32; +static PARALLEL_REQUEST_SIZE: usize = 64; #[binary_benchmark] -#[bench::http_11_handshake_always(setup = run_benchmark_server(), args = [false, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE])] -#[bench::http_11_handshake_once(setup = run_benchmark_server(), args = [true, Version::HTTP_11, REQUEST_COUNT, REQUEST_SIZE])] -#[bench::http_2_handshake_always(setup = run_benchmark_server(), args = [false, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE])] -#[bench::http_2_handshake_once(setup = run_benchmark_server(), args = [true, Version::HTTP_2, REQUEST_COUNT, REQUEST_SIZE])] -fn bench_client( +#[bench::http_11_handshake_always(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::http_11_handshake_once(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::http_2_handshake_always(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::http_2_handshake_once(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +fn bench_client_parallel( + parallel_connectors: u16, stream_reuse: bool, http_version: Version, request_count: i32, @@ -92,6 +95,7 @@ fn bench_client( // TODO: currently a workaround to keep the setup function running parallel with benchmark execution .stdin(Stdin::Setup(Pipe::Stderr)) .args([ + format!("--parallel-connectors={}", parallel_connectors), format!("--stream-reuse={}", stream_reuse), format!("--http-version={:?}", http_version), format!("--request-count={}", request_count), @@ -100,19 +104,51 @@ fn bench_client( .build() } -binary_benchmark_group!( - name = tls_connector; - config = BinaryBenchmarkConfig::default() - .flamegraph(FlamegraphConfig::default()) - .raw_callgrind_args(["" - // NOTE: toggle values can be extracted from .out files - // see '^fn=' values, need to be suffixed with '*' or '()' - // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u - //, "--toggle-collect=bench_client::post_http*" - // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() - //"--instr-atstart=no" - ]); - benchmarks = bench_client -); +fn start_servers(http_version: Version, parallel_connectors: u16) { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut server_set = JoinSet::new(); -main!(binary_benchmark_groups = tls_connector); + for i in 0..parallel_connectors { + server_set.spawn(start_server(http_version, i)); + } + + while !server_set.is_empty() { + server_set.join_next().await; + } + }); +} + +async fn start_server(http_version: Version, port_offset: u16) { + let addr = match http_version { + Version::HTTP_11 => SocketAddr::from(([127, 0, 0, 1], TLS_HTTP11_PORT + port_offset)), + Version::HTTP_2 => SocketAddr::from(([127, 0, 0, 1], TLS_HTTP2_PORT + port_offset)), + _ => { + panic!("HTTP version not supported.") + } + }; + + let app = Router::new().route("/", post(|body: String| async { body })); + let handle = Handle::new(); + + // configure certificate and private key used by https + let config = RustlsConfig::from_pem_file(PathBuf::from(&*CERT_PATH), PathBuf::from(&*KEY_PATH)) + .await + .unwrap(); + + let (http_server, _) = tokio::join!( + axum_server::bind_rustls(addr, config.clone()) + .handle(handle.clone()) + .serve(app.into_make_service()), + graceful_shutdown(handle), + ); + http_server.unwrap(); +} + +async fn graceful_shutdown(handle: Handle) { + tokio::time::sleep(Duration::from_secs(10)).await; + handle.graceful_shutdown(None); +} diff --git a/pingora-core/benches/utils/mod.rs b/pingora-core/benches/utils/mod.rs index bff060be..334090cb 100644 --- a/pingora-core/benches/utils/mod.rs +++ b/pingora-core/benches/utils/mod.rs @@ -1,8 +1,12 @@ +#![allow(dead_code)] + use once_cell::sync::Lazy; use rand::distributions::{Alphanumeric, DistString}; use reqwest::Version; +use std::io::ErrorKind::Unsupported; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::time::Duration; +use tokio::task::JoinSet; pub static CERT_PATH: Lazy = Lazy::new(|| { format!( @@ -10,26 +14,42 @@ pub static CERT_PATH: Lazy = Lazy::new(|| { env!("CARGO_MANIFEST_DIR") ) }); -#[allow(dead_code)] + pub static KEY_PATH: Lazy = Lazy::new(|| { format!( "{}/../pingora-proxy/tests/utils/conf/keys/key.pem", env!("CARGO_MANIFEST_DIR") ) }); -pub static TLS_HTTP11_PORT: u16 = 6204; -pub static TLS_HTTP2_PORT: u16 = 6205; +pub static TLS_HTTP11_PORT: u16 = 6100; +pub static TLS_HTTP2_PORT: u16 = 7200; -pub fn generate_random_ascii_data(count: i32, len: usize) -> Vec { - let mut random_data = vec![]; - for _i in 0..count { - let random_string = Alphanumeric.sample_string(&mut rand::thread_rng(), len); - random_data.push(random_string) +pub fn http_version_parser(version: &str) -> Result { + match version { + "HTTP/1.1" => Ok(Version::HTTP_11), + "HTTP/2.0" => Ok(Version::HTTP_2), + _ => Err(std::io::Error::from(Unsupported)), + } +} + +pub async fn wait_for_tcp_connect(version: Version, parallelism: u16) { + let port = http_version_to_port(version); + + let mut wait_set = JoinSet::new(); + for i in 0..parallelism { + wait_set.spawn(async move { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port + i); + while let Err(_err) = TcpStream::connect(addr) { + let _ = tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + } + while !wait_set.is_empty() { + wait_set.join_next().await; } - random_data } -pub fn version_to_port(version: Version) -> u16 { +pub fn http_version_to_port(version: Version) -> u16 { match version { Version::HTTP_11 => TLS_HTTP11_PORT, Version::HTTP_2 => TLS_HTTP2_PORT, @@ -39,10 +59,11 @@ pub fn version_to_port(version: Version) -> u16 { } } -pub async fn wait_for_tcp_connect(version: Version) { - let port = version_to_port(version); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); - while let Err(_err) = TcpStream::connect(addr) { - let _ = tokio::time::sleep(Duration::from_millis(100)).await; +pub fn generate_random_ascii_data(count: i32, len: usize) -> Vec { + let mut random_data = vec![]; + for _i in 0..count { + let random_string = Alphanumeric.sample_string(&mut rand::thread_rng(), len); + random_data.push(random_string) } + random_data } diff --git a/pingora-core/examples/bench_client.rs b/pingora-core/examples/bench_client.rs index ff77730a..5e8c3c24 100644 --- a/pingora-core/examples/bench_client.rs +++ b/pingora-core/examples/bench_client.rs @@ -10,63 +10,106 @@ use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV11; use pingora_core::protocols::http::v2::client::Http2Session; use pingora_http::RequestHeader; use reqwest::Version; -use std::io::ErrorKind::Unsupported; #[allow(dead_code, unused_imports)] #[path = "../benches/utils/mod.rs"] mod bench_utils; +use crate::bench_utils::TLS_HTTP11_PORT; use bench_utils::{ - generate_random_ascii_data, wait_for_tcp_connect, CERT_PATH, KEY_PATH, TLS_HTTP2_PORT, + generate_random_ascii_data, http_version_parser, wait_for_tcp_connect, CERT_PATH, KEY_PATH, + TLS_HTTP2_PORT, }; use pingora_core::protocols::http::client::HttpSession; +#[derive(Parser, Debug)] +struct Args { + #[clap(long, parse(try_from_str = http_version_parser))] + http_version: Version, + + #[clap(long, action = clap::ArgAction::Set)] + stream_reuse: bool, + + #[clap(long)] + request_count: i32, + + #[clap(long)] + request_size: usize, + + #[clap(long, default_value = "1")] + parallel_connectors: u16, +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args = Args::parse(); + println!("{:?}", args); + + println!("Waiting for TCP connect..."); + wait_for_tcp_connect(args.http_version, args.parallel_connectors).await; + println!("TCP connect successful."); + + println!("Sending benchmark requests..."); + connector_tls_post_data( + args.stream_reuse, + args.http_version, + args.parallel_connectors, + generate_random_ascii_data(args.request_count, args.request_size), + ) + .await; + println!("Benchmark requests successfully sent."); + println!("Waiting for server(s) to gracefully shutdown..."); +} + +async fn connector_tls_post_data( + client_reuse: bool, + version: Version, + parallel_connectors: u16, + data: Vec, +) { + match version { + Version::HTTP_11 => { + for i in 0..parallel_connectors { + let (connector, peer) = connector_http11_session(i as i32).await; + post_http11(client_reuse, connector, peer, data.clone()).await; + } + } + Version::HTTP_2 => { + for i in 0..parallel_connectors { + let (connector, peer) = connector_http2(i as i32).await; + post_http2(client_reuse, connector, peer, data.clone()).await; + } + } + _ => { + panic!("HTTP version not supported.") + } + }; +} + const DEFAULT_POOL_SIZE: usize = 2; const HTTP_HOST: &str = "openrusty.org"; const SERVER_IP: &str = "127.0.0.1"; -fn get_tls_connector_options() -> ConnectorOptions { - let mut options = ConnectorOptions::new(DEFAULT_POOL_SIZE); - options.ca_file = Some(CERT_PATH.clone()); - options.cert_key_file = Some((CERT_PATH.clone(), KEY_PATH.clone())); - options -} - -async fn connector_http11_session() -> (ConnectorV11, HttpPeer) { +async fn connector_http11_session(port_offset: i32) -> (ConnectorV11, HttpPeer) { let connector = ConnectorV11::new(Some(get_tls_connector_options())); - let peer = get_http_peer(TLS_HTTP2_PORT as i32); + let peer = get_http_peer(TLS_HTTP11_PORT as i32 + port_offset); (connector, peer) } -async fn connector_http2() -> (ConnectorV2, HttpPeer) { +async fn connector_http2(port_offset: i32) -> (ConnectorV2, HttpPeer) { let connector = ConnectorV2::new(Some(get_tls_connector_options())); - let mut peer = get_http_peer(TLS_HTTP2_PORT as i32); + let mut peer = get_http_peer(TLS_HTTP2_PORT as i32 + port_offset); peer.options.set_http_version(2, 2); peer.options.max_h2_streams = 1; (connector, peer) } -async fn session_new_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { - let http2_session = connector.new_http_session(&peer).await.unwrap(); - match http2_session { - HttpSession::H1(_) => panic!("expect h2"), - HttpSession::H2(h2_stream) => h2_stream, - } -} - -async fn session_new_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { - let (http_session, reused) = connector.get_http_session(&peer).await.unwrap(); - assert!(!reused); - http_session -} - -async fn session_reuse_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { - connector.reused_http_session(&peer).await.unwrap().unwrap() -} - -async fn session_reuse_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { - connector.reused_http_session(&peer).await.unwrap() +fn get_tls_connector_options() -> ConnectorOptions { + let mut options = ConnectorOptions::new(DEFAULT_POOL_SIZE); + options.ca_file = Some(CERT_PATH.clone()); + options.cert_key_file = Some((CERT_PATH.clone(), KEY_PATH.clone())); + options } fn get_http_peer(port: i32) -> HttpPeer { @@ -121,7 +164,6 @@ async fn post_http11( } } } - async fn post_http2(client_reuse: bool, connector: ConnectorV2, peer: HttpPeer, data: Vec) { let mut first = true; for d in data { @@ -167,60 +209,24 @@ async fn post_http2(client_reuse: bool, connector: ConnectorV2, peer: HttpPeer, } } -async fn connector_tls_post_data(client_reuse: bool, version: Version, data: Vec) { - match version { - Version::HTTP_11 => { - let (connector, peer) = connector_http11_session().await; - post_http11(client_reuse, connector, peer, data).await; - } - Version::HTTP_2 => { - let (connector, peer) = connector_http2().await; - post_http2(client_reuse, connector, peer, data).await; - } - _ => { - panic!("HTTP version not supported.") - } - }; +async fn session_new_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + let (http_session, reused) = connector.get_http_session(&peer).await.unwrap(); + assert!(!reused); + http_session } -fn http_version_parser(version: &str) -> Result { - match version { - "HTTP/1.1" => Ok(Version::HTTP_11), - "HTTP/2.0" => Ok(Version::HTTP_2), - _ => Err(std::io::Error::from(Unsupported)), +async fn session_new_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + let http2_session = connector.new_http_session(&peer).await.unwrap(); + match http2_session { + HttpSession::H1(_) => panic!("expect h2"), + HttpSession::H2(h2_stream) => h2_stream, } } -#[derive(Parser, Debug)] -struct Args { - #[clap(long, parse(try_from_str = http_version_parser))] - http_version: Version, - - #[clap(long, action = clap::ArgAction::Set)] - stream_reuse: bool, - - #[clap(long)] - request_count: i32, - - #[clap(long)] - request_size: usize, +async fn session_reuse_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + connector.reused_http_session(&peer).await.unwrap().unwrap() } -#[tokio::main(flavor = "current_thread")] -async fn main() { - let args = Args::parse(); - println!("{:?}", args); - - println!("Waiting for TCP connect..."); - wait_for_tcp_connect(args.http_version).await; - println!("TCP connect successful."); - - println!("Starting to send benchmark requests."); - connector_tls_post_data( - args.stream_reuse, - args.http_version, - generate_random_ascii_data(args.request_count, args.request_size), - ) - .await; - println!("Successfully sent benchmark requests."); +async fn session_reuse_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + connector.reused_http_session(&peer).await.unwrap() } diff --git a/pingora-core/examples/bench_server.rs b/pingora-core/examples/bench_server.rs index c0f8d17a..9b918dab 100644 --- a/pingora-core/examples/bench_server.rs +++ b/pingora-core/examples/bench_server.rs @@ -1,5 +1,6 @@ use clap::Parser; -use pingora_core::listeners::Listeners; +use hyper::Version; +use pingora_core::listeners::{Listeners, TlsSettings}; use pingora_core::prelude::{Opt, Server}; use pingora_core::services::listening::Service; use std::env::current_dir; @@ -9,12 +10,12 @@ use std::time::Duration; #[allow(dead_code, unused_imports)] #[path = "../tests/utils/mod.rs"] mod test_utils; -use crate::test_utils::EchoApp; +use test_utils::EchoApp; #[allow(dead_code, unused_imports)] #[path = "../benches/utils/mod.rs"] mod bench_utils; -use bench_utils::{CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; +use bench_utils::{http_version_parser, CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; pub struct BenchServer { // Maybe useful in the future @@ -22,7 +23,16 @@ pub struct BenchServer { pub handle: thread::JoinHandle<()>, } -fn entry_point(opt: Option) { +#[derive(Parser, Debug)] +struct Args { + #[clap(long, parse(try_from_str = http_version_parser))] + http_version: Version, + + #[clap(long, default_value = "1")] + parallel_acceptors: u16, +} + +fn entry_point(opt: Option, args: Args) { env_logger::init(); let mut test_server = Server::new(opt).unwrap(); @@ -30,24 +40,35 @@ fn entry_point(opt: Option) { let mut listeners = Listeners::new(); - let tls_settings_h1 = - pingora_core::listeners::TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()) - .unwrap(); - let mut tls_settings_h2 = - pingora_core::listeners::TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()) - .unwrap(); - tls_settings_h2.enable_h2(); - - listeners.add_tls_with_settings( - format! {"0.0.0.0:{}", TLS_HTTP11_PORT}.as_str(), - None, - tls_settings_h1, - ); - listeners.add_tls_with_settings( - format! {"0.0.0.0:{}", TLS_HTTP2_PORT}.as_str(), - None, - tls_settings_h2, - ); + match args.http_version { + Version::HTTP_11 => { + for i in 0..args.parallel_acceptors { + let tls_settings = + TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()).unwrap(); + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP11_PORT + i}.as_str(), + None, + tls_settings, + ); + } + } + Version::HTTP_2 => { + for i in 0..args.parallel_acceptors { + let mut tls_settings = + TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()).unwrap(); + tls_settings.enable_h2(); + + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP2_PORT + i}.as_str(), + None, + tls_settings, + ); + } + } + _ => { + panic!("HTTP version not supported.") + } + }; let echo_service_http = Service::with_listeners("Echo Service HTTP".to_string(), listeners, EchoApp); @@ -58,16 +79,20 @@ fn entry_point(opt: Option) { impl BenchServer { pub fn start() -> Self { - println!("{:?}", current_dir().unwrap()); - let opts: Vec = vec![ + println!("WorkingDir: {:?}", current_dir().unwrap()); + + let opts = Opt::parse_from::, String>(vec![ "pingora".into(), "-c".into(), "benches/utils/pingora_conf.yaml".into(), - ]; + ]); println!("{:?}", opts); + let args = Args::parse(); + println!("{:?}", args); + let server_handle = thread::spawn(|| { - entry_point(Some(Opt::parse_from(opts))); + entry_point(Some(opts), args); }); // wait until the server is up thread::sleep(Duration::from_secs(2)); @@ -78,8 +103,6 @@ impl BenchServer { } fn main() { - println!("bench_server: starting."); let _server = BenchServer::start(); thread::sleep(Duration::from_secs(10)); - println!("bench_server: finished."); } From 36274b4b1514f187142ccc337fb5769d13e1a8ae Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Mon, 9 Sep 2024 13:16:12 +0200 Subject: [PATCH 5/5] fix: unify sequential/parallel benchmarks --- pingora-core/benches/tls_acceptor.rs | 53 +++++++++----------------- pingora-core/benches/tls_benchmarks.md | 1 + pingora-core/benches/tls_connector.rs | 51 +++++++------------------ pingora-core/examples/bench_client.rs | 1 - pingora-core/examples/bench_server.rs | 1 - 5 files changed, 32 insertions(+), 75 deletions(-) diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs index 13ee6b95..25b510fb 100644 --- a/pingora-core/benches/tls_acceptor.rs +++ b/pingora-core/benches/tls_acceptor.rs @@ -1,26 +1,20 @@ use ahash::{HashMap, HashMapExt}; use iai_callgrind::{ binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, - FlamegraphConfig, Stdio, Tool, ValgrindTool, + FlamegraphConfig, Tool, ValgrindTool, }; use iai_callgrind::{Pipe, Stdin}; -use log::{debug, info, LevelFilter}; -use regex::Regex; use reqwest::blocking::Client; -use reqwest::{Certificate, Error, Response, StatusCode, Version}; +use reqwest::{Certificate, StatusCode, Version}; use std::env; use std::fs::File; -use std::io::{Read, Stdout}; +use std::io::Read; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::path::PathBuf; use tokio::task::JoinSet; mod utils; -use utils::{ - generate_random_ascii_data, http_version_to_port, wait_for_tcp_connect, CERT_PATH, - TLS_HTTP11_PORT, TLS_HTTP2_PORT, -}; +use utils::{generate_random_ascii_data, http_version_to_port, wait_for_tcp_connect, CERT_PATH}; main!(binary_benchmark_groups = tls_acceptor); @@ -37,45 +31,32 @@ binary_benchmark_group!( // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() //"--instr-atstart=no" ]); - benchmarks = bench_server_sequential, bench_server_parallel + benchmarks = bench_server ); static SEQUENTIAL_REQUEST_COUNT: i32 = 64; static SEQUENTIAL_REQUEST_SIZE: usize = 64; +static PARALLEL_ACCEPTORS: u16 = 16; +static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_ACCEPTORS as i32; +static PARALLEL_REQUEST_SIZE: usize = 64; #[binary_benchmark] -#[bench::http_11_handshake_always(args = (Version::HTTP_11), +#[bench::seq_http_11_handshake_always(args = (1, Version::HTTP_11), setup = send_requests(1, false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] -#[bench::http_11_handshake_once(args = (Version::HTTP_11), +#[bench::seq_http_11_handshake_once(args = (1, Version::HTTP_11), setup = send_requests(1, true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] -#[bench::http_2_handshake_always(args = (Version::HTTP_2), +#[bench::seq_http_2_handshake_always(args = (1, Version::HTTP_2), setup = send_requests(1, false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] -#[bench::http_2_handshake_once(args = (Version::HTTP_2), +#[bench::seq_http_2_handshake_once(args = (1, Version::HTTP_2), setup = send_requests(1, true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] -fn bench_server_sequential(http_version: Version) -> Command { - let path = format!( - "{}/../target/release/examples/bench_server", - env!("CARGO_MANIFEST_DIR") - ); - Command::new(path) - // TODO: currently a workaround to keep the setup function running parallel with benchmark execution - .stdin(Stdin::Setup(Pipe::Stderr)) - .args([format!("--http-version={:?}", http_version)]) - .build() -} - -static PARALLEL_ACCEPTORS: u16 = 16; -static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_ACCEPTORS as i32; -static PARALLEL_REQUEST_SIZE: usize = 64; -#[binary_benchmark] -#[bench::http_11_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), +#[bench::par_http_11_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] -#[bench::http_11_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), +#[bench::par_http_11_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] -#[bench::http_2_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), +#[bench::par_http_2_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] -#[bench::http_2_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), +#[bench::par_http_2_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] -fn bench_server_parallel(parallel_acceptors: u16, http_version: Version) -> Command { +fn bench_server(parallel_acceptors: u16, http_version: Version) -> Command { let path = format!( "{}/../target/release/examples/bench_server", env!("CARGO_MANIFEST_DIR") diff --git a/pingora-core/benches/tls_benchmarks.md b/pingora-core/benches/tls_benchmarks.md index 4c012ef6..ac5e85e1 100644 --- a/pingora-core/benches/tls_benchmarks.md +++ b/pingora-core/benches/tls_benchmarks.md @@ -131,6 +131,7 @@ target/iai/ ### Parameters Server and client benchmark are parameterized with the following options: +- number of parallel acceptors/connectors and servers/clients - client/session re-use - HTTP version `1.1|2.0` - number of requests diff --git a/pingora-core/benches/tls_connector.rs b/pingora-core/benches/tls_connector.rs index 09a6790d..0200df9a 100644 --- a/pingora-core/benches/tls_connector.rs +++ b/pingora-core/benches/tls_connector.rs @@ -32,55 +32,32 @@ binary_benchmark_group!( // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() //"--instr-atstart=no" ]); - benchmarks = bench_client_sequential, bench_client_parallel + benchmarks = bench_client ); static SEQUENTIAL_REQUEST_COUNT: i32 = 64; static SEQUENTIAL_REQUEST_SIZE: usize = 64; -#[binary_benchmark] -#[bench::http_11_handshake_always(setup = start_servers(Version::HTTP_11, 1), - args = [false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] -#[bench::http_11_handshake_once(setup = start_servers(Version::HTTP_11, 1), - args = [true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] -#[bench::http_2_handshake_always(setup = start_servers(Version::HTTP_2, 1), - args = [false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] -#[bench::http_2_handshake_once(setup = start_servers(Version::HTTP_2, 1), - args = [true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] -fn bench_client_sequential( - stream_reuse: bool, - http_version: Version, - request_count: i32, - request_size: usize, -) -> Command { - let path = format!( - "{}/../target/release/examples/bench_client", - env!("CARGO_MANIFEST_DIR") - ); - Command::new(path) - // TODO: currently a workaround to keep the setup function running parallel with benchmark execution - .stdin(Stdin::Setup(Pipe::Stderr)) - .args([ - format!("--stream-reuse={}", stream_reuse), - format!("--http-version={:?}", http_version), - format!("--request-count={}", request_count), - format!("--request-size={}", request_size), - ]) - .build() -} - static PARALLEL_CONNECTORS: u16 = 16; static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_CONNECTORS as i32; static PARALLEL_REQUEST_SIZE: usize = 64; #[binary_benchmark] -#[bench::http_11_handshake_always(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), +#[bench::seq_http_11_handshake_always(setup = start_servers(Version::HTTP_11, 1), + args = [1, false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_11_handshake_once(setup = start_servers(Version::HTTP_11, 1), + args = [1, true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_2_handshake_always(setup = start_servers(Version::HTTP_2, 1), + args = [1, false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_2_handshake_once(setup = start_servers(Version::HTTP_2, 1), + args = [1, true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::par_http_11_handshake_always(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), args = [PARALLEL_CONNECTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] -#[bench::http_11_handshake_once(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), +#[bench::par_http_11_handshake_once(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), args = [PARALLEL_CONNECTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] -#[bench::http_2_handshake_always(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), +#[bench::par_http_2_handshake_always(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), args = [PARALLEL_CONNECTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] -#[bench::http_2_handshake_once(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), +#[bench::par_http_2_handshake_once(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), args = [PARALLEL_CONNECTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] -fn bench_client_parallel( +fn bench_client( parallel_connectors: u16, stream_reuse: bool, http_version: Version, diff --git a/pingora-core/examples/bench_client.rs b/pingora-core/examples/bench_client.rs index 5e8c3c24..e50bb222 100644 --- a/pingora-core/examples/bench_client.rs +++ b/pingora-core/examples/bench_client.rs @@ -11,7 +11,6 @@ use pingora_core::protocols::http::v2::client::Http2Session; use pingora_http::RequestHeader; use reqwest::Version; -#[allow(dead_code, unused_imports)] #[path = "../benches/utils/mod.rs"] mod bench_utils; use crate::bench_utils::TLS_HTTP11_PORT; diff --git a/pingora-core/examples/bench_server.rs b/pingora-core/examples/bench_server.rs index 9b918dab..eb117611 100644 --- a/pingora-core/examples/bench_server.rs +++ b/pingora-core/examples/bench_server.rs @@ -12,7 +12,6 @@ use std::time::Duration; mod test_utils; use test_utils::EchoApp; -#[allow(dead_code, unused_imports)] #[path = "../benches/utils/mod.rs"] mod bench_utils; use bench_utils::{http_version_parser, CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT};