Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,9 @@ def _on_timeout() -> None:
self._response_created_futures.pop(event_id, None)
if fut and not fut.done():
fut.set_exception(llm.RealtimeError("generate_reply timed out."))
# The response.create was already sent; ask the server to cancel it so
# that any audio it produces does not arrive and play back unexpectedly.
self.send_event(ResponseCancelEvent(type="response.cancel"))
Comment on lines +1587 to +1589

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 The cancel sent by _on_timeout may cancel an unrelated active response

At livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:1589, the newly added ResponseCancelEvent is sent without specifying a response_id. If the generate_reply's response hasn't actually started on the server (no response.created received yet), but another response IS active (e.g., a VAD-triggered server-initiated response), the cancel may target that unrelated response instead. The OpenAI Realtime API's response.cancel cancels the currently in-progress response, which may not be the one that timed out. This is the same pattern used elsewhere in the file (e.g., line 1610 in interrupt()), so it's consistent with existing behavior, but worth noting as a potential race condition in multi-response scenarios.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


handle = asyncio.get_event_loop().call_later(10.0, _on_timeout)

Expand Down Expand Up @@ -1722,6 +1725,7 @@ def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
response_id=event.response.id,
)

timed_out = False
if (
isinstance(event.response.metadata, dict)
and (client_event_id := event.response.metadata.get("client_event_id"))
Expand All @@ -1731,9 +1735,22 @@ def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
generation_ev.user_initiated = True
fut.set_result(generation_ev)
else:
logger.warning("response of generate_reply received after it's timed out.")
# The generate_reply caller already received a timeout error. The
# server kept running and delivered response.created anyway. Cancel it
# so no audio frames are queued for playback. We keep
# _current_generation alive so that the subsequent response.output_item.*
# and response.done events (which may arrive before the server honours
# the cancel) do not trip their assertions; response.done will close the
# generation normally.
logger.warning(
"response of generate_reply received after it's timed out; "
"cancelling to prevent unexpected playback."
)
self.send_event(ResponseCancelEvent(type="response.cancel"))
timed_out = True

self.emit("generation_created", generation_ev)
if not timed_out:
self.emit("generation_created", generation_ev)
Comment on lines 1735 to +1753

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Timeout detection in response handler is unreachable, so unexpected audio playback still occurs after timeout

The late-arriving response is never detected as timed-out (self._response_created_futures.pop(client_event_id, None) at livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:1732) because the timeout callback already removed the future from the dictionary (realtime_model.py:1584), so the generation event is still emitted to listeners and triggers unintended speech playback.

Impact: After a generate_reply timeout, audio from the late server response can still play back to the user — the exact scenario the PR intends to prevent.

Detailed mechanism: _on_timeout pops the future before _handle_response_created can find it

In _on_timeout (line 1583-1589):

  1. Line 1584: self._response_created_futures.pop(event_id, None) removes the future from the dict
  2. Line 1586: fut.set_exception(...) marks the future as done
  3. Line 1589: sends ResponseCancelEvent

Later, when the server delivers response.created (because the cancel wasn't processed yet), _handle_response_created runs:

  • Line 1732: self._response_created_futures.pop(client_event_id, None) returns None (already removed)
  • The walrus assignment makes the condition falsy, so the entire if block (lines 1729-1750) is skipped
  • timed_out stays False
  • Line 1753: self.emit("generation_created", generation_ev) fires with user_initiated=False

The _on_generation_created handler at livekit-agents/livekit/agents/voice/agent_activity.py:1744 then processes this event (since user_initiated is False, it doesn't return early at line 1745), creates a SpeechHandle, and starts a _realtime_generation_task — causing unexpected audio playback.

To fix this, _on_timeout should NOT pop the future from the dict, so that _handle_response_created can later detect it as done. Alternatively, keep a separate set of timed-out event_ids that _handle_response_created can check.

(Refers to lines 1728-1753)

Prompt for agents
The bug is that _on_timeout (line 1584) pops the future from self._response_created_futures BEFORE _handle_response_created can check it. This makes the else branch at line 1737 unreachable for timeouts.

The fix should ensure that when response.created arrives for a timed-out generate_reply, the code can detect it. Two possible approaches:

1. In _on_timeout, do NOT pop from self._response_created_futures. Instead, leave the (now-done) future in the dict so _handle_response_created can find it and check fut.done(). Then rely on _handle_response_created to pop it. You'd also need to ensure _on_fut_done doesn't pop it prematurely (since set_exception triggers the done callback).

2. Maintain a separate set (e.g., self._timed_out_event_ids) that _on_timeout adds to, and _handle_response_created checks. This avoids changing the existing pop semantics.

Either way, the goal is: when _handle_response_created runs and finds the client_event_id corresponds to a timed-out request, it should set timed_out=True, send the cancel, and suppress the generation_created emission.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


def _handle_response_output_item_added(self, event: ResponseOutputItemAddedEvent) -> None:
assert self._current_generation is not None, "current_generation is None"
Expand Down
Loading