Skip to content

Commit fcef592

Browse files
authored
chore: Use tokio-stream UnboundedReceiverStream (#831)
1 parent fe31565 commit fcef592

File tree

5 files changed

+6
-38
lines changed

5 files changed

+6
-38
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ rand = "0.9"
2020
slab = "0.4.9"
2121
sync_wrapper = "1"
2222
tokio = "1.6.2"
23-
tokio-stream = "0.1.0"
23+
tokio-stream = "0.1.1"
2424
tokio-test = "0.4"
2525
tokio-util = { version = "0.7.0", default-features = false }
2626
tracing = { version = "0.1.2", default-features = false }

tower/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ sync_wrapper = { workspace = true, optional = true }
7676
[dev-dependencies]
7777
futures = { workspace = true }
7878
hdrhistogram = { workspace = true }
79-
pin-project-lite = { workspace = true }
8079
tokio = { workspace = true, features = ["macros", "sync", "test-util", "rt-multi-thread"] }
8180
tokio-stream = { workspace = true }
8281
tokio-test = { workspace = true }

tower/tests/balance/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod support;
44

55
use std::future::Future;
66
use std::task::{Context, Poll};
7+
use tokio_stream::wrappers::UnboundedReceiverStream;
78
use tokio_test::{assert_pending, assert_ready, task};
89
use tower::balance::p2c::Balance;
910
use tower::discover::Change;
@@ -37,7 +38,7 @@ fn stress() {
3738
let _t = support::trace_init();
3839
let mut task = task::spawn(());
3940
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
40-
let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx));
41+
let mut cache = Balance::<_, Req>::new(UnboundedReceiverStream::new(rx));
4142

4243
let mut nready = 0;
4344
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();

tower/tests/support.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22

33
use std::fmt;
44
use std::future;
5-
use std::pin::Pin;
65
use std::task::{Context, Poll};
7-
use tokio::sync::mpsc;
8-
use tokio_stream::Stream;
96
use tower::Service;
107

118
pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
@@ -17,36 +14,6 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
1714
tracing::subscriber::set_default(subscriber)
1815
}
1916

20-
pin_project_lite::pin_project! {
21-
#[derive(Clone, Debug)]
22-
pub struct IntoStream<S> {
23-
#[pin]
24-
inner: S
25-
}
26-
}
27-
28-
impl<S> IntoStream<S> {
29-
pub fn new(inner: S) -> Self {
30-
Self { inner }
31-
}
32-
}
33-
34-
impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
35-
type Item = I;
36-
37-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38-
self.project().inner.poll_recv(cx)
39-
}
40-
}
41-
42-
impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
43-
type Item = I;
44-
45-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46-
self.project().inner.poll_recv(cx)
47-
}
48-
}
49-
5017
#[derive(Clone, Debug)]
5118
pub struct AssertSpanSvc {
5219
span: tracing::Span,

tower/tests/util/call_all.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::fmt;
55
use std::future::{ready, Future, Ready};
66
use std::task::{Context, Poll};
77
use std::{cell::Cell, rc::Rc};
8+
use tokio_stream::wrappers::UnboundedReceiverStream;
89
use tokio_test::{assert_pending, assert_ready, task};
910
use tower::util::ServiceExt;
1011
use tower_service::*;
@@ -50,7 +51,7 @@ fn ordered() {
5051
admit: admit.clone(),
5152
};
5253
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
53-
let ca = srv.call_all(support::IntoStream::new(rx));
54+
let ca = srv.call_all(UnboundedReceiverStream::new(rx));
5455
pin_mut!(ca);
5556

5657
assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));
@@ -152,7 +153,7 @@ async fn pending() {
152153
let mut task = task::spawn(());
153154

154155
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
155-
let ca = mock.call_all(support::IntoStream::new(rx));
156+
let ca = mock.call_all(UnboundedReceiverStream::new(rx));
156157
pin_mut!(ca);
157158

158159
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));

0 commit comments

Comments
 (0)