Skip to content

Commit 2eaa605

Browse files
committed
fix: flaky test
1 parent 9beaa04 commit 2eaa605

File tree

3 files changed

+44
-49
lines changed

3 files changed

+44
-49
lines changed

.github/workflows/tarpaulin.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
token: ${{secrets.CODECOV_TOKEN}}
2929

3030
- name: Archive code coverage results
31-
uses: actions/upload-artifact@v1
31+
uses: actions/upload-artifact@v4
3232
with:
3333
name: code-coverage-report
3434
path: cobertura.xml

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ event-listener = "5.4.1"
1717
# conc = {version="0.5.1", optional = true}
1818

1919
[dev-dependencies]
20-
# rand = "0.10.0-rc.0"
20+
rand = "0.10.0-rc.0"
2121
futures-test = "0.3.31"
2222
futures = {version = "0.3.31", features = ["thread-pool"]}
23-
serial_test = "3.2.0"
2423

2524
[features]
2625
default = ["arcswap", "rwlock"]

tests/async_tests.rs

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,49 @@
11
use bus_queue::flavors::arc_swap::async_bounded;
2-
// use futures::{executor, pin_mut, task::Poll, task::SpawnExt, FutureExt, SinkExt, StreamExt};
3-
use futures::{FutureExt, SinkExt, pin_mut, task::Poll};
2+
use futures::{FutureExt, SinkExt, StreamExt, executor, pin_mut, task::Poll, task::SpawnExt};
43
use futures_test::task::noop_context;
54
use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
6-
// use rand::Rng;
5+
use rand::Rng;
76
use std::sync::Arc;
8-
// use std::time::Duration;
9-
10-
// pool.spawn alternative
11-
// pool.spawn(stream.forward(publisher).map(drop)).unwrap();
12-
13-
// #[test]
14-
// fn test_subscriber_item_drop_related_to_ratio_of_timing() {
15-
// const LEAD_IN_TIME: Duration = Duration::from_millis(10);
16-
// const MIN_PUB_MS: u64 = 2;
17-
// const MAX_PUB_MS: u64 = 10;
18-
// const MIN_SUB_MULTIPLIER: u64 = 2;
19-
// const MAX_SUB_MULTIPLIER: u64 = 10;
20-
// const NUMBER_OF_GENERATED: usize = 1000;
21-
// let mut rng = rand::thread_rng();
22-
// let pub_ms = rng.gen_range(MIN_PUB_MS, MAX_PUB_MS);
23-
// let pub_time = Duration::from_millis(pub_ms);
24-
// let sub_multiplier = rng.gen_range(MIN_SUB_MULTIPLIER, MAX_SUB_MULTIPLIER);
25-
// let sub_time = Duration::from_millis(sub_multiplier * pub_ms);
26-
// let pool = executor::ThreadPool::new().unwrap();
27-
// let (mut publisher, mut subscriber) = async_bounded::<usize>(1);
28-
// pool.spawn(async move {
29-
// std::thread::sleep(LEAD_IN_TIME);
30-
// for i in 0usize..NUMBER_OF_GENERATED {
31-
// std::thread::sleep(pub_time);
32-
// publisher.send(i).await.unwrap()
33-
// }
34-
// })
35-
// .unwrap();
36-
// let vec: Vec<usize> = executor::block_on(async move {
37-
// let mut vec = Vec::new();
38-
// loop {
39-
// std::thread::sleep(sub_time);
40-
// match subscriber.next().await {
41-
// Some(item) => vec.push(*item),
42-
// _ => return vec,
43-
// }
44-
// }
45-
// });
46-
// assert!(
47-
// (vec.len() >= (NUMBER_OF_GENERATED / (sub_multiplier as usize + 1usize)))
48-
// && (vec.len() <= (NUMBER_OF_GENERATED / (sub_multiplier as usize - 1usize)))
49-
// )
50-
// }
7+
use std::time::Duration;
8+
9+
#[test]
10+
fn test_subscriber_item_drop_related_to_ratio_of_timing() {
11+
const LEAD_IN_TIME: Duration = Duration::from_millis(10);
12+
const MIN_PUB_MS: u64 = 2;
13+
const MAX_PUB_MS: u64 = 10;
14+
const MIN_SUB_MULTIPLIER: u64 = 2;
15+
const MAX_SUB_MULTIPLIER: u64 = 10;
16+
const NUMBER_OF_GENERATED: usize = 1000;
17+
let mut rng = rand::rng();
18+
let pub_ms = rng.random_range(MIN_PUB_MS..=MAX_PUB_MS);
19+
let pub_time = Duration::from_millis(pub_ms);
20+
let sub_multiplier = rng.random_range(MIN_SUB_MULTIPLIER..=MAX_SUB_MULTIPLIER);
21+
let sub_time = Duration::from_millis(sub_multiplier * pub_ms);
22+
let pool = executor::ThreadPool::new().unwrap();
23+
let (mut publisher, mut subscriber) = async_bounded::<usize>(1);
24+
pool.spawn(async move {
25+
std::thread::sleep(LEAD_IN_TIME);
26+
for i in 0usize..NUMBER_OF_GENERATED {
27+
std::thread::sleep(pub_time);
28+
publisher.send(i).await.unwrap()
29+
}
30+
})
31+
.unwrap();
32+
let vec: Vec<usize> = executor::block_on(async move {
33+
let mut vec = Vec::new();
34+
loop {
35+
std::thread::sleep(sub_time);
36+
match subscriber.next().await {
37+
Some(item) => vec.push(*item),
38+
_ => return vec,
39+
}
40+
}
41+
});
42+
assert!(
43+
(vec.len() >= (NUMBER_OF_GENERATED / (sub_multiplier as usize + 1usize)))
44+
&& (vec.len() <= (NUMBER_OF_GENERATED / (sub_multiplier as usize - 1usize)))
45+
)
46+
}
5147
#[test]
5248
fn subscriber_is_in_pending_state_before_first_data_is_published() {
5349
let (_publisher, subscriber) = async_bounded::<usize>(1);

0 commit comments

Comments
 (0)