Skip to content

Commit a9f98c8

Browse files
committed
feat: async phase handler
1 parent fb2731b commit a9f98c8

File tree

7 files changed

+360
-0
lines changed

7 files changed

+360
-0
lines changed

Cargo.lock

Lines changed: 105 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ targets = []
3939
[dependencies]
4040
allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] }
4141
async-task = { version = "4.7.1", optional = true }
42+
futures = "0.3"
4243
lock_api = "0.4.13"
4344
nginx-sys = { path = "nginx-sys", version = "0.5.0"}
4445
pin-project-lite = { version = "0.2.16", optional = true }

examples/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ autobins = false
1313
build = "../build.rs"
1414

1515
[dependencies]
16+
allocator-api2 = { version = "0.4.0", default-features = false, features = ["fresh-rust"] }
17+
async-task = { version = "4.7.1" }
18+
futures = "0.3"
1619
nginx-sys = { path = "../nginx-sys/", default-features = false }
1720
ngx = { path = "../", default-features = false, features = ["std"] }
1821

@@ -51,6 +54,12 @@ name = "async"
5154
path = "async.rs"
5255
crate-type = ["cdylib"]
5356

57+
[[example]]
58+
name = "async_request"
59+
path = "async_request.rs"
60+
crate-type = ["cdylib"]
61+
required-features = ["async"]
62+
5463
[[example]]
5564
name = "shared_dict"
5665
path = "shared_dict.rs"
@@ -63,5 +72,6 @@ default = ["export-modules", "ngx/vendored"]
6372
# outside of the NGINX buildsystem. However, cargo currently does not detect
6473
# this configuration automatically.
6574
# See https://github.com/rust-lang/rust/issues/20267
75+
async = ["ngx/async"]
6676
export-modules = []
6777
linux = []

examples/async_request.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use ngx::http::{
2+
add_phase_handler, AsyncHandler, HttpModule, HttpPhase, Request,
3+
};
4+
use ngx::{async_ as ngx_async, ngx_log_debug_http, ngx_log_error};
5+
6+
use nginx_sys::ngx_int_t;
7+
8+
struct SampleAsyncHandler;
9+
10+
impl AsyncHandler for SampleAsyncHandler {
11+
const PHASE: HttpPhase = HttpPhase::Access;
12+
type Module = Module;
13+
type ReturnType = ngx_int_t;
14+
15+
async fn worker(request: &mut Request) -> Self::ReturnType {
16+
ngx_log_debug_http!(request, "worker started");
17+
ngx_async::sleep(core::time::Duration::from_secs(2)).await;
18+
ngx_log_error!(
19+
nginx_sys::NGX_LOG_INFO,
20+
request.log(),
21+
"Async handler after timeout",
22+
);
23+
nginx_sys::NGX_OK as _
24+
}
25+
}
26+
27+
static NGX_HTTP_ASYNC_REQUEST_MODULE_CTX: nginx_sys::ngx_http_module_t =
28+
nginx_sys::ngx_http_module_t {
29+
preconfiguration: None,
30+
postconfiguration: Some(Module::postconfiguration),
31+
create_main_conf: None,
32+
init_main_conf: None,
33+
create_srv_conf: None,
34+
merge_srv_conf: None,
35+
create_loc_conf: None,
36+
merge_loc_conf: None,
37+
};
38+
39+
#[cfg(feature = "export-modules")]
40+
ngx::ngx_modules!(ngx_http_async_request_module);
41+
42+
#[used]
43+
#[allow(non_upper_case_globals)]
44+
#[cfg_attr(not(feature = "export-modules"), no_mangle)]
45+
pub static mut ngx_http_async_request_module: nginx_sys::ngx_module_t = nginx_sys::ngx_module_t {
46+
ctx: core::ptr::addr_of!(NGX_HTTP_ASYNC_REQUEST_MODULE_CTX) as _,
47+
type_: nginx_sys::NGX_HTTP_MODULE as _,
48+
..nginx_sys::ngx_module_t::default()
49+
};
50+
51+
struct Module;
52+
53+
impl HttpModule for Module {
54+
fn module() -> &'static nginx_sys::ngx_module_t {
55+
unsafe { &*::core::ptr::addr_of!(ngx_http_async_request_module) }
56+
}
57+
58+
unsafe extern "C" fn postconfiguration(cf: *mut nginx_sys::ngx_conf_t) -> ngx_int_t {
59+
// SAFETY: this function is called with non-NULL cf always
60+
let cf = unsafe { &mut *cf };
61+
add_phase_handler::<SampleAsyncHandler>(cf)
62+
.map_or(nginx_sys::NGX_ERROR as _, |_| nginx_sys::NGX_OK as _)
63+
}
64+
}

src/http/async_request.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use core::fmt::Display;
2+
use core::future::Future;
3+
use core::pin::Pin;
4+
use core::task::{Context, Poll};
5+
6+
use crate::core::type_storage::*;
7+
use crate::http::{HttpHandlerReturn, HttpModule, HttpPhase, HttpRequestHandler, Request};
8+
use crate::{async_ as ngx_async, ngx_log_debug_http};
9+
10+
use crate::ffi::{ngx_http_request_t, ngx_int_t, ngx_post_event, ngx_posted_events};
11+
12+
use pin_project_lite::*;
13+
14+
/// An asynchronous HTTP request handler trait.
15+
pub trait AsyncHandler {
16+
/// The phase in which the handler will be executed.
17+
const PHASE: HttpPhase;
18+
/// The associated HTTP module type.
19+
type Module: HttpModule;
20+
/// The return type of the asynchronous worker function.
21+
type ReturnType: HttpHandlerReturn;
22+
/// The asynchronous worker function to be implemented.
23+
fn worker(request: &mut Request) -> impl Future<Output = Self::ReturnType>;
24+
}
25+
26+
const fn async_phase(phase: HttpPhase) -> HttpPhase {
27+
assert!(
28+
!matches!(phase, HttpPhase::Content),
29+
"Content phase is not supported"
30+
);
31+
phase
32+
}
33+
34+
/// An error type for asynchronous handler operations.
35+
#[derive(Debug)]
36+
pub enum AsyncHandlerError {
37+
/// Indicates that the context creation failed.
38+
ContextCreationFailed,
39+
/// Indicates that there is no async launcher available.
40+
NoAsyncLauncher,
41+
}
42+
43+
impl Display for AsyncHandlerError {
44+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
45+
match self {
46+
AsyncHandlerError::ContextCreationFailed => {
47+
write!(f, "AsyncHandler: Context creation failed")
48+
}
49+
AsyncHandlerError::NoAsyncLauncher => {
50+
write!(f, "AsyncHandler: No async launcher available")
51+
}
52+
}
53+
}
54+
}
55+
56+
impl<AH> HttpRequestHandler for AH
57+
where
58+
AH: AsyncHandler + 'static,
59+
{
60+
const PHASE: HttpPhase = async_phase(AH::PHASE);
61+
type ReturnType = Result<ngx_int_t, AsyncHandlerError>;
62+
63+
fn handler(request: &mut Request) -> Self::ReturnType {
64+
let mut pool = request.pool();
65+
let mut ctx = <AsyncRequestContext as TypeStorage>::get_mut(&mut pool);
66+
#[allow(clippy::manual_inspect)]
67+
if ctx.is_none() {
68+
ctx =
69+
<AsyncRequestContext as TypeStorage>::add(AsyncRequestContext::default(), &mut pool)
70+
.map(|ctx| {
71+
let request_ptr: *mut ngx_http_request_t = request.as_mut() as *mut _ as _;
72+
ctx.launcher = Some(ngx_async::spawn(handler_future::<AH>(request_ptr)));
73+
ctx
74+
})
75+
};
76+
77+
let ctx = ctx.ok_or(AsyncHandlerError::ContextCreationFailed)?;
78+
79+
if ctx.launcher.is_none() {
80+
Err(AsyncHandlerError::NoAsyncLauncher)
81+
} else if ctx.launcher.as_ref().unwrap().is_finished() {
82+
let rc = futures::executor::block_on(ctx.launcher.take().unwrap());
83+
ngx_log_debug_http!(request, "handler_wrapper: task joined; rc = {}", rc);
84+
<AsyncRequestContext as TypeStorage>::delete(&pool);
85+
Ok(rc)
86+
} else {
87+
ngx_log_debug_http!(request, "handler_wrapper: running");
88+
Ok(nginx_sys::NGX_AGAIN as _)
89+
}
90+
}
91+
}
92+
93+
#[derive(Default)]
94+
struct AsyncRequestContext {
95+
launcher: Option<async_task::Task<ngx_int_t>>,
96+
}
97+
98+
pin_project! {
99+
struct HandlerFuture<Fut>
100+
where
101+
Fut: Future<Output = ngx_int_t>,
102+
{
103+
#[pin]
104+
worker_fut: Fut,
105+
request: *const ngx_http_request_t,
106+
}
107+
}
108+
109+
fn handler_future<AH>(request: *mut ngx_http_request_t) -> impl Future<Output = ngx_int_t>
110+
where
111+
AH: AsyncHandler,
112+
{
113+
let fut = async move {
114+
let request = unsafe { Request::from_ngx_http_request(request) };
115+
AH::worker(request).await.into_ngx_int_t(request)
116+
};
117+
118+
HandlerFuture::<_> {
119+
worker_fut: fut,
120+
request,
121+
}
122+
}
123+
124+
impl<Fut> Future for HandlerFuture<Fut>
125+
where
126+
Fut: Future<Output = ngx_int_t>,
127+
{
128+
type Output = ngx_int_t;
129+
130+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131+
let this = self.project();
132+
let request = unsafe { Request::from_const_ngx_http_request(*this.request) };
133+
134+
match this.worker_fut.poll(cx) {
135+
Poll::Pending => {
136+
ngx_log_debug_http!(request, "HandlerFuture: pending");
137+
Poll::Pending
138+
}
139+
Poll::Ready(rc) => {
140+
unsafe {
141+
ngx_post_event(
142+
(*request.connection()).write,
143+
core::ptr::addr_of_mut!(ngx_posted_events),
144+
)
145+
};
146+
ngx_log_debug_http!(request, "HandlerFuture: ready");
147+
Poll::Ready(rc)
148+
}
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)