Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ license = "Apache-2.0/MIT"
keywords = ["pubsub", "lock-free", "queue","async","futures"]
repository = "https://github.com/filipdulic/bus-queue"
readme = "README.md"
edition = "2018"
edition = "2024"

[dependencies]
arc-swap = {version = "0.4.6", optional = true}
futures-core = "0.3.5"
futures-sink = "0.3.5"
event-listener = "1.0.0"
arc-swap = {version = "1.7.1", optional = true}
futures-core = "0.3.31"
futures-sink = "0.3.31"
event-listener = "5.4.1"
# conc = {version="0.5.1", optional = true}

[dev-dependencies]
rand = "0.7.3"
futures-test = "0.3.5"
futures = {version = "0.3.5", features = ["thread-pool"]}
# rand = "0.10.0-rc.0"
futures-test = "0.3.31"
futures = {version = "0.3.31", features = ["thread-pool"]}
serial_test = "3.2.0"

[features]
default = ["arcswap", "rwlock"]
Expand Down
2 changes: 1 addition & 1 deletion examples/async-simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bus_queue::flavors::arc_swap::async_bounded;
use futures::executor::block_on;
use futures::stream::{iter, StreamExt};
use futures::stream::{StreamExt, iter};

fn main() {
let (publisher, subscriber1) = async_bounded(10);
Expand Down
4 changes: 2 additions & 2 deletions src/async_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<T, S: SwapSlot<T>> Sink<T> for AsyncPublisher<T, S> {
self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.event.notify_all();
self.event.notify(usize::MAX);
Poll::Ready(Ok(()))
}

Expand All @@ -62,7 +62,7 @@ impl<T, S: SwapSlot<T>> PartialEq for AsyncPublisher<T, S> {
impl<T, S: SwapSlot<T>> Drop for AsyncPublisher<T, S> {
fn drop(&mut self) {
self.publisher.close();
self.event.notify_all();
self.event.notify(usize::MAX);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/async_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::swap_slot::SwapSlot;
use event_listener::{Event, EventListener};
//use piper::{Event, EventListener};
use futures_core::{
Stream,
future::Future,
task::{self, Poll},
Stream,
};
use std::pin::Pin;
use std::sync::Arc;
Expand Down
45 changes: 1 addition & 44 deletions src/flavors/arc_swap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(dead_code)]
use crate::{async_publisher, async_subscriber, publisher, subscriber, SwapSlot};
use crate::{SwapSlot, async_publisher, async_subscriber, publisher, subscriber};
use arc_swap::ArcSwapOption;
use std::sync::Arc;

Expand Down Expand Up @@ -70,46 +70,3 @@ mod test {
assert_eq!(Arc::strong_count(&arc.unwrap()), 2)
}
}

#[cfg(test)]
mod allocation_tests {
use crate::flavors::allocation_tests::{allocs_current_thread, reset_allocs_current_thread};

use super::*;

#[test]
fn store_with_arc_does_not_allocate_new_arc() {
let slot = Slot::<u32>::none();
let arc = Arc::new(123u32);

// Ignore allocations from constructing `slot` and `arc`.
reset_allocs_current_thread();

// This should only move / clone the Arc; no new heap allocation for T.
slot.store(arc.clone());

// Might still be 0 or some tiny number depending on RwLock internals,
// but definitely shouldn't be "one Arc allocation vs another" difference.
let after = allocs_current_thread();
assert_eq!(
after, 0,
"expected no additional allocations when storing an Arc"
);
}

#[test]
fn store_with_value_allocates_arc() {
let slot = Slot::<u32>::none();

reset_allocs_current_thread();

// This goes through `impl From<T> for Arc<T>` and must allocate.
slot.store(5u32);

let after = allocs_current_thread();
assert!(
after == 1,
"expected at least one allocation when storing a bare value T"
);
}
}
39 changes: 0 additions & 39 deletions src/flavors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,3 @@ pub mod arc_swap;

#[cfg(feature = "rwlock")]
pub mod rw_lock;

#[cfg(test)]
mod allocation_tests {
use std::alloc::{GlobalAlloc, Layout, System};
use std::cell::Cell;

struct CountingAlloc;

#[global_allocator]
static GLOBAL: CountingAlloc = CountingAlloc;

thread_local! {
// Each OS thread gets its own counter.
static ALLOCS_THIS_THREAD: Cell<usize> = Cell::new(0);
}

unsafe impl GlobalAlloc for CountingAlloc {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = System.alloc(layout);
if !ptr.is_null() {
// Only bump the counter for the *current* thread.
ALLOCS_THIS_THREAD.with(|c| c.set(c.get() + 1));
}
ptr
}

unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
System.dealloc(ptr, layout);
}
}

pub(crate) fn reset_allocs_current_thread() {
ALLOCS_THIS_THREAD.with(|c| c.set(0));
}

pub(crate) fn allocs_current_thread() -> usize {
ALLOCS_THIS_THREAD.with(|c| c.get())
}
}
45 changes: 1 addition & 44 deletions src/flavors/rw_lock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(dead_code)]
use crate::{async_publisher, async_subscriber, publisher, subscriber, SwapSlot};
use crate::{SwapSlot, async_publisher, async_subscriber, publisher, subscriber};
use std::sync::{Arc, RwLock};

pub struct Slot<T> {
Expand Down Expand Up @@ -69,46 +69,3 @@ mod test {
assert_eq!(Arc::strong_count(&arc.unwrap()), 2)
}
}

#[cfg(test)]
mod allocation_tests {
use crate::flavors::allocation_tests::{allocs_current_thread, reset_allocs_current_thread};

use super::*;

#[test]
fn store_with_arc_does_not_allocate_new_arc() {
let slot = Slot::<u32>::none();
let arc = Arc::new(123u32);

// Ignore allocations from constructing `slot` and `arc`.
reset_allocs_current_thread();

// This should only move / clone the Arc; no new heap allocation for T.
slot.store(arc.clone());

// Might still be 0 or some tiny number depending on RwLock internals,
// but definitely shouldn't be "one Arc allocation vs another" difference.
let after = allocs_current_thread();
assert_eq!(
after, 0,
"expected no additional allocations when storing an Arc"
);
}

#[test]
fn store_with_value_allocates_arc() {
let slot = Slot::<u32>::none();

reset_allocs_current_thread();

// This goes through `impl From<T> for Arc<T>` and must allocate.
slot.store(5u32);

let after = allocs_current_thread();
assert!(
after == 1,
"expected at least one allocation when storing a bare value T"
);
}
}
2 changes: 1 addition & 1 deletion src/ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::atomic_counter::AtomicCounter;
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
// Use std mpsc's error types as our own
use crate::swap_slot::SwapSlot;
use std::fmt::Debug;
Expand Down
2 changes: 1 addition & 1 deletion tests/async_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bus_queue::flavors::arc_swap::async_bounded;
// use futures::{executor, pin_mut, task::Poll, task::SpawnExt, FutureExt, SinkExt, StreamExt};
use futures::{pin_mut, task::Poll, FutureExt, SinkExt};
use futures::{FutureExt, SinkExt, pin_mut, task::Poll};
use futures_test::task::noop_context;
use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
// use rand::Rng;
Expand Down
Loading