Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/nginx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ env:
NGX_TEST_FILES: examples/t
NGX_TEST_GLOBALS_DYNAMIC: >-
load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_module.so;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have native async support now, we should remove half-working ngx_http_async_module example and remove tokio dependency.

load_module ${{ github.workspace }}/nginx/objs/ngx_http_async_request_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_awssigv4_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_curl_module.so;
load_module ${{ github.workspace }}/nginx/objs/ngx_http_shared_dict_module.so;
Expand Down
105 changes: 105 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ targets = []
[dependencies]
allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] }
async-task = { version = "4.7.1", optional = true }
futures = "0.3"
lock_api = "0.4.13"
nginx-sys = { path = "nginx-sys", version = "0.5.0"}
pin-project-lite = { version = "0.2.16", optional = true }
Expand Down
10 changes: 10 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ autobins = false
build = "../build.rs"

[dependencies]
allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] }
async-task = { version = "4.7.1" }
futures = "0.3"
nginx-sys = { path = "../nginx-sys/", default-features = false }
ngx = { path = "../", default-features = false, features = ["std"] }

Expand Down Expand Up @@ -51,6 +54,12 @@ name = "async"
path = "async.rs"
crate-type = ["cdylib"]

[[example]]
name = "async_request"
path = "async_request.rs"
crate-type = ["cdylib"]
required-features = ["async"]

[[example]]
name = "shared_dict"
path = "shared_dict.rs"
Expand All @@ -63,5 +72,6 @@ default = ["export-modules", "ngx/vendored"]
# outside of the NGINX buildsystem. However, cargo currently does not detect
# this configuration automatically.
# See https://github.com/rust-lang/rust/issues/20267
async = ["ngx/async"]
export-modules = []
linux = []
133 changes: 65 additions & 68 deletions examples/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ use std::time::Instant;

use ngx::core;
use ngx::ffi::{
ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_handler_pt,
ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_event_t, ngx_http_module_t, ngx_int_t,
ngx_module_t, ngx_post_event, ngx_posted_events, ngx_posted_next_events, ngx_str_t, ngx_uint_t,
NGX_CONF_TAKE1, NGX_HTTP_LOC_CONF, NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_MODULE, NGX_LOG_EMERG,
};
use ngx::http::{self, HttpModule, MergeConfigError};
use ngx::http::{HttpModuleLocationConf, HttpModuleMainConf, NgxHttpCoreModule};
use ngx::{http_request_handler, ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use ngx::http::{self, HttpModule, HttpModuleLocationConf, HttpRequestHandler, MergeConfigError};
use ngx::{ngx_conf_log_error, ngx_log_debug_http, ngx_string};
use tokio::runtime::Runtime;

struct Module;
Expand All @@ -25,18 +23,10 @@ impl http::HttpModule for Module {

unsafe extern "C" fn postconfiguration(cf: *mut ngx_conf_t) -> ngx_int_t {
// SAFETY: this function is called with non-NULL cf always
let cf = &mut *cf;
let cmcf = NgxHttpCoreModule::main_conf_mut(cf).expect("http core main conf");

let h = ngx_array_push(
&mut cmcf.phases[ngx_http_phases_NGX_HTTP_ACCESS_PHASE as usize].handlers,
) as *mut ngx_http_handler_pt;
if h.is_null() {
return core::Status::NGX_ERROR.into();
}
// set an Access phase handler
*h = Some(async_access_handler);
core::Status::NGX_OK.into()
let cf = unsafe { &mut *cf };
http::add_phase_handler::<AsyncAccessHandler>(cf)
.map_or(core::Status::NGX_ERROR, |_| core::Status::NGX_OK)
.into()
}
}

Expand Down Expand Up @@ -139,63 +129,70 @@ impl Drop for RequestCTX {
}
}

http_request_handler!(async_access_handler, |request: &mut http::Request| {
let co = Module::location_conf(request).expect("module config is none");
struct AsyncAccessHandler;

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);
impl HttpRequestHandler for AsyncAccessHandler {
const PHASE: ngx::http::HttpPhase = ngx::http::HttpPhase::Access;
type ReturnType = core::Status;

if !co.enable {
return core::Status::NGX_DECLINED;
}
fn handler(request: &mut http::Request) -> Self::ReturnType {
let co = Module::location_conf(request).expect("module config is none");

ngx_log_debug_http!(request, "async module enabled: {}", co.enable);

if let Some(ctx) =
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
{
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
if !co.enable {
return core::Status::NGX_DECLINED;
}

return core::Status::NGX_OK;
}
if let Some(ctx) =
unsafe { request.get_module_ctx::<RequestCTX>(&*addr_of!(ngx_http_async_module)) }
{
if !ctx.done.load(Ordering::Relaxed) {
return core::Status::NGX_AGAIN;
}

let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
return core::Status::NGX_OK;
}

let ctx = request.pool().allocate(RequestCTX::default());
if ctx.is_null() {
return core::Status::NGX_ERROR;
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out(
"X-Async-Time",
start.elapsed().as_millis().to_string().as_str(),
);

done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
// and use the same trick as the thread pool)
}));

core::Status::NGX_AGAIN
}
request.set_module_ctx(ctx.cast(), unsafe { &*addr_of!(ngx_http_async_module) });

let ctx = unsafe { &mut *ctx };
ctx.event.handler = Some(check_async_work_done);
ctx.event.data = request.connection().cast();
ctx.event.log = unsafe { (*request.connection()).log };
unsafe { ngx_post_event(&mut ctx.event, addr_of_mut!(ngx_posted_next_events)) };

// Request is no longer needed and can be converted to something movable to the async block
let req = AtomicPtr::new(request.into());
let done_flag = ctx.done.clone();

let rt = ngx_http_async_runtime();
ctx.task = Some(rt.spawn(async move {
let start = Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let req = unsafe { http::Request::from_ngx_http_request(req.load(Ordering::Relaxed)) };
// not really thread safe, we should apply all these operation in nginx thread
// but this is just an example. proper way would be storing these headers in the request ctx
// and apply them when we get back to the nginx thread.
req.add_header_out(
"X-Async-Time",
start.elapsed().as_millis().to_string().as_str(),
);

done_flag.store(true, Ordering::Release);
// there is a small issue here. If traffic is low we may get stuck behind a 300ms timer
// in the nginx event loop. To workaround it we can notify the event loop using
// pthread_kill( nginx_thread, SIGIO ) to wake up the event loop. (or patch nginx
// and use the same trick as the thread pool)
}));

core::Status::NGX_AGAIN
});
}

extern "C" fn ngx_http_async_commands_set_enable(
cf: *mut ngx_conf_t,
Expand Down
Loading
Loading