Skip to content

feat: add response-level retry for RealtimeModel.generate_reply (#6205)#6242

Open
C1-BA-B1-F3 wants to merge 2 commits into
livekit:mainfrom
C1-BA-B1-F3:fix/realtime-generate-reply-retry
Open

feat: add response-level retry for RealtimeModel.generate_reply (#6205)#6242
C1-BA-B1-F3 wants to merge 2 commits into
livekit:mainfrom
C1-BA-B1-F3:fix/realtime-generate-reply-retry

Conversation

@C1-BA-B1-F3

Copy link
Copy Markdown

Fixes #6205

Problem

RealtimeModel.generate_reply() had no retry mechanism for recoverable errors (timeout, transient API errors). Errors propagated directly to the caller.

Solution

  • Added recoverable flag to RealtimeError
  • Converted generate_reply from abstract to concrete with retry logic + exponential backoff
  • Added _do_generate_reply as the new abstract method for plugins
  • Updated all 6 plugin implementations
  • Retry configuration via environment variables

Error Classification

Error Type Recoverable Retry?
Timeout βœ… Yes
Transient API error βœ… Yes
Rate limit ❌ No
Auth failure ❌ No

Tests

8 passing tests covering retry on recoverable errors, no retry on non-recoverable, retry exhaustion, and env var configuration.

Fixes livekit#6205

- Add recoverable flag to RealtimeError (defaults to True)
- Implement retry mechanism in base RealtimeSession.generate_reply
- Rename plugin generate_reply to _do_generate_reply (abstract)
- Set recoverable=False for non-retryable errors (auth, invalid input)
- Set recoverable=True for timeout and transient errors
- Make retry configurable via env vars:
  - LIVEKIT_REALTIME_MAX_RETRIES (default: 3)
  - LIVEKIT_REALTIME_RETRY_BASE_DELAY (default: 1.0s)
  - LIVEKIT_REALTIME_RETRY_MAX_DELAY (default: 10.0s)
- Use exponential backoff with jitter for retries
- Add comprehensive tests for retry behavior

Updated plugins:
- openai
- google
- phonic
- nvidia
- ultravox
- aws
@C1-BA-B1-F3 C1-BA-B1-F3 requested a review from a team as a code owner June 26, 2026 12:32
- Remove unused imports (AsyncIterable, Literal, AsyncMock, DEFAULT_*)
- Fix import sorting (I001)
- Fix formatting with ruff format

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 5 potential issues.

Open in Devin Review

Comment on lines +247 to +266
def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ”΄ Cancelled implementation future leaves the caller's reply future permanently unresolved

The wrapper future is never resolved when the underlying provider future is cancelled (f.result() at livekit-agents/livekit/agents/llm/realtime.py:251), because CancelledError is a BaseException in Python 3.9+ and is not caught by except Exception.

Impact: Callers awaiting a reply generation can hang indefinitely when a provider cancels a superseded request.

CancelledError escapes the done-callback, orphaning the wrapper future

In _on_impl_done (lines 247-266) and _on_retry_done (lines 313-339), f.result() is called inside a try/except that only catches RealtimeError and Exception. In Python 3.10+ (the project's minimum), CancelledError inherits from BaseException, not Exception, so it propagates uncaught out of the callback.

When a provider cancels a pending impl future β€” for example, Google's _do_generate_reply at livekit-plugins/livekit-plugins-google/livekit/plugins/google/realtime/realtime_api.py:751 cancels a superseded _pending_generation_fut via old_fut.cancel(...) β€” the _on_impl_done callback fires. f.result() raises CancelledError, which escapes the callback (asyncio logs it but doesn't propagate). The outer wrapper fut created at line 240 is never resolved (no set_result or set_exception is called), so any caller doing await generate_reply() blocks forever.

The same pattern appears in _on_retry_done at lines 313-339.

Suggested change
def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
def _on_impl_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=1,
)
)
else:
fut.set_exception(e)
except asyncio.CancelledError:
fut.cancel()
except Exception as e:
fut.set_exception(e)
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Comment on lines +313 to +339
def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ”΄ Same missing CancelledError handling in the retry done callback leaves future unresolved

The retry-path callback also fails to catch CancelledError from a cancelled provider future (f.result() at livekit-agents/livekit/agents/llm/realtime.py:317), leaving the wrapper future permanently unresolved.

Impact: A cancelled retry attempt can leave the caller hanging forever, same root cause as in the initial attempt callback.

Same pattern as _on_impl_done repeated in _on_retry_done

The _on_retry_done closure at lines 313-339 catches RealtimeError and Exception, but not asyncio.CancelledError (a BaseException in Python 3.10+). If the implementation future returned by _do_generate_reply during a retry is cancelled by the provider, f.result() at line 317 raises CancelledError, which escapes the callback. The shared wrapper fut (passed via the fut parameter of _retry_generate_reply) is never resolved.

Suggested change
def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except Exception as e:
fut.set_exception(e)
def _on_retry_done(f: asyncio.Future[GenerationCreatedEvent]) -> None:
if fut.done():
return
try:
fut.set_result(f.result())
except RealtimeError as e:
if e.recoverable and attempt < max_retries:
asyncio.ensure_future(
self._retry_generate_reply(
fut=fut,
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
attempt=attempt + 1,
)
)
elif e.recoverable:
fut.set_exception(
RealtimeError(
f"generate_reply failed after {max_retries} retries",
recoverable=False,
)
)
else:
fut.set_exception(e)
except asyncio.CancelledError:
fut.cancel()
except Exception as e:
fut.set_exception(e)
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Comment on lines +2000 to +2001
if details.error.code == "rate_limit_exceeded":
recoverable = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟑 Rate-limit errors are marked non-recoverable, preventing future retry logic from handling them

Rate-limit responses are flagged as non-recoverable (recoverable = False at livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:2001), which contradicts the comment at line 2084-2085 that says all failures are assumed recoverable.

Impact: Rate-limit errors β€” the most naturally transient kind β€” are marked as permanent failures, preventing any downstream recovery logic from retrying them.

Inconsistency between _handle_response_done and _handle_response_done_but_not_complete

In _handle_response_done at lines 1997-2001, rate_limit_exceeded errors set recoverable = False on the RealtimeError passed to _done_fut.set_exception(). However, the same event is then processed by _handle_response_done_but_not_complete (called at line 2056) which emits the error with recoverable=True at line 2086, and includes a comment explicitly stating: "all possible failures undocumented by openai, so we assume optimistically all retryable/recoverable".

Rate-limit errors are inherently transient and are the canonical example of a recoverable error. Setting recoverable=False means any code that inspects this flag on _done_fut exceptions would refuse to retry the most retry-worthy class of errors. While the current retry logic in generate_reply operates on the creation future (not _done_fut), this sets incorrect semantics for current or future consumers of generation lifecycle errors.

Suggested change
if details.error.code == "rate_limit_exceeded":
recoverable = False
if details.error.code == "rate_limit_exceeded":
recoverable = True
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Comment on lines +306 to +311
try:
impl_fut = self._do_generate_reply(
instructions=instructions,
tool_choice=tool_choice,
tools=tools,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Retry logic does not cancel in-flight provider requests before retrying

When a _do_generate_reply call times out (e.g., after 5-10s depending on provider), the base class retry mechanism at livekit-agents/livekit/agents/llm/realtime.py:306-311 calls _do_generate_reply again. However, the previous provider request (e.g., OpenAI's response.create or Google's LiveClientContent with turn_complete=True) was already sent to the server and is not cancelled.

For OpenAI, _on_fut_done at livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py:1590-1595 only sends response.cancel when the future is cancelled (via .cancel()), not when it has an exception set. Since the timeout handler sets an exception (not a cancellation), no cancel event is sent to the server. The retry then sends a second response.create.

In most timeout cases the server was genuinely unresponsive, so this is harmless. But if the server was just slow, both requests could produce responses. Whether this causes user-visible duplicate output depends on server-side behavior (OpenAI may supersede the first response when it receives the second response.create).

Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Comment on lines 776 to 783
if not fut.done():
fut.set_exception(
llm.RealtimeError(
"generate_reply timed out waiting for generation_created event."
"generate_reply timed out waiting for generation_created event.",
recoverable=True,
)
)
if self._pending_generation_fut is fut:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Google and Ultravox plugins manage their own pending-generation state which may conflict with retry

Google's _do_generate_reply at livekit-plugins/livekit-plugins-google/livekit/plugins/google/realtime/realtime_api.py:743-754 cancels an existing _pending_generation_fut and replaces it. The base class retry mechanism may call _do_generate_reply multiple times, each creating a new _pending_generation_fut. If a server response arrives and resolves _pending_generation_fut, both the plugin's internal future and the base class's wrapper future will be resolved correctly via the callback chain. However, the plugin's _on_fut_done callback (line 784-795) has side effects like calling self.interrupt() on external cancellation. During retry, the old impl_fut's exception (not cancellation) triggers its _on_fut_done which doesn't send interrupt. This appears to work correctly in the retry case, but the interaction between two layers of future management (plugin + base class) adds complexity worth monitoring.

(Refers to lines 743-798)

Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Realtime models have no response-level retry for recoverable generate_reply failures (parity gap with the pipeline LLM)

1 participant