Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ tokio-runtime-metrics = [
# Enables logging functionality.
logging = [
"ratelimit",
"dep:governor",
"dep:parking_lot",
"dep:slog-async",
"dep:slog-json",
Expand All @@ -119,8 +118,8 @@ logging = [

# Enables distributed tracing functionality.
tracing = [
"ratelimit",
"dep:foundations-macros",
"dep:governor",
"dep:parking_lot",
"dep:rand",
"dep:cf-rustracing-jaeger",
Expand Down Expand Up @@ -173,7 +172,7 @@ cli = ["settings", "dep:clap"]
testing = ["dep:foundations-macros"]

# Enables the ratelimit! utility macro.
ratelimit = ["dep:governor"]
ratelimit = ["dep:governor", "dep:crossbeam-utils"]

# Enables panicking when too much nesting is reached on the logger
panic_on_too_much_logger_nesting = []
Expand Down
9 changes: 6 additions & 3 deletions foundations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
//! - **metrics**: Enables metrics functionality.
//! - **logging**: Enables logging functionality.
//! - **tracing**: Enables distributed tracing functionality.
//! - **ratelimit**: Enables helpers to simplify rate-limiting your code.
//! - **testing**: Enables testing-related functionality.
//! - **security**: Enables security features. Available only on Linux (x86_64, aarch64) with the `libclang-dev` package installed (for bindgen).
//! - **jemalloc**: Enables [jemalloc] memory allocator which is known to perform much better than
Expand Down Expand Up @@ -99,12 +100,14 @@ pub mod telemetry;
))]
pub mod security;

#[doc(hidden)]
#[cfg(feature = "ratelimit")]
pub mod ratelimit;

#[doc(hidden)]
pub mod reexports_for_macros {
#[cfg(feature = "tracing")]
pub use cf_rustracing;
#[cfg(feature = "ratelimit")]
pub use governor;
#[cfg(feature = "security")]
pub use once_cell;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -157,7 +160,7 @@ pub type Result<T> = std::result::Result<T, Error>;

#[cfg(feature = "ratelimit")]
#[doc(inline)]
pub use utils::ratelimit;
pub use self::ratelimit::ratelimit;

/// Basic service information.
#[derive(Clone, Debug, Default)]
Expand Down
169 changes: 169 additions & 0 deletions foundations/src/ratelimit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crossbeam_utils::CachePadded;
use governor::clock::{Clock, QuantaClock};
use std::sync::OnceLock;

// Reexport for macro
pub use governor::Quota;

/// Wrapper for sharing a `&'static QuantaClock` between many RateLimiter instances.
///
/// [`QuantaClock`] is 40 bytes but all state is the same across all instances, so keeping
/// a single static instance around saves some memory and helps with cache locality.
#[derive(Debug, Clone, Copy)]
pub struct StaticQuantaClock(&'static QuantaClock);

impl Default for StaticQuantaClock {
fn default() -> Self {
static CLOCK: CachePadded<OnceLock<QuantaClock>> = CachePadded::new(OnceLock::new());
Self(CLOCK.get_or_init(Default::default))
}
}

impl Clock for StaticQuantaClock {
type Instant = <QuantaClock as Clock>::Instant;

#[inline]
fn now(&self) -> Self::Instant {
Clock::now(self.0)
}
}

/// Type alias for a [`governor::RateLimiter`] using our [`StaticQuantaClock`].
pub type DirectRateLimiter = governor::RateLimiter<
governor::state::NotKeyed,
governor::state::InMemoryState,
StaticQuantaClock,
governor::middleware::NoOpMiddleware<<StaticQuantaClock as Clock>::Instant>,
>;

/// Applies a rate limit to the evaluation of an expression.
///
/// The macro takes two arguments, separated by a `;`. The first is the quota to use
/// for the ratelimit. This can either be a const expression evaluating to a
/// [`governor::Quota`], or a rate specifier like `200/s`, `10/m`, or `5/h`. The latter
/// three are equivalent to [`Quota`]'s `per_second`/`per_minute`/`per_hour` constructors.
///
/// The second argument is the expression to evaluate if the rate limit has not been
/// reached yet. The expression's result will be discarded.
///
/// # Examples
/// ```rust
/// # fn expensive_computation() -> u32 { 42 }
/// #
/// use foundations::telemetry::log;
/// use governor::Quota;
/// use std::num::NonZeroU32;
///
/// foundations::ratelimit!(10/s; println!("frequently failing operation failed!") );
///
/// // You can return data from the expression with an Option:
/// let mut output = None;
/// foundations::ratelimit!(1/h; output.insert(expensive_computation()) );
/// assert_eq!(output, Some(42));
///
/// // A quota expression allows customizing the burst size. By default,
/// // it is equivalent to the rate per time unit (i.e., 10/m yields a burst size of 10).
/// // Note: you could also reference a `const` declared somewhere else here.
/// foundations::ratelimit!(
/// Quota::per_hour(NonZeroU32::new(100).unwrap()).allow_burst(NonZeroU32::new(1).unwrap());
/// println!("this will be printed only once before the rate limit kicks in")
/// );
///
/// // Here the rate limit kicks in after the initial burst of 60 iterations:
/// let mut counter = 0;
/// for _ in 0..1000 {
/// foundations::ratelimit!(60/h; counter += 1);
/// }
/// assert_eq!(counter, 60);
/// ```
///
/// [`Quota`]: governor::Quota
#[macro_export]
#[doc(hidden)]
macro_rules! __ratelimit {
($limit:literal / s ; $expr:expr) => {
$crate::__ratelimit!(
$crate::ratelimit::Quota::per_second(::std::num::NonZeroU32::new($limit).unwrap());
$expr
)
};

($limit:literal / m ; $expr:expr) => {
$crate::__ratelimit!(
$crate::ratelimit::Quota::per_minute(::std::num::NonZeroU32::new($limit).unwrap());
$expr
)
};

($limit:literal / h ; $expr:expr) => {
$crate::__ratelimit!(
$crate::ratelimit::Quota::per_hour(::std::num::NonZeroU32::new($limit).unwrap());
$expr
)
};

($quota:expr ; $expr:expr) => {{
const QUOTA: $crate::ratelimit::Quota = $quota;
static LIMITER: ::std::sync::LazyLock<$crate::ratelimit::DirectRateLimiter> = ::std::sync::LazyLock::new(
|| $crate::ratelimit::DirectRateLimiter::direct_with_clock(QUOTA, ::std::default::Default::default())
);
if LIMITER.check().is_ok() {
$expr;
}
}};
}

pub use __ratelimit as ratelimit;

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_ratelimit() {
use governor::Quota;
use std::num::NonZeroU32;

const CUSTOM_QUOTA: Quota =
Quota::per_hour(NonZeroU32::new(60).unwrap()).allow_burst(NonZeroU32::new(20).unwrap());

// Burst size is only 20 for this quota, despite the refill rate being 60/h
let mut res_custom = 0;
for _ in 0..200 {
ratelimit!(CUSTOM_QUOTA; res_custom += 1);
}

assert_eq!(res_custom, 20);

// Cells may refill as the loop executes already, so a value >20 is possible
let mut res_sec = 0;
for _ in 0..100 {
ratelimit!(20/s; res_sec += 1);
}

assert!(res_sec >= 20);
assert!(res_sec < 100);

// This should execute exactly 3 times; we don't expect any cells to refill
let mut res_minute = 1;
for _ in 0..20 {
ratelimit!(3/m; res_minute *= 2);
}

assert_eq!(res_minute, 1 << 3);

let mut res_hour_a = 0;
let mut res_hour_b = 0;

for _ in 0..1000 {
ratelimit!(100/h; {
res_hour_a += 1;
res_hour_b += 2;
});
}

assert!(res_hour_a >= 100);
assert!(res_hour_a < 1000);
assert_eq!(res_hour_b, 2 * res_hour_a);
}
}
7 changes: 4 additions & 3 deletions foundations/src/sentry/hook.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Sentry hook implementation for tracking sentry events and rate-limiting them.

use super::SentrySettings;
use crate::ratelimit::StaticQuantaClock;
use governor::{Quota, RateLimiter};
use std::borrow::Cow;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -34,9 +35,9 @@ pub fn install_hook_with_settings(
options: &mut sentry_core::ClientOptions,
settings: &SentrySettings,
) {
let rate_limiter = settings
.max_events_per_second
.map(|rl| RateLimiter::<Fingerprint, _, _>::dashmap(Quota::per_second(rl)));
let rate_limiter = settings.max_events_per_second.map(|rl| {
RateLimiter::dashmap_with_clock(Quota::per_second(rl), StaticQuantaClock::default())
});

let previous = options.before_send.take();

Expand Down
9 changes: 6 additions & 3 deletions foundations/src/telemetry/log/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::ratelimit::{DirectRateLimiter, Quota, StaticQuantaClock};
use crate::telemetry::settings::RateLimitingSettings;
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use slog::{Drain, OwnedKVList, Record};
use std::num::NonZeroU32;

pub(crate) struct RateLimitingDrain<D> {
inner: D,
rate_limiter: Option<DefaultDirectRateLimiter>,
rate_limiter: Option<DirectRateLimiter>,
}

impl<D: Drain> RateLimitingDrain<D> {
pub(crate) fn new(inner: D, settings: &RateLimitingSettings) -> Self {
let rate_limiter = if settings.enabled
&& let Some(rate) = NonZeroU32::new(settings.max_events_per_second)
{
Some(RateLimiter::direct(Quota::per_second(rate)))
Some(DirectRateLimiter::direct_with_clock(
Quota::per_second(rate),
StaticQuantaClock::default(),
))
} else {
None
};
Expand Down
22 changes: 10 additions & 12 deletions foundations/src/telemetry/tracing/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use crate::ratelimit::{DirectRateLimiter, Quota, StaticQuantaClock};
use crate::telemetry::settings::ActiveSamplingSettings;
use cf_rustracing::sampler::Sampler;
use cf_rustracing::span::CandidateSpan;
use cf_rustracing::{Result, sampler::ProbabilisticSampler};
use governor::clock::DefaultClock;
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter};
use std::num::NonZeroU32;

#[derive(Debug)]
pub(crate) struct RateLimitingProbabilisticSampler {
inner: ProbabilisticSampler,
rate_limiter: Option<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>,
rate_limiter: Option<DirectRateLimiter>,
}

impl Default for RateLimitingProbabilisticSampler {
Expand All @@ -27,13 +25,13 @@ impl RateLimitingProbabilisticSampler {
/// If `sampling_rate` is not in the range `0.0...1.0`,
/// it will return an error with the kind `ErrorKind::InvalidInput`.
pub(crate) fn new(settings: &ActiveSamplingSettings) -> Result<Self> {
let rate_limiter = if settings.rate_limit.enabled {
settings
.rate_limit
.max_events_per_second
.try_into()
.ok()
.map(|r| RateLimiter::direct(Quota::per_second(r)))
let rate_limiter = if settings.rate_limit.enabled
&& let Some(rate) = NonZeroU32::new(settings.rate_limit.max_events_per_second)
{
Some(DirectRateLimiter::direct_with_clock(
Quota::per_second(rate),
StaticQuantaClock::default(),
))
} else {
None
};
Expand Down
Loading
Loading