From 860295277cac0a51c3166d3871d5bf73698d66bd Mon Sep 17 00:00:00 2001 From: simbiont666 Date: Fri, 13 Feb 2026 00:33:59 +0300 Subject: [PATCH 1/3] remove boxed timeout future allocation Use concrete future types in ToTimeout/Timeout instead of Box --- pingora-timeout/src/fast_timeout.rs | 6 +++-- pingora-timeout/src/lib.rs | 23 +++++++++++-------- pingora-timeout/src/timer.rs | 34 +++++++++++++++++++++++++---- 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pingora-timeout/src/fast_timeout.rs b/pingora-timeout/src/fast_timeout.rs index 27535e11..b8aaa787 100644 --- a/pingora-timeout/src/fast_timeout.rs +++ b/pingora-timeout/src/fast_timeout.rs @@ -50,8 +50,10 @@ fn check_clock_thread(tm: &Arc) { pub struct FastTimeout(Duration); impl ToTimeout for FastTimeout { - fn timeout(&self) -> Pin + Send + Sync>> { - Box::pin(TIMER_MANAGER.register_timer(self.0).poll()) + type Fut = TimerStubFuture; + + fn timeout(&self) -> Self::Fut { + TIMER_MANAGER.register_timer(self.0).poll() } fn create(d: Duration) -> Self { diff --git a/pingora-timeout/src/lib.rs b/pingora-timeout/src/lib.rs index 707f7be8..4c765f2b 100644 --- a/pingora-timeout/src/lib.rs +++ b/pingora-timeout/src/lib.rs @@ -49,7 +49,9 @@ use tokio::time::{sleep as tokio_sleep, Duration}; /// /// Users don't need to interact with this trait pub trait ToTimeout { - fn timeout(&self) -> Pin + Send + Sync>>; + type Fut: Future + Send + Sync; + + fn timeout(&self) -> Self::Fut; fn create(d: Duration) -> Self; } @@ -59,8 +61,10 @@ pub trait ToTimeout { pub struct TokioTimeout(Duration); impl ToTimeout for TokioTimeout { - fn timeout(&self) -> Pin + Send + Sync>> { - Box::pin(tokio_sleep(self.0)) + type Fut = tokio::time::Sleep; + + fn timeout(&self) -> Self::Fut { + tokio_sleep(self.0) } fn create(d: Duration) -> Self { @@ -95,11 +99,11 @@ where pin_project! { /// The timeout future returned by the timeout functions #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Timeout { + pub struct Timeout { #[pin] value: T, #[pin] - delay: Option + Send + Sync>>>, + delay: Option, callback: F, // callback to create the timer } } @@ -132,11 +136,12 @@ where return Poll::Ready(Ok(v)); } - let delay = me - .delay - .get_or_insert_with(|| Box::pin(me.callback.timeout())); + if me.delay.is_none() { + me.delay.as_mut().set(Some(me.callback.timeout())); + } - match delay.as_mut().poll(cx) { + // safe: delay is Some after init + match me.delay.as_pin_mut().unwrap().poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(()) => Poll::Ready(Err(Elapsed {})), } diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs index c6c587e0..998c1ab2 100644 --- a/pingora-timeout/src/timer.rs +++ b/pingora-timeout/src/timer.rs @@ -29,11 +29,15 @@ //! - drop: 10.694154ms total, 106ns avg per iteration use parking_lot::RwLock; +use pin_project_lite::pin_project; use std::collections::BTreeMap; +use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::Arc; +use std::task::Poll; use std::time::{Duration, Instant}; use thread_local::ThreadLocal; +use tokio::sync::futures::OwnedNotified; use tokio::sync::Notify; const RESOLUTION_MS: u64 = 10; @@ -71,11 +75,33 @@ pub struct TimerStub(Arc, Arc); impl TimerStub { /// Wait for the timer to expire. - pub async fn poll(self) { - if self.1.load(Ordering::SeqCst) { - return; + pub fn poll(self) -> TimerStubFuture { + TimerStubFuture { + notify: self.0.notified_owned(), + fired: self.1, } - self.0.notified().await; + } +} + +pin_project! { + pub struct TimerStubFuture { + #[pin] + notify: OwnedNotified, + fired: Arc + } +} + +impl Future for TimerStubFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + + if this.fired.load(Ordering::SeqCst) { + return Poll::Ready(()); + } + + this.notify.poll(cx) } } From 62733fc0d5d81f260872dab4ed0226f6f623ab2c Mon Sep 17 00:00:00 2001 From: simbiont666 Date: Fri, 13 Feb 2026 11:35:51 +0300 Subject: [PATCH 2/3] refactor: align TimerStubFuture with async poll semantics --- pingora-timeout/src/timer.rs | 41 +++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs index 998c1ab2..581525c4 100644 --- a/pingora-timeout/src/timer.rs +++ b/pingora-timeout/src/timer.rs @@ -76,18 +76,28 @@ pub struct TimerStub(Arc, Arc); impl TimerStub { /// Wait for the timer to expire. pub fn poll(self) -> TimerStubFuture { - TimerStubFuture { - notify: self.0.notified_owned(), - fired: self.1, - } + TimerStubFuture::new(self.0, self.1) } } pin_project! { pub struct TimerStubFuture { + notify_src: Option>, #[pin] - notify: OwnedNotified, - fired: Arc + notified: Option, + fired: Arc, + checked_once: bool, + } +} + +impl TimerStubFuture { + fn new(notify: Arc, fired: Arc) -> Self { + TimerStubFuture { + notify_src: Some(notify), + notified: None, + fired, + checked_once: false, + } } } @@ -95,13 +105,24 @@ impl Future for TimerStubFuture { type Output = (); fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let this = self.project(); + let mut this = self.project(); + + if !*this.checked_once { + *this.checked_once = true; + + if this.fired.load(Ordering::SeqCst) { + return Poll::Ready(()); + } - if this.fired.load(Ordering::SeqCst) { - return Poll::Ready(()); + let notify = this.notify_src.take().expect("polled after completion"); + this.notified.set(Some(notify.notified_owned())); } - this.notify.poll(cx) + this.notified + .as_mut() + .as_pin_mut() + .expect("initialized on first poll") + .poll(cx) } } From cfeb5a101dff6b0999211f34c1c77fe646d73c5a Mon Sep 17 00:00:00 2001 From: simbiont666 Date: Fri, 13 Feb 2026 23:10:49 +0300 Subject: [PATCH 3/3] add benchmarks --- pingora-timeout/benches/benchmark.rs | 44 ++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pingora-timeout/benches/benchmark.rs b/pingora-timeout/benches/benchmark.rs index 64fd053d..1950af52 100644 --- a/pingora-timeout/benches/benchmark.rs +++ b/pingora-timeout/benches/benchmark.rs @@ -49,6 +49,32 @@ async fn bench_fast_timeout() -> u32 { n } +async fn bench_tokio_timeout_pending_once() -> u32 { + let mut n = 0; + for _ in 0..LOOP_SIZE { + let fut = async { + tokio::task::yield_now().await; + 1 + }; + let to = tokio_timeout(Duration::from_secs(1), fut); + n += to.await.unwrap(); + } + n +} + +async fn bench_fast_timeout_pending_once() -> u32 { + let mut n = 0; + for _ in 0..LOOP_SIZE { + let fut = async { + tokio::task::yield_now().await; + 1 + }; + let to = fast_timeout::fast_timeout(Duration::from_secs(1), fut); + n += to.await.unwrap(); + } + n +} + fn bench_tokio_timer() { let mut list = Vec::with_capacity(LOOP_SIZE as usize); let before = Instant::now(); @@ -155,6 +181,24 @@ async fn main() { elapsed / LOOP_SIZE ); + let before = Instant::now(); + bench_fast_timeout_pending_once().await; + let elapsed = before.elapsed(); + println!( + "pingora timeout pending-once {:?} total, {:?} avg per iteration", + elapsed, + elapsed / LOOP_SIZE + ); + + let before = Instant::now(); + bench_tokio_timeout_pending_once().await; + let elapsed = before.elapsed(); + println!( + "tokio timeout pending-once {:?} total, {:?} avg per iteration", + elapsed, + elapsed / LOOP_SIZE + ); + println!("==========================="); let tm = pingora_timeout::timer::TimerManager::new();