diff --git a/pingora-timeout/benches/benchmark.rs b/pingora-timeout/benches/benchmark.rs index 64fd053d..1950af52 100644 --- a/pingora-timeout/benches/benchmark.rs +++ b/pingora-timeout/benches/benchmark.rs @@ -49,6 +49,32 @@ async fn bench_fast_timeout() -> u32 { n } +async fn bench_tokio_timeout_pending_once() -> u32 { + let mut n = 0; + for _ in 0..LOOP_SIZE { + let fut = async { + tokio::task::yield_now().await; + 1 + }; + let to = tokio_timeout(Duration::from_secs(1), fut); + n += to.await.unwrap(); + } + n +} + +async fn bench_fast_timeout_pending_once() -> u32 { + let mut n = 0; + for _ in 0..LOOP_SIZE { + let fut = async { + tokio::task::yield_now().await; + 1 + }; + let to = fast_timeout::fast_timeout(Duration::from_secs(1), fut); + n += to.await.unwrap(); + } + n +} + fn bench_tokio_timer() { let mut list = Vec::with_capacity(LOOP_SIZE as usize); let before = Instant::now(); @@ -155,6 +181,24 @@ async fn main() { elapsed / LOOP_SIZE ); + let before = Instant::now(); + bench_fast_timeout_pending_once().await; + let elapsed = before.elapsed(); + println!( + "pingora timeout pending-once {:?} total, {:?} avg per iteration", + elapsed, + elapsed / LOOP_SIZE + ); + + let before = Instant::now(); + bench_tokio_timeout_pending_once().await; + let elapsed = before.elapsed(); + println!( + "tokio timeout pending-once {:?} total, {:?} avg per iteration", + elapsed, + elapsed / LOOP_SIZE + ); + println!("==========================="); let tm = pingora_timeout::timer::TimerManager::new(); diff --git a/pingora-timeout/src/fast_timeout.rs b/pingora-timeout/src/fast_timeout.rs index 27535e11..b8aaa787 100644 --- a/pingora-timeout/src/fast_timeout.rs +++ b/pingora-timeout/src/fast_timeout.rs @@ -50,8 +50,10 @@ fn check_clock_thread(tm: &Arc) { pub struct FastTimeout(Duration); impl ToTimeout for FastTimeout { - fn timeout(&self) -> Pin + Send + Sync>> { - Box::pin(TIMER_MANAGER.register_timer(self.0).poll()) + type Fut = TimerStubFuture; + + fn timeout(&self) -> Self::Fut { + TIMER_MANAGER.register_timer(self.0).poll() } fn create(d: Duration) -> Self { diff --git a/pingora-timeout/src/lib.rs b/pingora-timeout/src/lib.rs index 707f7be8..4c765f2b 100644 --- a/pingora-timeout/src/lib.rs +++ b/pingora-timeout/src/lib.rs @@ -49,7 +49,9 @@ use tokio::time::{sleep as tokio_sleep, Duration}; /// /// Users don't need to interact with this trait pub trait ToTimeout { - fn timeout(&self) -> Pin + Send + Sync>>; + type Fut: Future + Send + Sync; + + fn timeout(&self) -> Self::Fut; fn create(d: Duration) -> Self; } @@ -59,8 +61,10 @@ pub trait ToTimeout { pub struct TokioTimeout(Duration); impl ToTimeout for TokioTimeout { - fn timeout(&self) -> Pin + Send + Sync>> { - Box::pin(tokio_sleep(self.0)) + type Fut = tokio::time::Sleep; + + fn timeout(&self) -> Self::Fut { + tokio_sleep(self.0) } fn create(d: Duration) -> Self { @@ -95,11 +99,11 @@ where pin_project! { /// The timeout future returned by the timeout functions #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Timeout { + pub struct Timeout { #[pin] value: T, #[pin] - delay: Option + Send + Sync>>>, + delay: Option, callback: F, // callback to create the timer } } @@ -132,11 +136,12 @@ where return Poll::Ready(Ok(v)); } - let delay = me - .delay - .get_or_insert_with(|| Box::pin(me.callback.timeout())); + if me.delay.is_none() { + me.delay.as_mut().set(Some(me.callback.timeout())); + } - match delay.as_mut().poll(cx) { + // safe: delay is Some after init + match me.delay.as_pin_mut().unwrap().poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(()) => Poll::Ready(Err(Elapsed {})), } diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs index c6c587e0..581525c4 100644 --- a/pingora-timeout/src/timer.rs +++ b/pingora-timeout/src/timer.rs @@ -29,11 +29,15 @@ //! - drop: 10.694154ms total, 106ns avg per iteration use parking_lot::RwLock; +use pin_project_lite::pin_project; use std::collections::BTreeMap; +use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::Arc; +use std::task::Poll; use std::time::{Duration, Instant}; use thread_local::ThreadLocal; +use tokio::sync::futures::OwnedNotified; use tokio::sync::Notify; const RESOLUTION_MS: u64 = 10; @@ -71,11 +75,54 @@ pub struct TimerStub(Arc, Arc); impl TimerStub { /// Wait for the timer to expire. - pub async fn poll(self) { - if self.1.load(Ordering::SeqCst) { - return; + pub fn poll(self) -> TimerStubFuture { + TimerStubFuture::new(self.0, self.1) + } +} + +pin_project! { + pub struct TimerStubFuture { + notify_src: Option>, + #[pin] + notified: Option, + fired: Arc, + checked_once: bool, + } +} + +impl TimerStubFuture { + fn new(notify: Arc, fired: Arc) -> Self { + TimerStubFuture { + notify_src: Some(notify), + notified: None, + fired, + checked_once: false, + } + } +} + +impl Future for TimerStubFuture { + type Output = (); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + + if !*this.checked_once { + *this.checked_once = true; + + if this.fired.load(Ordering::SeqCst) { + return Poll::Ready(()); + } + + let notify = this.notify_src.take().expect("polled after completion"); + this.notified.set(Some(notify.notified_owned())); } - self.0.notified().await; + + this.notified + .as_mut() + .as_pin_mut() + .expect("initialized on first poll") + .poll(cx) } }