Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 123 additions & 3 deletions livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import os
import time
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, Awaitable
Expand All @@ -18,6 +19,11 @@
from .chat_context import ChatContext, ChatItem, FunctionCall
from .tool_context import Tool, ToolChoice, ToolContext

# Default retry configuration for generate_reply
DEFAULT_MAX_RETRIES = int(os.environ.get("LIVEKIT_REALTIME_MAX_RETRIES", "3"))
DEFAULT_RETRY_BASE_DELAY = float(os.environ.get("LIVEKIT_REALTIME_RETRY_BASE_DELAY", "1.0"))
DEFAULT_RETRY_MAX_DELAY = float(os.environ.get("LIVEKIT_REALTIME_RETRY_MAX_DELAY", "10.0"))


@dataclass
class InputSpeechStartedEvent:
Expand Down Expand Up @@ -87,8 +93,9 @@ class RealtimeCapabilities:


class RealtimeError(Exception):
def __init__(self, message: str) -> None:
def __init__(self, message: str, *, recoverable: bool = True) -> None:
super().__init__(message)
self.recoverable = recoverable


class RealtimeModel:
Expand Down Expand Up @@ -223,14 +230,127 @@ def push_audio(self, frame: rtc.AudioFrame) -> None: ...
@abstractmethod
def push_video(self, frame: rtc.VideoFrame) -> None: ...

@abstractmethod
def generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
tools: NotGivenOr[list[Tool]] = NOT_GIVEN,
) -> asyncio.Future[GenerationCreatedEvent]: ... # can raise RealtimeError on Timeout
) -> asyncio.Future[GenerationCreatedEvent]:
fut: asyncio.Future[GenerationCreatedEvent] = asyncio.Future()
impl_fut = self._do_generate_reply(
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
)

def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
Comment on lines +247 to +266

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Cancelled implementation future leaves the caller's reply future permanently unresolved

The wrapper future is never resolved when the underlying provider future is cancelled (f.result() at livekit-agents/livekit/agents/llm/realtime.py:251), because CancelledError is a BaseException in Python 3.9+ and is not caught by except Exception.

Impact: Callers awaiting a reply generation can hang indefinitely when a provider cancels a superseded request.

CancelledError escapes the done-callback, orphaning the wrapper future

In _on_impl_done (lines 247-266) and _on_retry_done (lines 313-339), f.result() is called inside a try/except that only catches RealtimeError and Exception. In Python 3.10+ (the project's minimum), CancelledError inherits from BaseException, not Exception, so it propagates uncaught out of the callback.

When a provider cancels a pending impl future — for example, Google's _do_generate_reply at livekit-plugins/livekit-plugins-google/livekit/plugins/google/realtime/realtime_api.py:751 cancels a superseded _pending_generation_fut via old_fut.cancel(...) — the _on_impl_done callback fires. f.result() raises CancelledError, which escapes the callback (asyncio logs it but doesn't propagate). The outer wrapper fut created at line 240 is never resolved (no set_result or set_exception is called), so any caller doing await generate_reply() blocks forever.

The same pattern appears in _on_retry_done at lines 313-339.

Suggested change
def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except asyncio.CancelledError:
fut.cancel()
except Exception as e:
fut.set_exception(e)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


impl_fut.add_done_callback(_on_impl_done)
return fut

async def _retry_generate_reply(
self,
*,
fut: asyncio.Future[GenerationCreatedEvent],
instructions: NotGivenOr[str],
tool_choice: NotGivenOr[ToolChoice],
tools: NotGivenOr[list[Tool]],
attempt: int,
) -> None:
max_retries = DEFAULT_MAX_RETRIES
base_delay = DEFAULT_RETRY_BASE_DELAY
max_delay = DEFAULT_RETRY_MAX_DELAY

if attempt > max_retries:
if not fut.done():
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
return

delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
logger.warning(
"generate_reply failed (recoverable), retrying in %.1fs (attempt %d/%d)",
delay,
attempt,
max_retries,
)
await asyncio.sleep(delay)

if fut.done():
return

try:
impl_fut = self._do_generate_reply(
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
)
Comment on lines +306 to +311

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Retry logic does not cancel in-flight provider requests before retrying

When a _do_generate_reply call times out (e.g., after 5-10s depending on provider), the base class retry mechanism at livekit-agents/livekit/agents/llm/realtime.py:306-311 calls _do_generate_reply again. However, the previous provider request (e.g., OpenAI's response.create or Google's LiveClientContent with turn_complete=True) was already sent to the server and is not cancelled.

For OpenAI, _on_fut_done at livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:1590-1595 only sends response.cancel when the future is cancelled (via .cancel()), not when it has an exception set. Since the timeout handler sets an exception (not a cancellation), no cancel event is sent to the server. The retry then sends a second response.create.

In most timeout cases the server was genuinely unresponsive, so this is harmless. But if the server was just slow, both requests could produce responses. Whether this causes user-visible duplicate output depends on server-side behavior (OpenAI may supersede the first response when it receives the second response.create).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
Comment on lines +313 to +339

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Same missing CancelledError handling in the retry done callback leaves future unresolved

The retry-path callback also fails to catch CancelledError from a cancelled provider future (f.result() at livekit-agents/livekit/agents/llm/realtime.py:317), leaving the wrapper future permanently unresolved.

Impact: A cancelled retry attempt can leave the caller hanging forever, same root cause as in the initial attempt callback.

Same pattern as _on_impl_done repeated in _on_retry_done

The _on_retry_done closure at lines 313-339 catches RealtimeError and Exception, but not asyncio.CancelledError (a BaseException in Python 3.10+). If the implementation future returned by _do_generate_reply during a retry is cancelled by the provider, f.result() at line 317 raises CancelledError, which escapes the callback. The shared wrapper fut (passed via the fut parameter of _retry_generate_reply) is never resolved.

Suggested change
def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except asyncio.CancelledError:
fut.cancel()
except Exception as e:
fut.set_exception(e)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


impl_fut.add_done_callback(_on_retry_done)
except Exception as e:
if not fut.done():
fut.set_exception(e)

@abstractmethod
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
tools: NotGivenOr[list[Tool]] = NOT_GIVEN,
) -> asyncio.Future[GenerationCreatedEvent]: ...

# commit the input audio buffer to the server
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ def push_audio(self, frame: rtc.AudioFrame) -> None:
else:
logger.warning("audio input channel closed, skipping audio")

def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand Down Expand Up @@ -2132,7 +2132,10 @@ async def _send_text() -> None:
def _on_timeout() -> None:
if not fut.done():
fut.set_exception(
llm.RealtimeError("generate_reply timed out waiting for generation")
llm.RealtimeError(
"generate_reply timed out waiting for generation",
recoverable=True,
)
)
if self._pending_generation_fut is fut:
self._pending_generation_fut = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ def _send_client_event(self, event: ClientEvents) -> None:
with contextlib.suppress(utils.aio.channel.ChanClosed):
self._msg_ch.send_nowait(event)

def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand All @@ -737,7 +737,10 @@ def generate_reply(
)
fut = asyncio.Future[llm.GenerationCreatedEvent]()
fut.set_exception(
llm.RealtimeError(f"generate_reply is not compatible with '{self._opts.model}'")
llm.RealtimeError(
f"generate_reply is not compatible with '{self._opts.model}'",
recoverable=False,
)
)
return fut
if self._pending_generation_fut and not self._pending_generation_fut.done():
Expand Down Expand Up @@ -773,7 +776,8 @@ def _on_timeout() -> None:
if not fut.done():
fut.set_exception(
llm.RealtimeError(
"generate_reply timed out waiting for generation_created event."
"generate_reply timed out waiting for generation_created event.",
recoverable=True,
)
)
if self._pending_generation_fut is fut:
Comment on lines 776 to 783

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Google and Ultravox plugins manage their own pending-generation state which may conflict with retry

Google's _do_generate_reply at livekit-plugins/livekit-plugins-google/livekit/plugins/google/realtime/realtime_api.py:743-754 cancels an existing _pending_generation_fut and replaces it. The base class retry mechanism may call _do_generate_reply multiple times, each creating a new _pending_generation_fut. If a server response arrives and resolves _pending_generation_fut, both the plugin's internal future and the base class's wrapper future will be resolved correctly via the callback chain. However, the plugin's _on_fut_done callback (line 784-795) has side effects like calling self.interrupt() on external cancellation. During retry, the old impl_fut's exception (not cancellation) triggers its _on_fut_done which doesn't send interrupt. This appears to work correctly in the retry case, but the interaction between two layers of future management (plugin + base class) adds complexity worth monitoring.

(Refers to lines 743-798)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def push_video(self, frame: rtc.VideoFrame) -> None:

# -- Public API: generation control --

def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ def clear_audio(self) -> None:
self.send_event(InputAudioBufferClearEvent(type="input_audio_buffer.clear"))
self._pushed_duration_s = 0

def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand Down Expand Up @@ -1583,7 +1583,7 @@ def generate_reply(
def _on_timeout() -> None:
self._response_created_futures.pop(event_id, None)
if fut and not fut.done():
fut.set_exception(llm.RealtimeError("generate_reply timed out."))
fut.set_exception(llm.RealtimeError("generate_reply timed out.", recoverable=True))

handle = asyncio.get_event_loop().call_later(10.0, _on_timeout)

Expand Down Expand Up @@ -1994,11 +1994,16 @@ def _handle_response_done(self, event: ResponseDoneEvent) -> None:
if event.response.status in ("failed", "incomplete"):
details = event.response.status_details
msg = f"response {event.response.status}"
recoverable = True
if details and details.error:
msg = f"{msg}: [{details.error.type}] {details.error.code}"
if details.error.code == "rate_limit_exceeded":
recoverable = False
Comment on lines +2000 to +2001

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Rate-limit errors are marked non-recoverable, preventing future retry logic from handling them

Rate-limit responses are flagged as non-recoverable (recoverable = False at livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:2001), which contradicts the comment at line 2084-2085 that says all failures are assumed recoverable.

Impact: Rate-limit errors — the most naturally transient kind — are marked as permanent failures, preventing any downstream recovery logic from retrying them.

Inconsistency between _handle_response_done and _handle_response_done_but_not_complete

In _handle_response_done at lines 1997-2001, rate_limit_exceeded errors set recoverable = False on the RealtimeError passed to _done_fut.set_exception(). However, the same event is then processed by _handle_response_done_but_not_complete (called at line 2056) which emits the error with recoverable=True at line 2086, and includes a comment explicitly stating: "all possible failures undocumented by openai, so we assume optimistically all retryable/recoverable".

Rate-limit errors are inherently transient and are the canonical example of a recoverable error. Setting recoverable=False means any code that inspects this flag on _done_fut exceptions would refuse to retry the most retry-worthy class of errors. While the current retry logic in generate_reply operates on the creation future (not _done_fut), this sets incorrect semantics for current or future consumers of generation lifecycle errors.

Suggested change
if details.error.code == "rate_limit_exceeded":
recoverable = False
if details.error.code == "rate_limit_exceeded":
recoverable = True
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

elif details and details.reason:
msg = f"{msg}: {details.reason}"
self._current_generation._done_fut.set_exception(llm.RealtimeError(msg))
self._current_generation._done_fut.set_exception(
llm.RealtimeError(msg, recoverable=recoverable)
)
else:
self._current_generation._done_fut.set_result(None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ async def _send_say(
)
)

def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand Down Expand Up @@ -639,7 +639,7 @@ def generate_reply(

def _on_timeout() -> None:
if not fut.done():
fut.set_exception(llm.RealtimeError("generate_reply timed out."))
fut.set_exception(llm.RealtimeError("generate_reply timed out.", recoverable=True))

handle = asyncio.get_event_loop().call_later(10.0, _on_timeout)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def _send_audio_bytes(self, audio_data: bytes) -> None:
self._msg_ch.send_nowait(audio_data)

@utils.log_exceptions(logger=logger)
def generate_reply(
def _do_generate_reply(
self,
*,
instructions: NotGivenOr[str] = NOT_GIVEN,
Expand Down Expand Up @@ -518,7 +518,8 @@ def _on_timeout() -> None:
if not fut.done():
fut.set_exception(
llm.RealtimeError(
"generate_reply timed out waiting for generation_created event."
"generate_reply timed out waiting for generation_created event.",
recoverable=True,
)
)
if self._pending_generation_fut is fut:
Expand Down
Loading
Loading