-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat: Streamable HTTP transport for ACP + goose-acp usage #6741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces Phase I of ACP-over-HTTP transport, enabling clients to interact with goose via a standardized HTTP+SSE protocol instead of custom REST APIs. It implements session management, message exchanges, and streaming responses using Axum and Server-Sent Events.
Changes:
- Added HTTP transport layer with session lifecycle management (create, stream, remove)
- Created server factory for agent instantiation with configurable builtins and paths
- Added standalone binary for running goose-acp-server with CLI arguments
- Refactored server.rs to support upfront session creation for HTTP transport
Reviewed changes
Copilot reviewed 5 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/goose-acp/src/http.rs | New HTTP server with SSE streaming, channel-based transport adapters, and session management |
| crates/goose-acp/src/server_factory.rs | Factory pattern for creating ACP agents with custom configuration |
| crates/goose-acp/src/bin/server.rs | Binary entry point with CLI parsing for host, port, and builtins |
| crates/goose-acp/src/server.rs | Added create_session method for HTTP transport; cleaned up comments and simplified pattern matching |
| crates/goose-acp/src/lib.rs | Exported new modules and increased recursion limit for async-stream macros |
| crates/goose-acp/Cargo.toml | Added HTTP dependencies: axum, tower-http, async-stream, clap |
| Cargo.lock | Updated dependency versions including new HTTP crates |
| "Channel full, dropping message (backpressure): {}", | ||
| truncated | ||
| ); |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silently dropping messages on backpressure causes data loss. The agent's responses won't reach the client if the channel is full. Consider using async send() instead of try_send() to apply backpressure, or return an error that can propagate to the caller.
| "Channel full, dropping message (backpressure): {}", | |
| truncated | |
| ); | |
| "Channel full, cannot send message (backpressure): {}", | |
| truncated | |
| ); | |
| return Poll::Ready(Err(std::io::Error::new( | |
| std::io::ErrorKind::WouldBlock, | |
| "Channel full due to backpressure", | |
| ))); |
| let manager = self.agent.config.session_manager.clone(); | ||
| let goose_session = manager | ||
| .create_session( | ||
| std::env::current_dir().unwrap_or_default(), |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the server's current directory instead of a client-specified working directory creates sessions in the wrong location. This differs from the ACP protocol's session/new which accepts a cwd parameter. Consider either accepting a cwd parameter here or documenting that sessions must be loaded via session/load to set the correct working directory.
| async fn create_session(&self) -> Result<String, StatusCode> { | ||
| let (to_agent_tx, to_agent_rx) = mpsc::channel::<String>(256); | ||
| let (from_agent_tx, from_agent_rx) = mpsc::channel::<String>(256); | ||
|
|
||
| let agent = self.server.create_agent().await.map_err(|e| { | ||
| error!("Failed to create agent: {}", e); | ||
| StatusCode::INTERNAL_SERVER_ERROR | ||
| })?; | ||
|
|
||
| let session_id = agent.create_session().await.map_err(|e| { | ||
| error!("Failed to create ACP session: {}", e); | ||
| StatusCode::INTERNAL_SERVER_ERROR | ||
| })?; | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| let read_stream = ReceiverToAsyncRead::new(to_agent_rx); | ||
| let write_stream = SenderToAsyncWrite::new(from_agent_tx); | ||
|
|
||
| if let Err(e) = | ||
| crate::server::serve(agent, read_stream.compat(), write_stream.compat_write()).await | ||
| { | ||
| error!("ACP session error: {}", e); | ||
| } | ||
| }); | ||
|
|
||
| self.sessions.write().await.insert( | ||
| session_id.clone(), | ||
| HttpSession { | ||
| to_agent_tx, | ||
| from_agent_rx: Arc::new(Mutex::new(from_agent_rx)), | ||
| handle, | ||
| }, | ||
| ); | ||
|
|
||
| info!(session_id = %session_id, "Session created"); | ||
| Ok(session_id) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sessions created via HTTP POST may leak if the client never calls the stream endpoint. Consider adding a timeout or explicit deletion endpoint, or defer session creation until the stream is established.
| @@ -1158,14 +1165,6 @@ print(\"hello, world\") | |||
| assert_eq!(format_tool_name("single"), "Single"); | |||
| } | |||
|
|
|||
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed test coverage for edge cases (empty strings, double underscores) in format_tool_name. Consider keeping these tests to ensure the function handles malformed input gracefully.
| async fn get_receiver( | ||
| &self, | ||
| session_id: &str, | ||
| ) -> Result<Arc<Mutex<mpsc::Receiver<String>>>, StatusCode> { | ||
| let sessions = self.sessions.read().await; | ||
| let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?; | ||
| Ok(session.from_agent_rx.clone()) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple concurrent stream connections for the same session will share the receiver, causing messages to be distributed unpredictably between them. Consider rejecting attempts to open multiple streams for the same session or using a broadcast channel pattern instead.
|
@alexhancock what's your sense that given in MCP SSE is a deprecated transport... Intentionally not doing streamable based for ACP is the right choice? A lot of ACP is based on MCP and also proxies and gateways will already have the streamable transport as a long term protocol for MCP and likely plan to wind down SSE. This would resurrect it... |
4d50090 to
4c72105
Compare
|
I'm currently using Goose as an execution kernel in a very similar way: I wrap the stdin/stdout in a parent process and expose it via WebSockets to the frontend. |
jamadeo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I see that this marked as "in progress" in the ACP docs -- do you have more insight there @alexhancock on the status of that draft proposal? I imagine once it's standardized, we'll use that?
re: MCP transports, to me this does resemble the newer streamble HTTP transport more than the legacy one, maybe with some differences of course, but that's worth discussing further @codefromthecrypt @zhixiongdu027
| @@ -1 +1,5 @@ | |||
| #![recursion_limit = "256"] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this?
| .await?; | ||
|
|
||
| let session = GooseAcpSession { | ||
| messages: Conversation::new_unvalidated(Vec::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use Conversation::empty() so someday when we get rid of all the new_unvalidated()s, there's one less :)
| Ok(goose_session.id) | ||
| } | ||
|
|
||
| pub async fn has_session(&self, session_id: &str) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this isn't used?
codefromthecrypt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alexhancock this is great progress.
I'm happy to see things move forward with the understanding that this is basically rebranding of http streamable from MCP. It would be nice to change the default path to ensure that relationship is carried, and types and comments. Even if it isn't exactly http streamable yet, knowing it aims to be is the most important thing to me.
like ideally we just have a quick comment somewhere saying
This transport aims to be the same as MCP Streamable HTTP, except base path /acp not /mcp
See https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http
Doing so will set the intent which I think is shared and also will help us long term as there are more developers and libraries on MCP protocol, as well proxies like the one I work on.
p.s. rebase should get rid of stack overflow in tests. sacp makes a giant state machine I guess.
4c72105 to
5be9796
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 5 out of 7 changed files in this pull request and generated 6 comments.
| fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
| Poll::Ready(Ok(())) | ||
| } | ||
|
|
||
| fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
| Poll::Ready(Ok(())) |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poll_flush and poll_shutdown implementations return Poll::Ready(Ok(())) without actually flushing any buffered data. If there's partial data in the buffer when flush or shutdown is called, it will be lost. Consider ensuring any remaining buffered data is sent before completing these operations.
| fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | |
| Poll::Ready(Ok(())) | |
| } | |
| fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | |
| Poll::Ready(Ok(())) | |
| fn poll_flush( | |
| mut self: Pin<&mut Self>, | |
| _cx: &mut Context<'_>, | |
| ) -> Poll<std::io::Result<()>> { | |
| if !self.buffer.is_empty() { | |
| let line = String::from_utf8_lossy(&self.buffer).to_string(); | |
| self.buffer.clear(); | |
| if !line.is_empty() { | |
| if let Err(e) = self.tx.try_send(line.clone()) { | |
| match e { | |
| mpsc::error::TrySendError::Full(_) => { | |
| let truncated: String = line.chars().take(100).collect(); | |
| error!( | |
| "Channel full, dropping message (flush/backpressure): {}", | |
| truncated | |
| ); | |
| } | |
| mpsc::error::TrySendError::Closed(_) => { | |
| return Poll::Ready(Err(std::io::Error::new( | |
| std::io::ErrorKind::BrokenPipe, | |
| "Channel closed", | |
| ))); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| Poll::Ready(Ok(())) | |
| } | |
| fn poll_shutdown( | |
| self: Pin<&mut Self>, | |
| cx: &mut Context<'_>, | |
| ) -> Poll<std::io::Result<()>> { | |
| Self::poll_flush(self, cx) |
| use anyhow::Result; | ||
| use axum::{ | ||
| extract::{Path, State}, | ||
| http::{header, Method, StatusCode}, | ||
| response::Sse, | ||
| routing::{get, post}, | ||
| Json, Router, | ||
| }; | ||
| use serde::Serialize; | ||
| use std::{ | ||
| collections::HashMap, | ||
| convert::Infallible, | ||
| pin::Pin, | ||
| sync::Arc, | ||
| task::{Context, Poll}, | ||
| time::Duration, | ||
| }; | ||
| use tokio::sync::{mpsc, Mutex, RwLock}; | ||
| use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; | ||
| use tower_http::cors::{Any, CorsLayer}; | ||
| use tracing::{debug, error, info}; | ||
|
|
||
| use crate::server_factory::AcpServer; | ||
|
|
||
| struct HttpSession { | ||
| to_agent_tx: mpsc::Sender<String>, | ||
| from_agent_rx: Arc<Mutex<mpsc::Receiver<String>>>, | ||
| handle: tokio::task::JoinHandle<()>, | ||
| } | ||
|
|
||
| pub struct HttpState { | ||
| server: Arc<AcpServer>, | ||
| sessions: RwLock<HashMap<String, HttpSession>>, | ||
| } | ||
|
|
||
| impl HttpState { | ||
| pub fn new(server: Arc<AcpServer>) -> Self { | ||
| Self { | ||
| server, | ||
| sessions: RwLock::new(HashMap::new()), | ||
| } | ||
| } | ||
|
|
||
| async fn create_session(&self) -> Result<String, StatusCode> { | ||
| let (to_agent_tx, to_agent_rx) = mpsc::channel::<String>(256); | ||
| let (from_agent_tx, from_agent_rx) = mpsc::channel::<String>(256); | ||
|
|
||
| let agent = self.server.create_agent().await.map_err(|e| { | ||
| error!("Failed to create agent: {}", e); | ||
| StatusCode::INTERNAL_SERVER_ERROR | ||
| })?; | ||
|
|
||
| let session_id = agent.create_session().await.map_err(|e| { | ||
| error!("Failed to create ACP session: {}", e); | ||
| StatusCode::INTERNAL_SERVER_ERROR | ||
| })?; | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| let read_stream = ReceiverToAsyncRead::new(to_agent_rx); | ||
| let write_stream = SenderToAsyncWrite::new(from_agent_tx); | ||
|
|
||
| if let Err(e) = | ||
| crate::server::serve(agent, read_stream.compat(), write_stream.compat_write()).await | ||
| { | ||
| error!("ACP session error: {}", e); | ||
| } | ||
| }); | ||
|
|
||
| self.sessions.write().await.insert( | ||
| session_id.clone(), | ||
| HttpSession { | ||
| to_agent_tx, | ||
| from_agent_rx: Arc::new(Mutex::new(from_agent_rx)), | ||
| handle, | ||
| }, | ||
| ); | ||
|
|
||
| info!(session_id = %session_id, "Session created"); | ||
| Ok(session_id) | ||
| } | ||
|
|
||
| async fn remove_session(&self, session_id: &str) { | ||
| if let Some(session) = self.sessions.write().await.remove(session_id) { | ||
| session.handle.abort(); | ||
| info!(session_id = %session_id, "Session removed"); | ||
| } | ||
| } | ||
|
|
||
| async fn send_message(&self, session_id: &str, message: String) -> Result<(), StatusCode> { | ||
| let sessions = self.sessions.read().await; | ||
| let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?; | ||
| session | ||
| .to_agent_tx | ||
| .send(message) | ||
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) | ||
| } | ||
|
|
||
| async fn get_receiver( | ||
| &self, | ||
| session_id: &str, | ||
| ) -> Result<Arc<Mutex<mpsc::Receiver<String>>>, StatusCode> { | ||
| let sessions = self.sessions.read().await; | ||
| let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?; | ||
| Ok(session.from_agent_rx.clone()) | ||
| } | ||
| } | ||
|
|
||
| struct ReceiverToAsyncRead { | ||
| rx: mpsc::Receiver<String>, | ||
| buffer: Vec<u8>, | ||
| pos: usize, | ||
| } | ||
|
|
||
| impl ReceiverToAsyncRead { | ||
| fn new(rx: mpsc::Receiver<String>) -> Self { | ||
| Self { | ||
| rx, | ||
| buffer: Vec::new(), | ||
| pos: 0, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl tokio::io::AsyncRead for ReceiverToAsyncRead { | ||
| fn poll_read( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| buf: &mut tokio::io::ReadBuf<'_>, | ||
| ) -> Poll<std::io::Result<()>> { | ||
| if self.pos < self.buffer.len() { | ||
| let remaining = &self.buffer[self.pos..]; | ||
| let to_copy = remaining.len().min(buf.remaining()); | ||
| buf.put_slice(&remaining[..to_copy]); | ||
| self.pos += to_copy; | ||
| if self.pos >= self.buffer.len() { | ||
| self.buffer.clear(); | ||
| self.pos = 0; | ||
| } | ||
| return Poll::Ready(Ok(())); | ||
| } | ||
|
|
||
| match Pin::new(&mut self.rx).poll_recv(cx) { | ||
| Poll::Ready(Some(msg)) => { | ||
| let bytes = format!("{}\n", msg).into_bytes(); | ||
| let to_copy = bytes.len().min(buf.remaining()); | ||
| buf.put_slice(&bytes[..to_copy]); | ||
| if to_copy < bytes.len() { | ||
| self.buffer = bytes[to_copy..].to_vec(); | ||
| self.pos = 0; | ||
| } | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| Poll::Ready(None) => Poll::Ready(Ok(())), | ||
| Poll::Pending => Poll::Pending, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| struct SenderToAsyncWrite { | ||
| tx: mpsc::Sender<String>, | ||
| buffer: Vec<u8>, | ||
| } | ||
|
|
||
| impl SenderToAsyncWrite { | ||
| fn new(tx: mpsc::Sender<String>) -> Self { | ||
| Self { | ||
| tx, | ||
| buffer: Vec::new(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl tokio::io::AsyncWrite for SenderToAsyncWrite { | ||
| fn poll_write( | ||
| mut self: Pin<&mut Self>, | ||
| _cx: &mut Context<'_>, | ||
| buf: &[u8], | ||
| ) -> Poll<std::io::Result<usize>> { | ||
| self.buffer.extend_from_slice(buf); | ||
|
|
||
| while let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') { | ||
| let line = String::from_utf8_lossy(&self.buffer[..pos]).to_string(); | ||
| self.buffer.drain(..=pos); | ||
|
|
||
| if !line.is_empty() { | ||
| if let Err(e) = self.tx.try_send(line.clone()) { | ||
| match e { | ||
| mpsc::error::TrySendError::Full(_) => { | ||
| let truncated: String = line.chars().take(100).collect(); | ||
| error!( | ||
| "Channel full, dropping message (backpressure): {}", | ||
| truncated | ||
| ); | ||
| } | ||
| mpsc::error::TrySendError::Closed(_) => { | ||
| return Poll::Ready(Err(std::io::Error::new( | ||
| std::io::ErrorKind::BrokenPipe, | ||
| "Channel closed", | ||
| ))); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Poll::Ready(Ok(buf.len())) | ||
| } | ||
|
|
||
| fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
| Poll::Ready(Ok(())) | ||
| } | ||
|
|
||
| fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
| Poll::Ready(Ok(())) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Serialize)] | ||
| struct CreateSessionResponse { | ||
| session_id: String, | ||
| } | ||
|
|
||
| async fn create_session( | ||
| State(state): State<Arc<HttpState>>, | ||
| ) -> Result<Json<CreateSessionResponse>, StatusCode> { | ||
| let session_id = state.create_session().await?; | ||
| Ok(Json(CreateSessionResponse { session_id })) | ||
| } | ||
|
|
||
| async fn send_message( | ||
| State(state): State<Arc<HttpState>>, | ||
| Path(session_id): Path<String>, | ||
| body: String, | ||
| ) -> Result<StatusCode, StatusCode> { | ||
| debug!(session_id = %session_id, "Received message"); | ||
| state.send_message(&session_id, body).await?; | ||
| Ok(StatusCode::ACCEPTED) | ||
| } | ||
|
|
||
| async fn stream_events( | ||
| State(state): State<Arc<HttpState>>, | ||
| Path(session_id): Path<String>, | ||
| ) -> Result< | ||
| Sse<impl futures::Stream<Item = Result<axum::response::sse::Event, Infallible>>>, | ||
| StatusCode, | ||
| > { | ||
| let receiver = state.get_receiver(&session_id).await?; | ||
|
|
||
| let cleanup_state = state.clone(); | ||
| let cleanup_session_id = session_id.clone(); | ||
|
|
||
| let stream = async_stream::stream! { | ||
| let mut rx = receiver.lock().await; | ||
| while let Some(msg) = rx.recv().await { | ||
| yield Ok(axum::response::sse::Event::default().data(msg)); | ||
| } | ||
| cleanup_state.remove_session(&cleanup_session_id).await; | ||
| }; | ||
|
|
||
| Ok(Sse::new(stream).keep_alive( | ||
| axum::response::sse::KeepAlive::new() | ||
| .interval(Duration::from_secs(15)) | ||
| .text("ping"), | ||
| )) | ||
| } | ||
|
|
||
| async fn health() -> &'static str { | ||
| "ok" | ||
| } | ||
|
|
||
| pub fn create_router(state: Arc<HttpState>) -> Router { | ||
| let cors = CorsLayer::new() | ||
| .allow_origin(Any) | ||
| .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) | ||
| .allow_headers([header::CONTENT_TYPE, header::ACCEPT]); | ||
|
|
||
| Router::new() | ||
| .route("/health", get(health)) | ||
| .route("/acp/session", post(create_session)) | ||
| .route("/acp/session/{session_id}/message", post(send_message)) | ||
| .route("/acp/session/{session_id}/stream", get(stream_events)) | ||
| .layer(cors) | ||
| .with_state(state) | ||
| } | ||
|
|
||
| pub async fn serve(state: Arc<HttpState>, addr: std::net::SocketAddr) -> Result<()> { | ||
| let router = create_router(state); | ||
| let listener = tokio::net::TcpListener::bind(addr).await?; | ||
| info!("ACP HTTP server listening on {}", addr); | ||
| axum::serve(listener, router).await?; | ||
| Ok(()) | ||
| } |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new HTTP transport layer (HttpState, session management, SSE streaming) lacks test coverage. The existing tests only cover the in-process ACP protocol. Consider adding integration tests that exercise the HTTP endpoints, session lifecycle, and SSE streaming to ensure robustness of the new HTTP transport.
| if let Err(e) = self.tx.try_send(line.clone()) { | ||
| match e { | ||
| mpsc::error::TrySendError::Full(_) => { | ||
| let truncated: String = line.chars().take(100).collect(); | ||
| error!( | ||
| "Channel full, dropping message (backpressure): {}", | ||
| truncated | ||
| ); | ||
| } |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SenderToAsyncWrite implementation silently drops messages when the channel is full, logging only to stderr. This could lead to data loss in production. Consider either blocking until the channel has space (using send instead of try_send), or returning an error to propagate backpressure properly to the caller.
crates/goose-acp/src/http.rs
Outdated
| let cors = CorsLayer::new() | ||
| .allow_origin(Any) | ||
| .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) | ||
| .allow_headers([header::CONTENT_TYPE, header::ACCEPT]); |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CORS is configured to allow any origin with allow_origin(Any). This is acceptable for development but represents a potential security risk in production. Consider restricting this to specific trusted origins or adding a configuration option to control CORS policy.
| let stream = async_stream::stream! { | ||
| let mut rx = receiver.lock().await; | ||
| while let Some(msg) = rx.recv().await { | ||
| yield Ok(axum::response::sse::Event::default().data(msg)); | ||
| } | ||
| cleanup_state.remove_session(&cleanup_session_id).await; | ||
| }; |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream_events handler holds a receiver lock for the entire duration of the SSE stream. This design could lead to contention if the same session is accessed concurrently, though this is mitigated by having only one receiver per session. However, the Arc<Mutex> pattern is unusual - consider documenting why the receiver needs to be behind both Arc and Mutex, or refactor to avoid the lock if the receiver is only consumed once.
| let stream = async_stream::stream! { | ||
| let mut rx = receiver.lock().await; | ||
| while let Some(msg) = rx.recv().await { | ||
| yield Ok(axum::response::sse::Event::default().data(msg)); | ||
| } | ||
| cleanup_state.remove_session(&cleanup_session_id).await; | ||
| }; |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The session is only removed when the SSE stream ends normally. If the client disconnects abruptly or the stream handler panics, the session and its resources (including the spawned task) may leak. Consider adding connection timeout detection or implementing Drop for HttpSession to ensure cleanup happens.
5be9796 to
6a5500f
Compare
This PR
Starting Phase I of #6642
goose-acpMessage Flow when using the Transport
References
There is a prototype TUI client in 112d019 which uses this transport, and will come for PR next. Then we can build up supported feature-set.
A first revision of the message flow is here, before we converted to adhere closely to how the MCP Streamable HTTP transport works: https://github.com/block/goose/blob/219143b64b63053f5d9778e303d0ab3be37fd82d/ACP_HTTP_MIGRATION_PLAN.md#message-flow