Skip to content

fix: wait for WebSocket terminal status to prevent event loss#1832

Merged
xingyaoww merged 20 commits intomainfrom
fix/restore-post-run-reconcile
Feb 5, 2026
Merged

fix: wait for WebSocket terminal status to prevent event loss#1832
xingyaoww merged 20 commits intomainfrom
fix/restore-post-run-reconcile

Conversation

@xingyaoww
Copy link
Collaborator

@xingyaoww xingyaoww commented Jan 26, 2026

Summary

This PR fixes the race condition where events emitted during the final moments of a run could be lost if the WebSocket did not deliver them before run() returned.

Problem

The test test_events_not_lost_during_client_disconnection was failing intermittently in CI (observed in PR #1829) with the following error:

AssertionError: ActionEvent with finish tool not found in client events. 
REST API has 1 ActionEvent(s) but client has 0.

Root Cause

The race condition occurs when:

  1. Server emits events (ActionEvent, ObservationEvent)
  2. Client polls and sees "finished" status
  3. run() returns before WebSocket delivers those events

PR #1820 removed the reconcile() call that was supposed to catch these events, causing the flaky test failure.

Solution

This PR implements a more fundamental fix by waiting for the WebSocket to deliver the terminal status event before returning from run():

  1. Added _run_complete_event: A threading.Event that gets signaled when WebSocket delivers a ConversationStateUpdateEvent with terminal execution_status (idle, finished, error, stuck)

  2. Added is_terminal() method: Added to ConversationExecutionStatus enum to check if a status represents a terminal state

  3. Modified _wait_for_run_completion(): Now waits for the WebSocket to deliver the terminal status event instead of returning immediately when polling detects completion. The polling is kept as a health check for ERROR/STUCK states that need immediate attention.

Why this approach works:

  • Fixes the root cause: By waiting for WebSocket to deliver the terminal status event, we ensure all events are delivered before run() returns (since events come through the same WebSocket channel)
  • No additional API calls: When WebSocket delivers events on time (the common case), no additional API calls are needed
  • Simple and reliable: The WebSocket terminal status event is guaranteed to arrive after all other events from the run

Reliable Reproduction

Added test_event_loss_repro.py which reliably reproduces the race condition by injecting a 3s delay in the WebSocket callback for finish events.

Testing

  • All cross tests pass locally
  • The reproduction test passes with the fix

Related


Agent Server images for this PR

GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server

Variants & Base Images

Variant Architectures Base Image Docs / Tags
java amd64, arm64 eclipse-temurin:17-jdk Link
python amd64, arm64 nikolaik/python-nodejs:python3.12-nodejs22 Link
golang amd64, arm64 golang:1.21-bookworm Link

Pull (multi-arch manifest)

# Each variant is a multi-arch manifest supporting both amd64 and arm64
docker pull ghcr.io/openhands/agent-server:71f40eb-python

Run

docker run -it --rm \
  -p 8000:8000 \
  --name agent-server-71f40eb-python \
  ghcr.io/openhands/agent-server:71f40eb-python

All tags pushed for this build

ghcr.io/openhands/agent-server:71f40eb-golang-amd64
ghcr.io/openhands/agent-server:71f40eb-golang_tag_1.21-bookworm-amd64
ghcr.io/openhands/agent-server:71f40eb-golang-arm64
ghcr.io/openhands/agent-server:71f40eb-golang_tag_1.21-bookworm-arm64
ghcr.io/openhands/agent-server:71f40eb-java-amd64
ghcr.io/openhands/agent-server:71f40eb-eclipse-temurin_tag_17-jdk-amd64
ghcr.io/openhands/agent-server:71f40eb-java-arm64
ghcr.io/openhands/agent-server:71f40eb-eclipse-temurin_tag_17-jdk-arm64
ghcr.io/openhands/agent-server:71f40eb-python-amd64
ghcr.io/openhands/agent-server:71f40eb-nikolaik_s_python-nodejs_tag_python3.12-nodejs22-amd64
ghcr.io/openhands/agent-server:71f40eb-python-arm64
ghcr.io/openhands/agent-server:71f40eb-nikolaik_s_python-nodejs_tag_python3.12-nodejs22-arm64
ghcr.io/openhands/agent-server:71f40eb-golang
ghcr.io/openhands/agent-server:71f40eb-java
ghcr.io/openhands/agent-server:71f40eb-python

About Multi-Architecture Support

  • Each variant tag (e.g., 71f40eb-python) is a multi-arch manifest supporting both amd64 and arm64
  • Docker automatically pulls the correct architecture for your platform
  • Individual architecture tags (e.g., 71f40eb-python-amd64) are also available if needed

Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix that correctly addresses the race condition between run completion detection and WebSocket event delivery. The restoration of the post-run reconcile call is justified by the test failures and matches the established pattern.

The reconcile() call after run completion was removed in PR #1820, but
this caused a race condition where events emitted during the final
moments of the run could be lost if the WebSocket didn't deliver them
before run() returned.

This was observed in CI where test_events_not_lost_during_client_disconnection
failed because the client only received 3-4 events while the REST API had 6
events - the ActionEvent(finish) and ObservationEvent(finish) were missing.

Reproduction:
- Inject a 3s delay in the WebSocket callback for finish events
- Run the conversation with a finish tool call
- Observe that without the reconcile() call, the client is missing events

The fix restores the reconcile() call in _wait_for_run_completion() to
ensure all events are captured after run completion. This is safe because
reconcile() is idempotent and will only add events that are missing from
the client's cache.

Fixes the flaky test failure in PR #1829.

Co-authored-by: openhands <openhands@all-hands.dev>
@xingyaoww xingyaoww force-pushed the fix/restore-post-run-reconcile branch from c68f00b to ec4b0fb Compare January 26, 2026 16:47
This PR fixes the race condition where events emitted during the final
moments of a run could be lost if the WebSocket didn't deliver them
before run() returned.

## Root Cause

The race condition occurs when:
1. Server emits events (ActionEvent, ObservationEvent)
2. Client polls and sees 'finished' status
3. run() returns before WebSocket delivers those events

## Solution

Instead of using the expensive reconcile() which fetches ALL events,
we introduce reconcile_recent() which only fetches events after the
last known timestamp. This is much more efficient for long conversations.

The fix:
1. Added reconcile_recent() method to RemoteEventsList that uses the
   timestamp__gte filter to only fetch recent events
2. Call reconcile_recent() after run completion to catch any events
   that were missed by WebSocket

## Reproduction

Added test_event_loss_repro.py which reliably reproduces the race
condition by injecting a 3s delay in the WebSocket callback for
finish events. Without the fix, the test fails because the client
is missing ActionEvent(finish) and ObservationEvent(finish).

## Testing

- All cross tests pass
- The reproduction test passes with the fix

Fixes the flaky test failure in PR #1829.

Co-authored-by: openhands <openhands@all-hands.dev>
@github-actions
Copy link
Contributor

github-actions bot commented Jan 26, 2026

Coverage

Coverage Report •
FileStmtsMissCoverMissing
openhands-sdk/openhands/sdk/conversation
   state.py1752486%163, 197, 271, 302, 348–350, 366–367, 373, 379–382, 386, 392–395, 422, 440, 449, 464, 470
openhands-sdk/openhands/sdk/conversation/impl
   remote_conversation.py59020165%78–84, 91–94, 125, 132, 156, 159, 166–167, 172, 174–177, 187, 209–210, 215–218, 253, 267, 294, 304–306, 312, 353, 373–374, 407, 410–412, 426, 446, 454, 466, 474–477, 480, 485–488, 490, 495–496, 501–505, 510–514, 519–522, 525, 536–537, 541, 545, 548, 623, 629, 631, 634, 660–661, 665–666, 669, 677, 696–697, 702, 704–705, 716, 727–728, 748–751, 753–754, 767–768, 772, 778–779, 783, 788–789, 795–797, 800–804, 806–807, 811, 813–821, 823, 827, 842, 860, 903, 905, 908, 954, 985–987, 1012, 1028, 1030–1031, 1036, 1044–1048, 1055–1056, 1060, 1065–1069, 1073–1081, 1084–1085, 1094–1095, 1104, 1112, 1117–1119, 1121, 1124, 1126–1127, 1147, 1149, 1155–1156, 1171, 1178, 1184–1185, 1200, 1217, 1235, 1241–1242, 1246, 1249–1251, 1256–1257
TOTAL16885829450% 

…un()

This is a more fundamental fix for the event loss race condition. Instead
of just reconciling after polling detects terminal status, we now:

1. Add a threading.Event that gets signaled when WebSocket delivers a
   ConversationStateUpdateEvent with terminal execution_status (idle,
   finished, error, stuck)

2. In _wait_for_run_completion(), wait for either:
   - WebSocket delivers terminal status (preferred - all events guaranteed)
   - Poll interval expires (fallback - check status via REST)

3. If WebSocket delivers terminal status first, return immediately since
   all events are guaranteed to be delivered through the same channel

4. If polling detects terminal status first (the race condition), use
   reconcile_recent() to fetch any events that may have been missed

This approach:
- Fixes the root cause by waiting for WebSocket to deliver all events
- Uses reconcile_recent() as a fallback (more efficient than full reconcile)
- Maintains backward compatibility with the polling mechanism

Co-authored-by: openhands <openhands@all-hands.dev>
@enyst
Copy link
Collaborator

enyst commented Jan 27, 2026

@OpenHands Look at this github action failure: https://github.com/OpenHands/software-agent-sdk/actions/runs/21394941864/job/61590830061?pr=1840#step:7:5796

Analyze deeply why did test_events_not_lost_during_client_disconnection() fail there, not that the branch is not the same with this branch (that branch with the failure was openhands/bounded-init-state-scan).

Then check this PR's diff and tell me WDYT: does this PR fix the root cause of test_events_not_lost_during_client_disconnection() failures, as it intends to?

@openhands-ai
Copy link

openhands-ai bot commented Jan 27, 2026

I'm on it! enyst can track my progress at all-hands.dev

@openhands-ai
Copy link

openhands-ai bot commented Jan 27, 2026

I only answered your question (no code changes were made).

Answer (restated)

  • Why test_events_not_lost_during_client_disconnection() failed in that Actions run (branch openhands/bounded-init-state-scan): it’s a client-side race where conv.run() returns as soon as polling detects a terminal execution status, but the WebSocket thread hasn’t yet delivered/appended the final finish Action/Observation events into conv.state.events. The test then snapshots the WS-derived client event list too early, so it sees 0 finish ActionEvents while REST already has them persisted.

  • Does this PR (fix/restore-post-run-reconcile) fix the root cause? Yes. The PR changes completion semantics to prefer waiting for a terminal status update delivered via WebSocket (so tail events have time to arrive through the same channel), and when polling “wins” anyway it performs a post-run backfill via reconcile_recent(), which is exactly what prevents “REST has it, client missed it” failures.

Checklist vs your request

  • Analyzed the failure mode of test_events_not_lost_during_client_disconnection() (root cause: polling-vs-WS delivery race / missing tail-event backfill)
  • Reviewed this PR’s diff locally and assessed whether it addresses that root cause
  • Did not post PR comments (per instructions)
  • No changes made; git diff is clean

No push needed since the working tree is unchanged.

View full conversation

@enyst
Copy link
Collaborator

enyst commented Jan 28, 2026

@OpenHands Do a /codereview-roasted on this PR.

We want a solution for the recent events that doesn’t require rethinking the ws/REST sync yet.

@openhands-ai
Copy link

openhands-ai bot commented Jan 28, 2026

I'm on it! enyst can track my progress at all-hands.dev

@openhands-ai
Copy link

openhands-ai bot commented Jan 28, 2026

🔴 /codereview-roasted: Needs Improvement

Taste Rating: 🟡 Acceptable - Works but adds complexity to paper over a fundamental architectural issue.


[CRITICAL ISSUES] (Must fix)

[remote_conversation.py, Line 791] Data Structure / Special Case: Including IDLE as a terminal status is dangerous.

if event.key == "execution_status" and event.value in (
    ConversationExecutionStatus.IDLE.value,  # <-- SUSPICIOUS
    ConversationExecutionStatus.FINISHED.value,
    ...

IDLE is the initial state before a run starts. If the WebSocket delivers a stale IDLE status update (e.g., from connection replay or a previous run), _run_complete_event will be set prematurely and run() will return before the actual run completes. This is a new race condition introduced by the fix.

[remote_conversation.py, Line 359] Bug: timestamp__gte will re-fetch the last event we already have.

"timestamp__gte": timestamp_str,  # Should be timestamp__gt

This is wasteful and relies entirely on deduplication to not cause issues. Use timestamp__gt (greater than) instead of timestamp__gte (greater than or equal).

[remote_conversation.py, Lines 1017-1026] Complexity / False Assumption: The comment claims "all events are guaranteed to be delivered since they come through the same channel" - this is only true if the WebSocket connection stayed up the entire time. If WebSocket disconnected and reconnected during the run, events could be lost. The code doesn't verify WebSocket health before trusting this path.


[IMPROVEMENT OPPORTUNITIES] (Should fix)

[remote_conversation.py, Lines 372-376] Silent Failure: reconcile_recent() catches exceptions and just logs a warning, then breaks. This could leave the client in an inconsistent state with missing events and no indication to the caller.

except Exception as e:
    logger.warning(f"Failed to fetch events during recent reconciliation: {e}")
    break  # <-- Silent failure, caller has no idea

At minimum, return a sentinel or raise after logging.

[remote_conversation.py, Lines 786-796] Complexity: Defining a closure inside a 100+ line __init__ method makes this hard to test in isolation. Extract run_complete_callback as a method or separate class.

[test_event_loss_repro.py] Pragmatism: The test injects an artificial 3-second delay. This tests that reconcile_recent() works when WebSocket is slow, but it doesn't test the actual race condition (where polling naturally wins). A better test would use a mock server that delays the WebSocket message relative to the REST response.

[remote_conversation.py, Line 1001] Fragile Ordering: Clearing _run_complete_event at the start of _wait_for_run_completion() means if the WebSocket delivered the terminal event before we entered this method, we'd miss it. The fallback to polling saves us, but this is fragile and hard to reason about.


[STYLE NOTES] (Minor)

[remote_conversation.py, Lines 1024-1025, 1047-1048] Redundant: _run_complete_event.clear() is called in three places (line 1001, 1025, 1048). The one at line 1001 should be sufficient; the others are defensive but add noise.


VERDICT:

Worth merging with fixes - The core approach (prefer WebSocket, fallback to reconcile) is sound and solves the immediate problem without rethinking WS/REST sync. However, the IDLE terminal status issue is a potential regression that should be addressed before merge.

KEY INSIGHT:

The real fix would be server-side: don't report "finished" status until all events are persisted and ready for delivery. This client-side workaround adds complexity to compensate for a server-side ordering guarantee that doesn't exist. It's acceptable as a tactical fix, but the IDLE inclusion is a footgun waiting to fire.


Recommended minimal fixes before merge:

  1. Remove IDLE from terminal statuses (or add a guard that we're actually in a running state)
  2. Change timestamp__gte to timestamp__gt in reconcile_recent()
  3. Consider adding a return value or exception from reconcile_recent() on failure

View full conversation

@enyst
Copy link
Collaborator

enyst commented Jan 30, 2026

Comparison with PR #1858 (test-only fix)

I just cherry-picked the latest commit from PR #1840 into a new PR (#1858). That patch only hardens the test by:

  • waiting longer for server shutdown + explicitly waiting for port release
  • using shutil.rmtree(..., ignore_errors=True) to avoid teardown races
  • retrying the REST events query (10x with short delay) to handle eventual consistency

No SDK behavior changes there — it’s strictly a test stability change.

PR #1832 approach

This PR changes RemoteConversation behavior to avoid the event-loss race:

  • waits for a terminal status delivered via WebSocket before run() returns
  • falls back to polling if the WS signal is missed
  • performs reconcile_recent() when polling wins to fetch late events
  • adds a repro test with delayed WS callback

Take / preference

If the underlying issue is real event loss (run() returns before WS delivers final events), #1832 is the better fix because it addresses the root cause and aligns client behavior with the REST API’s authoritative state.

The #1858-style fix mainly treats CI flakiness / eventual consistency — it reduces false negatives but could mask a genuine race in production. If the delayed-WS repro is valid, I’d favor the behavioral fix in #1832 and keep the test robust as a secondary guard.

Minor nit: reconcile_recent() uses timestamp__gte, so duplicates are expected; you already dedupe by event.id, which is good. Just worth noting that timestamp resolution might be coarse, so the dedupe path is essential.

Remove the reconcile_recent method and simplify _wait_for_run_completion
to rely solely on the _run_complete_event mechanism for synchronization.

When WebSocket delivers the terminal status event, all events are
guaranteed to be delivered since they come through the same channel
in order. This makes reconcile_recent unnecessary.

The polling is now only used as a health check to detect error/stuck
states that need immediate attention. For normal completion, we
continue waiting for WebSocket to deliver the terminal status event.

Co-authored-by: openhands <openhands@all-hands.dev>
openhands-agent and others added 2 commits January 30, 2026 14:55
Add a class method to ConversationExecutionStatus to check if a status
represents a terminal state (IDLE, FINISHED, ERROR, STUCK). This helps
avoid repeating the same tuple check in multiple places.

Update remote_conversation.py to use the new is_terminal() method
instead of the inline tuple check.

Co-authored-by: openhands <openhands@all-hands.dev>
This gives the WebSocket more time to deliver the terminal status event
before falling back to polling, reducing unnecessary API calls while
still maintaining the ability to detect error/stuck states.

Co-authored-by: openhands <openhands@all-hands.dev>
…status

Co-authored-by: openhands <openhands@all-hands.dev>
@xingyaoww xingyaoww marked this pull request as ready for review January 30, 2026 16:08
Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall Assessment

The approach of waiting for WebSocket delivery of terminal status is sound and addresses the root cause of the race condition. However, there are some important concerns about behavior changes and missing functionality mentioned in the PR description.

Key Issues:

  1. 🟠 Polling interval increased from 1s to 5s (5x slower) - significant behavior change
  2. 🟠 reconcile_recent() mentioned in PR description but not implemented in this diff
  3. 🟡 Redundant event clearing logic
  4. 🟡 Complex control flow could use clarification

See inline comments for details.

Copy link
Collaborator Author

🔥 Roasted Code Review

Taste Rating: 🟡 Acceptable - Works but could be cleaner


The Good

This PR solves a real problem - the event loss race condition is a legitimate bug that causes flaky tests. The diagnosis is correct: when polling sees "finished" before WebSocket delivers events, run() returns too early.

The test reproduction (test_event_loss_repro.py) is excellent - injecting a 3s delay to reliably reproduce the race condition is the right way to validate the fix.


The Not-So-Good: Synchronization Complexity

5 pieces of state for one synchronization problem? Let me count:

_run_complete_event: threading.Event      # Signal
_run_counter: int                         # Counter
_terminal_status: str | None              # Payload
_terminal_status_run_counter: int         # Counter correlation
_terminal_status_lock: threading.Lock     # Protection

This is over-engineered. The classic "I need to pass data between threads" problem has simpler solutions.


Answer to Your Questions

1. Can we consolidate _terminal_status and _run_complete_event into ONE event?

Yes, absolutely. Use a queue.Queue:

from queue import Queue, Empty

class RemoteConversation:
    _terminal_status_queue: Queue[str]  # Thread-safe, blocks on get()
    
    def __init__(self, ...):
        self._terminal_status_queue = Queue()
    
    # In callback:
    def run_complete_callback(event: Event) -> None:
        if isinstance(event, ConversationStateUpdateEvent):
            if event.key == "execution_status":
                status = ConversationExecutionStatus(event.value)
                if status.is_terminal():
                    self._terminal_status_queue.put(event.value)
    
    # In _wait_for_run_completion:
    def _wait_for_run_completion(self, poll_interval, timeout):
        while True:
            try:
                ws_status = self._terminal_status_queue.get(timeout=poll_interval)
                self._handle_conversation_status(ws_status)
                return
            except Empty:
                # Fallback: poll REST API
                ...

This eliminates:

  • _run_complete_event (Queue.get() blocks)
  • _terminal_status_lock (Queue is thread-safe)
  • _terminal_status (it's in the queue)

2. Can we simplify the run_counter mechanism?

Yes. Instead of tracking run counters, just drain the queue at the start of run():

def run(self, ...):
    # Drain any stale terminal status from previous runs
    while not self._terminal_status_queue.empty():
        try:
            self._terminal_status_queue.get_nowait()
        except Empty:
            break
    
    # ... trigger run ...
    self._wait_for_run_completion(...)

This eliminates:

  • _run_counter
  • _terminal_status_run_counter

Simplified Implementation (5 variables → 1)

from queue import Queue, Empty

class RemoteConversation:
    _terminal_status_queue: Queue[str]
    
    def __init__(self, ...):
        self._terminal_status_queue = Queue()
    
    def _setup_ws_callback(self):
        def run_complete_callback(event: Event) -> None:
            if isinstance(event, ConversationStateUpdateEvent):
                if event.key == "execution_status":
                    try:
                        status = ConversationExecutionStatus(event.value)
                        if status.is_terminal():
                            self._terminal_status_queue.put(event.value)
                    except ValueError:
                        pass
        return run_complete_callback
    
    def run(self, ...):
        # Drain stale events
        while True:
            try:
                self._terminal_status_queue.get_nowait()
            except Empty:
                break
        
        # Trigger run
        self._trigger_run()
        self._wait_for_run_completion(poll_interval, timeout)
    
    def _wait_for_run_completion(self, poll_interval, timeout):
        start_time = time.monotonic()
        consecutive_terminal_polls = 0
        TERMINAL_POLL_THRESHOLD = 3
        
        while True:
            elapsed = time.monotonic() - start_time
            if elapsed > timeout:
                raise ConversationRunError(...)
            
            try:
                # Wait for WS terminal status (preferred path)
                ws_status = self._terminal_status_queue.get(timeout=poll_interval)
                self._handle_conversation_status(ws_status)
                logger.info("Run completed via WebSocket (status: %s)", ws_status)
                return
            except Empty:
                # Fallback: poll REST API
                try:
                    status = self._poll_status_once()
                except Exception as exc:
                    self._handle_poll_exception(exc)
                    consecutive_terminal_polls = 0
                    continue
                
                self._handle_conversation_status(status)
                
                if status and ConversationExecutionStatus(status).is_terminal():
                    consecutive_terminal_polls += 1
                    if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD:
                        logger.info("Run completed via REST fallback")
                        self._state.events.reconcile()
                        return
                else:
                    consecutive_terminal_polls = 0

Other Observations

  1. _handle_conversation_status return value is now ignored in the WS path (line 1001-1002). The method returns bool but you're not using it. Either use it or change the method signature.

  2. The fallback path is reasonable - 3 consecutive terminal polls before returning is a good balance. But document why 3 (you did, good).

  3. The is_terminal() method is clean - good addition to the enum.


Verdict: ✅ Worth merging with simplification

The core logic is sound and solves a real problem. But the synchronization mechanism is more complex than necessary. Consider simplifying with a Queue before merging, or merge as-is and simplify in a follow-up.

Key Insight: When you need to pass data between threads with blocking semantics, queue.Queue is almost always simpler than Event + shared variable + Lock + counter.

@xingyaoww xingyaoww marked this pull request as draft February 4, 2026 14:40
@xingyaoww
Copy link
Collaborator Author

@OpenHands please update this PR following the suggestions here: #1832 (comment)

@openhands-ai
Copy link

openhands-ai bot commented Feb 4, 2026

I'm on it! xingyaoww can track my progress at all-hands.dev

Replace 5 pieces of synchronization state with a single thread-safe Queue:
- _run_complete_event (threading.Event)
- _run_counter (int)
- _terminal_status (str | None)
- _terminal_status_run_counter (int)
- _terminal_status_lock (threading.Lock)

Now replaced with:
- _terminal_status_queue (Queue[str])

The Queue provides:
- Thread-safe blocking with timeout via Queue.get(timeout=poll_interval)
- No need for explicit locking (Queue is thread-safe)
- Simpler stale event handling by draining queue at start of run()

This simplification was suggested in code review:
#1832 (comment)

Co-authored-by: openhands <openhands@all-hands.dev>
@openhands-ai
Copy link

openhands-ai bot commented Feb 4, 2026

I've updated PR #1832 following the code review suggestions from the comment at #1832 (comment).

Summary of Changes

The code review suggested simplifying the synchronization mechanism by replacing 5 pieces of state with a single queue.Queue. I implemented this simplification:

Before (5 variables):

  • _run_complete_event: threading.Event
  • _run_counter: int
  • _terminal_status: str | None
  • _terminal_status_run_counter: int
  • _terminal_status_lock: threading.Lock

After (1 variable):

  • _terminal_status_queue: Queue[str]

Key Changes:

  1. Import: Added from queue import Empty, Queue
  2. Callback: Simplified run_complete_callback to just self._terminal_status_queue.put(event.value)
  3. run(): Drains stale events from queue at start instead of using run counters
  4. _wait_for_run_completion(): Uses Queue.get(timeout=poll_interval) instead of Event.wait() + lock + shared variable

Benefits:

  • Simpler: 5 variables → 1 variable
  • Thread-safe: Queue is inherently thread-safe, no explicit locking needed
  • Cleaner stale event handling: Just drain the queue instead of tracking run counters

Verification:

  • All 68 cross tests pass
  • The event loss reproduction test (test_event_loss_race_condition_with_ws_delay) passes

The changes have been pushed to the fix/restore-post-run-reconcile branch.

View full conversation

@xingyaoww
Copy link
Collaborator Author

@OpenHands Do a /codereview-roasted on this PR. Investigate deeply the WS implementation and understand the synchronization problems it has, and this PR is trying to solve with some approach. WDYT?

Answer directly as a comment to the PR. Note that it will be rendered as markdown.

@openhands-ai
Copy link

openhands-ai bot commented Feb 4, 2026

I'm on it! xingyaoww can track my progress at all-hands.dev

Copy link
Collaborator Author

🔥 /codereview-roasted: Deep Dive into the WS Synchronization Problem

Taste Rating: 🟡 Acceptable - A pragmatic tactical fix that papers over a fundamental architectural issue. Works, but adds complexity.


The Core Problem (Well Identified)

The race condition is a classic producer-consumer synchronization issue:

Timeline:
Server:  [emit ActionEvent] → [emit ObservationEvent] → [set status=finished]
                                                              ↓
REST:    ─────────────────────────────────────────────→ [poll sees "finished"]
                                                              ↓
WS:      ────────────[ActionEvent]────────[ObservationEvent]──┼──[status event]
                                                              ↓
Client:  ─────────────────────────────────────────────→ run() returns! 😱
                                                        (events still in flight)

The PR correctly identifies that REST polling can see terminal status before WS delivers all events.


[CRITICAL ISSUES] (Must fix or acknowledge)

[remote_conversation.py, Line 209] Data Flow / Silent Failure: ConnectionClosed breaks the WS loop entirely instead of retrying.

except websockets.exceptions.ConnectionClosed:
    break  # <-- WS client stops forever, no retry

If the server closes the connection gracefully (e.g., during shutdown or load balancing), the WS client dies silently. The fallback mechanism saves us, but there's no logging or visibility into this failure mode. Consider:

  • Logging a warning when WS disconnects
  • Attempting reconnection for graceful closes (not just errors)

[remote_conversation.py, Lines 881-887] Race Condition / Queue Draining: The queue drain at run() start has a subtle race.

# Drain any stale terminal status events from previous runs.
while True:
    try:
        self._terminal_status_queue.get_nowait()
    except Empty:
        break

What if a terminal status arrives between draining and the server actually starting the run? The 409 handling helps, but consider this sequence:

  1. Previous run finishes, terminal status queued
  2. run() starts, drains queue
  3. Server hasn't started new run yet
  4. WS delivers another terminal status from previous run (delayed)
  5. New run starts
  6. _wait_for_run_completion() sees the stale terminal status

This is unlikely but possible. A more robust approach would be to include a run ID or sequence number in the terminal status.


[IMPROVEMENT OPPORTUNITIES] (Should fix - violates good taste)

[remote_conversation.py, Line 945] Magic Number: TERMINAL_POLL_THRESHOLD = 3 is a magic number with a comment, but it's not configurable.

TERMINAL_POLL_THRESHOLD = 3

The comment explains the reasoning well, but this adds 3 seconds of latency in the fallback path. Consider making this configurable via a parameter or class attribute for users who need lower latency and are willing to accept the risk.

[remote_conversation.py, Lines 958-974] Complexity: The dual-path waiting logic is getting complex.

try:
    ws_status = self._terminal_status_queue.get(timeout=poll_interval)
    # Handle ERROR/STUCK states - raises ConversationRunError
    self._handle_conversation_status(ws_status)
    logger.info(...)
    return
except Empty:
    pass  # Queue.get() timed out, fall through to REST polling

This is acceptable, but the method is doing two things: waiting for WS and polling REST. Consider extracting the WS wait into a separate method for clarity.

[test_event_loss_repro.py] Test Validity: The test validates callback completion, not WS-before-REST ordering.

The test injects a delay in the callback:

def delayed_cb(event: Event) -> None:
    if isinstance(event, (ActionEvent, ObservationEvent)) and ...:
        time.sleep(ws_delay_s)  # Delay BEFORE calling orig_cb
    orig_cb(event)

Since run_complete_callback is part of the composed callbacks (added after user callbacks), the terminal status signal is also delayed. This means the test validates that we wait for callbacks to complete, but doesn't directly validate the WS-before-REST ordering guarantee.

A more direct test would mock the WS to deliver events with a delay AFTER the REST poll sees terminal status.


[ARCHITECTURAL OBSERVATIONS] (For future consideration)

The Fundamental Issue: The dual REST/WS synchronization model is inherently racy.

The PR description acknowledges this: "We want a solution for the recent events that doesn't require rethinking the ws/REST sync yet."

For a more robust long-term solution, consider:

  1. Server-side ordering guarantee: Ensure the server sends the terminal status event AFTER all other events
  2. Sequence numbers: Include event sequence numbers so the client can detect gaps
  3. Event ID tracking: Track the last event ID and fetch missing events by ID range
  4. Abandon WS for events: Use REST with long-polling for event delivery (simpler, more reliable)

WS Reconnection Gap: If WS disconnects and reconnects during a run, events during the gap are lost. The fallback reconcile helps, but only if we hit the 3-poll threshold. This is a known limitation.


VERDICT:

Worth merging - The core logic is sound and significantly improves reliability. The fix correctly waits for WS to deliver terminal status before returning, ensuring events are delivered through the same channel.

Key Insight: The PR's approach of "wait for WS terminal status, fall back to REST + reconcile" is a pragmatic solution that works for the common case (WS delivers events on time) while providing a safety net for edge cases (WS delayed/disconnected).

Remaining Risks:

  1. WS reconnection during run can still lose events (mitigated by fallback reconcile)
  2. 3-poll fallback adds latency when WS fails
  3. Silent WS disconnection has no visibility

Recommendation: Merge with the understanding that this is a tactical fix. Consider adding WS health logging and making TERMINAL_POLL_THRESHOLD configurable in a follow-up.

@openhands-ai
Copy link

openhands-ai bot commented Feb 4, 2026

I've completed the /codereview-roasted analysis of PR #1832 and posted a detailed code review comment directly on the PR.

Summary

The PR addresses a race condition in the WebSocket/REST synchronization where events can be lost when run() returns before WebSocket delivers all events.

My Review Verdict: 🟡 Acceptable - Worth Merging

Key Findings:

The Core Problem (Well Identified):

  • REST polling can see terminal status before WebSocket delivers all events
  • This is a classic producer-consumer synchronization issue

Critical Issues Identified:

  1. ConnectionClosed breaks the WS loop entirely instead of retrying (silent failure)
  2. Queue draining at run() start has a subtle race condition with stale terminal status events

Improvement Opportunities:

  1. TERMINAL_POLL_THRESHOLD = 3 is a magic number that adds 3s latency in fallback path - should be configurable
  2. The dual-path waiting logic is getting complex - could be refactored
  3. The test validates callback completion, not WS-before-REST ordering directly

Architectural Observations:

  • The dual REST/WS synchronization model is inherently racy
  • Long-term solutions could include server-side ordering guarantees, sequence numbers, or event ID tracking

Recommendation: Merge with the understanding that this is a tactical fix. The approach of "wait for WS terminal status, fall back to REST + reconcile" is pragmatic and significantly improves reliability.

The review comment has been posted at: #1832 (comment)

View full conversation

@xingyaoww xingyaoww marked this pull request as ready for review February 4, 2026 15:14
Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Great work fixing this race condition! The approach of waiting for the WebSocket terminal status event is sound and addresses the root cause. The fallback mechanism provides good resilience.

I found a few areas that could use clarification or minor improvements, particularly around the sequential processing assumption that this fix relies on.

Key Findings

🟠 Important Issues

  1. Sequential Processing Assumption (line 723 in remote_conversation.py)

    • This fix relies on the critical assumption that WebSocket events are processed sequentially
    • If ConversationStateUpdateEvent(finished) is processed before ActionEvent(finish) is fully added to state, the race condition persists
    • Recommendation: Add a comment documenting this assumption and consider adding test validation
  2. Timeout Edge Case (line 961 in remote_conversation.py)

    • If elapsed time is close to timeout, we still wait for full poll_interval, potentially exceeding timeout
    • Suggestion: Use wait_time = min(poll_interval, timeout - elapsed) for precise timeout handling

🟡 Suggestions

  1. TERMINAL_POLL_THRESHOLD Value (line 926)

    • Current value of 3 creates 3-second delay in fallback path
    • Well-reasoned in comments, but consider if 2 would be sufficient for faster fallback
  2. Queue Draining Race Condition (line 867)

    • Theoretical race if WebSocket fires between draining and run start (very unlikely)
    • Suggestion: Add logging for drained events for better debugging
  3. Test Performance (test_event_loss_repro.py line 176)

    • 3-second delay makes test slow
    • Recommendation: Add @pytest.mark.slow decorator for optional execution in CI
  4. Minor Inefficiency (line 720)

    • Status converted from string to enum twice
    • Not a bug, just a small optimization opportunity

✅ Good Practices

  • Excellent documentation on is_terminal() method explaining why IDLE is excluded
  • Smart optimization to only reconcile in fallback path
  • Comprehensive reproduction test validates the fix

Overall Assessment

The fix is sound and addresses the root cause effectively. The fallback mechanism provides resilience. Main recommendation is to document the sequential processing assumption more clearly, as it's critical for correctness.

status = ConversationExecutionStatus(event.value)
if status.is_terminal():
self._terminal_status_queue.put(event.value)
except ValueError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: This fix relies on a critical assumption that WebSocket events are processed sequentially in the callback.

Why this matters:

  • If ConversationStateUpdateEvent(finished) is processed and put in the queue BEFORE ActionEvent(finish) is fully processed and added to state, the race condition still exists
  • The fix works because callbacks execute sequentially, ensuring all prior events are processed when the terminal status arrives

Suggestion:
Add a comment documenting this critical assumption:

# CRITICAL: This callback must execute sequentially with other event callbacks
# to ensure all events preceding the terminal status have been fully processed
# and added to state when we signal run completion.
def run_complete_callback(event: Event) -> None:

Consider adding a test that validates sequential execution order.

# Wait for either:
# 1. WebSocket delivers terminal status event (preferred)
# 2. Poll interval expires (fallback - check status via REST)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.

Example:

  • timeout = 1800s, poll_interval = 1s
  • At iteration where elapsed = 1799.5s, we pass the timeout check
  • Then we wait up to 1s for the queue, potentially reaching 1800.5s total

Suggested fix:

remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
    ws_status = self._terminal_status_queue.get(timeout=wait_time)

This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.

# Wait for either:
# 1. WebSocket delivers terminal status event (preferred)
# 2. Poll interval expires (fallback - check status via REST)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Minor timeout edge case - if elapsed time is close to timeout, we still wait for the full poll_interval, which could exceed the intended timeout.

Example:

  • timeout = 1800s, poll_interval = 1s
  • At iteration where elapsed = 1799.5s, we pass the timeout check
  • Then we wait up to 1s for the queue, potentially reaching 1800.5s total

Suggested fix:

remaining = timeout - elapsed
wait_time = min(poll_interval, remaining)
try:
    ws_status = self._terminal_status_queue.get(timeout=wait_time)

This ensures we respect the timeout more precisely, though the current behavior is probably acceptable for most use cases.


def delayed_cb(event: Event) -> None:
if (
isinstance(event, (ActionEvent, ObservationEvent))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: The 3-second delay effectively demonstrates the race condition, but it makes this test slow to execute.

Options to consider:

  1. Add slow test marker: @pytest.mark.slow - allows skipping in fast CI runs
  2. Reduce delay: Try 1-2 seconds if it still reliably reproduces the issue
  3. Keep as-is: Since this is a critical bug reproduction test

Recommended approach:

@pytest.mark.slow  # Skip in fast test runs with: pytest -m "not slow"
def test_event_loss_race_condition_with_ws_delay(
    server_env_for_repro, monkeypatch: pytest.MonkeyPatch
):

This allows the test to be run in thorough validation while keeping fast feedback loops for development.

@xingyaoww
Copy link
Collaborator Author

@OpenHands please merge from main, resolve the merge conflict.

And then try run tests/sdk/conversation/remote/test_remote_conversation.py 1000 times and see if we run into concurrency issue / event loss? Then try switch back to main and run it 1000 times. Record the avg time taken to finish the test and the avg success rate and report back

@openhands-ai
Copy link

openhands-ai bot commented Feb 5, 2026

I'm on it! xingyaoww can track my progress at all-hands.dev

ws_status,
elapsed,
)
return
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OpenHands tell me, is it possible that the terminal status is streamed back FIRST, before the other events are streaming back via websocket?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on it! xingyaoww can track my progress at all-hands.dev

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answer to Your Question

No, it is NOT possible for the terminal status to be streamed back FIRST before other events via WebSocket.

Here's why:

Event Ordering in the Server

Looking at the code flow in agent.py (lines 697-707), when the agent calls the FinishTool:

  1. ActionEvent is emitted first (the finish tool call)
  2. ObservationEvent is emitted via on_event(obs_event) (line 703)
  3. Then state.execution_status = ConversationExecutionStatus.FINISHED is set (line 707)
  4. Setting execution_status triggers the _on_state_change callback, which emits a ConversationStateUpdateEvent

So the terminal status event is emitted after the ActionEvent and ObservationEvent.

WebSocket Delivery Order

All events go through the same pipeline:

  • _on_eventAsyncCallbackWrapper_pub_sub → WebSocket

The AsyncCallbackWrapper uses asyncio.run_coroutine_threadsafe() which schedules events in FIFO order to the event loop. Events are delivered to WebSocket subscribers in the order they were emitted.

The Actual Race Condition (What the PR Fixes)

The race condition mentioned in the PR is not about WebSocket events arriving out of order. It's about:

  1. Server emits events (ActionEvent, ObservationEvent, ConversationStateUpdateEvent)
  2. Client polls REST API and sees "finished" status (REST API reads directly from state)
  3. run() returns before WebSocket delivers those events

The REST API can return the terminal status faster than the WebSocket can deliver all the events, because:

  • REST API reads the state directly from memory
  • WebSocket events go through async queuing and network delivery

The fix in this PR waits for the WebSocket to deliver the terminal status event before returning from run(), which ensures all events have been delivered (since they come through the same WebSocket channel in order).

View full conversation

@openhands-ai
Copy link

openhands-ai bot commented Feb 5, 2026

Looks like there are a few issues preventing this PR from being merged!

  • GitHub Actions are failing:
    • Run tests

If you'd like me to help, just leave a comment, like

@OpenHands please fix the failing actions on PR #1832 at branch `fix/restore-post-run-reconcile`

Feel free to include any additional details that might help me get this PR into a better state.

You can manage your notification settings

Copy link
Collaborator

@enyst enyst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you for wrestling with this! ❤️

(and SGTM, agent's explanations may be a decent indicator! 😅)

@xingyaoww xingyaoww merged commit 8fd9421 into main Feb 5, 2026
24 of 25 checks passed
@xingyaoww xingyaoww deleted the fix/restore-post-run-reconcile branch February 5, 2026 13:53
xingyaoww pushed a commit that referenced this pull request Feb 5, 2026
All test jobs (sdk-tests, tools-tests, agent-server-tests, cross-tests,
and coverage-report) now run on blacksmith-2vcpu-ubuntu-2404 runners.

Previously, agent-server-tests and cross-tests ran on ubuntu-24.04 which
has a different checkout path (/home/runner/work/...) compared to
blacksmith runners (/home/runner/_work/...). This caused the coverage
combine step to fail because the source paths didn't match.

Fixes #1832 CI failure.

Co-authored-by: openhands <openhands@all-hands.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants