diff --git a/crates/base/src/runtime/mod.rs b/crates/base/src/runtime/mod.rs index 135d9ea3..58208447 100644 --- a/crates/base/src/runtime/mod.rs +++ b/crates/base/src/runtime/mod.rs @@ -299,7 +299,7 @@ pub enum WillTerminateReason { #[derive(Debug)] pub struct RunOptions { wait_termination_request_token: bool, - duplex_stream_rx: mpsc::UnboundedReceiver, + duplex_stream_rx: Option>, maybe_cpu_usage_metrics_tx: Option>, } @@ -352,11 +352,6 @@ impl RunOptionsBuilder { maybe_cpu_usage_metrics_tx, } = self; - // TODO(Nyannyacha): Make this as optional. - let Some(duplex_stream_rx) = duplex_stream_rx else { - return Err(anyhow!("stream_rx can't be empty")); - }; - Ok(RunOptions { wait_termination_request_token, duplex_stream_rx, @@ -1291,8 +1286,10 @@ where let op_state_rc = self.js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); - op_state - .put::>(duplex_stream_rx); + if let Some(duplex_stream_rx) = duplex_stream_rx { + op_state + .put::>(duplex_stream_rx); + } if self.conf.is_main_worker() { op_state.put::>( @@ -3201,13 +3198,10 @@ mod test { .build() .await; - let (_tx, duplex_stream_rx) = - mpsc::unbounded_channel::(); let (result, _) = user_rt .run( RunOptionsBuilder::new() .wait_termination_request_token(false) - .stream_rx(duplex_stream_rx) .build() .unwrap(), ) @@ -3231,13 +3225,10 @@ mod test { .build() .await; - let (_tx, duplex_stream_rx) = - mpsc::unbounded_channel::(); let (result, _) = user_rt .run( RunOptionsBuilder::new() .wait_termination_request_token(false) - .stream_rx(duplex_stream_rx) .build() .unwrap(), ) @@ -3259,8 +3250,6 @@ mod test { memory_limit_mb: u64, worker_timeout_ms: u64, ) { - let (_duplex_stream_tx, duplex_stream_rx) = - mpsc::unbounded_channel::(); let (callback_tx, mut callback_rx) = mpsc::unbounded_channel::<()>(); let mut user_rt = create_basic_user_runtime_builder( path, @@ -3286,7 +3275,6 @@ mod test { .run( RunOptionsBuilder::new() .wait_termination_request_token(false) - .stream_rx(duplex_stream_rx) .build() .unwrap(), ) @@ -3393,13 +3381,10 @@ mod test { .build() .await; - let (_tx, duplex_stream_rx) = mpsc::unbounded_channel(); - user_rt .run( RunOptionsBuilder::new() .wait_termination_request_token(false) - .stream_rx(duplex_stream_rx) .build() .unwrap(), ) @@ -3425,13 +3410,10 @@ mod test { .build() .await; - let (_tx, duplex_stream_rx) = mpsc::unbounded_channel(); - user_rt .run( RunOptionsBuilder::new() .wait_termination_request_token(false) - .stream_rx(duplex_stream_rx) .build() .unwrap(), ) diff --git a/ext/runtime/ops/net.rs b/ext/runtime/ops/net.rs index 2bf66cb3..1e01d4d2 100644 --- a/ext/runtime/ops/net.rs +++ b/ext/runtime/ops/net.rs @@ -158,7 +158,7 @@ pub async fn op_net_accept( }; let Some(rx) = rx else { - return Err(bad_resource("duplex stream receiver is already used")); + return Err(bad_resource("duplex stream receiver is not available")); }; let mut rx = scopeguard::guard(rx, {