From 6b11068008a6e154bbd4f596e0d111581a645ec4 Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Thu, 8 Jan 2026 10:44:50 -0800 Subject: [PATCH 1/6] feat: trait for HTTP request handler --- examples/async.rs | 133 +++++++++++++++++------------------ examples/awssig.rs | 166 ++++++++++++++++++++++---------------------- examples/curl.rs | 59 ++++++++-------- src/http/conf.rs | 66 ++++++++++++++++-- src/http/request.rs | 79 +++++++++++++++++++++ src/http/status.rs | 6 ++ 6 files changed, 322 insertions(+), 187 deletions(-) diff --git a/examples/async.rs b/examples/async.rs index 47f5de8e..6e3fcc25 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -6,14 +6,12 @@ use std::time::Instant; use ngx::core; use ngx::ffi::{ - ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt, - ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t, - ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, + ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_module_t, ngx_int_t, + ngx_module_t, ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG, }; -use ngx::http::{self, HttpModule, MergeConfigError}; -use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule}; -use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string}; +use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError}; +use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string}; use tokio::runtime::Runtime; struct Module; @@ -25,18 +23,10 @@ impl http::HttpModule for Module { unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { // SAFETY: this function is called with non-NULL cf always - let cf = &mut *cf; - let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf"); - - let h = ngx_array_push( - &mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers, - ) as *mut ngx_http_handler_pt; - if h.is_null() { - return core::Status::NGX_ERROR.into(); - } - // set an Access phase handler - *h = Some(async_access_handler); - core::Status::NGX_OK.into() + let cf = unsafe { &mut *cf }; + http::add_phase_handler::(cf) + .map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK) + .into() } } @@ -139,63 +129,70 @@ impl Drop for RequestCTX { } } -http_request_handler!(async_access_handler, |request: &mut http::Request| { - let co = Module::location_conf(request).expect("module config is none"); +struct AsyncAccessHandler; - ngx_log_debug_http!(request, "async module enabled: {}", co.enable); +impl HttpRequestHandler for AsyncAccessHandler { + const PHASE: ngx::http::HttpPhase = ngx::http::HttpPhase::Access; + type ReturnType = core::Status; - if !co.enable { - return core::Status::NGX_DECLINED; - } + fn handler(request: &mut http::Request) -> Self::ReturnType { + let co = Module::location_conf(request).expect("module config is none"); + + ngx_log_debug_http!(request, "async module enabled: {}", co.enable); - if let Some(ctx) = - unsafe { request.get_module_ctx::(&*addr_of!(ngx_http_async_module)) } - { - if !ctx.done.load(Ordering::Relaxed) { - return core::Status::NGX_AGAIN; + if !co.enable { + return core::Status::NGX_DECLINED; } - return core::Status::NGX_OK; - } + if let Some(ctx) = + unsafe { request.get_module_ctx::(&*addr_of!(ngx_http_async_module)) } + { + if !ctx.done.load(Ordering::Relaxed) { + return core::Status::NGX_AGAIN; + } - let ctx = request.pool().allocate(RequestCTX::default()); - if ctx.is_null() { - return core::Status::NGX_ERROR; + return core::Status::NGX_OK; + } + + let ctx = request.pool().allocate(RequestCTX::default()); + if ctx.is_null() { + return core::Status::NGX_ERROR; + } + request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) }); + + let ctx = unsafe { &mut *ctx }; + ctx.event.handler = Some(check_async_work_done); + ctx.event.data = request.connection().cast(); + ctx.event.log = unsafe { (*request.connection()).log }; + unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) }; + + // Request is no longer needed and can be converted to something movable to the async block + let req = AtomicPtr::new(request.into()); + let done_flag = ctx.done.clone(); + + let rt = ngx_http_async_runtime(); + ctx.task = Some(rt.spawn(async move { + let start = Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) }; + // not really thread safe, we should apply all these operation in nginx thread + // but this is just an example. proper way would be storing these headers in the request ctx + // and apply them when we get back to the nginx thread. + req.add_header_out( + "X-Async-Time", + start.elapsed().as_millis().to_string().as_str(), + ); + + done_flag.store(true, Ordering::Release); + // there is a small issue here. If traffic is low we may get stuck behind a 300ms timer + // in the nginx event loop. To workaround it we can notify the event loop using + // pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx + // and use the same trick as the thread pool) + })); + + core::Status::NGX_AGAIN } - request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) }); - - let ctx = unsafe { &mut *ctx }; - ctx.event.handler = Some(check_async_work_done); - ctx.event.data = request.connection().cast(); - ctx.event.log = unsafe { (*request.connection()).log }; - unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) }; - - // Request is no longer needed and can be converted to something movable to the async block - let req = AtomicPtr::new(request.into()); - let done_flag = ctx.done.clone(); - - let rt = ngx_http_async_runtime(); - ctx.task = Some(rt.spawn(async move { - let start = Instant::now(); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) }; - // not really thread safe, we should apply all these operation in nginx thread - // but this is just an example. proper way would be storing these headers in the request ctx - // and apply them when we get back to the nginx thread. - req.add_header_out( - "X-Async-Time", - start.elapsed().as_millis().to_string().as_str(), - ); - - done_flag.store(true, Ordering::Release); - // there is a small issue here. If traffic is low we may get stuck behind a 300ms timer - // in the nginx event loop. To workaround it we can notify the event loop using - // pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx - // and use the same trick as the thread pool) - })); - - core::Status::NGX_AGAIN -}); +} extern "C" fn ngx_http_async_commands_set_enable( cf: *mut ngx_conf_t, diff --git a/examples/awssig.rs b/examples/awssig.rs index cbbea102..c86faac6 100644 --- a/examples/awssig.rs +++ b/examples/awssig.rs @@ -3,13 +3,12 @@ use std::ffi::{c_char, c_void}; use http::HeaderMap; use ngx::core; use ngx::ffi::{ - ngx_array_push, ngx_command_t, ngx_conf_t, ngx_http_handler_pt, ngx_http_module_t, - ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t, + ngx_command_t, ngx_conf_t, ngx_http_module_t, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_HTTP_SRV_CONF, NGX_LOG_EMERG, }; use ngx::http::*; -use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string}; +use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string}; struct Module; @@ -20,18 +19,10 @@ impl HttpModule for Module { unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { // SAFETY: this function is called with non-NULL cf always - let cf = &mut *cf; - let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf"); - - let h = ngx_array_push( - &mut cmcf.phases[ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE as usize].handlers, - ) as *mut ngx_http_handler_pt; - if h.is_null() { - return core::Status::NGX_ERROR.into(); - } - // set an phase handler - *h = Some(awssigv4_header_handler); - core::Status::NGX_OK.into() + let cf = unsafe { &mut *cf }; + ngx::http::add_phase_handler::(cf) + .map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK) + .into() } } @@ -261,82 +252,89 @@ extern "C" fn ngx_http_awssigv4_commands_set_s3_endpoint( ngx::core::NGX_CONF_OK } -http_request_handler!(awssigv4_header_handler, |request: &mut Request| { - // get Module Config from request - let conf = Module::location_conf(request).expect("module conf"); - ngx_log_debug_http!(request, "AWS signature V4 module {}", { - if conf.enable { - "enabled" - } else { - "disabled" - } - }); - if !conf.enable { - return core::Status::NGX_DECLINED; - } +struct AwsSigV4HeaderHandler; - // TODO: build url properly from the original URL from client - let method = request.method(); - if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) { - return HTTPStatus::FORBIDDEN.into(); - } +impl HttpRequestHandler for AwsSigV4HeaderHandler { + const PHASE: HttpPhase = HttpPhase::PreContent; + type ReturnType = core::Status; - let datetime = chrono::Utc::now(); - let uri = match request.unparsed_uri().to_str() { - Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v), - Err(_) => return core::Status::NGX_DECLINED, - }; + fn handler(request: &mut Request) -> Self::ReturnType { + // get Module Config from request + let conf = Module::location_conf(request).expect("module conf"); + ngx_log_debug_http!(request, "AWS signature V4 module {}", { + if conf.enable { + "enabled" + } else { + "disabled" + } + }); + if !conf.enable { + return core::Status::NGX_DECLINED; + } + + // TODO: build url properly from the original URL from client + let method = request.method(); + if !matches!(method, ngx::http::Method::HEAD | ngx::http::Method::GET) { + return HTTPStatus::FORBIDDEN.into(); + } - let datetime_now = datetime.format("%Y%m%dT%H%M%SZ"); - let datetime_now = datetime_now.to_string(); + let datetime = chrono::Utc::now(); + let uri = match request.unparsed_uri().to_str() { + Ok(v) => format!("https://{}.{}{}", conf.s3_bucket, conf.s3_endpoint, v), + Err(_) => return core::Status::NGX_DECLINED, + }; - let signature = { - // NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap. - // Iterate over requests headers_in and copy into HeaderMap - // Copy only headers that will be used to sign the request - let mut headers = HeaderMap::new(); - for (name, value) in request.headers_in_iterator() { - if let Ok(name) = name.to_str() { - if name.to_lowercase() == "host" { - if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) { - headers.insert(http::header::HOST, value); - } else { - return core::Status::NGX_DECLINED; + let datetime_now = datetime.format("%Y%m%dT%H%M%SZ"); + let datetime_now = datetime_now.to_string(); + + let signature = { + // NOTE: aws_sign_v4::AwsSign::new() implementation requires a HeaderMap. + // Iterate over requests headers_in and copy into HeaderMap + // Copy only headers that will be used to sign the request + let mut headers = HeaderMap::new(); + for (name, value) in request.headers_in_iterator() { + if let Ok(name) = name.to_str() { + if name.to_lowercase() == "host" { + if let Ok(value) = http::HeaderValue::from_bytes(value.as_bytes()) { + headers.insert(http::header::HOST, value); + } else { + return core::Status::NGX_DECLINED; + } } + } else { + return core::Status::NGX_DECLINED; } - } else { - return core::Status::NGX_DECLINED; } - } - headers.insert("X-Amz-Date", datetime_now.parse().unwrap()); - ngx_log_debug_http!(request, "headers {:?}", headers); - ngx_log_debug_http!(request, "method {:?}", method); - ngx_log_debug_http!(request, "uri {:?}", uri); - ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now); - - let s = aws_sign_v4::AwsSign::new( - method.as_str(), - &uri, - &datetime, - &headers, - "us-east-1", - conf.access_key.as_str(), - conf.secret_key.as_str(), - "s3", - "", - ); - s.sign() - }; + headers.insert("X-Amz-Date", datetime_now.parse().unwrap()); + ngx_log_debug_http!(request, "headers {:?}", headers); + ngx_log_debug_http!(request, "method {:?}", method); + ngx_log_debug_http!(request, "uri {:?}", uri); + ngx_log_debug_http!(request, "datetime_now {:?}", datetime_now); + + let s = aws_sign_v4::AwsSign::new( + method.as_str(), + &uri, + &datetime, + &headers, + "us-east-1", + conf.access_key.as_str(), + conf.secret_key.as_str(), + "s3", + "", + ); + s.sign() + }; - request.add_header_in("authorization", signature.as_str()); - request.add_header_in("X-Amz-Date", datetime_now.as_str()); + request.add_header_in("authorization", signature.as_str()); + request.add_header_in("X-Amz-Date", datetime_now.as_str()); - for (name, value) in request.headers_out_iterator() { - ngx_log_debug_http!(request, "headers_out {name}: {value}",); - } - for (name, value) in request.headers_in_iterator() { - ngx_log_debug_http!(request, "headers_in {name}: {value}",); - } + for (name, value) in request.headers_out_iterator() { + ngx_log_debug_http!(request, "headers_out {name}: {value}",); + } + for (name, value) in request.headers_in_iterator() { + ngx_log_debug_http!(request, "headers_in {name}: {value}",); + } - core::Status::NGX_OK -}); + core::Status::NGX_OK + } +} diff --git a/examples/curl.rs b/examples/curl.rs index 3d07002f..2350cf00 100644 --- a/examples/curl.rs +++ b/examples/curl.rs @@ -2,13 +2,11 @@ use std::ffi::{c_char, c_void}; use ngx::core; use ngx::ffi::{ - ngx_array_push, ngx_command_t, ngx_conf_t, ngx_http_handler_pt, ngx_http_module_t, - ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t, + ngx_command_t, ngx_conf_t, ngx_http_module_t, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG, }; -use ngx::http::{self, HttpModule, MergeConfigError}; -use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule}; -use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string}; +use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError}; +use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string}; struct Module; @@ -19,18 +17,10 @@ impl http::HttpModule for Module { unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { // SAFETY: this function is called with non-NULL cf always - let cf = &mut *cf; - let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf"); - - let h = ngx_array_push( - &mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers, - ) as *mut ngx_http_handler_pt; - if h.is_null() { - return core::Status::NGX_ERROR.into(); - } - // set an Access phase handler - *h = Some(curl_access_handler); - core::Status::NGX_OK.into() + let cf = unsafe { &mut *cf }; + http::add_phase_handler::(cf) + .map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK) + .into() } } @@ -90,25 +80,32 @@ impl http::Merge for ModuleConfig { } } -http_request_handler!(curl_access_handler, |request: &mut http::Request| { - let co = Module::location_conf(request).expect("module config is none"); +struct CurlRequestHandler; + +impl HttpRequestHandler for CurlRequestHandler { + const PHASE: ngx::http::HttpPhase = ngx::http::HttpPhase::Access; + type ReturnType = core::Status; - ngx_log_debug_http!(request, "curl module enabled: {}", co.enable); + fn handler(request: &mut http::Request) -> Self::ReturnType { + let co = Module::location_conf(request).expect("module config is none"); - match co.enable { - true => { - if request - .user_agent() - .is_some_and(|ua| ua.as_bytes().starts_with(b"curl")) - { - http::HTTPStatus::FORBIDDEN.into() - } else { - core::Status::NGX_DECLINED + ngx_log_debug_http!(request, "curl module enabled: {}", co.enable); + + match co.enable { + true => { + if request + .user_agent() + .is_some_and(|ua| ua.as_bytes().starts_with(b"curl")) + { + http::HTTPStatus::FORBIDDEN.into() + } else { + core::Status::NGX_DECLINED + } } + false => core::Status::NGX_DECLINED, } - false => core::Status::NGX_DECLINED, } -}); +} extern "C" fn ngx_http_curl_commands_set_enable( cf: *mut ngx_conf_t, diff --git a/src/http/conf.rs b/src/http/conf.rs index 06426bd4..36d583bc 100644 --- a/src/http/conf.rs +++ b/src/http/conf.rs @@ -207,9 +207,15 @@ pub unsafe trait HttpModuleLocationConf: HttpModule { } mod core { - use crate::ffi::{ - ngx_http_core_loc_conf_t, ngx_http_core_main_conf_t, ngx_http_core_module, - ngx_http_core_srv_conf_t, + use allocator_api2::alloc::AllocError; + + use crate::{ + ffi::{ + ngx_http_core_loc_conf_t, ngx_http_core_main_conf_t, ngx_http_core_module, + ngx_http_core_srv_conf_t, + }, + http::{HttpModuleMainConf, HttpRequestHandler}, + ngx_conf_log_error, }; /// Auxiliary structure to access `ngx_http_core_module` configuration. @@ -229,9 +235,61 @@ mod core { unsafe impl crate::http::HttpModuleLocationConf for NgxHttpCoreModule { type LocationConf = ngx_http_core_loc_conf_t; } + + /// HTTP phases in which a module can register handlers. + #[repr(usize)] + pub enum HttpPhase { + /// Post-read phase + PostRead = crate::ffi::ngx_http_phases_NGX_HTTP_POST_READ_PHASE as _, + /// Server rewrite phase + ServerRewrite = crate::ffi::ngx_http_phases_NGX_HTTP_SERVER_REWRITE_PHASE as _, + /// Find configuration phase + FindConfig = crate::ffi::ngx_http_phases_NGX_HTTP_FIND_CONFIG_PHASE as _, + /// Rewrite phase + Rewrite = crate::ffi::ngx_http_phases_NGX_HTTP_REWRITE_PHASE as _, + /// Post-rewrite phase + PostRewrite = crate::ffi::ngx_http_phases_NGX_HTTP_POST_REWRITE_PHASE as _, + /// Pre-access phase + Preaccess = crate::ffi::ngx_http_phases_NGX_HTTP_PREACCESS_PHASE as _, + /// Access phase + Access = crate::ffi::ngx_http_phases_NGX_HTTP_ACCESS_PHASE as _, + /// Post-access phase + PostAccess = crate::ffi::ngx_http_phases_NGX_HTTP_POST_ACCESS_PHASE as _, + /// Pre-content phase + PreContent = crate::ffi::ngx_http_phases_NGX_HTTP_PRECONTENT_PHASE as _, + /// Content phase + Content = crate::ffi::ngx_http_phases_NGX_HTTP_CONTENT_PHASE as _, + /// Log phase + Log = crate::ffi::ngx_http_phases_NGX_HTTP_LOG_PHASE as _, + } + + /// Register a request handler for a specified phase. + /// This function must be called from the module's `postconfiguration()` function. + pub fn add_phase_handler(cf: &mut nginx_sys::ngx_conf_t) -> Result<(), AllocError> + where + H: HttpRequestHandler, + { + let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf"); + let h: *mut nginx_sys::ngx_http_handler_pt = + unsafe { nginx_sys::ngx_array_push(&mut cmcf.phases[H::PHASE as usize].handlers) as _ }; + if h.is_null() { + ngx_conf_log_error!( + nginx_sys::NGX_LOG_EMERG, + cf, + "failed to register {} handler", + H::name(), + ); + return Err(AllocError); + } + // set an H::PHASE phase handler + unsafe { + *h = Some(crate::http::handler_wrapper::); + } + Ok(()) + } } -pub use core::NgxHttpCoreModule; +pub use core::{add_phase_handler, HttpPhase, NgxHttpCoreModule}; #[cfg(ngx_feature = "http_ssl")] mod ssl { diff --git a/src/http/request.rs b/src/http/request.rs index 3671717c..22201664 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -1,6 +1,7 @@ use core::error; use core::ffi::c_void; use core::fmt; +use core::fmt::Display; use core::ptr::NonNull; use core::slice; use core::str::FromStr; @@ -8,6 +9,7 @@ use core::str::FromStr; use crate::core::*; use crate::ffi::*; use crate::http::status::*; +use crate::http::HttpPhase; /// Define a static request handler. /// @@ -85,6 +87,83 @@ macro_rules! http_variable_get { }; } +/// Trait for converting handler return types into `ngx_int_t`. +/// Any desired error handling / logging logic can be implemented in the `into_ngx_int_t` method. +pub trait HttpHandlerReturn: Sized { + /// Convert the handler return type into an `ngx_int_t`. + fn into_ngx_int_t(self, _r: &Request) -> ngx_int_t; +} + +impl HttpHandlerReturn for Option { + #[inline] + fn into_ngx_int_t(self, _r: &Request) -> ngx_int_t { + self.unwrap_or(NGX_ERROR as _) + } +} + +impl HttpHandlerReturn for ngx_int_t { + #[inline] + fn into_ngx_int_t(self, _r: &Request) -> ngx_int_t { + self + } +} + +impl HttpHandlerReturn for Status { + #[inline] + fn into_ngx_int_t(self, _r: &Request) -> ngx_int_t { + self.0 + } +} + +impl HttpHandlerReturn for core::result::Result +where + R: HttpHandlerReturn, + E: Display, +{ + #[inline] + fn into_ngx_int_t(self, r: &Request) -> ngx_int_t { + match self { + Ok(val) => val.into_ngx_int_t(r), + Err(e) => { + crate::ngx_log_error!(NGX_LOG_ERR, r.log(), "{e}",); + NGX_ERROR as _ + } + } + } +} + +/// Trait for static request handler. +/// Return type must implement [`HttpHandlerReturn`]. +/// There are predefined implementations for `ngx_int_t`, [`Status`], `Option`, +/// and `Result` with value type implementing [`HttpHandlerReturn`] and error type +/// implementing `Display`. +pub trait HttpRequestHandler { + /// The phase in which the handler is invoked. + const PHASE: HttpPhase; + /// The return type of the handler. + type ReturnType: HttpHandlerReturn; + /// The handler function. + fn handler(request: &mut Request) -> Self::ReturnType; + /// Handler name for logging purposes. + /// `core::any::type_name` is used by default. + fn name() -> &'static str { + core::any::type_name::() + } +} + +/// The C-compatible handler wrapper function. +/// +/// # Safety +/// +/// The caller has provided a valid non-null pointer to an `ngx_http_request_t`. +pub(crate) unsafe extern "C" fn handler_wrapper(r: *mut ngx_http_request_t) -> ngx_int_t +where + H: HttpRequestHandler, +{ + let r = unsafe { Request::from_ngx_http_request(r) }; + H::handler(r).into_ngx_int_t(r) +} + /// Wrapper struct for an [`ngx_http_request_t`] pointer, providing methods for working with HTTP /// requests. /// diff --git a/src/http/status.rs b/src/http/status.rs index 8b067cb6..a545f65b 100644 --- a/src/http/status.rs +++ b/src/http/status.rs @@ -36,6 +36,12 @@ impl From for Status { } } +impl From for ngx_int_t { + fn from(val: HTTPStatus) -> Self { + val.0 as _ + } +} + impl From for ngx_uint_t { fn from(val: HTTPStatus) -> Self { val.0 From 520b4ff434c4f4b264ec43b190e3839350af5e6c Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Thu, 8 Jan 2026 11:10:27 -0800 Subject: [PATCH 2/6] chore: make clippy tool happy --- src/log.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/log.rs b/src/log.rs index 51825c9c..ae6ac5c2 100644 --- a/src/log.rs +++ b/src/log.rs @@ -153,7 +153,8 @@ macro_rules! ngx_log_debug { #[macro_export] macro_rules! ngx_log_debug_http { ( $request:expr, $($arg:tt)+ ) => { - let log = unsafe { (*$request.connection()).log }; + let c = $request.connection(); + let log = unsafe { (*c).log }; $crate::ngx_log_debug!(mask: $crate::log::DebugMask::Http, log, $($arg)+); } } From 856fc077a05e0d6152c75594384f2a57e575639c Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Thu, 8 Jan 2026 11:10:27 -0800 Subject: [PATCH 3/6] feat: trait for module request context --- src/core/pool.rs | 32 ++++++++++++++++++++++++++++ src/http/mod.rs | 2 ++ src/http/request.rs | 8 +++++++ src/http/request_context.rs | 42 +++++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+) create mode 100644 src/http/request_context.rs diff --git a/src/core/pool.rs b/src/core/pool.rs index 685178cd..be53f67d 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -221,6 +221,28 @@ impl Pool { Ok(()) } + /// Removes a cleanup handler for a value in the memory pool. + unsafe fn remove_cleanup_for_value( + &self, + value: *const T, + ) -> Option { + let mut cln = (*self.0.as_ptr()).cleanup; + + while !cln.is_null() { + // SAFETY: comparing function pointers is generally unreliable, but in this specific + // case we can assume that the same function pointer was used when adding the cleanup + // handler. + #[allow(unknown_lints)] + #[allow(unpredictable_function_pointer_comparisons)] + if (*cln).data == value as *mut c_void && (*cln).handler == Some(cleanup_type::) { + return (*cln).handler.take(); + } + cln = (*cln).next; + } + + None + } + /// Allocates memory from the pool of the specified size. /// The resulting pointer is aligned to a platform word size. /// @@ -284,6 +306,16 @@ impl Pool { } } + /// Runs the cleanup handler for a value and removes it. + /// + /// # Safety + /// The caller must ensure that `value` is a valid pointer to a value that has an + /// associated cleanup handler in the pool. + pub unsafe fn remove(&self, value: *const T) { + self.remove_cleanup_for_value(value) + .inspect(|cleanup| cleanup(value as _)); + } + /// Resizes a memory allocation in place if possible. /// /// If resizing is requested for the last allocation in the pool, it may be diff --git a/src/http/mod.rs b/src/http/mod.rs index 00c329a8..4c41fa8f 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,10 +1,12 @@ mod conf; mod module; mod request; +mod request_context; mod status; mod upstream; pub use conf::*; pub use module::*; pub use request::*; +pub use request_context::*; pub use status::*; diff --git a/src/http/request.rs b/src/http/request.rs index 22201664..8c8db6cd 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -259,6 +259,14 @@ impl Request { unsafe { ctx.as_ref() } } + /// Get mutable Module context + pub fn get_module_ctx_mut(&mut self, module: &ngx_module_t) -> Option<&mut T> { + let ctx = self.get_module_ctx_ptr(module).cast::(); + // SAFETY: ctx is either NULL or allocated with ngx_p(c)alloc and + // explicitly initialized by the module + unsafe { ctx.as_mut() } + } + /// Sets the value as the module's context. /// /// See diff --git a/src/http/request_context.rs b/src/http/request_context.rs new file mode 100644 index 00000000..3c0298f1 --- /dev/null +++ b/src/http/request_context.rs @@ -0,0 +1,42 @@ +use crate::http::{HttpModule, Request}; +use crate::ngx_log_debug_http; + +/// A trait for managing request-specific context data. +pub trait RequestContext: Sized { + /// Creates a new context and associates it with the given request. + /// No check is performed to see if a context already exists. + fn create(request: &mut Request, value: Self) -> Option<&mut Self> { + // `Pool::allocate()` adds pool cleanup handler to drop the allocated object + // when the pool is destroyed. + let ctx_ptr = request.pool().allocate(value); + if !ctx_ptr.is_null() { + request.set_module_ctx(ctx_ptr as _, Module::module()); + } + + unsafe { ctx_ptr.as_mut() } + } + + /// Removes the context associated with the given request. + fn remove(request: &mut Request) { + if let Some(ctx_ptr) = request.get_module_ctx::(Module::module()) { + request.set_module_ctx(core::ptr::null_mut(), Module::module()); + unsafe { request.pool().remove(ctx_ptr as *const Self) }; + ngx_log_debug_http!(request, "RequestContext removed from request"); + } + } + + /// Retrieves an immutable reference to the context associated with the given request. + fn get(request: &Request) -> Option<&Self> { + request.get_module_ctx::(Module::module()) + } + + /// Retrieves a mutable reference to the context associated with the given request. + fn get_mut(request: &mut Request) -> Option<&mut Self> { + request.get_module_ctx_mut::(Module::module()) + } + + /// Checks if a context is associated with the given request. + fn exists(request: &Request) -> bool { + request.get_module_ctx::(Module::module()).is_some() + } +} From ae4ea939abb1190955a21aee5787c71199cf657c Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Fri, 9 Jan 2026 15:27:19 -0800 Subject: [PATCH 4/6] feat: typed storage in pool --- src/core/pool.rs | 144 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 27 deletions(-) diff --git a/src/core/pool.rs b/src/core/pool.rs index be53f67d..8da67b63 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -5,7 +5,7 @@ use core::ptr::{self, NonNull}; use nginx_sys::{ ngx_buf_t, ngx_create_temp_buf, ngx_palloc, ngx_pcalloc, ngx_pfree, ngx_pmemalign, ngx_pnalloc, - ngx_pool_cleanup_add, ngx_pool_t, NGX_ALIGNMENT, + ngx_pool_cleanup_add, ngx_pool_cleanup_t, ngx_pool_t, NGX_ALIGNMENT, }; use crate::allocator::{dangling_for_layout, AllocError, Allocator}; @@ -133,6 +133,11 @@ impl AsMut for Pool { } } +// Wrapper to create an unique value type +struct Item { + value: T, +} + impl Pool { /// Creates a new `Pool` from an `ngx_pool_t` pointer. /// @@ -203,29 +208,65 @@ impl Pool { Some(MemoryBuffer::from_ngx_buf(buf)) } - /// Adds a cleanup handler for a value in the memory pool. + /// Allocates memory for a value and adds a cleanup handler for it in the memory pool. /// - /// Returns `Ok(())` if the cleanup handler is successfully added, or `Err(())` if the cleanup - /// handler cannot be added. + /// Returns `Some(NonNull)` if the allocation and cleanup handler addition are successful, + /// or `None` if allocation fails. /// /// # Safety /// This function is marked as unsafe because it involves raw pointer manipulation. - unsafe fn add_cleanup_for_value(&self, value: *mut T) -> Result<(), ()> { - let cln = ngx_pool_cleanup_add(self.0.as_ptr(), 0); + unsafe fn allocate_with_cleanup(&self, value: T) -> Option> { + let cln = ngx_pool_cleanup_add(self.0.as_ptr(), mem::size_of::()); if cln.is_null() { - return Err(()); + return None; } (*cln).handler = Some(cleanup_type::); - (*cln).data = value as *mut c_void; + // `data` points to the memory allocated for the value by `ngx_pool_cleanup_add()` + ptr::write((*cln).data as *mut T, value); + NonNull::new((*cln).data as *mut T) + } - Ok(()) + /// Allocates memory for a value and adds a cleanup handler for it in the memory pool. + /// Memory is not allocated if the value of the same type already exists in the pool. + /// + /// Returns `Some(NonNull)` if the allocation and cleanup handler addition are successful, + /// or `None` if allocation fails or a cleanup handler for the value type already exists. + /// + /// # Safety + /// This function is marked as unsafe because it involves raw pointer manipulation. + unsafe fn allocate_with_cleanup_unique(&self, value: T) -> Option> { + if self.cleanup_lookup::(None).is_some() { + return None; + } + self.allocate_with_cleanup(value) } - /// Removes a cleanup handler for a value in the memory pool. - unsafe fn remove_cleanup_for_value( + /// Runs a cleanup handler for a value in the memory pool and removes it. + /// + /// Returns `true` if a cleanup handler was found and removed, or `false` otherwise. + /// + /// # Safety + /// This function is marked as unsafe because it involves raw pointer manipulation. + unsafe fn remove_cleanup(&self, value: Option<*const T>) -> Option<()> { + self.cleanup_lookup::(value).map(|mut cln| { + let cln = cln.as_mut(); + cln.handler.take().inspect(|handler| { + handler(cln.data); + }); + cln.data = core::ptr::null_mut(); + }) + } + + /// Looks up a cleanup handler for a value in the memory pool. + /// + /// Returns `Some(NonNull)` if a cleanup handler is found, or `None` otherwise. + /// + /// # Safety + /// This function is marked as unsafe because it involves raw pointer manipulation. + unsafe fn cleanup_lookup( &self, - value: *const T, - ) -> Option { + value: Option<*const T>, + ) -> Option> { let mut cln = (*self.0.as_ptr()).cleanup; while !cln.is_null() { @@ -234,8 +275,10 @@ impl Pool { // handler. #[allow(unknown_lints)] #[allow(unpredictable_function_pointer_comparisons)] - if (*cln).data == value as *mut c_void && (*cln).handler == Some(cleanup_type::) { - return (*cln).handler.take(); + if (*cln).handler == Some(cleanup_type::) + && (value.is_none() || (*cln).data == value.unwrap() as *mut c_void) + { + return NonNull::new(cln); } cln = (*cln).next; } @@ -294,26 +337,71 @@ impl Pool { /// /// Returns a typed pointer to the allocated memory if successful, or a null pointer if /// allocation or cleanup handler addition fails. - pub fn allocate(&self, value: T) -> *mut T { + pub fn allocate(&self, value: T) -> *mut T { unsafe { - let p = self.alloc(mem::size_of::()) as *mut T; - ptr::write(p, value); - if self.add_cleanup_for_value(p).is_err() { - ptr::drop_in_place(p); - return ptr::null_mut(); - }; - p + match self.allocate_with_cleanup(value) { + None => ptr::null_mut(), + Some(mut ptr) => ptr.as_mut(), + } + } + } + + /// Allocates memory for a value of a specified type and adds a cleanup handler to the memory + /// pool. Allocation is unique for the value type. + /// + /// Returns a typed pointer to the allocated memory if successful, or `None` if + /// allocation or cleanup handler addition fails. + pub fn allocate_unique(&mut self, value: T) -> Option<&mut T> { + unsafe { + self.allocate_with_cleanup_unique(Item { value }) + .map(|mut ptr| &mut ptr.as_mut().value) + } + } + + /// Gets the value of a specified type from the memory pool. This value must be allocated with + /// [`Pool::allocate_unique`]. + /// + /// Returns a reference to the value if found, or `None` if not found. + pub fn get_unique(&self) -> Option<&T> { + unsafe { + self.cleanup_lookup::>(None).map(|cln| { + let item = cln.as_ref().data as *const Item; + &(*item).value + }) + } + } + + /// Gets a mutable reference to the value of a specified type from the memory pool. + /// This value must be allocated with [`Pool::allocate_unique`]. + /// + /// Returns a mutable reference to the value if found, or `None` if not found. + pub fn get_unique_mut(&mut self) -> Option<&mut T> { + unsafe { + self.cleanup_lookup::>(None).map(|cln| { + let item = cln.as_ref().data as *mut Item; + &mut (*item).value + }) } } /// Runs the cleanup handler for a value and removes it. /// + /// Returns `Some(())` if the value was successfully removed, + /// or `None` if the value was not found. + /// /// # Safety /// The caller must ensure that `value` is a valid pointer to a value that has an /// associated cleanup handler in the pool. - pub unsafe fn remove(&self, value: *const T) { - self.remove_cleanup_for_value(value) - .inspect(|cleanup| cleanup(value as _)); + pub unsafe fn remove(&self, value: *const T) -> Option<()> { + self.remove_cleanup(Some(value)) + } + + /// Runs the cleanup handler for a unique value and removes it. + /// + /// Returns `Some(())` if the value was successfully removed, + /// or `None` if the value was not found. + pub fn remove_unique(&self) -> Option<()> { + unsafe { self.remove_cleanup::>(None) } } /// Resizes a memory allocation in place if possible. @@ -362,5 +450,7 @@ impl Pool { /// /// * `data` - A raw pointer to the value of type `T` to be cleaned up. unsafe extern "C" fn cleanup_type(data: *mut c_void) { - ptr::drop_in_place(data as *mut T); + if !data.is_null() { + ptr::drop_in_place(data as *mut T); + } } From eededf338bfdcd347008c3105a19de807f97dc2d Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Thu, 8 Jan 2026 11:10:27 -0800 Subject: [PATCH 5/6] feat: async phase handler --- Cargo.lock | 105 ++++++++++++++++++++++++++ Cargo.toml | 1 + examples/Cargo.toml | 10 +++ examples/async_request.rs | 64 ++++++++++++++++ src/http/async_request.rs | 151 ++++++++++++++++++++++++++++++++++++++ src/http/mod.rs | 6 ++ src/http/request.rs | 23 ++++++ 7 files changed, 360 insertions(+) create mode 100644 examples/async_request.rs create mode 100644 src/http/async_request.rs diff --git a/Cargo.lock b/Cargo.lock index fb2f523d..d7d17d4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,8 +256,11 @@ dependencies = [ name = "examples" version = "0.0.0" dependencies = [ + "allocator-api2", + "async-task", "aws-sign-v4", "chrono", + "futures", "http", "idna_adapter", "libc", @@ -309,6 +312,95 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -548,6 +640,7 @@ version = "0.5.0" dependencies = [ "allocator-api2", "async-task", + "futures", "lock_api", "nginx-sys", "pin-project-lite", @@ -624,6 +717,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "prettyplease" version = "0.2.37" @@ -892,6 +991,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo.toml b/Cargo.toml index 63b10f0a..86bde949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ targets = [] [dependencies] allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] } async-task = { version = "4.7.1", optional = true } +futures = "0.3" lock_api = "0.4.13" nginx-sys = { path = "nginx-sys", version = "0.5.0"} pin-project-lite = { version = "0.2.16", optional = true } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 9006aba7..e257166e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -13,6 +13,9 @@ autobins = false build = "../build.rs" [dependencies] +allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] } +async-task = { version = "4.7.1" } +futures = "0.3" nginx-sys = { path = "../nginx-sys/", default-features = false } ngx = { path = "../", default-features = false, features = ["std"] } @@ -51,6 +54,12 @@ name = "async" path = "async.rs" crate-type = ["cdylib"] +[[example]] +name = "async_request" +path = "async_request.rs" +crate-type = ["cdylib"] +required-features = ["async"] + [[example]] name = "shared_dict" path = "shared_dict.rs" @@ -63,5 +72,6 @@ default = ["export-modules", "ngx/vendored"] # outside of the NGINX buildsystem. However, cargo currently does not detect # this configuration automatically. # See https://github.com/rust-lang/rust/issues/20267 +async = ["ngx/async"] export-modules = [] linux = [] diff --git a/examples/async_request.rs b/examples/async_request.rs new file mode 100644 index 00000000..11f016dc --- /dev/null +++ b/examples/async_request.rs @@ -0,0 +1,64 @@ +use ngx::http::{ + add_phase_handler, AsyncHandler, HttpModule, HttpPhase, Request, +}; +use ngx::{async_ as ngx_async, ngx_log_debug_http, ngx_log_error}; + +use nginx_sys::ngx_int_t; + +struct SampleAsyncHandler; + +impl AsyncHandler for SampleAsyncHandler { + const PHASE: HttpPhase = HttpPhase::Access; + type Module = Module; + type ReturnType = ngx_int_t; + + async fn worker(request: &mut Request) -> Self::ReturnType { + ngx_log_debug_http!(request, "worker started"); + ngx_async::sleep(core::time::Duration::from_secs(2)).await; + ngx_log_error!( + nginx_sys::NGX_LOG_INFO, + request.log(), + "Async handler after timeout", + ); + nginx_sys::NGX_OK as _ + } +} + +static NGX_HTTP_ASYNC_REQUEST_MODULE_CTX: nginx_sys::ngx_http_module_t = + nginx_sys::ngx_http_module_t { + preconfiguration: None, + postconfiguration: Some(Module::postconfiguration), + create_main_conf: None, + init_main_conf: None, + create_srv_conf: None, + merge_srv_conf: None, + create_loc_conf: None, + merge_loc_conf: None, + }; + +#[cfg(feature = "export-modules")] +ngx::ngx_modules!(ngx_http_async_request_module); + +#[used] +#[allow(non_upper_case_globals)] +#[cfg_attr(not(feature = "export-modules"), no_mangle)] +pub static mut ngx_http_async_request_module: nginx_sys::ngx_module_t = nginx_sys::ngx_module_t { + ctx: core::ptr::addr_of!(NGX_HTTP_ASYNC_REQUEST_MODULE_CTX) as _, + type_: nginx_sys::NGX_HTTP_MODULE as _, + ..nginx_sys::ngx_module_t::default() +}; + +struct Module; + +impl HttpModule for Module { + fn module() -> &'static nginx_sys::ngx_module_t { + unsafe { &*::core::ptr::addr_of!(ngx_http_async_request_module) } + } + + unsafe extern "C" fn postconfiguration(cf: *mut nginx_sys::ngx_conf_t) -> ngx_int_t { + // SAFETY: this function is called with non-NULL cf always + let cf = unsafe { &mut *cf }; + add_phase_handler::(cf) + .map_or(nginx_sys::NGX_ERROR as _, |_| nginx_sys::NGX_OK as _) + } +} diff --git a/src/http/async_request.rs b/src/http/async_request.rs new file mode 100644 index 00000000..c99b282b --- /dev/null +++ b/src/http/async_request.rs @@ -0,0 +1,151 @@ +use core::fmt::Display; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use crate::core::type_storage::*; +use crate::http::{HttpHandlerReturn, HttpModule, HttpPhase, HttpRequestHandler, Request}; +use crate::{async_ as ngx_async, ngx_log_debug_http}; + +use crate::ffi::{ngx_http_request_t, ngx_int_t, ngx_post_event, ngx_posted_events}; + +use pin_project_lite::*; + +/// An asynchronous HTTP request handler trait. +pub trait AsyncHandler { + /// The phase in which the handler will be executed. + const PHASE: HttpPhase; + /// The associated HTTP module type. + type Module: HttpModule; + /// The return type of the asynchronous worker function. + type ReturnType: HttpHandlerReturn; + /// The asynchronous worker function to be implemented. + fn worker(request: &mut Request) -> impl Future; +} + +const fn async_phase(phase: HttpPhase) -> HttpPhase { + assert!( + !matches!(phase, HttpPhase::Content), + "Content phase is not supported" + ); + phase +} + +/// An error type for asynchronous handler operations. +#[derive(Debug)] +pub enum AsyncHandlerError { + /// Indicates that the context creation failed. + ContextCreationFailed, + /// Indicates that there is no async launcher available. + NoAsyncLauncher, +} + +impl Display for AsyncHandlerError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + AsyncHandlerError::ContextCreationFailed => { + write!(f, "AsyncHandler: Context creation failed") + } + AsyncHandlerError::NoAsyncLauncher => { + write!(f, "AsyncHandler: No async launcher available") + } + } + } +} + +impl HttpRequestHandler for AH +where + AH: AsyncHandler + 'static, +{ + const PHASE: HttpPhase = async_phase(AH::PHASE); + type ReturnType = Result; + + fn handler(request: &mut Request) -> Self::ReturnType { + let mut pool = request.pool(); + let mut ctx = ::get_mut(&mut pool); + #[allow(clippy::manual_inspect)] + if ctx.is_none() { + ctx = + ::add(AsyncRequestContext::default(), &mut pool) + .map(|ctx| { + let request_ptr: *mut ngx_http_request_t = request.as_mut() as *mut _ as _; + ctx.launcher = Some(ngx_async::spawn(handler_future::(request_ptr))); + ctx + }) + }; + + let ctx = ctx.ok_or(AsyncHandlerError::ContextCreationFailed)?; + + if ctx.launcher.is_none() { + Err(AsyncHandlerError::NoAsyncLauncher) + } else if ctx.launcher.as_ref().unwrap().is_finished() { + let rc = futures::executor::block_on(ctx.launcher.take().unwrap()); + ngx_log_debug_http!(request, "handler_wrapper: task joined; rc = {}", rc); + ::delete(&pool); + Ok(rc) + } else { + ngx_log_debug_http!(request, "handler_wrapper: running"); + Ok(nginx_sys::NGX_AGAIN as _) + } + } +} + +#[derive(Default)] +struct AsyncRequestContext { + launcher: Option>, +} + +pin_project! { + struct HandlerFuture + where + Fut: Future, + { + #[pin] + worker_fut: Fut, + request: *const ngx_http_request_t, + } +} + +fn handler_future(request: *mut ngx_http_request_t) -> impl Future +where + AH: AsyncHandler, +{ + let fut = async move { + let request = unsafe { Request::from_ngx_http_request(request) }; + AH::worker(request).await.into_ngx_int_t(request) + }; + + HandlerFuture::<_> { + worker_fut: fut, + request, + } +} + +impl Future for HandlerFuture +where + Fut: Future, +{ + type Output = ngx_int_t; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let request = unsafe { Request::from_const_ngx_http_request(*this.request) }; + + match this.worker_fut.poll(cx) { + Poll::Pending => { + ngx_log_debug_http!(request, "HandlerFuture: pending"); + Poll::Pending + } + Poll::Ready(rc) => { + unsafe { + ngx_post_event( + (*request.connection()).write, + core::ptr::addr_of_mut!(ngx_posted_events), + ) + }; + ngx_log_debug_http!(request, "HandlerFuture: ready"); + Poll::Ready(rc) + } + } + } +} diff --git a/src/http/mod.rs b/src/http/mod.rs index 4c41fa8f..ad492e55 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "async")] +mod async_request; + mod conf; mod module; mod request; @@ -5,6 +8,9 @@ mod request_context; mod status; mod upstream; +#[cfg(feature = "async")] +pub use async_request::*; + pub use conf::*; pub use module::*; pub use request::*; diff --git a/src/http/request.rs b/src/http/request.rs index 8c8db6cd..6dbf0f36 100644 --- a/src/http/request.rs +++ b/src/http/request.rs @@ -206,6 +206,16 @@ impl Request { &mut *r.cast::() } + /// Create a const [`Request`] from a const [`ngx_http_request_t`]. + /// + /// # Safety + /// + /// The caller has provided a valid non-null pointer to a valid `ngx_http_request_t` + /// which shares the same representation as `Request`. + pub unsafe fn from_const_ngx_http_request<'a>(r: *const ngx_http_request_t) -> &'a Request { + &*r.cast::() + } + /// Is this the main request (as opposed to a subrequest)? pub fn is_main(&self) -> bool { let main = self.0.main.cast(); @@ -311,6 +321,11 @@ impl Request { } } + /// Get HTTP status of response. + pub fn get_status(&self) -> HTTPStatus { + HTTPStatus::from_u16(self.0.headers_out.status as u16).unwrap_or(HTTPStatus(0)) + } + /// Set HTTP status of response. pub fn set_status(&mut self, status: HTTPStatus) { self.0.headers_out.status = status.into(); @@ -380,6 +395,14 @@ impl Request { unsafe { Status(ngx_http_output_filter(&mut self.0, body)) } } + /// Get the output chain buffer. + pub fn get_out(&self) -> Option<&ngx_chain_t> { + if self.0.out.is_null() { + return None; + } + unsafe { Some(&*self.0.out) } + } + /// Perform internal redirect to a location pub fn internal_redirect(&self, location: &str) -> Status { assert!(!location.is_empty(), "uri location is empty"); From 27338b62c3caf912edbf2e555fab45711a37a111 Mon Sep 17 00:00:00 2001 From: Evgeny Shirykalov Date: Thu, 8 Jan 2026 17:01:52 -0800 Subject: [PATCH 6/6] feat: async subrequest --- .github/workflows/nginx.yaml | 1 + examples/async_request.rs | 204 ++++++++++++++++++++++++++++---- examples/config | 11 ++ examples/t/async_request.t | 66 +++++++++++ src/http/async_request.rs | 221 +++++++++++++++++++++++++++++++++-- 5 files changed, 469 insertions(+), 34 deletions(-) create mode 100644 examples/t/async_request.t diff --git a/.github/workflows/nginx.yaml b/.github/workflows/nginx.yaml index 55f1345f..c3239cec 100644 --- a/.github/workflows/nginx.yaml +++ b/.github/workflows/nginx.yaml @@ -50,6 +50,7 @@ env: NGX_TEST_FILES: examples/t NGX_TEST_GLOBALS_DYNAMIC: >- load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_module.so; + load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_request_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_awssigv4_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_curl_module.so; load_module ${{ github.workspace }}/nginx/objs/ngx_http_shared_dict_module.so; diff --git a/examples/async_request.rs b/examples/async_request.rs index 11f016dc..a616b7d3 100644 --- a/examples/async_request.rs +++ b/examples/async_request.rs @@ -1,40 +1,142 @@ +use std::ffi::{c_char, c_void}; + use ngx::http::{ - add_phase_handler, AsyncHandler, HttpModule, HttpPhase, Request, + add_phase_handler, AsyncHandler, AsyncSubRequestBuilder, AsyncSubRequestError, HttpModule, + HttpModuleLocationConf, HttpPhase, Merge, MergeConfigError, Request, }; -use ngx::{async_ as ngx_async, ngx_log_debug_http, ngx_log_error}; +use ngx::{async_ as ngx_async, ngx_conf_log_error, ngx_log_debug_http, ngx_log_error}; -use nginx_sys::ngx_int_t; +use nginx_sys::{ + ngx_command_t, ngx_conf_t, ngx_http_complex_value_t, ngx_http_module_t, ngx_http_request_t, + ngx_http_send_response, ngx_int_t, ngx_module_t, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1, + NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, +}; struct SampleAsyncHandler; +enum SampleAsyncHandlerError { + SubrequestCreationFailed(AsyncSubRequestError), + SubrequestFailed(ngx_int_t), +} + +impl std::fmt::Display for SampleAsyncHandlerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SampleAsyncHandlerError::SubrequestCreationFailed(e) => { + write!(f, "Subrequest creation failed: {}", e) + } + SampleAsyncHandlerError::SubrequestFailed(rc) => { + write!(f, "Subrequest failed with return code: {}", rc) + } + } + } +} + +impl From for SampleAsyncHandlerError { + fn from(err: AsyncSubRequestError) -> Self { + SampleAsyncHandlerError::SubrequestCreationFailed(err) + } +} + +impl From for SampleAsyncHandlerError { + fn from(rc: ngx_int_t) -> Self { + SampleAsyncHandlerError::SubrequestFailed(rc) + } +} + impl AsyncHandler for SampleAsyncHandler { const PHASE: HttpPhase = HttpPhase::Access; type Module = Module; - type ReturnType = ngx_int_t; + type ReturnType = Result; async fn worker(request: &mut Request) -> Self::ReturnType { ngx_log_debug_http!(request, "worker started"); + + let co = Module::location_conf(request).expect("module config is none"); + ngx_log_debug_http!(request, "async_request module enabled: {}", co.enable); + + if !co.enable { + return Ok(nginx_sys::NGX_DECLINED as _); + } + + let log = request.log(); + let request_ptr: *mut ngx_http_request_t = request.as_mut(); + + let fut = AsyncSubRequestBuilder::new("/proxy") + //.args("arg1=val1&arg2=val2") + .in_memory() + .waited() + .build(request)?; + + let subrc = fut.await; + + ngx_log_error!(nginx_sys::NGX_LOG_INFO, log, "Subrequest rc {}", subrc.0); + + if subrc.0 != nginx_sys::NGX_OK as _ || subrc.1.is_none() { + return Err(SampleAsyncHandlerError::from(subrc.0)); + } + + let sr = subrc.1.unwrap(); + + ngx_log_error!( + nginx_sys::NGX_LOG_INFO, + log, + "Subrequest status: {:?}", + sr.get_status() + ); + ngx_async::sleep(core::time::Duration::from_secs(2)).await; + + let mut resp_len: usize = 0; + + let mut rc = nginx_sys::NGX_OK as ngx_int_t; + + if let Some(out) = sr.get_out() { + if !out.buf.is_null() { + let b = unsafe { &*out.buf }; + resp_len = unsafe { b.last.offset_from(b.pos) } as usize; + + let sr_ptr: *const ngx_http_request_t = sr.as_ref(); + + let mut ct: ngx_str_t = (unsafe { *sr_ptr }).headers_out.content_type; + + let mut cv: ngx_http_complex_value_t = unsafe { core::mem::zeroed() }; + cv.value = ngx_str_t { + len: resp_len as _, + data: b.pos as _, + }; + + rc = unsafe { + ngx_http_send_response(request_ptr, sr.get_status().0, &mut ct, &mut cv) + }; + + if rc == nginx_sys::NGX_OK as _ { + rc = nginx_sys::NGX_HTTP_OK as _; + } + } + } + ngx_log_error!( nginx_sys::NGX_LOG_INFO, - request.log(), - "Async handler after timeout", + log, + "Async handler after timeout; subrequest response length: {}", + resp_len ); - nginx_sys::NGX_OK as _ + + Ok(rc) } } -static NGX_HTTP_ASYNC_REQUEST_MODULE_CTX: nginx_sys::ngx_http_module_t = - nginx_sys::ngx_http_module_t { - preconfiguration: None, - postconfiguration: Some(Module::postconfiguration), - create_main_conf: None, - init_main_conf: None, - create_srv_conf: None, - merge_srv_conf: None, - create_loc_conf: None, - merge_loc_conf: None, - }; +static NGX_HTTP_ASYNC_REQUEST_MODULE_CTX: ngx_http_module_t = ngx_http_module_t { + preconfiguration: None, + postconfiguration: Some(Module::postconfiguration), + create_main_conf: None, + init_main_conf: None, + create_srv_conf: None, + merge_srv_conf: None, + create_loc_conf: Some(Module::create_loc_conf), + merge_loc_conf: Some(Module::merge_loc_conf), +}; #[cfg(feature = "export-modules")] ngx::ngx_modules!(ngx_http_async_request_module); @@ -42,23 +144,81 @@ ngx::ngx_modules!(ngx_http_async_request_module); #[used] #[allow(non_upper_case_globals)] #[cfg_attr(not(feature = "export-modules"), no_mangle)] -pub static mut ngx_http_async_request_module: nginx_sys::ngx_module_t = nginx_sys::ngx_module_t { +pub static mut ngx_http_async_request_module: ngx_module_t = ngx_module_t { ctx: core::ptr::addr_of!(NGX_HTTP_ASYNC_REQUEST_MODULE_CTX) as _, + commands: unsafe { &NGX_HTTP_ASYNC_REQUEST_COMMANDS[0] as *const _ as *mut _ }, type_: nginx_sys::NGX_HTTP_MODULE as _, - ..nginx_sys::ngx_module_t::default() + ..ngx_module_t::default() }; struct Module; impl HttpModule for Module { - fn module() -> &'static nginx_sys::ngx_module_t { + fn module() -> &'static ngx_module_t { unsafe { &*::core::ptr::addr_of!(ngx_http_async_request_module) } } - unsafe extern "C" fn postconfiguration(cf: *mut nginx_sys::ngx_conf_t) -> ngx_int_t { + unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t { // SAFETY: this function is called with non-NULL cf always let cf = unsafe { &mut *cf }; add_phase_handler::(cf) .map_or(nginx_sys::NGX_ERROR as _, |_| nginx_sys::NGX_OK as _) } } + +#[derive(Debug, Default)] +struct ModuleConfig { + enable: bool, +} + +unsafe impl HttpModuleLocationConf for Module { + type LocationConf = ModuleConfig; +} + +impl Merge for ModuleConfig { + fn merge(&mut self, prev: &ModuleConfig) -> Result<(), MergeConfigError> { + if prev.enable { + self.enable = true; + }; + Ok(()) + } +} + +static mut NGX_HTTP_ASYNC_REQUEST_COMMANDS: [ngx_command_t; 2] = [ + ngx_command_t { + name: ngx::ngx_string!("async_request"), + type_: (NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1) as ngx_uint_t, + set: Some(ngx_http_async_request_commands_set_enable), + conf: NGX_HTTP_LOC_CONF_OFFSET, + offset: 0, + post: std::ptr::null_mut(), + }, + ngx_command_t::empty(), +]; + +extern "C" fn ngx_http_async_request_commands_set_enable( + cf: *mut ngx_conf_t, + _cmd: *mut ngx_command_t, + conf: *mut c_void, +) -> *mut c_char { + unsafe { + let conf = &mut *(conf as *mut ModuleConfig); + let args: &[ngx_str_t] = (*(*cf).args).as_slice(); + + conf.enable = match args[1].to_str() { + Err(_) => false, + Ok(s) if s.len() == 2 && s.eq_ignore_ascii_case("on") => true, + Ok(s) if s.len() == 3 && s.eq_ignore_ascii_case("off") => false, + _ => { + ngx_conf_log_error!( + nginx_sys::NGX_LOG_EMERG, + cf, + "`async_request` argument must be 'on' or 'off'" + ); + return ngx::core::NGX_CONF_ERROR; + } + }; + } + + ngx::core::NGX_CONF_OK +} diff --git a/examples/config b/examples/config index 6b763652..eee01ff8 100644 --- a/examples/config +++ b/examples/config @@ -23,6 +23,17 @@ if [ $HTTP = YES ]; then ngx_rust_module fi + if :; then + ngx_module_name=ngx_http_async_request_module + ngx_module_libs="-lm" + ngx_rust_target_name=async_request + ngx_rust_target_features=async + + ngx_rust_module + + ngx_rust_target_features= + fi + if :; then ngx_module_name=ngx_http_awssigv4_module ngx_module_libs="-lm" diff --git a/examples/t/async_request.t b/examples/t/async_request.t new file mode 100644 index 00000000..3d09ddd5 --- /dev/null +++ b/examples/t/async_request.t @@ -0,0 +1,66 @@ +#!/usr/bin/perl + +# (C) Nginx, Inc + +# Tests for ngx-rust example modules. + +############################################################################### + +use warnings; +use strict; + +use Test::More; + +BEGIN { use FindBin; chdir($FindBin::Bin); } + +use lib 'lib'; +use Test::Nginx; + +############################################################################### + +select STDERR; $| = 1; +select STDOUT; $| = 1; + +my $t = Test::Nginx->new()->has(qw/http/)->plan(1) + ->write_file_expand('nginx.conf', <<"EOF"); + +%%TEST_GLOBALS%% + +daemon off; + +events { +} + +http { + %%TEST_GLOBALS_HTTP%% + + server { + listen 127.0.0.1:8080; + server_name localhost; + + location / { + async_request on; + } + + location /proxy { + internal; + proxy_pass http://127.0.0.1:8081; + } + } + + server { + listen 127.0.0.1:8081; + server_name localhost; + + location / { + return 200 'Hello from backend'; + } + } +} + +EOF + +$t->write_file('index.html', ''); +$t->run(); + +like(http_get('/'), qr/200 OK.*Hello from backend/s, 'async subrequest works'); diff --git a/src/http/async_request.rs b/src/http/async_request.rs index c99b282b..83ac81b4 100644 --- a/src/http/async_request.rs +++ b/src/http/async_request.rs @@ -1,14 +1,19 @@ +use core::ffi::c_void; use core::fmt::Display; use core::future::Future; use core::pin::Pin; -use core::task::{Context, Poll}; +use core::task::{Context, Poll, Waker}; -use crate::core::type_storage::*; +use crate::core::Pool; use crate::http::{HttpHandlerReturn, HttpModule, HttpPhase, HttpRequestHandler, Request}; -use crate::{async_ as ngx_async, ngx_log_debug_http}; +use crate::log::ngx_cycle_log; +use crate::{async_ as ngx_async, ngx_log_debug_http, ngx_log_error}; use crate::ffi::{ngx_http_request_t, ngx_int_t, ngx_post_event, ngx_posted_events}; +use alloc::string::String; +use allocator_api2::boxed::Box; +use nginx_sys::{ngx_http_post_subrequest_t, ngx_str_t, ngx_uint_t}; use pin_project_lite::*; /// An asynchronous HTTP request handler trait. @@ -38,6 +43,8 @@ pub enum AsyncHandlerError { ContextCreationFailed, /// Indicates that there is no async launcher available. NoAsyncLauncher, + /// Indicates that the context deletion failed. + ContextDeletionFailed, } impl Display for AsyncHandlerError { @@ -49,6 +56,9 @@ impl Display for AsyncHandlerError { AsyncHandlerError::NoAsyncLauncher => { write!(f, "AsyncHandler: No async launcher available") } + AsyncHandlerError::ContextDeletionFailed => { + write!(f, "AsyncHandler: Context deletion failed") + } } } } @@ -62,16 +72,16 @@ where fn handler(request: &mut Request) -> Self::ReturnType { let mut pool = request.pool(); - let mut ctx = ::get_mut(&mut pool); + let mut ctx = pool.get_unique_mut::(); #[allow(clippy::manual_inspect)] if ctx.is_none() { - ctx = - ::add(AsyncRequestContext::default(), &mut pool) - .map(|ctx| { - let request_ptr: *mut ngx_http_request_t = request.as_mut() as *mut _ as _; - ctx.launcher = Some(ngx_async::spawn(handler_future::(request_ptr))); - ctx - }) + ctx = pool + .allocate_unique(AsyncRequestContext::default()) + .map(|ctx| { + let request_ptr: *mut ngx_http_request_t = request.as_mut() as *mut _ as _; + ctx.launcher = Some(ngx_async::spawn(handler_future::(request_ptr))); + ctx + }) }; let ctx = ctx.ok_or(AsyncHandlerError::ContextCreationFailed)?; @@ -81,7 +91,8 @@ where } else if ctx.launcher.as_ref().unwrap().is_finished() { let rc = futures::executor::block_on(ctx.launcher.take().unwrap()); ngx_log_debug_http!(request, "handler_wrapper: task joined; rc = {}", rc); - ::delete(&pool); + pool.remove_unique::() + .ok_or(AsyncHandlerError::ContextDeletionFailed)?; Ok(rc) } else { ngx_log_debug_http!(request, "handler_wrapper: running"); @@ -149,3 +160,189 @@ where } } } + +/// A builder for creating asynchronous subrequests. +#[derive(Default)] +pub struct AsyncSubRequestBuilder { + uri: String, + args: Option, + flags: ngx_uint_t, +} + +/// An error type for asynchronous subrequest operations. +#[derive(Debug)] +pub enum AsyncSubRequestError { + /// Indicates that the subrequest allocation failed. + RequestAllocFailed, + /// Indicates that the post subrequest allocation failed. + PostRequestAllocFailed, + /// Indicates that the URI allocation failed. + UriAllocFailed, + /// Indicates that the arguments allocation failed. + ArgsAllocFailed, + /// Indicates that the subrequest creation failed. + CreationFailed, +} + +impl Display for AsyncSubRequestError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + AsyncSubRequestError::RequestAllocFailed => write!(f, "Subrequest allocation failed"), + AsyncSubRequestError::PostRequestAllocFailed => { + write!(f, "Post subrequest allocation failed") + } + AsyncSubRequestError::UriAllocFailed => write!(f, "URI allocation failed"), + AsyncSubRequestError::ArgsAllocFailed => write!(f, "Arguments allocation failed"), + AsyncSubRequestError::CreationFailed => write!(f, "Subrequest creation failed"), + } + } +} + +impl AsyncSubRequestBuilder { + /// Creates a new `AsyncSubRequestBuilder` with the specified URI. + pub fn new>(uri: S) -> Self { + Self { + uri: uri.into(), + ..Default::default() + } + } + + /// Sets the arguments for the subrequest. + pub fn args>(mut self, args: S) -> Self { + self.args = Some(args.into()); + self + } + + /// Sets the subrequest to be in-memory. + pub fn in_memory(mut self) -> Self { + self.flags |= nginx_sys::NGX_HTTP_SUBREQUEST_IN_MEMORY as ngx_uint_t; + self + } + + /// Sets the subrequest to be waited. + pub fn waited(mut self) -> Self { + self.flags |= nginx_sys::NGX_HTTP_SUBREQUEST_WAITED as ngx_uint_t; + self + } + + /// Sets the subrequest to be a background request. + pub fn background(mut self) -> Self { + self.flags |= nginx_sys::NGX_HTTP_SUBREQUEST_BACKGROUND as ngx_uint_t; + self + } + + /// Builds and initiates the asynchronous subrequest. + pub fn build<'r>( + &self, + request: &'r mut Request, + ) -> Result, Pool>>, AsyncSubRequestError> { + let mut this = Box::::try_new_in(Default::default(), request.pool()) + .map_err(|_| AsyncSubRequestError::RequestAllocFailed)?; + + let mut uri = + unsafe { ngx_str_t::from_bytes(request.pool().as_ptr(), self.uri.as_bytes()) } + .ok_or(AsyncSubRequestError::UriAllocFailed)?; + + let mut sr_args: ngx_str_t; + let mut sr_args_ptr: *mut ngx_str_t = core::ptr::null_mut(); + + if let Some(args) = &self.args { + sr_args = unsafe { ngx_str_t::from_bytes(request.pool().as_ptr(), args.as_bytes()) } + .ok_or(AsyncSubRequestError::ArgsAllocFailed)?; + sr_args_ptr = &mut sr_args as *mut ngx_str_t; + } + + let mut psr = Box::try_new_in( + ngx_http_post_subrequest_t { + handler: Some(AsyncSubRequest::sr_handler), + data: core::ptr::null_mut(), + }, + request.pool(), + ) + .map_err(|_| AsyncSubRequestError::PostRequestAllocFailed)?; + + unsafe { + let mut sr_ptr: *mut ngx_http_request_t = core::ptr::null_mut(); + let rc = nginx_sys::ngx_http_subrequest( + request.as_mut() as *mut _ as _, + &mut uri, + sr_args_ptr, + &mut sr_ptr, + Box::as_mut_ptr(&mut psr), + self.flags as ngx_uint_t, + ); + + if rc != nginx_sys::NGX_OK as _ { + return Err(AsyncSubRequestError::CreationFailed); + } + + this.sr = Some(Request::from_ngx_http_request(sr_ptr)); + } + + let this = Box::into_pin(this); + + psr.data = this.as_ref().get_ref() as *const _ as *mut c_void; + + Ok(this) + } +} + +/// An asynchronous subrequest structure. +#[derive(Default)] +pub struct AsyncSubRequest<'sr> { + /// The subrequest reference. + pub sr: Option<&'sr mut Request>, + waker: Option, + rc: Option, +} + +impl<'sr> AsyncSubRequest<'sr> { + extern "C" fn sr_handler( + r: *mut ngx_http_request_t, + data: *mut c_void, + rc: ngx_int_t, + ) -> ngx_int_t { + let request = unsafe { Request::from_ngx_http_request(r) }; + ngx_log_debug_http!(request, "subrequest completed with rc = {}", rc); + + let this = unsafe { &mut *(data as *mut Self) }; + // ngx_log_debug_http!(request, "subrequest handler: at {:p} / {:p}", this, data); + this.rc = Some(rc); + if let Some(waker) = this.waker.take() { + ngx_log_debug_http!(request, "subrequest completed; call waker"); + waker.wake(); + } + rc + } +} + +impl<'sr> core::future::Future for AsyncSubRequest<'sr> { + type Output = (ngx_int_t, Option<&'sr Request>); + + fn poll( + self: Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + let this = self.get_mut(); + this.waker = Some(cx.waker().clone()); + + if this.sr.is_none() { + ngx_log_error!( + nginx_sys::NGX_LOG_ERR, + ngx_cycle_log().as_ptr(), + "Subrequest not created" + ); + return core::task::Poll::Ready((nginx_sys::NGX_ERROR as _, None)); + } + + if this.rc.is_none() { + // ngx_log_debug_http!(request, "subrequest poll: pending because rc is none"); + return core::task::Poll::Pending; + } + + // let request: &Request = unsafe { Request::from_ngx_http_request(this.sr.take().unwrap()) }; + let rc = this.rc.unwrap(); + // ngx_log_debug_http!(request, "subrequest poll: ready({rc})"); + core::task::Poll::Ready((rc, Some(this.sr.take().unwrap()))) + } +}