Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ec4b0fb
fix: restore post-run reconcile call to prevent event loss
openhands-agent Jan 26, 2026
25aa320
fix: add efficient reconcile_recent() to prevent event loss
openhands-agent Jan 26, 2026
0b407d8
fix: wait for WebSocket terminal status event before returning from r…
openhands-agent Jan 26, 2026
ffff022
Merge branch 'main' into fix/restore-post-run-reconcile
enyst Jan 28, 2026
13ad596
refactor: remove reconcile_recent, rely solely on _run_complete_event
openhands-agent Jan 30, 2026
bd8e311
refactor: add is_terminal() method to ConversationExecutionStatus
openhands-agent Jan 30, 2026
83261dd
Merge branch 'main' into fix/restore-post-run-reconcile
xingyaoww Jan 30, 2026
5287639
chore: increase WebSocket wait timeout to 5*poll_interval
openhands-agent Jan 30, 2026
939f74b
docs: add comment explaining why we don't return on polling terminal …
openhands-agent Jan 30, 2026
69610ab
Apply suggestion from @xingyaoww
xingyaoww Jan 30, 2026
35ebdd1
Merge branch 'main' into fix/restore-post-run-reconcile
xingyaoww Jan 30, 2026
802a991
Merge branch 'main' into fix/restore-post-run-reconcile
enyst Jan 31, 2026
622b711
fix: address critical issues in _wait_for_run_completion
openhands-agent Feb 2, 2026
d03112e
Merge branch 'main' into fix/restore-post-run-reconcile
openhands-agent Feb 2, 2026
3fa5173
fix: remove IDLE from terminal states and clear event at run() start
openhands-agent Feb 2, 2026
f054895
fix: address code review issues for WebSocket terminal status handling
openhands-agent Feb 3, 2026
cc6c1be
fix: add thread safety for terminal status state and document TERMINA…
openhands-agent Feb 3, 2026
6e354e6
refactor: simplify synchronization using queue.Queue
openhands-agent Feb 4, 2026
cd3346e
Merge branch 'main' into fix/restore-post-run-reconcile
xingyaoww Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 91 additions & 12 deletions openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import uuid
from collections.abc import Mapping
from queue import Empty, Queue
from typing import SupportsIndex, overload
from urllib.parse import urlparse

Expand Down Expand Up @@ -555,6 +556,7 @@ class RemoteConversation(BaseConversation):
_client: httpx.Client
_hook_processor: HookEventProcessor | None
_cleanup_initiated: bool
_terminal_status_queue: Queue[str] # Thread-safe queue for terminal status from WS
delete_on_close: bool = False

def __init__(
Expand Down Expand Up @@ -609,6 +611,7 @@ def __init__(
self._client = workspace.client
self._hook_processor = None
self._cleanup_initiated = False
self._terminal_status_queue: Queue[str] = Queue()

should_create = conversation_id is None
if conversation_id is not None:
Expand Down Expand Up @@ -708,8 +711,21 @@ def __init__(
# No visualization (visualizer is None)
self._visualizer = None

# Add a callback that signals when run completes via WebSocket
# This ensures we wait for all events to be delivered before run() returns
def run_complete_callback(event: Event) -> None:
if isinstance(event, ConversationStateUpdateEvent):
if event.key == "execution_status":
try:
status = ConversationExecutionStatus(event.value)
if status.is_terminal():
self._terminal_status_queue.put(event.value)
except ValueError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: This fix relies on a critical assumption that WebSocket events are processed sequentially in the callback.

Why this matters:

  • If ConversationStateUpdateEvent(finished) is processed and put in the queue BEFORE ActionEvent(finish) is fully processed and added to state, the race condition still exists
  • The fix works because callbacks execute sequentially, ensuring all prior events are processed when the terminal status arrives

Suggestion:
Add a comment documenting this critical assumption:

# CRITICAL: This callback must execute sequentially with other event callbacks
# to ensure all events preceding the terminal status have been fully processed
# and added to state when we signal run completion.
def run_complete_callback(event: Event) -> None:

Consider adding a test that validates sequential execution order.

pass # Unknown status value, ignore

# Compose all callbacks into a single callback
composed_callback = BaseConversation.compose_callbacks(self._callbacks)
all_callbacks = self._callbacks + [run_complete_callback]
composed_callback = BaseConversation.compose_callbacks(all_callbacks)

# Initialize WebSocket client for callbacks
self._ws_client = WebSocketCallbackClient(
Expand Down Expand Up @@ -862,6 +878,14 @@ def run(
Raises:
ConversationRunError: If the run fails or times out.
"""
# Drain any stale terminal status events from previous runs.
# This prevents stale events from causing early returns.
while True:
try:
self._terminal_status_queue.get_nowait()
except Empty:
break

# Trigger a run on the server using the dedicated run endpoint.
# Let the server tell us if it's already running (409), avoiding an extra GET.
try:
Expand Down Expand Up @@ -889,10 +913,20 @@ def _wait_for_run_completion(
poll_interval: float = 1.0,
timeout: float = 1800.0,
) -> None:
"""Poll the server until the conversation is no longer running.
"""Wait for the conversation run to complete.

This method waits for the run to complete by listening for the terminal
status event via WebSocket. This ensures all events are delivered before
returning, avoiding the race condition where polling sees "finished"
status before WebSocket delivers the final events.

As a fallback, it also polls the server periodically. If the WebSocket
is delayed or disconnected, we return after multiple consecutive polls
show a terminal status, and reconcile events to catch any that were
missed via WebSocket.

Args:
poll_interval: Time in seconds between status polls.
poll_interval: Time in seconds between status polls (fallback).
timeout: Maximum time in seconds to wait.

Raises:
Expand All @@ -901,6 +935,14 @@ def _wait_for_run_completion(
responses are retried until timeout.
"""
start_time = time.monotonic()
consecutive_terminal_polls = 0
# Return after this many consecutive terminal polls (fallback for WS issues).
# We use 3 polls to balance latency vs reliability:
# - 1 poll could be a transient state during shutdown
# - 2 polls might still catch a race condition
# - 3 polls (with default 1s interval = 3s total) provides high confidence
# that the run is truly complete while keeping fallback latency reasonable
TERMINAL_POLL_THRESHOLD = 3

while True:
elapsed = time.monotonic() - start_time
Expand All @@ -913,20 +955,57 @@ def _wait_for_run_completion(
),
)

# Wait for either:
# 1. WebSocket delivers terminal status event (preferred)
# 2. Poll interval expires (fallback - check status via REST)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.

Example:

  • timeout = 1800s, poll_interval = 1s
  • At iteration where elapsed = 1799.5s, we pass the timeout check
  • Then we wait up to 1s for the queue, potentially reaching 1800.5s total

Suggested fix:

remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
    ws_status = self._terminal_status_queue.get(timeout=wait_time)

This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.

Example:

  • timeout = 1800s, poll_interval = 1s
  • At iteration where elapsed = 1799.5s, we pass the timeout check
  • Then we wait up to 1s for the queue, potentially reaching 1800.5s total

Suggested fix:

remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
    ws_status = self._terminal_status_queue.get(timeout=wait_time)

This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.

ws_status = self._terminal_status_queue.get(timeout=poll_interval)
# Handle ERROR/STUCK states - raises ConversationRunError
self._handle_conversation_status(ws_status)

logger.info(
"Run completed via WebSocket notification "
"(status: %s, elapsed: %.1fs)",
ws_status,
elapsed,
)
return
except Empty:
pass # Queue.get() timed out, fall through to REST polling

# Poll the server for status as a health check and fallback.
# This catches ERROR/STUCK states that need immediate attention,
# and provides a fallback if WebSocket is delayed/disconnected.
try:
status = self._poll_status_once()
except Exception as exc:
self._handle_poll_exception(exc)
consecutive_terminal_polls = 0 # Reset on error
else:
if self._handle_conversation_status(status):
logger.info(
"Run completed with status: %s (elapsed: %.1fs)",
status,
elapsed,
)
return

time.sleep(poll_interval)
# Raises ConversationRunError for ERROR/STUCK states
self._handle_conversation_status(status)

# Track consecutive terminal polls as a fallback for WS issues.
# If WebSocket is delayed/disconnected, we return after multiple
# consecutive polls confirm the terminal status.
if status and ConversationExecutionStatus(status).is_terminal():
consecutive_terminal_polls += 1
if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD:
logger.info(
"Run completed via REST fallback after %d consecutive "
"terminal polls (status: %s, elapsed: %.1fs). "
"Reconciling events...",
consecutive_terminal_polls,
status,
elapsed,
)
# Reconcile events to catch any that were missed via WS.
# This is only called in the fallback path, so it doesn't
# add overhead in the common case where WS works.
self._state.events.reconcile()
return
else:
consecutive_terminal_polls = 0

def _poll_status_once(self) -> str | None:
"""Fetch the current execution status from the remote conversation."""
Expand Down
19 changes: 19 additions & 0 deletions openhands-sdk/openhands/sdk/conversation/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ class ConversationExecutionStatus(str, Enum):
STUCK = "stuck" # Conversation is stuck in a loop or unable to proceed
DELETING = "deleting" # Conversation is in the process of being deleted

def is_terminal(self) -> bool:
"""Check if this status represents a terminal state.

Terminal states indicate the run has completed and the agent is no longer
actively processing. These are: FINISHED, ERROR, STUCK.

Note: IDLE is NOT a terminal state - it's the initial state of a conversation
before any run has started. Including IDLE would cause false positives when
the WebSocket delivers the initial state update during connection.

Returns:
True if this is a terminal status, False otherwise.
"""
return self in (
ConversationExecutionStatus.FINISHED,
ConversationExecutionStatus.ERROR,
ConversationExecutionStatus.STUCK,
)


class ConversationState(OpenHandsModel):
# ===== Public, validated fields =====
Expand Down
Loading
Loading