Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pingora-timeout/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,32 @@ async fn bench_fast_timeout() -> u32 {
n
}

async fn bench_tokio_timeout_pending_once() -> u32 {
let mut n = 0;
for _ in 0..LOOP_SIZE {
let fut = async {
tokio::task::yield_now().await;
1
};
let to = tokio_timeout(Duration::from_secs(1), fut);
n += to.await.unwrap();
}
n
}

async fn bench_fast_timeout_pending_once() -> u32 {
let mut n = 0;
for _ in 0..LOOP_SIZE {
let fut = async {
tokio::task::yield_now().await;
1
};
let to = fast_timeout::fast_timeout(Duration::from_secs(1), fut);
n += to.await.unwrap();
}
n
}

fn bench_tokio_timer() {
let mut list = Vec::with_capacity(LOOP_SIZE as usize);
let before = Instant::now();
Expand Down Expand Up @@ -155,6 +181,24 @@ async fn main() {
elapsed / LOOP_SIZE
);

let before = Instant::now();
bench_fast_timeout_pending_once().await;
let elapsed = before.elapsed();
println!(
"pingora timeout pending-once {:?} total, {:?} avg per iteration",
elapsed,
elapsed / LOOP_SIZE
);

let before = Instant::now();
bench_tokio_timeout_pending_once().await;
let elapsed = before.elapsed();
println!(
"tokio timeout pending-once {:?} total, {:?} avg per iteration",
elapsed,
elapsed / LOOP_SIZE
);

println!("===========================");

let tm = pingora_timeout::timer::TimerManager::new();
Expand Down
6 changes: 4 additions & 2 deletions pingora-timeout/src/fast_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ fn check_clock_thread(tm: &Arc<TimerManager>) {
pub struct FastTimeout(Duration);

impl ToTimeout for FastTimeout {
fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(TIMER_MANAGER.register_timer(self.0).poll())
type Fut = TimerStubFuture;

fn timeout(&self) -> Self::Fut {
TIMER_MANAGER.register_timer(self.0).poll()
}

fn create(d: Duration) -> Self {
Expand Down
23 changes: 14 additions & 9 deletions pingora-timeout/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use tokio::time::{sleep as tokio_sleep, Duration};
///
/// Users don't need to interact with this trait
pub trait ToTimeout {
fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
type Fut: Future<Output = ()> + Send + Sync;

fn timeout(&self) -> Self::Fut;
fn create(d: Duration) -> Self;
}

Expand All @@ -59,8 +61,10 @@ pub trait ToTimeout {
pub struct TokioTimeout(Duration);

impl ToTimeout for TokioTimeout {
fn timeout(&self) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
Box::pin(tokio_sleep(self.0))
type Fut = tokio::time::Sleep;

fn timeout(&self) -> Self::Fut {
tokio_sleep(self.0)
}

fn create(d: Duration) -> Self {
Expand Down Expand Up @@ -95,11 +99,11 @@ where
pin_project! {
/// The timeout future returned by the timeout functions
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timeout<T, F> {
pub struct Timeout<T, F: ToTimeout> {
#[pin]
value: T,
#[pin]
delay: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
delay: Option<F::Fut>,
callback: F, // callback to create the timer
}
}
Expand Down Expand Up @@ -132,11 +136,12 @@ where
return Poll::Ready(Ok(v));
}

let delay = me
.delay
.get_or_insert_with(|| Box::pin(me.callback.timeout()));
if me.delay.is_none() {
me.delay.as_mut().set(Some(me.callback.timeout()));
}

match delay.as_mut().poll(cx) {
// safe: delay is Some after init
match me.delay.as_pin_mut().unwrap().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(Err(Elapsed {})),
}
Expand Down
55 changes: 51 additions & 4 deletions pingora-timeout/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
//! - drop: 10.694154ms total, 106ns avg per iteration

use parking_lot::RwLock;
use pin_project_lite::pin_project;
use std::collections::BTreeMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::{Duration, Instant};
use thread_local::ThreadLocal;
use tokio::sync::futures::OwnedNotified;
use tokio::sync::Notify;

const RESOLUTION_MS: u64 = 10;
Expand Down Expand Up @@ -71,11 +75,54 @@ pub struct TimerStub(Arc<Notify>, Arc<AtomicBool>);

impl TimerStub {
/// Wait for the timer to expire.
pub async fn poll(self) {
if self.1.load(Ordering::SeqCst) {
return;
pub fn poll(self) -> TimerStubFuture {
TimerStubFuture::new(self.0, self.1)
}
}

pin_project! {
pub struct TimerStubFuture {
notify_src: Option<Arc<Notify>>,
#[pin]
notified: Option<OwnedNotified>,
fired: Arc<AtomicBool>,
checked_once: bool,
}
}

impl TimerStubFuture {
fn new(notify: Arc<Notify>, fired: Arc<AtomicBool>) -> Self {
TimerStubFuture {
notify_src: Some(notify),
notified: None,
fired,
checked_once: false,
}
}
}

impl Future for TimerStubFuture {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if !*this.checked_once {
*this.checked_once = true;

if this.fired.load(Ordering::SeqCst) {
return Poll::Ready(());
}

let notify = this.notify_src.take().expect("polled after completion");
this.notified.set(Some(notify.notified_owned()));
}
self.0.notified().await;

this.notified
.as_mut()
.as_pin_mut()
.expect("initialized on first poll")
.poll(cx)
}
}

Expand Down