Skip to content

Conversation

@alexhancock
Copy link
Collaborator

@alexhancock alexhancock commented Jan 27, 2026

This PR

Starting Phase I of #6642

  • This PR introduces a new ACP-over-HTTP transport to the goose-acp
  • It enables clients to connect and perform ACP exchanges over streaming HTTP
  • It supports the basics to start: creating sessions, removing them, and acp message exchanges
  • Aligns as closely as possible with the MCP Streamable HTTP Transport in request patterns

Message Flow when using the Transport

Client                              Server
  │                                   │
  │  ═══ Session Initialization ═══   │
  │                                   │
  │─── POST /acp ─────────────────────>│  { method: "initialize", id: 1 }
  │    Accept: application/json,       │  (no Acp-Session-Id header)
  │            text/event-stream       │
  │              ┌─────────────────────│  Server creates session, opens SSE stream
  │              │ (SSE stream open)   │
  │<─────────────│─ SSE event ─────────│  { id: 1, result: { capabilities } }
  │              │                     │  Response includes Acp-Session-Id header
  │              ▼                     │
  │                                    │
  │  ═══ Prompt Flow ═══               │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "session/new", id: 2,
  │    Acp-Session-Id: <session_id>    │    params: { cwd, mcp_servers } }
  │              ┌─────────────────────│  Opens new SSE stream for response
  │<─────────────│─ SSE event ─────────│  { id: 2, result: { session_id: <goose_session> } }
  │              ▼                     │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "session/prompt", id: 3,
  │    Acp-Session-Id: <session_id>    │    params: { session_id, prompt } }
  │              ┌─────────────────────│  Opens new SSE stream for response
  │<─────────────│─ SSE event ─────────│  notification: AgentMessageChunk
  │<─────────────│─ SSE event ─────────│  notification: AgentThoughtChunk (if reasoning)
  │<─────────────│─ SSE event ─────────│  notification: ToolCall (status: pending)
  │<─────────────│─ SSE event ─────────│  notification: ToolCallUpdate (status: completed)
  │<─────────────│─ SSE event ─────────│  notification: AgentMessageChunk
  │<─────────────│─ SSE event ─────────│  { id: 3, result: { stop_reason: "end_turn" } }
  │              ▼                     │
  │                                    │
  │  ═══ Permission Flow ═══           │
  │  (when tool requires confirmation) │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "session/prompt", id: 4, ... }
  │    Acp-Session-Id: <session_id>    │
  │              ┌─────────────────────│
  │<─────────────│─ SSE event ─────────│  notification: ToolCall (status: pending)
  │<─────────────│─ SSE event ─────────│  { method: "request_permission", id: 99, params: {...} }
  │              │                     │  (server-to-client request)
  │              │                     │
  │─── POST /acp ┼────────────────────>│  { id: 99, result: { outcome: "allow_once" } }
  │    Acp-Session-Id: <session_id>    │  (client response, returns 202 Accepted)
  │              │                     │
  │<─────────────│─ SSE event ─────────│  notification: ToolCallUpdate (status: completed)
  │<─────────────│─ SSE event ─────────│  { id: 4, result: { stop_reason: "end_turn" } }
  │              ▼                     │
  │                                    │
  │  ═══ Cancel Flow ═══               │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "session/prompt", id: 5, ... }
  │    Acp-Session-Id: <session_id>    │
  │              ┌─────────────────────│
  │<─────────────│─ SSE event ─────────│  notification: AgentMessageChunk
  │              │                     │
  │─── POST /acp ┼────────────────────>│  { method: "session/cancel" }
  │    Acp-Session-Id: <session_id>    │  (notification, no id - returns 202 Accepted)
  │              │                     │
  │<─────────────│─ SSE event ─────────│  { id: 5, result: { stop_reason: "cancelled" } }
  │              ▼                     │
  │                                    │
  │  ═══ Resume Session Flow ═══       │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "initialize", id: 1 }
  │    (no Acp-Session-Id)             │  New HTTP session
  │              ┌─────────────────────│
  │<─────────────│─ SSE event ─────────│  { id: 1, result: { capabilities } }
  │              │                     │  Response includes new Acp-Session-Id
  │              ▼                     │
  │                                    │
  │─── POST /acp ─────────────────────>│  { method: "session/load", id: 2,
  │    Acp-Session-Id: <new_session>   │    params: { session_id: <existing_goose_session>, cwd } }
  │              ┌─────────────────────│
  │<─────────────│─ SSE event ─────────│  notification: UserMessageChunk (history replay)
  │<─────────────│─ SSE event ─────────│  notification: AgentMessageChunk (history replay)
  │<─────────────│─ SSE event ─────────│  notification: ToolCall (history replay)
  │<─────────────│─ SSE event ─────────│  notification: ToolCallUpdate (history replay)
  │<─────────────│─ SSE event ─────────│  { id: 2, result: {} }
  │              ▼                     │
  │                                    │
  │  ═══ Standalone SSE Stream ═══     │
  │  (optional, for server-initiated)  │
  │                                    │
  │─── GET /acp ──────────────────────>│  Open dedicated SSE listener
  │    Acp-Session-Id: <session_id>    │
  │    Accept: text/event-stream       │
  │              ┌─────────────────────│  Long-lived connection for
  │              │ (SSE stream open)   │  server-initiated messages
  │              ▼                     │
  │                                    │
  │  ═══ Session Termination ═══       │
  │                                    │
  │─── DELETE /acp ───────────────────>│  Terminate session
  │    Acp-Session-Id: <session_id>    │
  │<────────── 202 Accepted ───────────│

References

There is a prototype TUI client in 112d019 which uses this transport, and will come for PR next. Then we can build up supported feature-set.

A first revision of the message flow is here, before we converted to adhere closely to how the MCP Streamable HTTP transport works: https://github.com/block/goose/blob/219143b64b63053f5d9778e303d0ab3be37fd82d/ACP_HTTP_MIGRATION_PLAN.md#message-flow

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces Phase I of ACP-over-HTTP transport, enabling clients to interact with goose via a standardized HTTP+SSE protocol instead of custom REST APIs. It implements session management, message exchanges, and streaming responses using Axum and Server-Sent Events.

Changes:

  • Added HTTP transport layer with session lifecycle management (create, stream, remove)
  • Created server factory for agent instantiation with configurable builtins and paths
  • Added standalone binary for running goose-acp-server with CLI arguments
  • Refactored server.rs to support upfront session creation for HTTP transport

Reviewed changes

Copilot reviewed 5 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
crates/goose-acp/src/http.rs New HTTP server with SSE streaming, channel-based transport adapters, and session management
crates/goose-acp/src/server_factory.rs Factory pattern for creating ACP agents with custom configuration
crates/goose-acp/src/bin/server.rs Binary entry point with CLI parsing for host, port, and builtins
crates/goose-acp/src/server.rs Added create_session method for HTTP transport; cleaned up comments and simplified pattern matching
crates/goose-acp/src/lib.rs Exported new modules and increased recursion limit for async-stream macros
crates/goose-acp/Cargo.toml Added HTTP dependencies: axum, tower-http, async-stream, clap
Cargo.lock Updated dependency versions including new HTTP crates

Comment on lines +192 to +205
"Channel full, dropping message (backpressure): {}",
truncated
);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently dropping messages on backpressure causes data loss. The agent's responses won't reach the client if the channel is full. Consider using async send() instead of try_send() to apply backpressure, or return an error that can propagate to the caller.

Suggested change
"Channel full, dropping message (backpressure): {}",
truncated
);
"Channel full, cannot send message (backpressure): {}",
truncated
);
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Channel full due to backpressure",
)));

Copilot uses AI. Check for mistakes.
let manager = self.agent.config.session_manager.clone();
let goose_session = manager
.create_session(
std::env::current_dir().unwrap_or_default(),
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the server's current directory instead of a client-specified working directory creates sessions in the wrong location. This differs from the ACP protocol's session/new which accepts a cwd parameter. Consider either accepting a cwd parameter here or documenting that sessions must be loaded via session/load to set the correct working directory.

Copilot uses AI. Check for mistakes.
Comment on lines +44 to +86
async fn create_session(&self) -> Result<String, StatusCode> {
let (to_agent_tx, to_agent_rx) = mpsc::channel::<String>(256);
let (from_agent_tx, from_agent_rx) = mpsc::channel::<String>(256);

let agent = self.server.create_agent().await.map_err(|e| {
error!("Failed to create agent: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

let session_id = agent.create_session().await.map_err(|e| {
error!("Failed to create ACP session: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

let handle = tokio::spawn(async move {
let read_stream = ReceiverToAsyncRead::new(to_agent_rx);
let write_stream = SenderToAsyncWrite::new(from_agent_tx);

if let Err(e) =
crate::server::serve(agent, read_stream.compat(), write_stream.compat_write()).await
{
error!("ACP session error: {}", e);
}
});

self.sessions.write().await.insert(
session_id.clone(),
HttpSession {
to_agent_tx,
from_agent_rx: Arc::new(Mutex::new(from_agent_rx)),
handle,
},
);

info!(session_id = %session_id, "Session created");
Ok(session_id)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sessions created via HTTP POST may leak if the client never calls the stream endpoint. Consider adding a timeout or explicit deletion endpoint, or defer session creation until the stream is established.

Copilot uses AI. Check for mistakes.
@@ -1158,14 +1165,6 @@ print(\"hello, world\")
assert_eq!(format_tool_name("single"), "Single");
}

Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed test coverage for edge cases (empty strings, double underscores) in format_tool_name. Consider keeping these tests to ensure the function handles malformed input gracefully.

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +116
async fn get_receiver(
&self,
session_id: &str,
) -> Result<Arc<Mutex<mpsc::Receiver<String>>>, StatusCode> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?;
Ok(session.from_agent_rx.clone())
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple concurrent stream connections for the same session will share the receiver, causing messages to be distributed unpredictably between them. Consider rejecting attempts to open multiple streams for the same session or using a broadcast channel pattern instead.

Copilot uses AI. Check for mistakes.
@alexhancock alexhancock requested a review from baxen January 27, 2026 16:11
@codefromthecrypt
Copy link
Collaborator

codefromthecrypt commented Jan 27, 2026

@alexhancock what's your sense that given in MCP SSE is a deprecated transport... Intentionally not doing streamable based for ACP is the right choice? A lot of ACP is based on MCP and also proxies and gateways will already have the streamable transport as a long term protocol for MCP and likely plan to wind down SSE. This would resurrect it...

@alexhancock alexhancock force-pushed the alexhancock/goose-acp-http branch from 4d50090 to 4c72105 Compare January 28, 2026 18:49
@zhixiongdu027
Copy link

I'm currently using Goose as an execution kernel in a very similar way: I wrap the stdin/stdout in a parent process and expose it via WebSockets to the frontend.
While I strongly support the goal of this PR (making ACP accessible over the network), I share the concern regarding the "Split Transport" design (SSE for reads + POST for writes).
From an integrator's perspective, managing a single bidirectional connection (like WS or the standardized MCP Streamable transport) is much cleaner than handling synchronized split connections. I would prefer an implementation that aligns closer to the modern MCP transport layer to avoid future fragmentation.

Copy link
Collaborator

@jamadeo jamadeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I see that this marked as "in progress" in the ACP docs -- do you have more insight there @alexhancock on the status of that draft proposal? I imagine once it's standardized, we'll use that?

re: MCP transports, to me this does resemble the newer streamble HTTP transport more than the legacy one, maybe with some differences of course, but that's worth discussing further @codefromthecrypt @zhixiongdu027

@@ -1 +1,5 @@
#![recursion_limit = "256"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

.await?;

let session = GooseAcpSession {
messages: Conversation::new_unvalidated(Vec::new()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use Conversation::empty() so someday when we get rid of all the new_unvalidated()s, there's one less :)

Ok(goose_session.id)
}

pub async fn has_session(&self, session_id: &str) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't used?

Copy link
Collaborator

@codefromthecrypt codefromthecrypt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alexhancock this is great progress.

I'm happy to see things move forward with the understanding that this is basically rebranding of http streamable from MCP. It would be nice to change the default path to ensure that relationship is carried, and types and comments. Even if it isn't exactly http streamable yet, knowing it aims to be is the most important thing to me.

like ideally we just have a quick comment somewhere saying

This transport aims to be the same as MCP Streamable HTTP, except base path /acp not /mcp
See https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http

Doing so will set the intent which I think is shared and also will help us long term as there are more developers and libraries on MCP protocol, as well proxies like the one I work on.

p.s. rebase should get rid of stack overflow in tests. sacp makes a giant state machine I guess.

Copilot AI review requested due to automatic review settings January 30, 2026 16:57
@alexhancock alexhancock force-pushed the alexhancock/goose-acp-http branch from 4c72105 to 5be9796 Compare January 30, 2026 16:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 7 changed files in this pull request and generated 6 comments.

Comment on lines +210 to +226
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll_flush and poll_shutdown implementations return Poll::Ready(Ok(())) without actually flushing any buffered data. If there's partial data in the buffer when flush or shutdown is called, it will be lost. Consider ensuring any remaining buffered data is sent before completing these operations.

Suggested change
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
if !self.buffer.is_empty() {
let line = String::from_utf8_lossy(&self.buffer).to_string();
self.buffer.clear();
if !line.is_empty() {
if let Err(e) = self.tx.try_send(line.clone()) {
match e {
mpsc::error::TrySendError::Full(_) => {
let truncated: String = line.chars().take(100).collect();
error!(
"Channel full, dropping message (flush/backpressure): {}",
truncated
);
}
mpsc::error::TrySendError::Closed(_) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Channel closed",
)));
}
}
}
}
}
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
Self::poll_flush(self, cx)

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 530
use anyhow::Result;
use axum::{
extract::{Path, State},
http::{header, Method, StatusCode},
response::Sse,
routing::{get, post},
Json, Router,
};
use serde::Serialize;
use std::{
collections::HashMap,
convert::Infallible,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tower_http::cors::{Any, CorsLayer};
use tracing::{debug, error, info};

use crate::server_factory::AcpServer;

struct HttpSession {
to_agent_tx: mpsc::Sender<String>,
from_agent_rx: Arc<Mutex<mpsc::Receiver<String>>>,
handle: tokio::task::JoinHandle<()>,
}

pub struct HttpState {
server: Arc<AcpServer>,
sessions: RwLock<HashMap<String, HttpSession>>,
}

impl HttpState {
pub fn new(server: Arc<AcpServer>) -> Self {
Self {
server,
sessions: RwLock::new(HashMap::new()),
}
}

async fn create_session(&self) -> Result<String, StatusCode> {
let (to_agent_tx, to_agent_rx) = mpsc::channel::<String>(256);
let (from_agent_tx, from_agent_rx) = mpsc::channel::<String>(256);

let agent = self.server.create_agent().await.map_err(|e| {
error!("Failed to create agent: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

let session_id = agent.create_session().await.map_err(|e| {
error!("Failed to create ACP session: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;

let handle = tokio::spawn(async move {
let read_stream = ReceiverToAsyncRead::new(to_agent_rx);
let write_stream = SenderToAsyncWrite::new(from_agent_tx);

if let Err(e) =
crate::server::serve(agent, read_stream.compat(), write_stream.compat_write()).await
{
error!("ACP session error: {}", e);
}
});

self.sessions.write().await.insert(
session_id.clone(),
HttpSession {
to_agent_tx,
from_agent_rx: Arc::new(Mutex::new(from_agent_rx)),
handle,
},
);

info!(session_id = %session_id, "Session created");
Ok(session_id)
}

async fn remove_session(&self, session_id: &str) {
if let Some(session) = self.sessions.write().await.remove(session_id) {
session.handle.abort();
info!(session_id = %session_id, "Session removed");
}
}

async fn send_message(&self, session_id: &str, message: String) -> Result<(), StatusCode> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?;
session
.to_agent_tx
.send(message)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}

async fn get_receiver(
&self,
session_id: &str,
) -> Result<Arc<Mutex<mpsc::Receiver<String>>>, StatusCode> {
let sessions = self.sessions.read().await;
let session = sessions.get(session_id).ok_or(StatusCode::NOT_FOUND)?;
Ok(session.from_agent_rx.clone())
}
}

struct ReceiverToAsyncRead {
rx: mpsc::Receiver<String>,
buffer: Vec<u8>,
pos: usize,
}

impl ReceiverToAsyncRead {
fn new(rx: mpsc::Receiver<String>) -> Self {
Self {
rx,
buffer: Vec::new(),
pos: 0,
}
}
}

impl tokio::io::AsyncRead for ReceiverToAsyncRead {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.pos < self.buffer.len() {
let remaining = &self.buffer[self.pos..];
let to_copy = remaining.len().min(buf.remaining());
buf.put_slice(&remaining[..to_copy]);
self.pos += to_copy;
if self.pos >= self.buffer.len() {
self.buffer.clear();
self.pos = 0;
}
return Poll::Ready(Ok(()));
}

match Pin::new(&mut self.rx).poll_recv(cx) {
Poll::Ready(Some(msg)) => {
let bytes = format!("{}\n", msg).into_bytes();
let to_copy = bytes.len().min(buf.remaining());
buf.put_slice(&bytes[..to_copy]);
if to_copy < bytes.len() {
self.buffer = bytes[to_copy..].to_vec();
self.pos = 0;
}
Poll::Ready(Ok(()))
}
Poll::Ready(None) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}
}

struct SenderToAsyncWrite {
tx: mpsc::Sender<String>,
buffer: Vec<u8>,
}

impl SenderToAsyncWrite {
fn new(tx: mpsc::Sender<String>) -> Self {
Self {
tx,
buffer: Vec::new(),
}
}
}

impl tokio::io::AsyncWrite for SenderToAsyncWrite {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.buffer.extend_from_slice(buf);

while let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') {
let line = String::from_utf8_lossy(&self.buffer[..pos]).to_string();
self.buffer.drain(..=pos);

if !line.is_empty() {
if let Err(e) = self.tx.try_send(line.clone()) {
match e {
mpsc::error::TrySendError::Full(_) => {
let truncated: String = line.chars().take(100).collect();
error!(
"Channel full, dropping message (backpressure): {}",
truncated
);
}
mpsc::error::TrySendError::Closed(_) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Channel closed",
)));
}
}
}
}
}

Poll::Ready(Ok(buf.len()))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}

#[derive(Serialize)]
struct CreateSessionResponse {
session_id: String,
}

async fn create_session(
State(state): State<Arc<HttpState>>,
) -> Result<Json<CreateSessionResponse>, StatusCode> {
let session_id = state.create_session().await?;
Ok(Json(CreateSessionResponse { session_id }))
}

async fn send_message(
State(state): State<Arc<HttpState>>,
Path(session_id): Path<String>,
body: String,
) -> Result<StatusCode, StatusCode> {
debug!(session_id = %session_id, "Received message");
state.send_message(&session_id, body).await?;
Ok(StatusCode::ACCEPTED)
}

async fn stream_events(
State(state): State<Arc<HttpState>>,
Path(session_id): Path<String>,
) -> Result<
Sse<impl futures::Stream<Item = Result<axum::response::sse::Event, Infallible>>>,
StatusCode,
> {
let receiver = state.get_receiver(&session_id).await?;

let cleanup_state = state.clone();
let cleanup_session_id = session_id.clone();

let stream = async_stream::stream! {
let mut rx = receiver.lock().await;
while let Some(msg) = rx.recv().await {
yield Ok(axum::response::sse::Event::default().data(msg));
}
cleanup_state.remove_session(&cleanup_session_id).await;
};

Ok(Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("ping"),
))
}

async fn health() -> &'static str {
"ok"
}

pub fn create_router(state: Arc<HttpState>) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE, header::ACCEPT]);

Router::new()
.route("/health", get(health))
.route("/acp/session", post(create_session))
.route("/acp/session/{session_id}/message", post(send_message))
.route("/acp/session/{session_id}/stream", get(stream_events))
.layer(cors)
.with_state(state)
}

pub async fn serve(state: Arc<HttpState>, addr: std::net::SocketAddr) -> Result<()> {
let router = create_router(state);
let listener = tokio::net::TcpListener::bind(addr).await?;
info!("ACP HTTP server listening on {}", addr);
axum::serve(listener, router).await?;
Ok(())
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new HTTP transport layer (HttpState, session management, SSE streaming) lacks test coverage. The existing tests only cover the in-process ACP protocol. Consider adding integration tests that exercise the HTTP endpoints, session lifecycle, and SSE streaming to ensure robustness of the new HTTP transport.

Copilot uses AI. Check for mistakes.
Comment on lines +187 to +206
if let Err(e) = self.tx.try_send(line.clone()) {
match e {
mpsc::error::TrySendError::Full(_) => {
let truncated: String = line.chars().take(100).collect();
error!(
"Channel full, dropping message (backpressure): {}",
truncated
);
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SenderToAsyncWrite implementation silently drops messages when the channel is full, logging only to stderr. This could lead to data loss in production. Consider either blocking until the channel has space (using send instead of try_send), or returning an error to propagate backpressure properly to the caller.

Copilot uses AI. Check for mistakes.
Comment on lines 273 to 276
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE, header::ACCEPT]);
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CORS is configured to allow any origin with allow_origin(Any). This is acceptable for development but represents a potential security risk in production. Consider restricting this to specific trusted origins or adding a configuration option to control CORS policy.

Copilot uses AI. Check for mistakes.
Comment on lines 253 to 292
let stream = async_stream::stream! {
let mut rx = receiver.lock().await;
while let Some(msg) = rx.recv().await {
yield Ok(axum::response::sse::Event::default().data(msg));
}
cleanup_state.remove_session(&cleanup_session_id).await;
};
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream_events handler holds a receiver lock for the entire duration of the SSE stream. This design could lead to contention if the same session is accessed concurrently, though this is mitigated by having only one receiver per session. However, the Arc<Mutex> pattern is unusual - consider documenting why the receiver needs to be behind both Arc and Mutex, or refactor to avoid the lock if the receiver is only consumed once.

Copilot uses AI. Check for mistakes.
Comment on lines 253 to 292
let stream = async_stream::stream! {
let mut rx = receiver.lock().await;
while let Some(msg) = rx.recv().await {
yield Ok(axum::response::sse::Event::default().data(msg));
}
cleanup_state.remove_session(&cleanup_session_id).await;
};
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session is only removed when the SSE stream ends normally. If the client disconnects abruptly or the stream handler panics, the session and its resources (including the spawned task) may leak. Consider adding connection timeout detection or implementing Drop for HttpSession to ensure cleanup happens.

Copilot uses AI. Check for mistakes.
@alexhancock alexhancock force-pushed the alexhancock/goose-acp-http branch from 5be9796 to 6a5500f Compare January 30, 2026 17:56
@alexhancock alexhancock changed the title feat: goose-acp over http feat: Streamable HTTP transport for ACP + goose-acp crate Jan 30, 2026
@alexhancock alexhancock changed the title feat: Streamable HTTP transport for ACP + goose-acp crate feat: Streamable HTTP transport for ACP + goose-acp usage Jan 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants