Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,7 @@ async def _read_fnc_stream() -> None:

def _tool_execution_started_cb(fnc_call: llm.FunctionCall) -> None:
speech_handle._item_added([fnc_call])
fnc_call.extra["dispatched"] = True
self._agent._chat_ctx.items.append(fnc_call)
self._session._tool_items_added([fnc_call])

Expand Down Expand Up @@ -2675,6 +2676,8 @@ def _create_assistant_message(

if speech_handle.interrupted:
await utils.aio.cancel_and_wait(exe_task)
for fc in function_calls:
fc.extra["dispatched"] = True
return

# wait for the tool execution to complete
Expand All @@ -2687,6 +2690,8 @@ def _create_assistant_message(
await exe_task
finally:
self._background_speeches.discard(speech_handle)
for fc in function_calls:
fc.extra["dispatched"] = True

# important: no agent output should be used after this point

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,12 +1179,25 @@ def _is_content_empty(msg_id: str) -> bool:
return True
return False

def _is_pending_function_call(msg_id: str) -> bool:
remote_item = remote_ctx.get_by_id(msg_id)
return (
remote_item is not None
and remote_item.type == "function_call"
and not remote_item.extra.get("dispatched", True)
)

for msg_id in diff_ops.to_remove:
# we don't have content synced down for some types of content (audio/images)
# these won't be present in the Agent's view of the context
# so in those cases, we do not want to remove them from the server context
if _is_content_empty(msg_id):
continue
# function_calls arrive in _remote_chat_ctx before _agent._chat_ctx;
# deleting them during this window causes cascading insert failures.
# we protect these function calls until they arrive in local context.
if _is_pending_function_call(msg_id):
continue
_delete_item(msg_id)

for previous_msg_id, msg_id in diff_ops.to_create:
Expand All @@ -1196,6 +1209,9 @@ def _is_content_empty(msg_id: str) -> bool:
# we don't want to recreate these items there
if _is_content_empty(msg_id):
continue
# same guard as above: don't recreate pending function_calls
if _is_pending_function_call(msg_id):
continue
_delete_item(msg_id)
_create_item(previous_msg_id, msg_id)

Expand Down Expand Up @@ -1615,14 +1631,18 @@ def _handle_function_call(self, item: RealtimeConversationItemFunctionCall) -> N
assert item.name is not None, "name is None"
assert item.arguments is not None, "arguments is None"

self._current_generation.function_ch.send_nowait(
llm.FunctionCall(
remote = self._remote_chat_ctx.get(item.id)
if remote is not None and isinstance(remote.item, llm.FunctionCall):
fc = remote.item
else:
fc = llm.FunctionCall(
id=item.id,
call_id=item.call_id,
name=item.name,
arguments=item.arguments,
)
)

self._current_generation.function_ch.send_nowait(fc)

def _handle_response_done(self, event: ResponseDoneEvent) -> None:
if self._current_generation is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,8 @@ def _create_update_chat_ctx_events(
self, chat_ctx: llm.ChatContext
) -> list[ConversationItemCreateEvent | ConversationItemDeleteEvent]:
events: list[ConversationItemCreateEvent | ConversationItemDeleteEvent] = []
diff_ops = llm.utils.compute_chat_ctx_diff(self._remote_chat_ctx.to_chat_ctx(), chat_ctx)
remote_ctx = self._remote_chat_ctx.to_chat_ctx()
diff_ops = llm.utils.compute_chat_ctx_diff(remote_ctx, chat_ctx)

def _delete_item(msg_id: str) -> None:
events.append(
Expand All @@ -1018,14 +1019,29 @@ def _create_item(previous_msg_id: str | None, msg_id: str) -> None:
)
)

def _is_pending_function_call(msg_id: str) -> bool:
remote_item = remote_ctx.get_by_id(msg_id)
return (
remote_item is not None
and remote_item.type == "function_call"
and not remote_item.extra.get("dispatched", True)
)

for msg_id in diff_ops.to_remove:
# function_calls arrive in _remote_chat_ctx before _agent._chat_ctx;
# deleting them during this window causes cascading insert failures
if _is_pending_function_call(msg_id):
continue
_delete_item(msg_id)

for previous_msg_id, msg_id in diff_ops.to_create:
_create_item(previous_msg_id, msg_id)

# update the items with the same id but different content
for previous_msg_id, msg_id in diff_ops.to_update:
# same guard as above: don't recreate pending function_calls
if _is_pending_function_call(msg_id):
continue
_delete_item(msg_id)
_create_item(previous_msg_id, msg_id)

Expand Down Expand Up @@ -1416,14 +1432,18 @@ def _handle_response_output_item_done(self, event: ResponseOutputItemDoneEvent)
assert item.name is not None, "name is None"
assert item.arguments is not None, "arguments is None"

self._current_generation.function_ch.send_nowait(
llm.FunctionCall(
remote = self._remote_chat_ctx.get(item_id)
if remote is not None and isinstance(remote.item, llm.FunctionCall):
fc = remote.item
else:
fc = llm.FunctionCall(
id=item_id,
call_id=item.call_id,
name=item.name,
arguments=item.arguments,
)
)

self._current_generation.function_ch.send_nowait(fc)
elif item_type == "message":
item_generation = self._current_generation.messages[item_id]
item_generation.text_ch.close()
Expand Down Expand Up @@ -1646,6 +1666,7 @@ def _openai_item_to_livekit_item(item: ConversationItem) -> llm.ChatItem:
call_id=item.call_id,
name=item.name,
arguments=item.arguments,
extra={"dispatched": False},
)

if item.type == "function_call_output":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ def openai_item_to_livekit_item(item: realtime.ConversationItem) -> llm.ChatItem
call_id=item.call_id,
name=item.name,
arguments=item.arguments,
extra={"dispatched": False}, # protect function calls from premature diff deletion
)

if item.type == "function_call_output":
Expand Down