fix: wait for WebSocket terminal status to prevent event loss#1832
fix: wait for WebSocket terminal status to prevent event loss#1832
Conversation
all-hands-bot
left a comment
There was a problem hiding this comment.
Good fix that correctly addresses the race condition between run completion detection and WebSocket event delivery. The restoration of the post-run reconcile call is justified by the test failures and matches the established pattern.
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
The reconcile() call after run completion was removed in PR #1820, but this caused a race condition where events emitted during the final moments of the run could be lost if the WebSocket didn't deliver them before run() returned. This was observed in CI where test_events_not_lost_during_client_disconnection failed because the client only received 3-4 events while the REST API had 6 events - the ActionEvent(finish) and ObservationEvent(finish) were missing. Reproduction: - Inject a 3s delay in the WebSocket callback for finish events - Run the conversation with a finish tool call - Observe that without the reconcile() call, the client is missing events The fix restores the reconcile() call in _wait_for_run_completion() to ensure all events are captured after run completion. This is safe because reconcile() is idempotent and will only add events that are missing from the client's cache. Fixes the flaky test failure in PR #1829. Co-authored-by: openhands <[email protected]>
c68f00b to
ec4b0fb
Compare
This PR fixes the race condition where events emitted during the final moments of a run could be lost if the WebSocket didn't deliver them before run() returned. ## Root Cause The race condition occurs when: 1. Server emits events (ActionEvent, ObservationEvent) 2. Client polls and sees 'finished' status 3. run() returns before WebSocket delivers those events ## Solution Instead of using the expensive reconcile() which fetches ALL events, we introduce reconcile_recent() which only fetches events after the last known timestamp. This is much more efficient for long conversations. The fix: 1. Added reconcile_recent() method to RemoteEventsList that uses the timestamp__gte filter to only fetch recent events 2. Call reconcile_recent() after run completion to catch any events that were missed by WebSocket ## Reproduction Added test_event_loss_repro.py which reliably reproduces the race condition by injecting a 3s delay in the WebSocket callback for finish events. Without the fix, the test fails because the client is missing ActionEvent(finish) and ObservationEvent(finish). ## Testing - All cross tests pass - The reproduction test passes with the fix Fixes the flaky test failure in PR #1829. Co-authored-by: openhands <[email protected]>
Coverage Report •
|
||||||||||||||||||||||||||||||
…un() This is a more fundamental fix for the event loss race condition. Instead of just reconciling after polling detects terminal status, we now: 1. Add a threading.Event that gets signaled when WebSocket delivers a ConversationStateUpdateEvent with terminal execution_status (idle, finished, error, stuck) 2. In _wait_for_run_completion(), wait for either: - WebSocket delivers terminal status (preferred - all events guaranteed) - Poll interval expires (fallback - check status via REST) 3. If WebSocket delivers terminal status first, return immediately since all events are guaranteed to be delivered through the same channel 4. If polling detects terminal status first (the race condition), use reconcile_recent() to fetch any events that may have been missed This approach: - Fixes the root cause by waiting for WebSocket to deliver all events - Uses reconcile_recent() as a fallback (more efficient than full reconcile) - Maintains backward compatibility with the polling mechanism Co-authored-by: openhands <[email protected]>
|
@OpenHands Look at this github action failure: https://github.com/OpenHands/software-agent-sdk/actions/runs/21394941864/job/61590830061?pr=1840#step:7:5796 Analyze deeply why did test_events_not_lost_during_client_disconnection() fail there, not that the branch is not the same with this branch (that branch with the failure was openhands/bounded-init-state-scan). Then check this PR's diff and tell me WDYT: does this PR fix the root cause of test_events_not_lost_during_client_disconnection() failures, as it intends to? |
|
I'm on it! enyst can track my progress at all-hands.dev |
|
I only answered your question (no code changes were made). Answer (restated)
Checklist vs your request
No push needed since the working tree is unchanged. |
|
@OpenHands Do a /codereview-roasted on this PR. We want a solution for the recent events that doesn’t require rethinking the ws/REST sync yet. |
|
I'm on it! enyst can track my progress at all-hands.dev |
🔴 /codereview-roasted: Needs ImprovementTaste Rating: 🟡 Acceptable - Works but adds complexity to paper over a fundamental architectural issue. [CRITICAL ISSUES] (Must fix)[remote_conversation.py, Line 791] Data Structure / Special Case: Including if event.key == "execution_status" and event.value in (
ConversationExecutionStatus.IDLE.value, # <-- SUSPICIOUS
ConversationExecutionStatus.FINISHED.value,
...
[remote_conversation.py, Line 359] Bug: "timestamp__gte": timestamp_str, # Should be timestamp__gtThis is wasteful and relies entirely on deduplication to not cause issues. Use [remote_conversation.py, Lines 1017-1026] Complexity / False Assumption: The comment claims "all events are guaranteed to be delivered since they come through the same channel" - this is only true if the WebSocket connection stayed up the entire time. If WebSocket disconnected and reconnected during the run, events could be lost. The code doesn't verify WebSocket health before trusting this path. [IMPROVEMENT OPPORTUNITIES] (Should fix)[remote_conversation.py, Lines 372-376] Silent Failure: except Exception as e:
logger.warning(f"Failed to fetch events during recent reconciliation: {e}")
break # <-- Silent failure, caller has no ideaAt minimum, return a sentinel or raise after logging. [remote_conversation.py, Lines 786-796] Complexity: Defining a closure inside a 100+ line [test_event_loss_repro.py] Pragmatism: The test injects an artificial 3-second delay. This tests that [remote_conversation.py, Line 1001] Fragile Ordering: Clearing [STYLE NOTES] (Minor)[remote_conversation.py, Lines 1024-1025, 1047-1048] Redundant: VERDICT:✅ Worth merging with fixes - The core approach (prefer WebSocket, fallback to reconcile) is sound and solves the immediate problem without rethinking WS/REST sync. However, the KEY INSIGHT:The real fix would be server-side: don't report "finished" status until all events are persisted and ready for delivery. This client-side workaround adds complexity to compensate for a server-side ordering guarantee that doesn't exist. It's acceptable as a tactical fix, but the Recommended minimal fixes before merge:
|
Comparison with PR #1858 (test-only fix)I just cherry-picked the latest commit from PR #1840 into a new PR (#1858). That patch only hardens the test by:
No SDK behavior changes there — it’s strictly a test stability change. PR #1832 approachThis PR changes RemoteConversation behavior to avoid the event-loss race:
Take / preferenceIf the underlying issue is real event loss (run() returns before WS delivers final events), #1832 is the better fix because it addresses the root cause and aligns client behavior with the REST API’s authoritative state. The #1858-style fix mainly treats CI flakiness / eventual consistency — it reduces false negatives but could mask a genuine race in production. If the delayed-WS repro is valid, I’d favor the behavioral fix in #1832 and keep the test robust as a secondary guard. Minor nit: |
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
Remove the reconcile_recent method and simplify _wait_for_run_completion to rely solely on the _run_complete_event mechanism for synchronization. When WebSocket delivers the terminal status event, all events are guaranteed to be delivered since they come through the same channel in order. This makes reconcile_recent unnecessary. The polling is now only used as a health check to detect error/stuck states that need immediate attention. For normal completion, we continue waiting for WebSocket to deliver the terminal status event. Co-authored-by: openhands <[email protected]>
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
Add a class method to ConversationExecutionStatus to check if a status represents a terminal state (IDLE, FINISHED, ERROR, STUCK). This helps avoid repeating the same tuple check in multiple places. Update remote_conversation.py to use the new is_terminal() method instead of the inline tuple check. Co-authored-by: openhands <[email protected]>
This gives the WebSocket more time to deliver the terminal status event before falling back to polling, reducing unnecessary API calls while still maintaining the ability to detect error/stuck states. Co-authored-by: openhands <[email protected]>
…status Co-authored-by: openhands <[email protected]>
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
all-hands-bot
left a comment
There was a problem hiding this comment.
Overall Assessment
The approach of waiting for WebSocket delivery of terminal status is sound and addresses the root cause of the race condition. However, there are some important concerns about behavior changes and missing functionality mentioned in the PR description.
Key Issues:
- 🟠 Polling interval increased from 1s to 5s (5x slower) - significant behavior change
- 🟠
reconcile_recent()mentioned in PR description but not implemented in this diff - 🟡 Redundant event clearing logic
- 🟡 Complex control flow could use clarification
See inline comments for details.
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py
Outdated
Show resolved
Hide resolved
🔴 /codereview-roasted: Needs Improvement (but close to acceptable)Taste Rating: 🟡 Acceptable - A pragmatic fix that addresses the root cause, but relies on implicit ordering guarantees and has some thread safety concerns. Deep Dive: The Synchronization ProblemThe WebSocket implementation has a fundamental race condition between two parallel channels: The race window exists because REST polling can detect the terminal status before WebSocket delivers all events. Why This Fix WorksThe key insight is that the server guarantees event ordering: # event_service.py, lines 531-543
async def _run_and_publish():
try:
await loop.run_in_executor(None, conversation.run)
finally:
# Wait for all pending events to be published
if self._callback_wrapper:
await loop.run_in_executor(
None, self._callback_wrapper.wait_for_pending, 30.0
)
# THEN publish terminal status
await self._publish_state_update()Combined with:
So if we see the terminal status via WebSocket, all prior events have been processed. This is the correct fix. [CRITICAL ISSUES] (Should fix)[remote_conversation.py, Lines 555-558, 731-733] Thread Safety: Shared state accessed from multiple threads without explicit synchronization. # Written from WebSocket thread:
self._terminal_status = event.value
self._terminal_status_run_counter = self._run_counter
# Read from main thread:
if self._terminal_status_run_counter == expected_run_counter:
ws_status = self._terminal_statusWhile Python's GIL provides some protection, this is fragile. Consider using a [remote_conversation.py, Line 951] Magic Number: TERMINAL_POLL_THRESHOLD = 3 # Why 3? Document the reasoningThis adds 3 * poll_interval (default 3 seconds) of latency in the fallback path. Consider making this configurable or documenting why 3 is the right number. [IMPROVEMENT OPPORTUNITIES] (Should consider)[Implicit Contract] Server Ordering Guarantee: The fix relies on the server sending events in order (all events before terminal status). This is currently true but not documented as a contract. If someone changes
[test_event_loss_repro.py] Test Relies on Implementation Detail: The test injects a delay in the callback, which works because WebSocket message processing is sequential. This is correct but subtle - the test should document why this works: # The delay works because WebSocket processes messages sequentially:
# 1. ActionEvent arrives → callback sleeps 3s
# 2. ConversationStateUpdateEvent waits in queue
# 3. Terminal status can't be processed until ActionEvent callback completes[state.py, is_terminal()] Good Design: Excluding [STYLE NOTES] (Minor)[remote_conversation.py, Lines 719-735] Nested Callback Definition: The VERDICT:✅ Worth merging with minor fixes: The core logic is sound and addresses the root cause correctly. The thread safety concern is the main issue to address. KEY INSIGHT:The fix correctly leverages the fact that WebSocket message processing is sequential - if we see the terminal status via WebSocket, all prior events have been processed. This is a fundamental property of the system that makes the fix reliable. Comparison with PR #1858 (test-only fix):PR #1858 hardens the test but doesn't fix the SDK. This PR (#1832) fixes the actual race condition in the SDK, which is the right approach. The test-only fix would leave users vulnerable to the same race condition in production. |
|
I've completed the Summary of the ReviewVerdict: 🟡 Acceptable - Worth merging with minor fixes The ProblemThe WebSocket implementation has a race condition where REST polling can detect "finished" status before WebSocket delivers all events, causing event loss. Why This PR's Fix WorksThe fix correctly leverages three key properties:
So if we see the terminal status via WebSocket, all prior events have been processed. Issues Identified
RecommendationThe core logic is sound and addresses the root cause correctly. This is the right approach compared to PR #1858 (test-only fix), as it fixes the actual SDK race condition rather than just hardening tests. The review comment has been posted to: #1832 (comment) |
|
@OpenHands please fix #1832 (comment) |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
…L_POLL_THRESHOLD
Address code review feedback:
1. Thread Safety: Added threading.Lock to protect shared state
(_terminal_status, _terminal_status_run_counter, _run_counter)
accessed from both WebSocket callback thread and main thread.
All reads and writes to these variables are now atomic.
2. Magic Number Documentation: Added detailed comment explaining
why TERMINAL_POLL_THRESHOLD = 3 is the right value:
- 1 poll could be a transient state during shutdown
- 2 polls might still catch a race condition
- 3 polls (with default 1s interval = 3s total) provides high
confidence while keeping fallback latency reasonable
Co-authored-by: openhands <[email protected]>
|
I've addressed the code review feedback from @xingyaoww on PR #1832. The changes have been committed and pushed to the Changes Made1. Thread Safety Fix (Critical Issue)Added a
All reads and writes to these variables are now atomic, wrapped in 2. Magic Number Documentation (Critical Issue)Added detailed documentation for
CommitThe PR has been updated: #1832 |
|
@OpenHands Do a /codereview-roasted on this PR. Investigate deeply the WS implementation and understand the synchronization problems it has, and this PR is trying to solve with some approach. WDYT? Answer directly as a comment to the PR. Note that it will be rendered as markdown. |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
🔥 /codereview-roasted: Deep Dive into the WS Synchronization ProblemTaste Rating: 🟡 Acceptable - A pragmatic fix that addresses the immediate problem, though it adds complexity to paper over a fundamental dual-path architecture issue. The Core Problem (Well Understood)The SDK has two parallel data paths that need synchronization:
The race condition: Why This PR's Approach WorksAfter digging into the server-side code ( # Wait for all pending events to be published via
# AsyncCallbackWrapper before publishing the final state update.
if self._callback_wrapper:
await loop.run_in_executor(None, self._callback_wrapper.wait_for_pending, 30.0)
# Clear task reference and publish state update
self._run_task = None
await self._publish_state_update()The server guarantees the terminal status event is sent AFTER all other events. This makes the PR's approach of waiting for the WS terminal status fundamentally sound - if you receive the terminal status via WS, all events have already been sent through the same channel. [CRITICAL ISSUES] - None blocking mergeThe previous review's concern about IDLE being a terminal status has been addressed - [IMPROVEMENT OPPORTUNITIES][remote_conversation.py, Lines 986-1012] Potential Tight Loop on Stale Events if ws_status is not None:
# ... handle and return
else:
continue # ← No sleep hereIf there are multiple stale events from previous runs, this could spin without sleeping. In practice unlikely, but consider adding a small sleep or rate limit. [remote_conversation.py, Line 958] Magic Number TERMINAL_POLL_THRESHOLD = 3The comment explains the rationale well, but this could be a class constant or configurable parameter for different deployment scenarios (e.g., high-latency networks might want higher threshold). [test_event_loss_repro.py] Test Artificiality [ARCHITECTURAL OBSERVATION]The complexity added here (5 new instance variables, run counter correlation, dual-path waiting logic) is a symptom of the fundamental tension between:
The PR correctly identifies that the WS terminal status is the synchronization point, but the fallback mechanism (3 consecutive terminal polls + reconcile) adds significant complexity. Long-term consideration: Could the SDK be simplified by making WS the primary path and REST purely for initial sync/fallback? The server already guarantees event ordering. VERDICT: ✅ Worth MergingThe core logic is sound:
The complexity is justified by the need to handle both happy path (WS works) and degraded path (WS fails). KEY INSIGHTThis PR correctly leverages the server's existing guarantee that terminal status is published after all events are flushed. The synchronization point was always there - this PR just makes the client wait for it properly. |
|
I've completed the SummaryNo code changes were made - this was a review-only task. Key Findings from the Deep Dive:
The review has been posted to the PR: #1832 (comment) |
|
@enyst 🤣 looks like after i repeat your prompt three times in a row, now openhands thinks the solution is ok |
| with self._terminal_status_lock: | ||
| self._terminal_status = event.value | ||
| self._terminal_status_run_counter = self._run_counter | ||
| self._run_complete_event.set() |
There was a problem hiding this comment.
@OpenHands Do a /codereview-roasted on this PR. Investigate deeply the WS implementation and understand the synchronization problems it has, and this PR is trying to solve with some approach. WDYT?
- Tell me, can we consolidate the
_terminal_statusand_run_complete_eventinto just ONE event? - Also, can we simplify the current implementation?
Answer directly as a comment to the PR. Note that it will be rendered as markdown.
There was a problem hiding this comment.
I'm on it! xingyaoww can track my progress at all-hands.dev
🔥 Roasted Code ReviewTaste Rating: 🟡 Acceptable - Works but could be cleanerThe GoodThis PR solves a real problem - the event loss race condition is a legitimate bug that causes flaky tests. The diagnosis is correct: when polling sees "finished" before WebSocket delivers events, The test reproduction ( The Not-So-Good: Synchronization Complexity5 pieces of state for one synchronization problem? Let me count: _run_complete_event: threading.Event # Signal
_run_counter: int # Counter
_terminal_status: str | None # Payload
_terminal_status_run_counter: int # Counter correlation
_terminal_status_lock: threading.Lock # ProtectionThis is over-engineered. The classic "I need to pass data between threads" problem has simpler solutions. Answer to Your Questions1. Can we consolidate
|
|
@OpenHands please update this PR following the suggestions here: #1832 (comment) |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
Replace 5 pieces of synchronization state with a single thread-safe Queue: - _run_complete_event (threading.Event) - _run_counter (int) - _terminal_status (str | None) - _terminal_status_run_counter (int) - _terminal_status_lock (threading.Lock) Now replaced with: - _terminal_status_queue (Queue[str]) The Queue provides: - Thread-safe blocking with timeout via Queue.get(timeout=poll_interval) - No need for explicit locking (Queue is thread-safe) - Simpler stale event handling by draining queue at start of run() This simplification was suggested in code review: #1832 (comment) Co-authored-by: openhands <[email protected]>
|
I've updated PR #1832 following the code review suggestions from the comment at #1832 (comment). Summary of ChangesThe code review suggested simplifying the synchronization mechanism by replacing 5 pieces of state with a single Before (5 variables):
After (1 variable):
Key Changes:
Benefits:
Verification:
The changes have been pushed to the |
|
@OpenHands Do a /codereview-roasted on this PR. Investigate deeply the WS implementation and understand the synchronization problems it has, and this PR is trying to solve with some approach. WDYT? Answer directly as a comment to the PR. Note that it will be rendered as markdown. |
|
I'm on it! xingyaoww can track my progress at all-hands.dev |
🔥 /codereview-roasted: Deep Dive into the WS Synchronization ProblemTaste Rating: 🟡 Acceptable - A pragmatic tactical fix that papers over a fundamental architectural issue. Works, but adds complexity. The Core Problem (Well Identified)The race condition is a classic producer-consumer synchronization issue: The PR correctly identifies that REST polling can see terminal status before WS delivers all events. [CRITICAL ISSUES] (Must fix or acknowledge)[remote_conversation.py, Line 209] Data Flow / Silent Failure: except websockets.exceptions.ConnectionClosed:
break # <-- WS client stops forever, no retryIf the server closes the connection gracefully (e.g., during shutdown or load balancing), the WS client dies silently. The fallback mechanism saves us, but there's no logging or visibility into this failure mode. Consider:
[remote_conversation.py, Lines 881-887] Race Condition / Queue Draining: The queue drain at # Drain any stale terminal status events from previous runs.
while True:
try:
self._terminal_status_queue.get_nowait()
except Empty:
breakWhat if a terminal status arrives between draining and the server actually starting the run? The 409 handling helps, but consider this sequence:
This is unlikely but possible. A more robust approach would be to include a run ID or sequence number in the terminal status. [IMPROVEMENT OPPORTUNITIES] (Should fix - violates good taste)[remote_conversation.py, Line 945] Magic Number: TERMINAL_POLL_THRESHOLD = 3The comment explains the reasoning well, but this adds 3 seconds of latency in the fallback path. Consider making this configurable via a parameter or class attribute for users who need lower latency and are willing to accept the risk. [remote_conversation.py, Lines 958-974] Complexity: The dual-path waiting logic is getting complex. try:
ws_status = self._terminal_status_queue.get(timeout=poll_interval)
# Handle ERROR/STUCK states - raises ConversationRunError
self._handle_conversation_status(ws_status)
logger.info(...)
return
except Empty:
pass # Queue.get() timed out, fall through to REST pollingThis is acceptable, but the method is doing two things: waiting for WS and polling REST. Consider extracting the WS wait into a separate method for clarity. [test_event_loss_repro.py] Test Validity: The test validates callback completion, not WS-before-REST ordering. The test injects a delay in the callback: def delayed_cb(event: Event) -> None:
if isinstance(event, (ActionEvent, ObservationEvent)) and ...:
time.sleep(ws_delay_s) # Delay BEFORE calling orig_cb
orig_cb(event)Since A more direct test would mock the WS to deliver events with a delay AFTER the REST poll sees terminal status. [ARCHITECTURAL OBSERVATIONS] (For future consideration)The Fundamental Issue: The dual REST/WS synchronization model is inherently racy. The PR description acknowledges this: "We want a solution for the recent events that doesn't require rethinking the ws/REST sync yet." For a more robust long-term solution, consider:
WS Reconnection Gap: If WS disconnects and reconnects during a run, events during the gap are lost. The fallback reconcile helps, but only if we hit the 3-poll threshold. This is a known limitation. VERDICT:✅ Worth merging - The core logic is sound and significantly improves reliability. The fix correctly waits for WS to deliver terminal status before returning, ensuring events are delivered through the same channel. Key Insight: The PR's approach of "wait for WS terminal status, fall back to REST + reconcile" is a pragmatic solution that works for the common case (WS delivers events on time) while providing a safety net for edge cases (WS delayed/disconnected). Remaining Risks:
Recommendation: Merge with the understanding that this is a tactical fix. Consider adding WS health logging and making |
|
I've completed the SummaryThe PR addresses a race condition in the WebSocket/REST synchronization where events can be lost when My Review Verdict: 🟡 Acceptable - Worth Merging Key Findings:The Core Problem (Well Identified):
Critical Issues Identified:
Improvement Opportunities:
Architectural Observations:
Recommendation: Merge with the understanding that this is a tactical fix. The approach of "wait for WS terminal status, fall back to REST + reconcile" is pragmatic and significantly improves reliability. The review comment has been posted at: #1832 (comment) |
all-hands-bot
left a comment
There was a problem hiding this comment.
Summary
Great work fixing this race condition! The approach of waiting for the WebSocket terminal status event is sound and addresses the root cause. The fallback mechanism provides good resilience.
I found a few areas that could use clarification or minor improvements, particularly around the sequential processing assumption that this fix relies on.
Key Findings
🟠 Important Issues
-
Sequential Processing Assumption (line 723 in
remote_conversation.py)- This fix relies on the critical assumption that WebSocket events are processed sequentially
- If
ConversationStateUpdateEvent(finished)is processed beforeActionEvent(finish)is fully added to state, the race condition persists - Recommendation: Add a comment documenting this assumption and consider adding test validation
-
Timeout Edge Case (line 961 in
remote_conversation.py)- If elapsed time is close to timeout, we still wait for full
poll_interval, potentially exceeding timeout - Suggestion: Use
wait_time = min(poll_interval, timeout - elapsed)for precise timeout handling
- If elapsed time is close to timeout, we still wait for full
🟡 Suggestions
-
TERMINAL_POLL_THRESHOLD Value (line 926)
- Current value of 3 creates 3-second delay in fallback path
- Well-reasoned in comments, but consider if 2 would be sufficient for faster fallback
-
Queue Draining Race Condition (line 867)
- Theoretical race if WebSocket fires between draining and run start (very unlikely)
- Suggestion: Add logging for drained events for better debugging
-
Test Performance (
test_event_loss_repro.pyline 176)- 3-second delay makes test slow
- Recommendation: Add
@pytest.mark.slowdecorator for optional execution in CI
-
Minor Inefficiency (line 720)
- Status converted from string to enum twice
- Not a bug, just a small optimization opportunity
✅ Good Practices
- Excellent documentation on
is_terminal()method explaining why IDLE is excluded - Smart optimization to only reconcile in fallback path
- Comprehensive reproduction test validates the fix
Overall Assessment
The fix is sound and addresses the root cause effectively. The fallback mechanism provides resilience. Main recommendation is to document the sequential processing assumption more clearly, as it's critical for correctness.
| status = ConversationExecutionStatus(event.value) | ||
| if status.is_terminal(): | ||
| self._terminal_status_queue.put(event.value) | ||
| except ValueError: |
There was a problem hiding this comment.
🟠 Important: This fix relies on a critical assumption that WebSocket events are processed sequentially in the callback.
Why this matters:
- If
ConversationStateUpdateEvent(finished)is processed and put in the queue BEFOREActionEvent(finish)is fully processed and added to state, the race condition still exists - The fix works because callbacks execute sequentially, ensuring all prior events are processed when the terminal status arrives
Suggestion:
Add a comment documenting this critical assumption:
# CRITICAL: This callback must execute sequentially with other event callbacks
# to ensure all events preceding the terminal status have been fully processed
# and added to state when we signal run completion.
def run_complete_callback(event: Event) -> None:Consider adding a test that validates sequential execution order.
| # Wait for either: | ||
| # 1. WebSocket delivers terminal status event (preferred) | ||
| # 2. Poll interval expires (fallback - check status via REST) | ||
| try: |
There was a problem hiding this comment.
🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.
Example:
timeout = 1800s,poll_interval = 1s- At iteration where
elapsed = 1799.5s, we pass the timeout check - Then we wait up to 1s for the queue, potentially reaching 1800.5s total
Suggested fix:
remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
ws_status = self._terminal_status_queue.get(timeout=wait_time)This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.
| # Wait for either: | ||
| # 1. WebSocket delivers terminal status event (preferred) | ||
| # 2. Poll interval expires (fallback - check status via REST) | ||
| try: |
There was a problem hiding this comment.
🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.
Example:
timeout = 1800s,poll_interval = 1s- At iteration where
elapsed = 1799.5s, we pass the timeout check - Then we wait up to 1s for the queue, potentially reaching 1800.5s total
Suggested fix:
remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
ws_status = self._terminal_status_queue.get(timeout=wait_time)This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.
|
|
||
| def delayed_cb(event: Event) -> None: | ||
| if ( | ||
| isinstance(event, (ActionEvent, ObservationEvent)) |
There was a problem hiding this comment.
🟡 Suggestion: The 3-second delay effectively demonstrates the race condition, but it makes this test slow to execute.
Options to consider:
- Add slow test marker:
@pytest.mark.slow- allows skipping in fast CI runs - Reduce delay: Try 1-2 seconds if it still reliably reproduces the issue
- Keep as-is: Since this is a critical bug reproduction test
Recommended approach:
@pytest.mark.slow # Skip in fast test runs with: pytest -m "not slow"
def test_event_loss_race_condition_with_ws_delay(
server_env_for_repro, monkeypatch: pytest.MonkeyPatch
):This allows the test to be run in thorough validation while keeping fast feedback loops for development.
|
Looks like there are a few issues preventing this PR from being merged!
If you'd like me to help, just leave a comment, like Feel free to include any additional details that might help me get this PR into a better state. You can manage your notification settings |
Summary
This PR fixes the race condition where events emitted during the final moments of a run could be lost if the WebSocket did not deliver them before
run()returned.Problem
The test
test_events_not_lost_during_client_disconnectionwas failing intermittently in CI (observed in PR #1829) with the following error:Root Cause
The race condition occurs when:
run()returns before WebSocket delivers those eventsPR #1820 removed the
reconcile()call that was supposed to catch these events, causing the flaky test failure.Solution
This PR implements a more fundamental fix by waiting for the WebSocket to deliver the terminal status event before returning from
run():Added
_run_complete_event: Athreading.Eventthat gets signaled when WebSocket delivers aConversationStateUpdateEventwith terminalexecution_status(idle, finished, error, stuck)Added
is_terminal()method: Added toConversationExecutionStatusenum to check if a status represents a terminal stateModified
_wait_for_run_completion(): Now waits for the WebSocket to deliver the terminal status event instead of returning immediately when polling detects completion. The polling is kept as a health check for ERROR/STUCK states that need immediate attention.Why this approach works:
run()returns (since events come through the same WebSocket channel)Reliable Reproduction
Added
test_event_loss_repro.pywhich reliably reproduces the race condition by injecting a 3s delay in the WebSocket callback for finish events.Testing
Related
Agent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22golang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:7b08629-pythonRun
All tags pushed for this build
About Multi-Architecture Support
7b08629-python) is a multi-arch manifest supporting both amd64 and arm647b08629-python-amd64) are also available if needed