Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions crates/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ pub enum WillTerminateReason {
#[derive(Debug)]
pub struct RunOptions {
wait_termination_request_token: bool,
duplex_stream_rx: mpsc::UnboundedReceiver<DuplexStreamEntry>,
duplex_stream_rx: Option<mpsc::UnboundedReceiver<DuplexStreamEntry>>,
maybe_cpu_usage_metrics_tx: Option<mpsc::UnboundedSender<CPUUsageMetrics>>,
}

Expand Down Expand Up @@ -352,11 +352,6 @@ impl RunOptionsBuilder {
maybe_cpu_usage_metrics_tx,
} = self;

// TODO(Nyannyacha): Make this as optional.
let Some(duplex_stream_rx) = duplex_stream_rx else {
return Err(anyhow!("stream_rx can't be empty"));
};

Ok(RunOptions {
wait_termination_request_token,
duplex_stream_rx,
Expand Down Expand Up @@ -1291,8 +1286,10 @@ where
let op_state_rc = self.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();

op_state
.put::<mpsc::UnboundedReceiver<DuplexStreamEntry>>(duplex_stream_rx);
if let Some(duplex_stream_rx) = duplex_stream_rx {
op_state
.put::<mpsc::UnboundedReceiver<DuplexStreamEntry>>(duplex_stream_rx);
}

if self.conf.is_main_worker() {
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(
Expand Down Expand Up @@ -3201,13 +3198,10 @@ mod test {
.build()
.await;

let (_tx, duplex_stream_rx) =
mpsc::unbounded_channel::<DuplexStreamEntry>();
let (result, _) = user_rt
.run(
RunOptionsBuilder::new()
.wait_termination_request_token(false)
.stream_rx(duplex_stream_rx)
.build()
.unwrap(),
)
Expand All @@ -3231,13 +3225,10 @@ mod test {
.build()
.await;

let (_tx, duplex_stream_rx) =
mpsc::unbounded_channel::<DuplexStreamEntry>();
let (result, _) = user_rt
.run(
RunOptionsBuilder::new()
.wait_termination_request_token(false)
.stream_rx(duplex_stream_rx)
.build()
.unwrap(),
)
Expand All @@ -3259,8 +3250,6 @@ mod test {
memory_limit_mb: u64,
worker_timeout_ms: u64,
) {
let (_duplex_stream_tx, duplex_stream_rx) =
mpsc::unbounded_channel::<DuplexStreamEntry>();
let (callback_tx, mut callback_rx) = mpsc::unbounded_channel::<()>();
let mut user_rt = create_basic_user_runtime_builder(
path,
Expand All @@ -3286,7 +3275,6 @@ mod test {
.run(
RunOptionsBuilder::new()
.wait_termination_request_token(false)
.stream_rx(duplex_stream_rx)
.build()
.unwrap(),
)
Expand Down Expand Up @@ -3393,13 +3381,10 @@ mod test {
.build()
.await;

let (_tx, duplex_stream_rx) = mpsc::unbounded_channel();

user_rt
.run(
RunOptionsBuilder::new()
.wait_termination_request_token(false)
.stream_rx(duplex_stream_rx)
.build()
.unwrap(),
)
Expand All @@ -3425,13 +3410,10 @@ mod test {
.build()
.await;

let (_tx, duplex_stream_rx) = mpsc::unbounded_channel();

user_rt
.run(
RunOptionsBuilder::new()
.wait_termination_request_token(false)
.stream_rx(duplex_stream_rx)
.build()
.unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion ext/runtime/ops/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub async fn op_net_accept(
};

let Some(rx) = rx else {
return Err(bad_resource("duplex stream receiver is already used"));
return Err(bad_resource("duplex stream receiver is not available"));
};

let mut rx = scopeguard::guard(rx, {
Expand Down