diff --git a/pingora-core/Cargo.toml b/pingora-core/Cargo.toml index c5bce77e..208d97d5 100644 --- a/pingora-core/Cargo.toml +++ b/pingora-core/Cargo.toml @@ -76,9 +76,27 @@ reqwest = { version = "0.11", features = ["rustls"], default-features = false } hyperlocal = "0.8" hyper = "0.14" jemallocator = "0.5" +iai-callgrind = "0.13.1" +axum = { version = "0.7.5", features = ["http2"] } +axum-server = { version = "0.7.1", features = ["tls-rustls"] } [features] default = ["openssl"] openssl = ["pingora-openssl"] boringssl = ["pingora-boringssl"] -patched_http1 = [] \ No newline at end of file +patched_http1 = [] + +[[bench]] +name = "tls_connector" +harness = false + +[[example]] +name = "bench_server" + + +[[bench]] +name = "tls_acceptor" +harness = false + +[[example]] +name = "bench_client" \ No newline at end of file diff --git a/pingora-core/benches/tls_acceptor.rs b/pingora-core/benches/tls_acceptor.rs new file mode 100644 index 00000000..25b510fb --- /dev/null +++ b/pingora-core/benches/tls_acceptor.rs @@ -0,0 +1,214 @@ +use ahash::{HashMap, HashMapExt}; +use iai_callgrind::{ + binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, + FlamegraphConfig, Tool, ValgrindTool, +}; +use iai_callgrind::{Pipe, Stdin}; +use reqwest::blocking::Client; +use reqwest::{Certificate, StatusCode, Version}; +use std::env; +use std::fs::File; +use std::io::Read; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use tokio::task::JoinSet; + +mod utils; + +use utils::{generate_random_ascii_data, http_version_to_port, wait_for_tcp_connect, CERT_PATH}; + +main!(binary_benchmark_groups = tls_acceptor); + +binary_benchmark_group!( + name = tls_acceptor; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .tool(Tool::new(ValgrindTool::DHAT)) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //"--toggle-collect=pingora_core::services::listening::Service::run_endpoint*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_server +); + +static SEQUENTIAL_REQUEST_COUNT: i32 = 64; +static SEQUENTIAL_REQUEST_SIZE: usize = 64; +static PARALLEL_ACCEPTORS: u16 = 16; +static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_ACCEPTORS as i32; +static PARALLEL_REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::seq_http_11_handshake_always(args = (1, Version::HTTP_11), + setup = send_requests(1, false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::seq_http_11_handshake_once(args = (1, Version::HTTP_11), + setup = send_requests(1, true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::seq_http_2_handshake_always(args = (1, Version::HTTP_2), + setup = send_requests(1, false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::seq_http_2_handshake_once(args = (1, Version::HTTP_2), + setup = send_requests(1, true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE))] +#[bench::par_http_11_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), + setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::par_http_11_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_11), + setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::par_http_2_handshake_always(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), + setup = send_requests(PARALLEL_ACCEPTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +#[bench::par_http_2_handshake_once(args = (PARALLEL_ACCEPTORS, Version::HTTP_2), + setup = send_requests(PARALLEL_ACCEPTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE))] +fn bench_server(parallel_acceptors: u16, http_version: Version) -> Command { + let path = format!( + "{}/../target/release/examples/bench_server", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([ + format!("--http-version={:?}", http_version), + format!("--parallel-acceptors={}", parallel_acceptors), + ]) + .build() +} + +fn send_requests( + parallel_acceptors: u16, + client_reuse: bool, + http_version: Version, + request_count: i32, + request_size: usize, +) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + println!("Waiting for TCP connect..."); + wait_for_tcp_connect(http_version, parallel_acceptors).await; + println!("TCP connect successful."); + + println!("Sending benchmark requests..."); + tls_post_data( + parallel_acceptors, + client_reuse, + http_version, + generate_random_ascii_data(request_count, request_size), + ) + .await; + println!("Benchmark requests successfully sent."); + println!("Waiting for server(s) to gracefully shutdown..."); + }) +} + +async fn tls_post_data( + parallel_acceptors: u16, + client_reuse: bool, + http_version: Version, + data: Vec, +) { + if http_version != Version::HTTP_2 && http_version != Version::HTTP_11 { + panic!("HTTP version not supported"); + } + let base_port = http_version_to_port(http_version); + + let mut clients: HashMap = HashMap::new(); + if client_reuse { + for i in 0..parallel_acceptors { + if http_version == Version::HTTP_11 { + clients.insert(i.to_string(), client_http11(base_port + i)); + } + if http_version == Version::HTTP_2 { + clients.insert(i.to_string(), client_http2(base_port + i)); + }; + } + }; + + let mut req_set = JoinSet::new(); + for i in 0..parallel_acceptors { + // spawn request for all elements within data + for d in data.iter() { + if client_reuse { + // reuse same connection & avoid new handshake + let reuse_client = clients.get(&i.to_string()).unwrap(); + req_set.spawn(post_data( + reuse_client.to_owned(), + http_version, + base_port + i, + d.to_string(), + )); + } else { + // always create a new client to ensure handshake is performed + if http_version == Version::HTTP_11 { + req_set.spawn(post_data( + client_http11(base_port + i), + http_version, + base_port + i, + d.to_string(), + )); + } + if http_version == Version::HTTP_2 { + req_set.spawn(post_data( + client_http2(base_port + i), + http_version, + base_port + i, + d.to_string(), + )); + }; + } + } + } + // wait for all responses + while let Some(res) = req_set.join_next().await { + res.unwrap(); + } +} + +async fn post_data(client: Client, version: Version, port: u16, data: String) { + // using blocking client to reuse same connection in case of client_reuse=true + // async client does not ensure to use the same connection, will start a new one + // in case the existing is still blocked + let resp = client + .post(format! {"https://openrusty.org:{}", port}) + .body(data) + .send() + .unwrap_or_else(|err| { + println!("HTTP client error: {err}"); + panic!("error: {err}"); + }); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.version(), version); + + // read full response, important for consistent tests + let _resp_body = resp.text().unwrap(); + // println!("resp_body: {}", resp_body) +} + +fn client_http11(port: u16) -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) + .build() + .unwrap() +} + +fn client_http2(port: u16) -> Client { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + Client::builder() + .resolve_to_addrs("openrusty.org", &[socket]) + .add_root_certificate(read_cert()) + // avoid error messages during first set of connections (os error 32, broken pipe) + .http2_prior_knowledge() + .build() + .unwrap() +} + +fn read_cert() -> Certificate { + let mut buf = Vec::new(); + File::open(CERT_PATH.to_string()) + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + Certificate::from_pem(&buf).unwrap() +} diff --git a/pingora-core/benches/tls_benchmarks.md b/pingora-core/benches/tls_benchmarks.md new file mode 100644 index 00000000..ac5e85e1 --- /dev/null +++ b/pingora-core/benches/tls_benchmarks.md @@ -0,0 +1,138 @@ +# TLS Benchmarks +The benchmarks are using [Valgrind](https://valgrind.org/) through the [iai_callgrind](https://docs.rs/iai-callgrind/latest/iai_callgrind/) benchmark framework. +For measuring performance the Valgrind tool [callgrind](https://valgrind.org/docs/manual/cl-manual.html) is used. + +```mermaid +C4Context + title Overview + + System_Ext(ContinuousIntegration, "Continuous Integration") + System_Boundary(OS, "Linux") { + System(Cargo, "Cargo", "bench") + Container(Results, "Benchmark Results") + System_Boundary(Valgrind, "Valgrind") { + Container(LogFile, "Log File") + System_Ext(Valgrind, "Valgrind") + Container(CallGraph, "Call Graph") + Rel(Valgrind, CallGraph, "creates") + Rel(Valgrind, LogFile, "creates") + } + Rel(Cargo, Valgrind, "executes") + } + + Person(Developer, "Developer") + System_Ext(QCacheGrind, "QCacheGrind", "KCacheGrind") + + Rel(Developer, Cargo, "runs") + Rel(ContinuousIntegration, Cargo, "runs") + + Rel(Developer, QCacheGrind, "utilizes") + Rel(QCacheGrind, CallGraph, "to visualize") + + Rel(Cargo, Results, "reports") + +``` + +## Visualization +With [kcachegrind](https://github.com/KDE/kcachegrind)/[qcachegrind](https://github.com/KDE/kcachegrind) the call-graphs +can be interactively visualized and navigated. + +[gprof2dot](https://github.com/jrfonseca/gprof2dot) and [graphviz](https://graphviz.org/) can create call-graph images. + +```bash +gprof2dot -f callgrind *out | dot -T png -o out.png +``` + +The iai_callgrind default [Flamegrahps](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.FlamegraphConfig.html#impl-Default-for-FlamegraphConfig) +are activated and stored in [SVG](https://en.wikipedia.org/wiki/SVG) format next to the call-graph files. + + +## Technical Details +The TLS Benchmarks are intended to capture full `connect/accept` cycles. To benchmark such scenario it is required +to have some parallel processes running (`server/client`) while only one of them should be benchmarked. + +### Challenges +pingora-core uses [tokio](https://tokio.rs/) as runtime and [pingora-core::server::Server](https://docs.rs/pingora-core/latest/pingora_core/server/struct.Server.html) +spawns threads when being setup. +This leads to implications on the benchmark process as multiple threads need to be covered. + +As tokio is used and network requests are issued during the benchmarking the results will always have a certain variance. + +To limit the variance impact the following pre-cautions where considered: +- running benchmarks (where possible) within a single thread and utilize tokio single-threaded runtime +- issuing multiple requests during benchmarking + +### Scenario Setup +Within `pingora-core/examples/` the BinaryBenchmark Command executables for benchmarking are built using `dev-dependencies`. +The `pingora-core/benches/` contains the opposite side and the iai_callgrind definitions. + +The benchmarked part (`server/client` executable) is built with `pingora-core`. The opposite part is built using +external components (`reqwest/axum`). + +The `servers` are instantiated to accept `POST` requests and echo the transmitted bodies in the response. + +The binaries (`bench_server/bench_client`) are launched through iai_callgrind as [BinaryBenchmark](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.BinaryBenchmark.html) +within `valgrind/callgrind`. +The BinaryBenchmark [setup](https://docs.rs/iai-callgrind/latest/iai_callgrind/struct.BinaryBenchmark.html#structfield.setup) +function is used to run the opposite part (`client/server`) of the benchmark in parallel. + +For the server benchmark scenario the layout looks like: +- iai_callgrind starts the client on the setup + - the client waits for a TCP connect before issuing the requests +- iai_callgrind launches the server within valgrind +- once the server is up the setup function client successfuly connects and starts to run the requests +- the server stops after a pre-configured period of time + +```mermaid +sequenceDiagram + iai_callgrind->>Setup (Client): starts + Setup (Client)->>BinaryBechmark (Server): TcpStream::connect + BinaryBechmark (Server)-->>Setup (Client): Failed - Startup Phase + iai_callgrind->>BinaryBechmark (Server): starts + Setup (Client)->>BinaryBechmark (Server): TcpStream::connect + BinaryBechmark (Server)->>Setup (Client): Succeeded - Server Running + Setup (Client)->>BinaryBechmark (Server): HTTP Request + BinaryBechmark (Server)->>Setup (Client): HTTP Response + iai_callgrind->>BinaryBechmark (Server): waits for success + iai_callgrind->>Setup (Client): waits for success +``` + +For the client benchmark the setup is similar, but inverse as the server runs within the iai_callgrind setup function. + +### Running +The benchmarks can be run using the following commands: +```bash +VERSION="$(cargo metadata --format-version=1 |\ + jq -r '.packages[] | select(.name == "iai-callgrind").version')" +cargo install iai-callgrind-runner --version "${VERSION}" + +FEATURES="openssl" +cargo build --no-default-features --features "${FEATURES}" --release --examples +cargo bench --no-default-features --features "${FEATURES}" --package pingora-core --bench tls_acceptor -- --nocapture +cargo bench --no-default-features --features "${FEATURES}" --package pingora-core --bench tls_connector -- --nocapture +``` + +### Output +Generated benchmark files are located below `target/iai/`: +``` +target/iai/ +└── pingora-core # + └── tls_acceptor # + └── tls_acceptor # + └── bench_server.http_11_handshake_always # . + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.diff.old.svg + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.old.svg + ├── callgrind.bench_server.http_11_handshake_always.flamegraph.Ir.svg + ├── callgrind.bench_server.http_11_handshake_always.log + ├── callgrind.bench_server.http_11_handshake_always.log.old + ├── callgrind.bench_server.http_11_handshake_always.out + └── callgrind.bench_server.http_11_handshake_always.out.old +``` + +### Parameters +Server and client benchmark are parameterized with the following options: +- number of parallel acceptors/connectors and servers/clients +- client/session re-use +- HTTP version `1.1|2.0` +- number of requests +- request body size \ No newline at end of file diff --git a/pingora-core/benches/tls_connector.rs b/pingora-core/benches/tls_connector.rs new file mode 100644 index 00000000..0200df9a --- /dev/null +++ b/pingora-core/benches/tls_connector.rs @@ -0,0 +1,131 @@ +use axum::routing::post; +use axum::Router; +use axum_server::tls_rustls::RustlsConfig; +use axum_server::Handle; +use iai_callgrind::{ + binary_benchmark, binary_benchmark_group, main, BinaryBenchmarkConfig, Command, + FlamegraphConfig, Tool, ValgrindTool, +}; +use iai_callgrind::{Pipe, Stdin}; +use reqwest::Version; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; +use tokio::task::JoinSet; + +mod utils; + +use utils::{CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; + +main!(binary_benchmark_groups = tls_connector); + +binary_benchmark_group!( + name = tls_connector; + config = BinaryBenchmarkConfig::default() + .flamegraph(FlamegraphConfig::default()) + .tool(Tool::new(ValgrindTool::DHAT)) + .raw_callgrind_args(["" + // NOTE: toggle values can be extracted from .out files + // see '^fn=' values, need to be suffixed with '*' or '()' + // grep -E '^fn=' *.out | cut -d '=' -f2- | sort -u + //, "--toggle-collect=bench_client::post_http*" + // NOTE: for usage with callgrind::start_instrumentation() & stop_instrumentation() + //"--instr-atstart=no" + ]); + benchmarks = bench_client +); + +static SEQUENTIAL_REQUEST_COUNT: i32 = 64; +static SEQUENTIAL_REQUEST_SIZE: usize = 64; +static PARALLEL_CONNECTORS: u16 = 16; +static PARALLEL_REQUEST_COUNT: i32 = SEQUENTIAL_REQUEST_COUNT / PARALLEL_CONNECTORS as i32; +static PARALLEL_REQUEST_SIZE: usize = 64; +#[binary_benchmark] +#[bench::seq_http_11_handshake_always(setup = start_servers(Version::HTTP_11, 1), + args = [1, false, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_11_handshake_once(setup = start_servers(Version::HTTP_11, 1), + args = [1, true, Version::HTTP_11, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_2_handshake_always(setup = start_servers(Version::HTTP_2, 1), + args = [1, false, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::seq_http_2_handshake_once(setup = start_servers(Version::HTTP_2, 1), + args = [1, true, Version::HTTP_2, SEQUENTIAL_REQUEST_COUNT, SEQUENTIAL_REQUEST_SIZE])] +#[bench::par_http_11_handshake_always(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, false, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::par_http_11_handshake_once(setup = start_servers(Version::HTTP_11, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, true, Version::HTTP_11, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::par_http_2_handshake_always(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, false, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +#[bench::par_http_2_handshake_once(setup = start_servers(Version::HTTP_2, PARALLEL_CONNECTORS), + args = [PARALLEL_CONNECTORS, true, Version::HTTP_2, PARALLEL_REQUEST_COUNT, PARALLEL_REQUEST_SIZE])] +fn bench_client( + parallel_connectors: u16, + stream_reuse: bool, + http_version: Version, + request_count: i32, + request_size: usize, +) -> Command { + let path = format!( + "{}/../target/release/examples/bench_client", + env!("CARGO_MANIFEST_DIR") + ); + Command::new(path) + // TODO: currently a workaround to keep the setup function running parallel with benchmark execution + .stdin(Stdin::Setup(Pipe::Stderr)) + .args([ + format!("--parallel-connectors={}", parallel_connectors), + format!("--stream-reuse={}", stream_reuse), + format!("--http-version={:?}", http_version), + format!("--request-count={}", request_count), + format!("--request-size={}", request_size), + ]) + .build() +} + +fn start_servers(http_version: Version, parallel_connectors: u16) { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut server_set = JoinSet::new(); + + for i in 0..parallel_connectors { + server_set.spawn(start_server(http_version, i)); + } + + while !server_set.is_empty() { + server_set.join_next().await; + } + }); +} + +async fn start_server(http_version: Version, port_offset: u16) { + let addr = match http_version { + Version::HTTP_11 => SocketAddr::from(([127, 0, 0, 1], TLS_HTTP11_PORT + port_offset)), + Version::HTTP_2 => SocketAddr::from(([127, 0, 0, 1], TLS_HTTP2_PORT + port_offset)), + _ => { + panic!("HTTP version not supported.") + } + }; + + let app = Router::new().route("/", post(|body: String| async { body })); + let handle = Handle::new(); + + // configure certificate and private key used by https + let config = RustlsConfig::from_pem_file(PathBuf::from(&*CERT_PATH), PathBuf::from(&*KEY_PATH)) + .await + .unwrap(); + + let (http_server, _) = tokio::join!( + axum_server::bind_rustls(addr, config.clone()) + .handle(handle.clone()) + .serve(app.into_make_service()), + graceful_shutdown(handle), + ); + http_server.unwrap(); +} + +async fn graceful_shutdown(handle: Handle) { + tokio::time::sleep(Duration::from_secs(10)).await; + handle.graceful_shutdown(None); +} diff --git a/pingora-core/benches/utils/mod.rs b/pingora-core/benches/utils/mod.rs new file mode 100644 index 00000000..334090cb --- /dev/null +++ b/pingora-core/benches/utils/mod.rs @@ -0,0 +1,69 @@ +#![allow(dead_code)] + +use once_cell::sync::Lazy; +use rand::distributions::{Alphanumeric, DistString}; +use reqwest::Version; +use std::io::ErrorKind::Unsupported; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; +use std::time::Duration; +use tokio::task::JoinSet; + +pub static CERT_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/server_rustls.crt", + env!("CARGO_MANIFEST_DIR") + ) +}); + +pub static KEY_PATH: Lazy = Lazy::new(|| { + format!( + "{}/../pingora-proxy/tests/utils/conf/keys/key.pem", + env!("CARGO_MANIFEST_DIR") + ) +}); +pub static TLS_HTTP11_PORT: u16 = 6100; +pub static TLS_HTTP2_PORT: u16 = 7200; + +pub fn http_version_parser(version: &str) -> Result { + match version { + "HTTP/1.1" => Ok(Version::HTTP_11), + "HTTP/2.0" => Ok(Version::HTTP_2), + _ => Err(std::io::Error::from(Unsupported)), + } +} + +pub async fn wait_for_tcp_connect(version: Version, parallelism: u16) { + let port = http_version_to_port(version); + + let mut wait_set = JoinSet::new(); + for i in 0..parallelism { + wait_set.spawn(async move { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port + i); + while let Err(_err) = TcpStream::connect(addr) { + let _ = tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + } + while !wait_set.is_empty() { + wait_set.join_next().await; + } +} + +pub fn http_version_to_port(version: Version) -> u16 { + match version { + Version::HTTP_11 => TLS_HTTP11_PORT, + Version::HTTP_2 => TLS_HTTP2_PORT, + _ => { + panic!("HTTP version not supported.") + } + } +} + +pub fn generate_random_ascii_data(count: i32, len: usize) -> Vec { + let mut random_data = vec![]; + for _i in 0..count { + let random_string = Alphanumeric.sample_string(&mut rand::thread_rng(), len); + random_data.push(random_string) + } + random_data +} diff --git a/pingora-core/benches/utils/pingora_conf.yaml b/pingora-core/benches/utils/pingora_conf.yaml new file mode 100644 index 00000000..a30d66bd --- /dev/null +++ b/pingora-core/benches/utils/pingora_conf.yaml @@ -0,0 +1,7 @@ +--- +version: 1 +client_bind_to_ipv4: + - 127.0.0.2 +ca_file: tests/keys/server.crt +threads: 1 +work_stealing: false \ No newline at end of file diff --git a/pingora-core/examples/bench_client.rs b/pingora-core/examples/bench_client.rs new file mode 100644 index 00000000..e50bb222 --- /dev/null +++ b/pingora-core/examples/bench_client.rs @@ -0,0 +1,231 @@ +use bytes::Bytes; +use clap::Parser; +use http::StatusCode; +use log::debug; +use pingora_core::connectors::http::v1::Connector as ConnectorV11; +use pingora_core::connectors::http::v2::Connector as ConnectorV2; +use pingora_core::connectors::ConnectorOptions; +use pingora_core::prelude::HttpPeer; +use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV11; +use pingora_core::protocols::http::v2::client::Http2Session; +use pingora_http::RequestHeader; +use reqwest::Version; + +#[path = "../benches/utils/mod.rs"] +mod bench_utils; +use crate::bench_utils::TLS_HTTP11_PORT; +use bench_utils::{ + generate_random_ascii_data, http_version_parser, wait_for_tcp_connect, CERT_PATH, KEY_PATH, + TLS_HTTP2_PORT, +}; +use pingora_core::protocols::http::client::HttpSession; + +#[derive(Parser, Debug)] +struct Args { + #[clap(long, parse(try_from_str = http_version_parser))] + http_version: Version, + + #[clap(long, action = clap::ArgAction::Set)] + stream_reuse: bool, + + #[clap(long)] + request_count: i32, + + #[clap(long)] + request_size: usize, + + #[clap(long, default_value = "1")] + parallel_connectors: u16, +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args = Args::parse(); + println!("{:?}", args); + + println!("Waiting for TCP connect..."); + wait_for_tcp_connect(args.http_version, args.parallel_connectors).await; + println!("TCP connect successful."); + + println!("Sending benchmark requests..."); + connector_tls_post_data( + args.stream_reuse, + args.http_version, + args.parallel_connectors, + generate_random_ascii_data(args.request_count, args.request_size), + ) + .await; + println!("Benchmark requests successfully sent."); + println!("Waiting for server(s) to gracefully shutdown..."); +} + +async fn connector_tls_post_data( + client_reuse: bool, + version: Version, + parallel_connectors: u16, + data: Vec, +) { + match version { + Version::HTTP_11 => { + for i in 0..parallel_connectors { + let (connector, peer) = connector_http11_session(i as i32).await; + post_http11(client_reuse, connector, peer, data.clone()).await; + } + } + Version::HTTP_2 => { + for i in 0..parallel_connectors { + let (connector, peer) = connector_http2(i as i32).await; + post_http2(client_reuse, connector, peer, data.clone()).await; + } + } + _ => { + panic!("HTTP version not supported.") + } + }; +} + +const DEFAULT_POOL_SIZE: usize = 2; +const HTTP_HOST: &str = "openrusty.org"; +const SERVER_IP: &str = "127.0.0.1"; + +async fn connector_http11_session(port_offset: i32) -> (ConnectorV11, HttpPeer) { + let connector = ConnectorV11::new(Some(get_tls_connector_options())); + let peer = get_http_peer(TLS_HTTP11_PORT as i32 + port_offset); + + (connector, peer) +} + +async fn connector_http2(port_offset: i32) -> (ConnectorV2, HttpPeer) { + let connector = ConnectorV2::new(Some(get_tls_connector_options())); + let mut peer = get_http_peer(TLS_HTTP2_PORT as i32 + port_offset); + peer.options.set_http_version(2, 2); + peer.options.max_h2_streams = 1; + + (connector, peer) +} + +fn get_tls_connector_options() -> ConnectorOptions { + let mut options = ConnectorOptions::new(DEFAULT_POOL_SIZE); + options.ca_file = Some(CERT_PATH.clone()); + options.cert_key_file = Some((CERT_PATH.clone(), KEY_PATH.clone())); + options +} + +fn get_http_peer(port: i32) -> HttpPeer { + HttpPeer::new(format!("{}:{}", SERVER_IP, port), true, HTTP_HOST.into()) +} + +async fn post_http11( + client_reuse: bool, + connector: ConnectorV11, + peer: HttpPeer, + data: Vec, +) { + let mut first = true; + for d in data { + let mut http_session = if client_reuse { + if first { + debug!("Creating a new HTTP stream for the first request."); + session_new_http11(&connector, peer.clone()).await + } else { + debug!("Re-using existing HTTP stream for request."); + session_reuse_http11(&connector, peer.clone()).await + } + } else { + debug!("Using new HTTP stream for request."); + session_new_http11(&connector, peer.clone()).await + }; + + let mut req = Box::new(RequestHeader::build("POST", b"/", Some(d.len())).unwrap()); + req.append_header("Host", HTTP_HOST).unwrap(); + req.append_header("Content-Length", d.len()).unwrap(); + req.append_header("Content-Type", "text/plain").unwrap(); + debug!("request_headers: {:?}", req.headers); + + http_session.write_request_header(req).await.unwrap(); + http_session.write_body(d.as_bytes()).await.unwrap(); + + let res_headers = *http_session.read_resp_header_parts().await.unwrap(); + debug!("response_headers: {:?}", res_headers); + + let res_body = http_session.read_body_bytes().await.unwrap().unwrap(); + debug!("res_body: {:?}", res_body); + assert_eq!(res_body, Bytes::from(d.clone())); + + assert_eq!(res_headers.version, http::version::Version::HTTP_11); + assert_eq!(res_headers.status, StatusCode::OK); + + if client_reuse { + connector + .release_http_session(http_session, &peer, None) + .await; + first = false; + } + } +} +async fn post_http2(client_reuse: bool, connector: ConnectorV2, peer: HttpPeer, data: Vec) { + let mut first = true; + for d in data { + let mut http_session = if client_reuse { + if first { + debug!("Creating a new HTTP stream for the first request."); + session_new_http2(&connector, peer.clone()).await + } else { + debug!("Re-using existing HTTP stream for request."); + session_reuse_http2(&connector, peer.clone()).await + } + } else { + debug!("Using new HTTP stream for request."); + session_new_http2(&connector, peer.clone()).await + }; + + let mut req = Box::new(RequestHeader::build("POST", b"/", Some(d.len())).unwrap()); + req.append_header("Host", HTTP_HOST).unwrap(); + req.append_header("Content-Length", d.len()).unwrap(); + req.append_header("Content-Type", "text/plain").unwrap(); + debug!("res_headers: {:?}", req.headers); + + http_session.write_request_header(req, false).unwrap(); + http_session + .write_request_body(Bytes::from(d.clone()), true) + .unwrap(); + http_session.finish_request_body().unwrap(); + + http_session.read_response_header().await.unwrap(); + let res_body = http_session.read_response_body().await.unwrap().unwrap(); + debug!("res_body: {:?}", res_body); + assert_eq!(res_body, Bytes::from(d)); + + let res_headers = http_session.response_header().unwrap(); + debug!("res_header: {:?}", res_headers); + assert_eq!(res_headers.version, http::version::Version::HTTP_2); + assert_eq!(res_headers.status, StatusCode::OK); + + if client_reuse { + connector.release_http_session(http_session, &peer, None); + first = false; + } + } +} + +async fn session_new_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + let (http_session, reused) = connector.get_http_session(&peer).await.unwrap(); + assert!(!reused); + http_session +} + +async fn session_new_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + let http2_session = connector.new_http_session(&peer).await.unwrap(); + match http2_session { + HttpSession::H1(_) => panic!("expect h2"), + HttpSession::H2(h2_stream) => h2_stream, + } +} + +async fn session_reuse_http2(connector: &ConnectorV2, peer: HttpPeer) -> Http2Session { + connector.reused_http_session(&peer).await.unwrap().unwrap() +} + +async fn session_reuse_http11(connector: &ConnectorV11, peer: HttpPeer) -> HttpSessionV11 { + connector.reused_http_session(&peer).await.unwrap() +} diff --git a/pingora-core/examples/bench_server.rs b/pingora-core/examples/bench_server.rs new file mode 100644 index 00000000..eb117611 --- /dev/null +++ b/pingora-core/examples/bench_server.rs @@ -0,0 +1,107 @@ +use clap::Parser; +use hyper::Version; +use pingora_core::listeners::{Listeners, TlsSettings}; +use pingora_core::prelude::{Opt, Server}; +use pingora_core::services::listening::Service; +use std::env::current_dir; +use std::thread; +use std::time::Duration; + +#[allow(dead_code, unused_imports)] +#[path = "../tests/utils/mod.rs"] +mod test_utils; +use test_utils::EchoApp; + +#[path = "../benches/utils/mod.rs"] +mod bench_utils; +use bench_utils::{http_version_parser, CERT_PATH, KEY_PATH, TLS_HTTP11_PORT, TLS_HTTP2_PORT}; + +pub struct BenchServer { + // Maybe useful in the future + #[allow(dead_code)] + pub handle: thread::JoinHandle<()>, +} + +#[derive(Parser, Debug)] +struct Args { + #[clap(long, parse(try_from_str = http_version_parser))] + http_version: Version, + + #[clap(long, default_value = "1")] + parallel_acceptors: u16, +} + +fn entry_point(opt: Option, args: Args) { + env_logger::init(); + + let mut test_server = Server::new(opt).unwrap(); + test_server.bootstrap(); + + let mut listeners = Listeners::new(); + + match args.http_version { + Version::HTTP_11 => { + for i in 0..args.parallel_acceptors { + let tls_settings = + TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()).unwrap(); + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP11_PORT + i}.as_str(), + None, + tls_settings, + ); + } + } + Version::HTTP_2 => { + for i in 0..args.parallel_acceptors { + let mut tls_settings = + TlsSettings::intermediate(CERT_PATH.as_str(), KEY_PATH.as_str()).unwrap(); + tls_settings.enable_h2(); + + listeners.add_tls_with_settings( + format! {"0.0.0.0:{}", TLS_HTTP2_PORT + i}.as_str(), + None, + tls_settings, + ); + } + } + _ => { + panic!("HTTP version not supported.") + } + }; + + let echo_service_http = + Service::with_listeners("Echo Service HTTP".to_string(), listeners, EchoApp); + + test_server.add_service(echo_service_http); + test_server.run_forever(); +} + +impl BenchServer { + pub fn start() -> Self { + println!("WorkingDir: {:?}", current_dir().unwrap()); + + let opts = Opt::parse_from::, String>(vec![ + "pingora".into(), + "-c".into(), + "benches/utils/pingora_conf.yaml".into(), + ]); + println!("{:?}", opts); + + let args = Args::parse(); + println!("{:?}", args); + + let server_handle = thread::spawn(|| { + entry_point(Some(opts), args); + }); + // wait until the server is up + thread::sleep(Duration::from_secs(2)); + BenchServer { + handle: server_handle, + } + } +} + +fn main() { + let _server = BenchServer::start(); + thread::sleep(Duration::from_secs(10)); +} diff --git a/pingora-proxy/tests/utils/conf/keys/server_rustls.crt b/pingora-proxy/tests/utils/conf/keys/server_rustls.crt new file mode 100644 index 00000000..c02a214b --- /dev/null +++ b/pingora-proxy/tests/utils/conf/keys/server_rustls.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICJzCCAc6gAwIBAgIUU+G0acG/uiMu1ZDSjlcoY4gH53QwCgYIKoZIzj0EAwIw +ZDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNp +c2NvMRgwFgYDVQQKDA9DbG91ZGZsYXJlLCBJbmMxFjAUBgNVBAMMDW9wZW5ydXN0 +eS5vcmcwHhcNMjQwNzI0MTMzOTQ4WhcNMzQwNzIyMTMzOTQ4WjBkMQswCQYDVQQG +EwJVUzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xGDAWBgNV +BAoMD0Nsb3VkZmxhcmUsIEluYzEWMBQGA1UEAwwNb3BlbnJ1c3R5Lm9yZzBZMBMG +ByqGSM49AgEGCCqGSM49AwEHA0IABNn/9RZtR48knaJD6tk9BdccaJfZ0hGEPn6B +SDXmlmJPhcTBqa4iUwW/ABpGvO3FpJcNWasrX2k+qZLq3g205MKjXjBcMDsGA1Ud +EQQ0MDKCDyoub3BlbnJ1c3R5Lm9yZ4INb3BlbnJ1c3R5Lm9yZ4IHY2F0LmNvbYIH +ZG9nLmNvbTAdBgNVHQ4EFgQUnfYAFWyQnSN57IGokj7jcz8ChJQwCgYIKoZIzj0E +AwIDRwAwRAIgQr+Ly2cH04CncbnbhUf4hBl5frTp1pXgGnn8dYjd+UcCICuunEtp +H/a42/sVGBFvjS6FOFe6ZDs4oWBNEqQSw0S2 +-----END CERTIFICATE-----