diff --git a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs index 3aaef7a7fc6d..9b6b40157572 100644 --- a/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs +++ b/crates/test-programs/src/bin/p3_sockets_tcp_streams.rs @@ -2,113 +2,197 @@ use futures::join; use std::pin::pin; use std::task::{Context, Poll, Waker}; use test_programs::p3::wasi::sockets::types::{ - IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, + ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; use test_programs::p3::wit_stream; use test_programs::sockets::supports_ipv6; -use wit_bindgen::StreamResult; +use wit_bindgen::{FutureReader, StreamReader, StreamResult, StreamWriter}; struct Component; test_programs::p3::export!(Component); -/// InputStream::read should return `StreamError::Closed` after the connection has been shut down by the server. -async fn test_tcp_input_stream_should_be_closed_by_remote_shutdown(family: IpAddressFamily) { - setup(family, |server, client| async move { - drop(server); - - let (mut client_rx, client_fut) = client.receive(); +/// Test basic functionality. +async fn test_tcp_ping_pong(family: IpAddressFamily) { + setup(family, |mut server, mut client| async move { + { + let rest = server.send_stream.write_all(b"ping".into()).await; + assert!(rest.is_empty()); + } + { + let (status, buf) = client.receive_stream.read(Vec::with_capacity(4)).await; + assert_eq!(status, StreamResult::Complete(4)); + assert_eq!(buf, b"ping"); + } + { + let rest = client.send_stream.write_all(b"pong".into()).await; + assert!(rest.is_empty()); + } + { + let (status, buf) = server.receive_stream.read(Vec::with_capacity(4)).await; + assert_eq!(status, StreamResult::Complete(4)); + assert_eq!(buf, b"pong"); + } + }) + .await; +} - // The input stream should immediately signal StreamError::Closed. - // Notably, it should _not_ return an empty list (the wasi-io equivalent of EWOULDBLOCK) - // See: https://github.com/bytecodealliance/wasmtime/pull/8968 +/// The stream and future returned by `receive` should complete/resolve after +/// the connection has been shut down by the remote. +async fn test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(family: IpAddressFamily) { + setup(family, |server, mut client| async move { + drop(server); // Wait for the shutdown signal to reach the client: - assert!(client_rx.next().await.is_none()); - assert_eq!(client_fut.await, Ok(())); + let (stream_result, data) = client.receive_stream.read(Vec::with_capacity(1)).await; + assert_eq!(data.len(), 0); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(client.receive_result.await, Ok(())); }) .await; } -/// InputStream::read should return `StreamError::Closed` after the connection has been shut down locally. -async fn test_tcp_input_stream_should_be_closed_by_local_shutdown(family: IpAddressFamily) { - setup(family, |server, client| async move { - let (mut server_tx, server_rx) = wit_stream::new(); - join!( - async { - server.send(server_rx).await.unwrap(); - }, - async { - // On Linux, `recv` continues to work even after `shutdown(sock, SHUT_RD)` - // has been called. To properly test that this behavior doesn't happen in - // WASI, we make sure there's some data to read by the client: - let rest = server_tx.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; - assert!(rest.is_empty()); - drop(server_tx); - }, - ); +/// The future returned by `receive` should resolve once the companion stream +/// has been dropped. Regardless of whether there was still data pending. +async fn test_tcp_receive_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { + setup(family, |mut server, client| async move { + { + let rest = server.send_stream.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + assert!(rest.is_empty()); + } + { + let Connection { mut receive_stream, receive_result, .. } = client; - let (client_rx, client_fut) = client.receive(); + // Wait for the data to be ready: + receive_stream.next().await.unwrap(); + drop(receive_stream); - // Shut down socket locally: - drop(client_rx); - // Wait for the shutdown signal to reach the client: - assert_eq!(client_fut.await, Ok(())); + // Dropping the stream should've caused the future to resolve even + // though there was still data pending: + assert_eq!(receive_result.await, Ok(())); + } }).await; } -/// StreamWriter should return `StreamError::Closed` after the connection has been locally shut down for sending. -async fn test_tcp_output_stream_should_be_closed_by_local_shutdown(family: IpAddressFamily) { +/// The future returned by `send` should resolve after the input stream is dropped. +async fn test_tcp_send_future_should_resolve_when_stream_dropped(family: IpAddressFamily) { setup(family, |_server, client| async move { - let (client_tx, client_rx) = wit_stream::new(); - join!( - async { - client.send(client_rx).await.unwrap(); - }, - async { - // TODO: Verify if send on the stream should return an error - //assert!(client_tx.send(b"Hi!".into()).await.is_err()); - drop(client_tx); + let Connection { + send_stream, + send_result, + .. + } = client; + drop(send_stream); + assert_eq!(send_result.await, Ok(())); + }) + .await; +} + +/// `send` should drop the input stream when the connection is shut down by the remote. +async fn test_tcp_send_drops_stream_when_remote_shutdown(family: IpAddressFamily) { + setup(family, |server, mut client| async move { + drop(server); + + // Give it a few tries for the shutdown signal to reach the client: + loop { + let stream_result = client.send_stream.write(b"undeliverable".into()).await.0; + if stream_result == StreamResult::Dropped { + break; } - ); + } + + _ = client.send_result.await; + }) + .await; +} + +/// `receive` may be called successfully at most once. +async fn test_tcp_receive_once(family: IpAddressFamily) { + setup(family, |mut server, client| async move { + // Give the client some potential data to _hopefully never_ read. + { + let rest = server.send_stream.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + assert!(rest.is_empty()); + } + + // FYI, the first call to `receive` is part of the `setup` code, so every + // `receive` in here should fail. + for _ in 0..3 { + let (mut reader, future) = client.socket.receive(); + + let (stream_result, data) = reader.read(Vec::with_capacity(10)).await; + assert_eq!(data.len(), 0); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(future.await, Err(ErrorCode::InvalidState)); + } + }) + .await; +} + +/// `send` may be called successfully at most once. +async fn test_tcp_send_once(family: IpAddressFamily) { + setup(family, |_server, client| async move { + // FYI, the first call to `send` is part of the `setup` code, so every + // `send` in here should fail. + for _ in 0..3 { + let (mut writer, send_rx) = wit_stream::new(); + let future = client.socket.send(send_rx); + + const DATA: &[u8] = b"undeliverable"; + let (stream_result, rest) = writer.write(DATA.into()).await; + assert_eq!(rest.into_vec(), DATA); + assert_eq!(stream_result, StreamResult::Dropped); + assert_eq!(future.await, Err(ErrorCode::InvalidState)); + } }) .await; } -/// Calling `shutdown` while the StreamWriter is in the middle of a background write should not cause that write to be lost. -async fn test_tcp_shutdown_should_not_lose_data(family: IpAddressFamily) { +/// The streams and futures returned by `send` and `receive` should remain +/// operational even after the socket that spawned them has been dropped. +async fn test_tcp_stream_lifetimes(family: IpAddressFamily) { setup(family, |server, client| async move { - // Minimize the local send buffer: - client.set_send_buffer_size(1024).unwrap(); - let small_buffer_size = client.get_send_buffer_size().unwrap(); + let Connection { + socket: server_socket, + send_stream: mut server_send_stream, + receive_stream: server_receive_stream, + send_result: server_send_result, + receive_result: server_receive_result, + } = server; + let Connection { + socket: client_socket, + send_stream: mut client_send_stream, + receive_stream: client_receive_stream, + send_result: client_send_result, + receive_result: client_receive_result, + } = client; - // Create a significantly bigger buffer, so that we can be pretty sure the `write` won't finish immediately: - let big_buffer_size = 100 * small_buffer_size; - assert!(big_buffer_size > small_buffer_size); - let outgoing_data = vec![0; big_buffer_size as usize]; + // Drop the parent sockets: + drop(server_socket); + drop(client_socket); - // Submit the oversized buffer and immediately initiate the shutdown: - let (mut client_tx, client_rx) = wit_stream::new(); - join!( - async { - client.send(client_rx).await.unwrap(); - }, - async { - let ret = client_tx.write_all(outgoing_data.clone()).await; - assert!(ret.is_empty()); - drop(client_tx); - }, - async { - // The peer should receive _all_ data: - let (server_rx, server_fut) = server.receive(); - let incoming_data = server_rx.collect().await; - assert_eq!( - outgoing_data, incoming_data, - "Received data should match the sent data" - ); - server_fut.await.unwrap(); - }, - ); + { + let rest = server_send_stream.write_all(b"ping".into()).await; + assert!(rest.is_empty()); + drop(server_send_stream); + assert_eq!(server_send_result.await, Ok(())); + } + { + let data = client_receive_stream.collect().await; + assert_eq!(data, b"ping"); + assert_eq!(client_receive_result.await, Ok(())); + } + { + let rest = client_send_stream.write_all(b"pong".into()).await; + assert!(rest.is_empty()); + drop(client_send_stream); + assert_eq!(client_send_result.await, Ok(())); + } + { + let data = server_receive_stream.collect().await; + assert_eq!(data, b"pong"); + assert_eq!(server_receive_result.await, Ok(())); + } }) .await; } @@ -125,31 +209,26 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { *slot = i as u8; } - setup(family, |server, client| async move { + setup(family, |mut server, mut client| async move { // Minimize the local send buffer: - client.set_send_buffer_size(1024).unwrap(); + client.socket.set_send_buffer_size(1024).unwrap(); - let (mut client_tx, client_rx) = wit_stream::new(); join!( - async { - client.send(client_rx).await.unwrap(); - }, async { for _ in 0..CHUNKS { - let ret = client_tx.write_all(data.to_vec()).await; + let ret = client.send_stream.write_all(data.to_vec()).await; assert!(ret.is_empty()); } - drop(client_tx); + drop(client.send_stream); }, async { let mut buf = Vec::with_capacity(1024); - let (mut server_rx, server_fut) = server.receive(); let mut i = 0_usize; let mut consecutive_zero_length_reads = 0; loop { assert!(buf.is_empty()); let (status, b) = { - let mut fut = pin!(server_rx.read(buf)); + let mut fut = pin!(server.receive_stream.read(buf)); let mut cx = Context::from_waker(Waker::noop()); match fut.as_mut().poll(&mut cx) { Poll::Ready(pair) => pair, @@ -171,12 +250,12 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { StreamResult::Cancelled => { assert!(consecutive_zero_length_reads < 10); consecutive_zero_length_reads += 1; - server_rx.read(Vec::new()).await; + server.receive_stream.read(Vec::new()).await; } } } assert_eq!(i, CHUNKS * 256); - server_fut.await.unwrap(); + server.receive_result.await.unwrap(); }, ); }) @@ -185,17 +264,27 @@ async fn test_tcp_read_cancellation(family: IpAddressFamily) { impl test_programs::p3::exports::wasi::cli::run::Guest for Component { async fn run() -> Result<(), ()> { - test_tcp_input_stream_should_be_closed_by_remote_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_input_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_output_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv4).await; - test_tcp_shutdown_should_not_lose_data(IpAddressFamily::Ipv4).await; + test_tcp_ping_pong(IpAddressFamily::Ipv4).await; + test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(IpAddressFamily::Ipv4).await; + test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; + test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv4).await; + test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv4).await; + test_tcp_receive_once(IpAddressFamily::Ipv4).await; + test_tcp_send_once(IpAddressFamily::Ipv4).await; + test_tcp_stream_lifetimes(IpAddressFamily::Ipv4).await; test_tcp_read_cancellation(IpAddressFamily::Ipv4).await; if supports_ipv6() { - test_tcp_input_stream_should_be_closed_by_remote_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_input_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_output_stream_should_be_closed_by_local_shutdown(IpAddressFamily::Ipv6).await; - test_tcp_shutdown_should_not_lose_data(IpAddressFamily::Ipv6).await; + test_tcp_ping_pong(IpAddressFamily::Ipv6).await; + test_tcp_receive_stream_should_be_dropped_by_remote_shutdown(IpAddressFamily::Ipv6) + .await; + test_tcp_receive_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; + test_tcp_send_future_should_resolve_when_stream_dropped(IpAddressFamily::Ipv6).await; + test_tcp_send_drops_stream_when_remote_shutdown(IpAddressFamily::Ipv6).await; + test_tcp_receive_once(IpAddressFamily::Ipv6).await; + test_tcp_send_once(IpAddressFamily::Ipv6).await; + test_tcp_stream_lifetimes(IpAddressFamily::Ipv6).await; + test_tcp_read_cancellation(IpAddressFamily::Ipv6).await; } Ok(()) } @@ -203,10 +292,32 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { fn main() {} +struct Connection { + socket: TcpSocket, + receive_stream: StreamReader, + receive_result: FutureReader>, + send_stream: StreamWriter, + send_result: FutureReader>, +} +impl Connection { + fn new(socket: TcpSocket) -> Self { + let (send_stream, send_rx) = wit_stream::new(); + let send_result = socket.send(send_rx); + let (receive_stream, receive_result) = socket.receive(); + Self { + socket, + receive_stream, + receive_result, + send_stream, + send_result, + } + } +} + /// Set up a connected pair of sockets async fn setup>( family: IpAddressFamily, - body: impl FnOnce(TcpSocket, TcpSocket) -> Fut, + body: impl FnOnce(Connection, Connection) -> Fut, ) { let bind_address = IpSocketAddress::new(IpAddress::new_loopback(family), 0); let listener = TcpSocket::create(family).unwrap(); @@ -220,5 +331,10 @@ async fn setup>( }, async { accept.next().await.unwrap() }, ); - body(accepted_socket, client_socket).await; + + body( + Connection::new(accepted_socket), + Connection::new(client_socket), + ) + .await; } diff --git a/crates/wasi/src/p2/tcp.rs b/crates/wasi/src/p2/tcp.rs index 67edf98ba868..15f93f1e77c1 100644 --- a/crates/wasi/src/p2/tcp.rs +++ b/crates/wasi/src/p2/tcp.rs @@ -15,11 +15,9 @@ use wasmtime::Result; impl TcpSocket { pub(crate) fn p2_streams(&mut self) -> SocketResult<(DynInputStream, DynOutputStream)> { - let client = self.tcp_stream_arc()?; - let reader = Arc::new(Mutex::new(TcpReader::new(client.clone()))); - let writer = Arc::new(Mutex::new(TcpWriter::new(client.clone()))); + let reader = Arc::new(Mutex::new(TcpReader::new(self.take_receive_stream()?))); + let writer = Arc::new(Mutex::new(TcpWriter::new(self.take_send_stream()?))); self.set_p2_streaming_state(P2TcpStreamingState { - stream: client.clone(), reader: reader.clone(), writer: writer.clone(), })?; @@ -30,7 +28,6 @@ impl TcpSocket { } pub(crate) struct P2TcpStreamingState { - pub(crate) stream: Arc, reader: Arc>, writer: Arc>, } diff --git a/crates/wasi/src/p3/bindings.rs b/crates/wasi/src/p3/bindings.rs index cc589e2f3ca0..a20a32750e8b 100644 --- a/crates/wasi/src/p3/bindings.rs +++ b/crates/wasi/src/p3/bindings.rs @@ -87,7 +87,7 @@ mod generated { "wasi:filesystem/types.[method]descriptor.read-directory": store | tracing, "wasi:sockets/types.[method]tcp-socket.bind": async | tracing | trappable, "wasi:sockets/types.[method]tcp-socket.listen": store | tracing | trappable, - "wasi:sockets/types.[method]tcp-socket.send": store | tracing, + "wasi:sockets/types.[method]tcp-socket.send": store | tracing | trappable, "wasi:sockets/types.[method]tcp-socket.receive": store | tracing | trappable, "wasi:sockets/types.[method]udp-socket.bind": async | tracing | trappable, "wasi:sockets/types.[method]udp-socket.connect": async | tracing | trappable, diff --git a/crates/wasi/src/p3/sockets/host/types/tcp.rs b/crates/wasi/src/p3/sockets/host/types/tcp.rs index a8815fd55bec..0a080c349968 100644 --- a/crates/wasi/src/p3/sockets/host/types/tcp.rs +++ b/crates/wasi/src/p3/sockets/host/types/tcp.rs @@ -226,6 +226,9 @@ impl StreamConsumer for SendStreamConsumer { Poll::Pending => return Poll::Pending, } } + Err(err) if err.kind() == std::io::ErrorKind::BrokenPipe => { + break 'result Ok(()); + } Err(err) => break 'result Err(err.into()), } } @@ -286,12 +289,11 @@ impl HostTcpSocketWithStore for WasiSockets { mut store: Access<'_, T, Self>, socket: Resource, mut data: StreamReader, - ) -> FutureReader> { - let (result_tx, result_rx) = oneshot::channel(); - match get_socket(store.get().table, &socket) - .and_then(|sock| sock.tcp_stream_arc().map(Arc::clone).map_err(Into::into)) - { + ) -> wasmtime::Result>> { + let socket = get_socket_mut(store.get().table, &socket)?; + match socket.take_send_stream() { Ok(stream) => { + let (result_tx, result_rx) = oneshot::channel(); data.pipe( &mut store, SendStreamConsumer { @@ -299,13 +301,15 @@ impl HostTcpSocketWithStore for WasiSockets { result: Some(result_tx), }, ); + Ok(FutureReader::new(&mut store, result_rx)) } Err(err) => { data.close(&mut store); - let _ = result_tx.send(Err(err.downcast().unwrap_or(ErrorCode::Unknown))); + Ok(FutureReader::new(&mut store, async { + wasmtime::error::Ok(Err(err.into())) + })) } } - FutureReader::new(&mut store, result_rx) } fn receive( @@ -313,9 +317,8 @@ impl HostTcpSocketWithStore for WasiSockets { socket: Resource, ) -> wasmtime::Result<(StreamReader, FutureReader>)> { let socket = get_socket_mut(store.get().table, &socket)?; - match socket.start_receive() { - Some(stream) => { - let stream = Arc::clone(stream); + match socket.take_receive_stream() { + Ok(stream) => { let (result_tx, result_rx) = oneshot::channel(); Ok(( StreamReader::new( @@ -328,11 +331,9 @@ impl HostTcpSocketWithStore for WasiSockets { FutureReader::new(&mut store, result_rx), )) } - None => Ok(( + Err(err) => Ok(( StreamReader::new(&mut store, iter::empty()), - FutureReader::new(&mut store, async { - wasmtime::error::Ok(Err(ErrorCode::InvalidState)) - }), + FutureReader::new(&mut store, async { wasmtime::error::Ok(Err(err.into())) }), )), } } diff --git a/crates/wasi/src/sockets/tcp.rs b/crates/wasi/src/sockets/tcp.rs index d31e4640786d..2dc7fff55c95 100644 --- a/crates/wasi/src/sockets/tcp.rs +++ b/crates/wasi/src/sockets/tcp.rs @@ -87,20 +87,12 @@ enum TcpState { /// This is created either via `finish_connect` or for freshly accepted /// sockets from a TCP listener. /// - /// From here a socket can transition to `Receiving` or `P2Streaming`. - Connected(Arc), - - /// A connection has been established and `receive` has been called. - /// - /// A socket will not transition out of this state. - #[cfg(feature = "p3")] - Receiving(Arc), - - /// This is a WASIp2-bound socket which stores some extra state for - /// read/write streams to handle TCP shutdown. - /// /// A socket will not transition out of this state. - P2Streaming(Box), + Connected { + stream: Arc, + taken_streams: TakenStreams, + p2_state: Option, + }, /// This is not actually a socket but a deferred error. /// @@ -112,7 +104,18 @@ enum TcpState { /// The socket is closed and no more operations can be performed. Closed, } - +impl TcpState { + fn connected(stream: tokio::net::TcpStream) -> Self { + TcpState::Connected { + stream: Arc::new(stream), + taken_streams: TakenStreams { + receive: false, + send: false, + }, + p2_state: None, + } + } +} impl Debug for TcpState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -125,15 +128,17 @@ impl Debug for TcpState { Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(), Self::Connected { .. } => f.debug_tuple("Connected").finish(), #[cfg(feature = "p3")] - Self::Receiving { .. } => f.debug_tuple("Receiving").finish(), - Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(), - #[cfg(feature = "p3")] Self::Error(..) => f.debug_tuple("Error").finish(), Self::Closed => write!(f, "Closed"), } } } +struct TakenStreams { + receive: bool, + send: bool, +} + /// A host TCP socket, plus associated bookkeeping. pub struct TcpSocket { /// The current state in the bind/listen/accept/connect progression. @@ -218,10 +223,7 @@ impl TcpSocket { _ => err, })?; options.apply(family, &client); - Ok(Self::from_state( - TcpState::Connected(Arc::new(client)), - family, - )) + Ok(Self::from_state(TcpState::connected(client), family)) } /// Create a `TcpSocket` from an existing socket. @@ -240,11 +242,8 @@ impl TcpSocket { | TcpState::BindStarted(socket) | TcpState::Bound(socket) | TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()), - TcpState::Connected(stream) => Ok(stream.as_socketlike_view()), - #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()), + TcpState::Connected { stream, .. } => Ok(stream.as_socketlike_view()), TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()), - TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()), TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => { Err(ErrorCode::InvalidState) } @@ -371,7 +370,7 @@ impl TcpSocket { } match result { Ok(stream) => { - self.tcp_state = TcpState::Connected(Arc::new(stream)); + self.tcp_state = TcpState::connected(stream); Ok(()) } Err(err) => { @@ -509,27 +508,10 @@ impl TcpSocket { Ok(Some(Self::new_accept(result, &self.options, self.family)?)) } - #[cfg(feature = "p3")] - pub(crate) fn start_receive(&mut self) -> Option<&Arc> { - match mem::replace(&mut self.tcp_state, TcpState::Closed) { - TcpState::Connected(stream) => { - self.tcp_state = TcpState::Receiving(stream); - Some(self.tcp_stream_arc().unwrap()) - } - prev => { - self.tcp_state = prev; - None - } - } - } - pub(crate) fn local_address(&self) -> Result { match &self.tcp_state { TcpState::Bound(socket) => Ok(socket.local_addr()?), - TcpState::Connected(stream) => Ok(stream.local_addr()?), - #[cfg(feature = "p3")] - TcpState::Receiving(stream) => Ok(stream.local_addr()?), - TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?), + TcpState::Connected { stream, .. } => Ok(stream.local_addr()?), TcpState::Listening { listener, .. } => Ok(listener.local_addr()?), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), @@ -538,9 +520,12 @@ impl TcpSocket { } pub(crate) fn remote_address(&self) -> Result { - let stream = self.tcp_stream_arc()?; - let addr = stream.peer_addr()?; - Ok(addr) + match &self.tcp_state { + TcpState::Connected { stream, .. } => Ok(stream.peer_addr()?), + #[cfg(feature = "p3")] + TcpState::Error(err) => Err(err.into()), + _ => Err(ErrorCode::InvalidState), + } } pub(crate) fn is_listening(&self) -> bool { @@ -695,21 +680,43 @@ impl TcpSocket { } } - pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc, ErrorCode> { - match &self.tcp_state { - TcpState::Connected(socket) => Ok(socket), - #[cfg(feature = "p3")] - TcpState::Receiving(socket) => Ok(socket), - TcpState::P2Streaming(state) => Ok(&state.stream), + pub(crate) fn take_receive_stream(&mut self) -> Result, ErrorCode> { + self.take_stream(|s| &mut s.receive) + } + + pub(crate) fn take_send_stream(&mut self) -> Result, ErrorCode> { + self.take_stream(|s| &mut s.send) + } + + fn take_stream( + &mut self, + direction: impl FnOnce(&mut TakenStreams) -> &mut bool, + ) -> Result, ErrorCode> { + match &mut self.tcp_state { + TcpState::Connected { + stream, + taken_streams, + .. + } => { + let taken = direction(taken_streams); + if *taken { + return Err(ErrorCode::InvalidState); + } + *taken = true; + Ok(stream.clone()) + } #[cfg(feature = "p3")] - TcpState::Error(err) => Err(err.into()), + TcpState::Error(err) => Err((&*err).into()), _ => Err(ErrorCode::InvalidState), } } pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> { match &self.tcp_state { - TcpState::P2Streaming(state) => Ok(state), + TcpState::Connected { + p2_state: Some(state), + .. + } => Ok(state), #[cfg(feature = "p3")] TcpState::Error(err) => Err(err.into()), _ => Err(ErrorCode::InvalidState), @@ -720,11 +727,12 @@ impl TcpSocket { &mut self, state: P2TcpStreamingState, ) -> Result<(), ErrorCode> { - if !matches!(self.tcp_state, TcpState::Connected(_)) { - return Err(ErrorCode::InvalidState); + if let TcpState::Connected { p2_state, .. } = &mut self.tcp_state { + *p2_state = Some(state); + Ok(()) + } else { + Err(ErrorCode::InvalidState) } - self.tcp_state = TcpState::P2Streaming(Box::new(state)); - Ok(()) } /// Used for `Pollable` in the WASIp2 implementation this awaits the socket @@ -745,11 +753,10 @@ impl TcpSocket { | TcpState::Listening { pending_accept: Some(_), .. - } - | TcpState::P2Streaming(_) => {} + } => {} #[cfg(feature = "p3")] - TcpState::Receiving(_) | TcpState::Error(_) => {} + TcpState::Error(_) => {} TcpState::Connecting(Some(future)) => { self.tcp_state = TcpState::ConnectReady(future.as_mut().await);