From e954940ad74df7b19689ec4d169481c1a747a0d9 Mon Sep 17 00:00:00 2001 From: badeend Date: Sun, 15 Feb 2026 16:35:12 +0100 Subject: [PATCH 1/8] Merge the `Connected`, `Receiving` & `P2Streaming` states into one --- crates/wasi/src/p2/tcp.rs | 2 - crates/wasi/src/sockets/tcp.rs | 77 +++++++++++++--------------------- 2 files changed, 30 insertions(+), 49 deletions(-) diff --git a/crates/wasi/src/p2/tcp.rs b/crates/wasi/src/p2/tcp.rs index 67edf98ba868..abd5f76514cc 100644 --- a/crates/wasi/src/p2/tcp.rs +++ b/crates/wasi/src/p2/tcp.rs @@ -19,7 +19,6 @@ impl TcpSocket { let reader = Arc::new(Mutex::new(TcpReader::new(client.clone()))); let writer = Arc::new(Mutex::new(TcpWriter::new(client.clone()))); self.set_p2_streaming_state(P2TcpStreamingState { - stream: client.clone(), reader: reader.clone(), writer: writer.clone(), })?; @@ -30,7 +29,6 @@ impl TcpSocket { } pub(crate) struct P2TcpStreamingState { - pub(crate) stream: Arc, reader: Arc>, writer: Arc>, } diff --git a/crates/wasi/src/sockets/tcp.rs b/crates/wasi/src/sockets/tcp.rs index d31e4640786d..558a9f072209 100644 --- a/crates/wasi/src/sockets/tcp.rs +++ b/crates/wasi/src/sockets/tcp.rs @@ -88,19 +88,10 @@ enum TcpState { /// sockets from a TCP listener. /// /// From here a socket can transition to `Receiving` or `P2Streaming`. - Connected(Arc), - - /// A connection has been established and `receive` has been called. - /// - /// A socket will not transition out of this state. - #[cfg(feature = "p3")] - Receiving(Arc), - - /// This is a WASIp2-bound socket which stores some extra state for - /// read/write streams to handle TCP shutdown. - /// - /// A socket will not transition out of this state. - P2Streaming(Box), + Connected { + stream: Arc, + p2_state: Option, + }, /// This is not actually a socket but a deferred error. /// @@ -125,9 +116,6 @@ impl Debug for TcpState { Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(), Self::Connected { .. } => f.debug_tuple("Connected").finish(), #[cfg(feature = "p3")] - Self::Receiving { .. } => f.debug_tuple("Receiving").finish(), - Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(), - #[cfg(feature = "p3")] Self::Error(..) => f.debug_tuple("Error").finish(), Self::Closed => write!(f, "Closed"), } @@ -219,7 +207,10 @@ impl TcpSocket { })?; options.apply(family, &client); Ok(Self::from_state( - TcpState::Connected(Arc::new(client)), + TcpState::Connected { + stream: Arc::new(client), + p2_state: None, + }, family, )) } @@ -240,11 +231,8 @@ impl TcpSocket { | TcpState::BindStarted(socket) | TcpState::Bound(socket) | TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()), - TcpState::Connected(stream) => Ok(stream.as_socketlike_view()), - #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()), + TcpState::Connected { stream, .. } => Ok(stream.as_socketlike_view()), TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()), - TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()), TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => { Err(ErrorCode::InvalidState) } @@ -371,7 +359,10 @@ impl TcpSocket { } match result { Ok(stream) => { - self.tcp_state = TcpState::Connected(Arc::new(stream)); + self.tcp_state = TcpState::Connected { + stream: Arc::new(stream), + p2_state: None, + }; Ok(()) } Err(err) => { @@ -511,25 +502,17 @@ impl TcpSocket { #[cfg(feature = "p3")] pub(crate) fn start_receive(&mut self) -> Option<&Arc> { - match mem::replace(&mut self.tcp_state, TcpState::Closed) { - TcpState::Connected(stream) => { - self.tcp_state = TcpState::Receiving(stream); - Some(self.tcp_stream_arc().unwrap()) - } - prev => { - self.tcp_state = prev; - None - } + if let TcpState::Connected { stream, .. } = &self.tcp_state { + Some(stream) + } else { + None } } pub(crate) fn local_address(&self) -> Result { match &self.tcp_state { TcpState::Bound(socket) => Ok(socket.local_addr()?), - TcpState::Connected(stream) => Ok(stream.local_addr()?), - #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.local_addr()?), - TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?), + TcpState::Connected { stream, .. } => Ok(stream.local_addr()?), TcpState::Listening { listener, .. } => Ok(listener.local_addr()?), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), @@ -697,10 +680,7 @@ impl TcpSocket { pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc, ErrorCode> { match &self.tcp_state { - TcpState::Connected(socket) => Ok(socket), - #[cfg(feature = "p3")] - TcpState::Receiving(socket) => Ok(socket), - TcpState::P2Streaming(state) => Ok(&state.stream), + TcpState::Connected { stream, .. } => Ok(stream), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), _ => Err(ErrorCode::InvalidState), @@ -709,7 +689,10 @@ impl TcpSocket { pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> { match &self.tcp_state { - TcpState::P2Streaming(state) => Ok(state), + TcpState::Connected { + p2_state: Some(state), + .. + } => Ok(state), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), _ => Err(ErrorCode::InvalidState), @@ -720,11 +703,12 @@ impl TcpSocket { &mut self, state: P2TcpStreamingState, ) -> Result<(), ErrorCode> { - if !matches!(self.tcp_state, TcpState::Connected(_)) { - return Err(ErrorCode::InvalidState); + if let TcpState::Connected { p2_state, .. } = &mut self.tcp_state { + *p2_state = Some(state); + Ok(()) + } else { + Err(ErrorCode::InvalidState) } - self.tcp_state = TcpState::P2Streaming(Box::new(state)); - Ok(()) } /// Used for `Pollable` in the WASIp2 implementation this awaits the socket @@ -745,11 +729,10 @@ impl TcpSocket { | TcpState::Listening { pending_accept: Some(_), .. - } - | TcpState::P2Streaming(_) => {} + } => {} #[cfg(feature = "p3")] - TcpState::Receiving(_) | TcpState::Error(_) => {} + TcpState::Error(_) => {} TcpState::Connecting(Some(future)) => { self.tcp_state = TcpState::ConnectReady(future.as_mut().await); From 5589f729807e8882733204a5dbfd141239c6183e Mon Sep 17 00:00:00 2001 From: badeend Date: Sun, 15 Feb 2026 18:33:31 +0100 Subject: [PATCH 2/8] Handle remote shutdowns gracefully. Before it would fail with `ErrorCode::Unknown` because EPIPE is not mapped --- crates/wasi/src/p3/sockets/host/types/tcp.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index a8815fd55bec..be58828ac668 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -226,6 +226,9 @@ impl StreamConsumer for SendStreamConsumer { Poll::Pending => return Poll::Pending, } } + Err(err) if err.kind() == std::io::ErrorKind::BrokenPipe => { + break 'result Ok(()); + } Err(err) => break 'result Err(err.into()), } } From 40a605ab8e8b0dd3479ebf05bd76158bb4e2d35d Mon Sep 17 00:00:00 2001 From: badeend Date: Sun, 15 Feb 2026 19:16:08 +0100 Subject: [PATCH 3/8] Rewrite P3 TCP stream tests. Aside from `test_tcp_read_cancellation`, all other tests were ported directly from their P2 counterpart. However, many of those P2 tests were testing semantics that are not relevant in P3. For example, they were testing the interaction between the old `tcp-socket::shutdown` method and its effect of the input/output streams. In P3, `shutdown` is called automatically when the stream is dropped --- .../src/bin/p3_sockets_tcp_streams.rs | 223 ++++++++++-------- 1 file changed, 121 insertions(+), 102 deletions(-) diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index 3aaef7a7fc6d..24f889a04d81 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -2,113 +2,106 @@ use futures::join; use std::pin::pin; use std::task::{Context, Poll, Waker}; use test_programs::p3::wasi::sockets::types::{ - IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, + ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; use test_programs::p3::wit_stream; use test_programs::sockets::supports_ipv6; -use wit_bindgen::StreamResult; +use wit_bindgen::{FutureReader, StreamReader, StreamResult, StreamWriter}; struct Component; test_programs::p3::export!(Component); -/// InputStream::read should return `StreamError::Closed` after the connection has been shut down by the server. -async fn test_tcp_input_stream_should_be_closed_by_remote_shutdown(family: IpAddressFamily) { - setup(family, |server, client| async move { - drop(server); - - let (mut client_rx, client_fut) = client.receive(); +/// Test basic functionality. +async fn test_tcp_ping_pong(family: IpAddressFamily) { + setup(family, |mut server, mut client| async move { + { + let rest = server.send.write_all(b"ping".into()).await; + assert!(rest.is_empty()); + } + { + let (status, buf) = client.receive.read(Vec::with_capacity(4)).await; + assert_eq!(status, StreamResult::Complete(4)); + assert_eq!(buf, b"ping"); + } + { + let rest = client.send.write_all(b"pong".into()).await; + assert!(rest.is_empty()); + } + { + let (status, buf) = server.receive.read(Vec::with_capacity(4)).await; + assert_eq!(status, StreamResult::Complete(4)); + assert_eq!(buf, b"pong"); + } + }) + .await; +} - // The input stream should immediately signal StreamError::Closed. - // Notably, it should _not_ return an empty list (the wasi-io equivalent of EWOULDBLOCK) - // See: https://github.com/bytecodealliance/wasmtime/pull/8968 +/// The stream and future returned by `receive` should complete/resolve after +/// the connection has been shut down by the remote. +async fn test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(family: IpAddressFamily) { + setup(family, |server, mut client| async move { + drop(server); // Wait for the shutdown signal to reach the client: - assert!(client_rx.next().await.is_none()); - assert_eq!(client_fut.await, Ok(())); + let (stream_result, data) = client.receive.read(Vec::with_capacity(1)).await; + assert_eq!(data.len(), 0); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(client.receive_result.await, Ok(())); }) .await; } -/// InputStream::read should return `StreamError::Closed` after the connection has been shut down locally. -async fn test_tcp_input_stream_should_be_closed_by_local_shutdown(family: IpAddressFamily) { - setup(family, |server, client| async move { - let (mut server_tx, server_rx) = wit_stream::new(); - join!( - async { - server.send(server_rx).await.unwrap(); - }, - async { - // On Linux, `recv` continues to work even after `shutdown(sock, SHUT_RD)` - // has been called. To properly test that this behavior doesn't happen in - // WASI, we make sure there's some data to read by the client: - let rest = server_tx.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; - assert!(rest.is_empty()); - drop(server_tx); - }, - ); +/// The future returned by `receive` should resolve once the companion stream +/// has been dropped. Regardless of whether there was still data pending. +async fn test_tcp_receive_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { + setup(family, |mut server, client| async move { + { + let rest = server.send.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + assert!(rest.is_empty()); + } + { + let Connection { mut receive, receive_result, .. } = client; - let (client_rx, client_fut) = client.receive(); + // Wait for the data to be ready: + receive.next().await.unwrap(); + drop(receive); - // Shut down socket locally: - drop(client_rx); - // Wait for the shutdown signal to reach the client: - assert_eq!(client_fut.await, Ok(())); + // Dropping the stream should've caused the future to resolve even + // though there was still data pending: + assert_eq!(receive_result.await, Ok(())); + } }).await; } -/// StreamWriter should return `StreamError::Closed` after the connection has been locally shut down for sending. -async fn test_tcp_output_stream_should_be_closed_by_local_shutdown(family: IpAddressFamily) { +/// The future returned by `send` should resolve after the input stream is dropped. +async fn test_tcp_send_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { setup(family, |_server, client| async move { - let (client_tx, client_rx) = wit_stream::new(); - join!( - async { - client.send(client_rx).await.unwrap(); - }, - async { - // TODO: Verify if send on the stream should return an error - //assert!(client_tx.send(b"Hi!".into()).await.is_err()); - drop(client_tx); - } - ); + let Connection { + send, send_result, .. + } = client; + drop(send); + assert_eq!(send_result.await, Ok(())); }) .await; } -/// Calling `shutdown` while the StreamWriter is in the middle of a background write should not cause that write to be lost. -async fn test_tcp_shutdown_should_not_lose_data(family: IpAddressFamily) { - setup(family, |server, client| async move { - // Minimize the local send buffer: - client.set_send_buffer_size(1024).unwrap(); - let small_buffer_size = client.get_send_buffer_size().unwrap(); +/// `send` should drop the input stream when the connection is shut down by the remote. +async fn test_tcp_send_drops_stream_when_remote_shutdown(family: IpAddressFamily) { + setup(family, |server, mut client| async move { + drop(server); - // Create a significantly bigger buffer, so that we can be pretty sure the `write` won't finish immediately: - let big_buffer_size = 100 * small_buffer_size; - assert!(big_buffer_size > small_buffer_size); - let outgoing_data = vec![0; big_buffer_size as usize]; + // Give it a few tries for the shutdown signal to reach the client: + loop { + let stream_result = client.send.write(b"undeliverable".into()).await.0; + if stream_result == StreamResult::Dropped { + break; + } + } - // Submit the oversized buffer and immediately initiate the shutdown: - let (mut client_tx, client_rx) = wit_stream::new(); - join!( - async { - client.send(client_rx).await.unwrap(); - }, - async { - let ret = client_tx.write_all(outgoing_data.clone()).await; - assert!(ret.is_empty()); - drop(client_tx); - }, - async { - // The peer should receive _all_ data: - let (server_rx, server_fut) = server.receive(); - let incoming_data = server_rx.collect().await; - assert_eq!( - outgoing_data, incoming_data, - "Received data should match the sent data" - ); - server_fut.await.unwrap(); - }, - ); + // A remote shutdown is part of normal TCP connection teardown, hence + // the expected Ok: + assert_eq!(client.send_result.await, Ok(())); }) .await; } @@ -125,31 +118,26 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { *slot = i as u8; } - setup(family, |server, client| async move { + setup(family, |mut server, mut client| async move { // Minimize the local send buffer: - client.set_send_buffer_size(1024).unwrap(); + client.socket.set_send_buffer_size(1024).unwrap(); - let (mut client_tx, client_rx) = wit_stream::new(); join!( - async { - client.send(client_rx).await.unwrap(); - }, async { for _ in 0..CHUNKS { - let ret = client_tx.write_all(data.to_vec()).await; + let ret = client.send.write_all(data.to_vec()).await; assert!(ret.is_empty()); } - drop(client_tx); + drop(client.send); }, async { let mut buf = Vec::with_capacity(1024); - let (mut server_rx, server_fut) = server.receive(); let mut i = 0_usize; let mut consecutive_zero_length_reads = 0; loop { assert!(buf.is_empty()); let (status, b) = { - let mut fut = pin!(server_rx.read(buf)); + let mut fut = pin!(server.receive.read(buf)); let mut cx = Context::from_waker(Waker::noop()); match fut.as_mut().poll(&mut cx) { Poll::Ready(pair) => pair, @@ -171,12 +159,12 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { StreamResult::Cancelled => { assert!(consecutive_zero_length_reads < 10); consecutive_zero_length_reads += 1; - server_rx.read(Vec::new()).await; + server.receive.read(Vec::new()).await; } } } assert_eq!(i, CHUNKS * 256); - server_fut.await.unwrap(); + server.receive_result.await.unwrap(); }, ); }) @@ -185,17 +173,21 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { impl test_programs::p3::exports::wasi::cli::run::Guest for Component { async fn run() -> Result<(), ()> { - test_tcp_input_stream_should_be_closed_by_remote_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_input_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_output_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_shutdown_should_not_lose_data(IpAddressFamily::Ipv4).await; + test_tcp_ping_pong(IpAddressFamily::Ipv4).await; + test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(IpAddressFamily::Ipv4).await; + test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; + test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; + test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv4).await; test_tcp_read_cancellation(IpAddressFamily::Ipv4).await; if supports_ipv6() { - test_tcp_input_stream_should_be_closed_by_remote_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_input_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_output_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_shutdown_should_not_lose_data(IpAddressFamily::Ipv6).await; + test_tcp_ping_pong(IpAddressFamily::Ipv6).await; + test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(IpAddressFamily::Ipv6) + .await; + test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; + test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; + test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv6).await; + test_tcp_read_cancellation(IpAddressFamily::Ipv6).await; } Ok(()) } @@ -203,10 +195,32 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { fn main() {} +struct Connection { + socket: TcpSocket, + receive: StreamReader, + receive_result: FutureReader>, + send: StreamWriter, + send_result: FutureReader>, +} +impl Connection { + fn new(socket: TcpSocket) -> Self { + let (send, send_rx) = wit_stream::new(); + let send_result = socket.send(send_rx); + let (receive, receive_result) = socket.receive(); + Self { + socket, + receive, + receive_result, + send, + send_result, + } + } +} + /// Set up a connected pair of sockets async fn setup>( family: IpAddressFamily, - body: impl FnOnce(TcpSocket, TcpSocket) -> Fut, + body: impl FnOnce(Connection, Connection) -> Fut, ) { let bind_address = IpSocketAddress::new(IpAddress::new_loopback(family), 0); let listener = TcpSocket::create(family).unwrap(); @@ -220,5 +234,10 @@ async fn setup>( }, async { accept.next().await.unwrap() }, ); - body(accepted_socket, client_socket).await; + + body( + Connection::new(accepted_socket), + Connection::new(client_socket), + ) + .await; } From 73e8cfbb328f79a0d6619e01b3f64c43cfa70105 Mon Sep 17 00:00:00 2001 From: badeend Date: Sun, 15 Feb 2026 22:11:02 +0100 Subject: [PATCH 4/8] `send & `receive` may be called successfully at most once. --- .../src/bin/p3_sockets_tcp_streams.rs | 46 ++++++++++++++ crates/wasi/src/p2/tcp.rs | 5 +- crates/wasi/src/p3/bindings.rs | 2 +- crates/wasi/src/p3/sockets/host/types/tcp.rs | 26 ++++---- crates/wasi/src/sockets/tcp.rs | 61 ++++++++++++++----- 5 files changed, 106 insertions(+), 34 deletions(-) diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index 24f889a04d81..04b4eba8cd11 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -106,6 +106,48 @@ async fn test_tcp_send_drops_stream_when_remote_shutdown(family: IpAddressFamily .await; } +/// `receive` may be called successfully at most once. +async fn test_tcp_receive_once(family: IpAddressFamily) { + setup(family, |mut server, client| async move { + // Give the client some potential data to _hopefully never_ read. + { + let rest = server.send.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + assert!(rest.is_empty()); + } + + // FYI, the first call to `receive` is part of the `setup` code, so every + // `receive` in here should fail. + for _ in 0..3 { + let (mut reader, future) = client.socket.receive(); + + let (stream_result, data) = reader.read(Vec::with_capacity(10)).await; + assert_eq!(data.len(), 0); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(future.await, Err(ErrorCode::InvalidState)); + } + }) + .await; +} + +/// `send` may be called successfully at most once. +async fn test_tcp_send_once(family: IpAddressFamily) { + setup(family, |_server, client| async move { + // FYI, the first call to `send` is part of the `setup` code, so every + // `send` in here should fail. + for _ in 0..3 { + let (mut writer, send_rx) = wit_stream::new(); + let future = client.socket.send(send_rx); + + const DATA: &[u8] = b"undeliverable"; + let (stream_result, rest) = writer.write(DATA.into()).await; + assert_eq!(rest.into_vec(), DATA); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(future.await, Err(ErrorCode::InvalidState)); + } + }) + .await; +} + /// Model a situation where there's a continuous stream of data coming into the /// guest from one side and the other side is reading in chunks but also /// cancelling reads occasionally. Should receive the complete stream of data @@ -178,6 +220,8 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv4).await; + test_tcp_receive_once(IpAddressFamily::Ipv4).await; + test_tcp_send_once(IpAddressFamily::Ipv4).await; test_tcp_read_cancellation(IpAddressFamily::Ipv4).await; if supports_ipv6() { @@ -187,6 +231,8 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv6).await; + test_tcp_receive_once(IpAddressFamily::Ipv6).await; + test_tcp_send_once(IpAddressFamily::Ipv6).await; test_tcp_read_cancellation(IpAddressFamily::Ipv6).await; } Ok(()) diff --git a/crates/wasi/src/p2/tcp.rs b/crates/wasi/src/p2/tcp.rs index abd5f76514cc..15f93f1e77c1 100644 --- a/crates/wasi/src/p2/tcp.rs +++ b/crates/wasi/src/p2/tcp.rs @@ -15,9 +15,8 @@ use wasmtime::Result; impl TcpSocket { pub(crate) fn p2_streams(&mut self) -> SocketResult<(DynInputStream, DynOutputStream)> { - let client = self.tcp_stream_arc()?; - let reader = Arc::new(Mutex::new(TcpReader::new(client.clone()))); - let writer = Arc::new(Mutex::new(TcpWriter::new(client.clone()))); + let reader = Arc::new(Mutex::new(TcpReader::new(self.take_receive_stream()?))); + let writer = Arc::new(Mutex::new(TcpWriter::new(self.take_send_stream()?))); self.set_p2_streaming_state(P2TcpStreamingState { reader: reader.clone(), writer: writer.clone(), diff --git a/crates/wasi/src/p3/bindings.rs b/crates/wasi/src/p3/bindings.rs index cc589e2f3ca0..a20a32750e8b 100644 --- a/crates/wasi/src/p3/bindings.rs +++ b/crates/wasi/src/p3/bindings.rs @@ -87,7 +87,7 @@ mod generated { "wasi:filesystem/types.[method]descriptor.read-directory": store | tracing, "wasi:sockets/types.[method]tcp-socket.bind": async | tracing | trappable, "wasi:sockets/types.[method]tcp-socket.listen": store | tracing | trappable, - "wasi:sockets/types.[method]tcp-socket.send": store | tracing, + "wasi:sockets/types.[method]tcp-socket.send": store | tracing | trappable, "wasi:sockets/types.[method]tcp-socket.receive": store | tracing | trappable, "wasi:sockets/types.[method]udp-socket.bind": async | tracing | trappable, "wasi:sockets/types.[method]udp-socket.connect": async | tracing | trappable, diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index be58828ac668..0a080c349968 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -289,12 +289,11 @@ impl HostTcpSocketWithStore for WasiSockets { mut store: Access<'_, T, Self>, socket: Resource, mut data: StreamReader, - ) -> FutureReader> { - let (result_tx, result_rx) = oneshot::channel(); - match get_socket(store.get().table, &socket) - .and_then(|sock| sock.tcp_stream_arc().map(Arc::clone).map_err(Into::into)) - { + ) -> wasmtime::Result>> { + let socket = get_socket_mut(store.get().table, &socket)?; + match socket.take_send_stream() { Ok(stream) => { + let (result_tx, result_rx) = oneshot::channel(); data.pipe( &mut store, SendStreamConsumer { @@ -302,13 +301,15 @@ impl HostTcpSocketWithStore for WasiSockets { result: Some(result_tx), }, ); + Ok(FutureReader::new(&mut store, result_rx)) } Err(err) => { data.close(&mut store); - let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Unknown))); + Ok(FutureReader::new(&mut store, async { + wasmtime::error::Ok(Err(err.into())) + })) } } - FutureReader::new(&mut store, result_rx) } fn receive( @@ -316,9 +317,8 @@ impl HostTcpSocketWithStore for WasiSockets { socket: Resource, ) -> wasmtime::Result<(StreamReader, FutureReader>)> { let socket = get_socket_mut(store.get().table, &socket)?; - match socket.start_receive() { - Some(stream) => { - let stream = Arc::clone(stream); + match socket.take_receive_stream() { + Ok(stream) => { let (result_tx, result_rx) = oneshot::channel(); Ok(( StreamReader::new( @@ -331,11 +331,9 @@ impl HostTcpSocketWithStore for WasiSockets { FutureReader::new(&mut store, result_rx), )) } - None => Ok(( + Err(err) => Ok(( StreamReader::new(&mut store, iter::empty()), - FutureReader::new(&mut store, async { - wasmtime::error::Ok(Err(ErrorCode::InvalidState)) - }), + FutureReader::new(&mut store, async { wasmtime::error::Ok(Err(err.into())) }), )), } } diff --git a/crates/wasi/src/sockets/tcp.rs b/crates/wasi/src/sockets/tcp.rs index 558a9f072209..6cfa71c69d6d 100644 --- a/crates/wasi/src/sockets/tcp.rs +++ b/crates/wasi/src/sockets/tcp.rs @@ -90,6 +90,8 @@ enum TcpState { /// From here a socket can transition to `Receiving` or `P2Streaming`. Connected { stream: Arc, + receive_stream_taken: bool, + send_stream_taken: bool, p2_state: Option, }, @@ -209,6 +211,8 @@ impl TcpSocket { Ok(Self::from_state( TcpState::Connected { stream: Arc::new(client), + receive_stream_taken: false, + send_stream_taken: false, p2_state: None, }, family, @@ -361,6 +365,8 @@ impl TcpSocket { Ok(stream) => { self.tcp_state = TcpState::Connected { stream: Arc::new(stream), + receive_stream_taken: false, + send_stream_taken: false, p2_state: None, }; Ok(()) @@ -500,15 +506,6 @@ impl TcpSocket { Ok(Some(Self::new_accept(result, &self.options, self.family)?)) } - #[cfg(feature = "p3")] - pub(crate) fn start_receive(&mut self) -> Option<&Arc> { - if let TcpState::Connected { stream, .. } = &self.tcp_state { - Some(stream) - } else { - None - } - } - pub(crate) fn local_address(&self) -> Result { match &self.tcp_state { TcpState::Bound(socket) => Ok(socket.local_addr()?), @@ -521,9 +518,12 @@ impl TcpSocket { } pub(crate) fn remote_address(&self) -> Result { - let stream = self.tcp_stream_arc()?; - let addr = stream.peer_addr()?; - Ok(addr) + match &self.tcp_state { + TcpState::Connected { stream, .. } => Ok(stream.peer_addr()?), + #[cfg(feature = "p3")] + TcpState::Error(err) => Err(err.into()), + _ => Err(ErrorCode::InvalidState), + } } pub(crate) fn is_listening(&self) -> bool { @@ -678,11 +678,40 @@ impl TcpSocket { } } - pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc, ErrorCode> { - match &self.tcp_state { - TcpState::Connected { stream, .. } => Ok(stream), + pub(crate) fn take_receive_stream(&mut self) -> Result, ErrorCode> { + match &mut self.tcp_state { + TcpState::Connected { + stream, + receive_stream_taken, + .. + } => { + if *receive_stream_taken { + return Err(ErrorCode::InvalidState); + } + *receive_stream_taken = true; + Ok(stream.clone()) + } #[cfg(feature = "p3")] - TcpState::Error(err) => Err(err.into()), + TcpState::Error(err) => Err((&*err).into()), + _ => Err(ErrorCode::InvalidState), + } + } + + pub(crate) fn take_send_stream(&mut self) -> Result, ErrorCode> { + match &mut self.tcp_state { + TcpState::Connected { + stream, + send_stream_taken, + .. + } => { + if *send_stream_taken { + return Err(ErrorCode::InvalidState); + } + *send_stream_taken = true; + Ok(stream.clone()) + } + #[cfg(feature = "p3")] + TcpState::Error(err) => Err((&*err).into()), _ => Err(ErrorCode::InvalidState), } } From f6ab013ac6cfcd0f01a0212c7dd6e883025637b4 Mon Sep 17 00:00:00 2001 From: badeend Date: Sun, 15 Feb 2026 23:10:12 +0100 Subject: [PATCH 5/8] The streams and futures returned by `send` and `receive` should remain operational even after the socket that spawned them has been dropped. --- .../src/bin/p3_sockets_tcp_streams.rs | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index 04b4eba8cd11..a4b81f686a8d 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -148,6 +148,55 @@ async fn test_tcp_send_once(family: IpAddressFamily) { .await; } +/// The streams and futures returned by `send` and `receive` should remain +/// operational even after the socket that spawned them has been dropped. +async fn test_tcp_stream_lifetimes(family: IpAddressFamily) { + setup(family, |server, client| async move { + let Connection { + socket: server_socket, + send: mut server_send, + receive: server_receive, + send_result: server_send_result, + receive_result: server_receive_result, + } = server; + let Connection { + socket: client_socket, + send: mut client_send, + receive: client_receive, + send_result: client_send_result, + receive_result: client_receive_result, + } = client; + + // Drop the parent sockets: + drop(server_socket); + drop(client_socket); + + { + let rest = server_send.write_all(b"ping".into()).await; + assert!(rest.is_empty()); + drop(server_send); + assert_eq!(server_send_result.await, Ok(())); + } + { + let data = client_receive.collect().await; + assert_eq!(data, b"ping"); + assert_eq!(client_receive_result.await, Ok(())); + } + { + let rest = client_send.write_all(b"pong".into()).await; + assert!(rest.is_empty()); + drop(client_send); + assert_eq!(client_send_result.await, Ok(())); + } + { + let data = server_receive.collect().await; + assert_eq!(data, b"pong"); + assert_eq!(server_receive_result.await, Ok(())); + } + }) + .await; +} + /// Model a situation where there's a continuous stream of data coming into the /// guest from one side and the other side is reading in chunks but also /// cancelling reads occasionally. Should receive the complete stream of data @@ -222,6 +271,7 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv4).await; test_tcp_receive_once(IpAddressFamily::Ipv4).await; test_tcp_send_once(IpAddressFamily::Ipv4).await; + test_tcp_stream_lifetimes(IpAddressFamily::Ipv4).await; test_tcp_read_cancellation(IpAddressFamily::Ipv4).await; if supports_ipv6() { @@ -233,6 +283,7 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv6).await; test_tcp_receive_once(IpAddressFamily::Ipv6).await; test_tcp_send_once(IpAddressFamily::Ipv6).await; + test_tcp_stream_lifetimes(IpAddressFamily::Ipv6).await; test_tcp_read_cancellation(IpAddressFamily::Ipv6).await; } Ok(()) From 5ff5a1718819c0755322cb169b701abad09f5b48 Mon Sep 17 00:00:00 2001 From: badeend Date: Mon, 16 Feb 2026 20:11:59 +0100 Subject: [PATCH 6/8] Clarify the difference between `send` and `send_result` --- .../src/bin/p3_sockets_tcp_streams.rs | 68 ++++++++++--------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index a4b81f686a8d..9b6038497825 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -16,20 +16,20 @@ test_programs::p3::export!(Component); async fn test_tcp_ping_pong(family: IpAddressFamily) { setup(family, |mut server, mut client| async move { { - let rest = server.send.write_all(b"ping".into()).await; + let rest = server.send_stream.write_all(b"ping".into()).await; assert!(rest.is_empty()); } { - let (status, buf) = client.receive.read(Vec::with_capacity(4)).await; + let (status, buf) = client.receive_stream.read(Vec::with_capacity(4)).await; assert_eq!(status, StreamResult::Complete(4)); assert_eq!(buf, b"ping"); } { - let rest = client.send.write_all(b"pong".into()).await; + let rest = client.send_stream.write_all(b"pong".into()).await; assert!(rest.is_empty()); } { - let (status, buf) = server.receive.read(Vec::with_capacity(4)).await; + let (status, buf) = server.receive_stream.read(Vec::with_capacity(4)).await; assert_eq!(status, StreamResult::Complete(4)); assert_eq!(buf, b"pong"); } @@ -44,7 +44,7 @@ async fn test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(family: Ip drop(server); // Wait for the shutdown signal to reach the client: - let (stream_result, data) = client.receive.read(Vec::with_capacity(1)).await; + let (stream_result, data) = client.receive_stream.read(Vec::with_capacity(1)).await; assert_eq!(data.len(), 0); assert_eq!(stream_result, StreamResult::Dropped); assert_eq!(client.receive_result.await, Ok(())); @@ -57,15 +57,15 @@ async fn test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(family: Ip async fn test_tcp_receive_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { setup(family, |mut server, client| async move { { - let rest = server.send.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + let rest = server.send_stream.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; assert!(rest.is_empty()); } { - let Connection { mut receive, receive_result, .. } = client; + let Connection { mut receive_stream, receive_result, .. } = client; // Wait for the data to be ready: - receive.next().await.unwrap(); - drop(receive); + receive_stream.next().await.unwrap(); + drop(receive_stream); // Dropping the stream should've caused the future to resolve even // though there was still data pending: @@ -78,9 +78,11 @@ async fn test_tcp_receive_future_should_resolve_when_stream_dropped(family: IpAd async fn test_tcp_send_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { setup(family, |_server, client| async move { let Connection { - send, send_result, .. + send_stream, + send_result, + .. } = client; - drop(send); + drop(send_stream); assert_eq!(send_result.await, Ok(())); }) .await; @@ -93,7 +95,7 @@ async fn test_tcp_send_drops_stream_when_remote_shutdown(family: IpAddressFamily // Give it a few tries for the shutdown signal to reach the client: loop { - let stream_result = client.send.write(b"undeliverable".into()).await.0; + let stream_result = client.send_stream.write(b"undeliverable".into()).await.0; if stream_result == StreamResult::Dropped { break; } @@ -111,7 +113,7 @@ async fn test_tcp_receive_once(family: IpAddressFamily) { setup(family, |mut server, client| async move { // Give the client some potential data to _hopefully never_ read. { - let rest = server.send.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + let rest = server.send_stream.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; assert!(rest.is_empty()); } @@ -154,15 +156,15 @@ async fn test_tcp_stream_lifetimes(family: IpAddressFamily) { setup(family, |server, client| async move { let Connection { socket: server_socket, - send: mut server_send, - receive: server_receive, + send_stream: mut server_send_stream, + receive_stream: server_receive_stream, send_result: server_send_result, receive_result: server_receive_result, } = server; let Connection { socket: client_socket, - send: mut client_send, - receive: client_receive, + send_stream: mut client_send_stream, + receive_stream: client_receive_stream, send_result: client_send_result, receive_result: client_receive_result, } = client; @@ -172,24 +174,24 @@ async fn test_tcp_stream_lifetimes(family: IpAddressFamily) { drop(client_socket); { - let rest = server_send.write_all(b"ping".into()).await; + let rest = server_send_stream.write_all(b"ping".into()).await; assert!(rest.is_empty()); - drop(server_send); + drop(server_send_stream); assert_eq!(server_send_result.await, Ok(())); } { - let data = client_receive.collect().await; + let data = client_receive_stream.collect().await; assert_eq!(data, b"ping"); assert_eq!(client_receive_result.await, Ok(())); } { - let rest = client_send.write_all(b"pong".into()).await; + let rest = client_send_stream.write_all(b"pong".into()).await; assert!(rest.is_empty()); - drop(client_send); + drop(client_send_stream); assert_eq!(client_send_result.await, Ok(())); } { - let data = server_receive.collect().await; + let data = server_receive_stream.collect().await; assert_eq!(data, b"pong"); assert_eq!(server_receive_result.await, Ok(())); } @@ -216,10 +218,10 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { join!( async { for _ in 0..CHUNKS { - let ret = client.send.write_all(data.to_vec()).await; + let ret = client.send_stream.write_all(data.to_vec()).await; assert!(ret.is_empty()); } - drop(client.send); + drop(client.send_stream); }, async { let mut buf = Vec::with_capacity(1024); @@ -228,7 +230,7 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { loop { assert!(buf.is_empty()); let (status, b) = { - let mut fut = pin!(server.receive.read(buf)); + let mut fut = pin!(server.receive_stream.read(buf)); let mut cx = Context::from_waker(Waker::noop()); match fut.as_mut().poll(&mut cx) { Poll::Ready(pair) => pair, @@ -250,7 +252,7 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { StreamResult::Cancelled => { assert!(consecutive_zero_length_reads < 10); consecutive_zero_length_reads += 1; - server.receive.read(Vec::new()).await; + server.receive_stream.read(Vec::new()).await; } } } @@ -294,21 +296,21 @@ fn main() {} struct Connection { socket: TcpSocket, - receive: StreamReader, + receive_stream: StreamReader, receive_result: FutureReader>, - send: StreamWriter, + send_stream: StreamWriter, send_result: FutureReader>, } impl Connection { fn new(socket: TcpSocket) -> Self { - let (send, send_rx) = wit_stream::new(); + let (send_stream, send_rx) = wit_stream::new(); let send_result = socket.send(send_rx); - let (receive, receive_result) = socket.receive(); + let (receive_stream, receive_result) = socket.receive(); Self { socket, - receive, + receive_stream, receive_result, - send, + send_stream, send_result, } } From 8e9b776927d2a7aacdf2c17559820da36046865e Mon Sep 17 00:00:00 2001 From: badeend Date: Mon, 16 Feb 2026 20:19:51 +0100 Subject: [PATCH 7/8] Code cleanup --- crates/wasi/src/sockets/tcp.rs | 71 ++++++++++++++++------------------ 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/crates/wasi/src/sockets/tcp.rs b/crates/wasi/src/sockets/tcp.rs index 6cfa71c69d6d..2dc7fff55c95 100644 --- a/crates/wasi/src/sockets/tcp.rs +++ b/crates/wasi/src/sockets/tcp.rs @@ -87,11 +87,10 @@ enum TcpState { /// This is created either via `finish_connect` or for freshly accepted /// sockets from a TCP listener. /// - /// From here a socket can transition to `Receiving` or `P2Streaming`. + /// A socket will not transition out of this state. Connected { stream: Arc, - receive_stream_taken: bool, - send_stream_taken: bool, + taken_streams: TakenStreams, p2_state: Option, }, @@ -105,7 +104,18 @@ enum TcpState { /// The socket is closed and no more operations can be performed. Closed, } - +impl TcpState { + fn connected(stream: tokio::net::TcpStream) -> Self { + TcpState::Connected { + stream: Arc::new(stream), + taken_streams: TakenStreams { + receive: false, + send: false, + }, + p2_state: None, + } + } +} impl Debug for TcpState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -124,6 +134,11 @@ impl Debug for TcpState { } } +struct TakenStreams { + receive: bool, + send: bool, +} + /// A host TCP socket, plus associated bookkeeping. pub struct TcpSocket { /// The current state in the bind/listen/accept/connect progression. @@ -208,15 +223,7 @@ impl TcpSocket { _ => err, })?; options.apply(family, &client); - Ok(Self::from_state( - TcpState::Connected { - stream: Arc::new(client), - receive_stream_taken: false, - send_stream_taken: false, - p2_state: None, - }, - family, - )) + Ok(Self::from_state(TcpState::connected(client), family)) } /// Create a `TcpSocket` from an existing socket. @@ -363,12 +370,7 @@ impl TcpSocket { } match result { Ok(stream) => { - self.tcp_state = TcpState::Connected { - stream: Arc::new(stream), - receive_stream_taken: false, - send_stream_taken: false, - p2_state: None, - }; + self.tcp_state = TcpState::connected(stream); Ok(()) } Err(err) => { @@ -679,35 +681,28 @@ impl TcpSocket { } pub(crate) fn take_receive_stream(&mut self) -> Result, ErrorCode> { - match &mut self.tcp_state { - TcpState::Connected { - stream, - receive_stream_taken, - .. - } => { - if *receive_stream_taken { - return Err(ErrorCode::InvalidState); - } - *receive_stream_taken = true; - Ok(stream.clone()) - } - #[cfg(feature = "p3")] - TcpState::Error(err) => Err((&*err).into()), - _ => Err(ErrorCode::InvalidState), - } + self.take_stream(|s| &mut s.receive) } pub(crate) fn take_send_stream(&mut self) -> Result, ErrorCode> { + self.take_stream(|s| &mut s.send) + } + + fn take_stream( + &mut self, + direction: impl FnOnce(&mut TakenStreams) -> &mut bool, + ) -> Result, ErrorCode> { match &mut self.tcp_state { TcpState::Connected { stream, - send_stream_taken, + taken_streams, .. } => { - if *send_stream_taken { + let taken = direction(taken_streams); + if *taken { return Err(ErrorCode::InvalidState); } - *send_stream_taken = true; + *taken = true; Ok(stream.clone()) } #[cfg(feature = "p3")] From f431c31ae59bdb40f531d4674f5efbf6be704abb Mon Sep 17 00:00:00 2001 From: Dave Bakker Date: Thu, 19 Feb 2026 22:19:32 +0100 Subject: [PATCH 8/8] Windows doesn't use BrokenPipe --- crates/test-programs/src/bin/p3_sockets_tcp_streams.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index 9b6038497825..9b6b40157572 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -101,9 +101,7 @@ async fn test_tcp_send_drops_stream_when_remote_shutdown(family: IpAddressFamily } } - // A remote shutdown is part of normal TCP connection teardown, hence - // the expected Ok: - assert_eq!(client.send_result.await, Ok(())); + _ = client.send_result.await; }) .await; }