Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/strands/experimental/bidi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# Model interface (for custom implementations)
from .models.model import BidiModel

# Built-in tools
# Built-in tools (deprecated - use strands_tools.stop instead)
from .tools import stop_conversation

# Event types - For type hints and event handling
Expand All @@ -39,8 +39,6 @@
__all__ = [
# Main interface
"BidiAgent",
# Built-in tools
"stop_conversation",
# Input Event types
"BidiTextInputEvent",
"BidiAudioInputEvent",
Expand All @@ -64,6 +62,8 @@
"ToolStreamEvent",
# Model interface
"BidiModel",
# Built-in tools (deprecated)
"stop_conversation",
]


Expand Down
28 changes: 23 additions & 5 deletions src/strands/experimental/bidi/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import asyncio
import logging
import warnings
from typing import TYPE_CHECKING, Any, AsyncGenerator, cast

from ....types._events import ToolInterruptEvent, ToolResultEvent, ToolResultMessageEvent, ToolUseStreamEvent
Expand Down Expand Up @@ -248,6 +249,10 @@ async def _run_tool(self, tool_use: ToolUse) -> None:

tool_results: list[ToolResult] = []

# Ensure request_state exists for tools like strands_tools.stop
if "request_state" not in self._invocation_state:
self._invocation_state["request_state"] = {}

invocation_state: dict[str, Any] = {
**self._invocation_state,
"agent": self._agent,
Expand Down Expand Up @@ -282,16 +287,29 @@ async def _run_tool(self, tool_use: ToolUse) -> None:

await self._event_queue.put(ToolResultMessageEvent(tool_result_message))

# Check for stop_conversation before sending to model
if tool_use["name"] == "stop_conversation":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agent-of-mkmeral we should probably keep it for now, but add a warning that we will remove in next versions. So we support both, and we can remove the tool later.

It gives users who depend on it some time

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Commit 1381ba1 keeps stop_conversation with backward compatibility:

What How
stop_conversation tool Restored — emits DeprecationWarning when called
bidi/__init__.py export Restored — from strands.experimental.bidi import stop_conversation works
Loop check Primary: request_state["stop_event_loop"] flag. Fallback: tool_use["name"] == "stop_conversation" with deprecation warning
Tests New test_bidi_agent_loop_stop_conversation_deprecated_but_works verifies both the tool works AND the warning is emitted

Users who depend on stop_conversation get a clear deprecation message pointing them to strands_tools.stop or request_state["stop_event_loop"].

logger.info("tool_name=<%s> | conversation stop requested, skipping model send", tool_use["name"])
# Check for stop_event_loop flag (set by strands_tools.stop, stop_conversation, or any custom tool)
request_state = invocation_state.get("request_state", {})
should_stop = request_state.get("stop_event_loop", False)

# Backward compatibility: also check for stop_conversation by name (deprecated)
if not should_stop and tool_use["name"] == "stop_conversation":
warnings.warn(
"Stopping the event loop by tool name 'stop_conversation' is deprecated. "
"Use request_state['stop_event_loop'] = True instead.",
DeprecationWarning,
stacklevel=2,
)
should_stop = True

if should_stop:
logger.info("stop_event_loop=<True> | stopping conversation")
connection_id = getattr(self._agent.model, "_connection_id", "unknown")
await self._event_queue.put(
BidiConnectionCloseEvent(connection_id=connection_id, reason="user_request")
)
return # Skip the model send
return # Skip sending result to model

# Send result to model (all tools except stop_conversation)
# Send result to model
await self.send(tool_result_event)

except Exception as error:
Expand Down
2 changes: 1 addition & 1 deletion src/strands/experimental/bidi/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def __call__(self, event: BidiOutputEvent) -> None:

elif isinstance(event, BidiConnectionCloseEvent):
if event.reason == "user_request":
print("user requested connection close using the stop_conversation tool.")
print("user requested connection close using the stop tool.")
logger.debug("connection_id=<%s> | user requested connection close", event.connection_id)
elif isinstance(event, BidiTranscriptStreamEvent):
text = event["text"]
Expand Down
15 changes: 14 additions & 1 deletion src/strands/experimental/bidi/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
"""Built-in tools for bidirectional agents."""
"""Built-in tools for bidirectional agents.

.. deprecated::
The built-in ``stop_conversation`` tool is deprecated. Use ``strands_tools.stop`` or set
``request_state["stop_event_loop"] = True`` in any custom tool instead.

To stop a bidirectional conversation, use the standard ``stop`` tool from strands_tools::

from strands_tools import stop
agent = BidiAgent(tools=[stop, ...])

The stop tool sets ``request_state["stop_event_loop"] = True``, which signals the
BidiAgent to gracefully close the connection.
"""

from .stop_conversation import stop_conversation

Expand Down
20 changes: 18 additions & 2 deletions src/strands/experimental/bidi/tools/stop_conversation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""Tool to gracefully stop a bidirectional connection."""
"""Tool to gracefully stop a bidirectional connection.

.. deprecated::
The ``stop_conversation`` tool is deprecated and will be removed in a future version.
Use ``strands_tools.stop`` or set ``request_state["stop_event_loop"] = True`` in any custom tool instead.
"""

import warnings

from ....tools.decorator import tool

Expand All @@ -7,10 +14,19 @@
def stop_conversation() -> str:
"""Stop the bidirectional conversation gracefully.

.. deprecated::
Use ``strands_tools.stop`` or set ``request_state["stop_event_loop"] = True`` in a custom tool instead.

Use ONLY when user says "stop conversation" exactly.
Do NOT use for: "stop", "goodbye", "bye", "exit", "quit", "end" or other farewells or phrases.

Returns:
Success message confirming the conversation will end
Success message confirming the conversation will end.
"""
warnings.warn(
"stop_conversation is deprecated and will be removed in a future version. "
"Use strands_tools.stop or set request_state['stop_event_loop'] = True in any custom tool instead.",
DeprecationWarning,
stacklevel=2,
)
return "Ending conversation"
157 changes: 156 additions & 1 deletion tests/strands/experimental/bidi/agent/test_loop.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import unittest.mock
import warnings

import pytest
import pytest_asyncio

from strands import tool
from strands.experimental.bidi import BidiAgent
from strands.experimental.bidi.models import BidiModel, BidiModelTimeoutError
from strands.experimental.bidi.types.events import BidiConnectionRestartEvent, BidiTextInputEvent
from strands.experimental.bidi.types.events import BidiConnectionCloseEvent, BidiConnectionRestartEvent, BidiTextInputEvent
from strands.types._events import ToolResultEvent, ToolResultMessageEvent, ToolUseStreamEvent


Expand Down Expand Up @@ -93,3 +94,157 @@ async def test_bidi_agent_loop_receive_tool_use(loop, agent, agenerator):
assert tru_messages == exp_messages

agent.model.send.assert_called_with(tool_result_event)


@pytest.mark.asyncio
async def test_bidi_agent_loop_request_state_initialized_for_tools(loop, agent, agenerator):
"""Test that request_state is initialized in invocation_state before tool execution.

This ensures request_state exists for tools that may need it via invocation_state,
even when invocation_state is not provided by the user.
"""
tool_use = {"toolUseId": "t2", "name": "time_tool", "input": {}}
tool_use_event = ToolUseStreamEvent(current_tool_use=tool_use, delta="")

agent.model.receive = unittest.mock.Mock(return_value=agenerator([tool_use_event]))

# Start without providing invocation_state
await loop.start()

tru_events = []
async for event in loop.receive():
tru_events.append(event)
if len(tru_events) >= 3:
break

# Verify tool executed successfully
tool_result_event = tru_events[1]
assert isinstance(tool_result_event, ToolResultEvent)
assert tool_result_event.tool_result["status"] == "success"

# Verify request_state was initialized in invocation_state
assert "request_state" in loop._invocation_state
assert isinstance(loop._invocation_state["request_state"], dict)


@pytest.mark.asyncio
async def test_bidi_agent_loop_stop_event_loop_flag(agent, agenerator):
"""Test that the stop_event_loop flag in request_state gracefully closes the connection.

This simulates a tool (like strands_tools.stop) setting the flag via invocation_state.
"""
# Use a tool that modifies invocation_state to set the stop flag
# We'll mock the tool executor to simulate this behavior
loop = agent._loop

tool_use = {"toolUseId": "t3", "name": "time_tool", "input": {}}
tool_use_event = ToolUseStreamEvent(current_tool_use=tool_use, delta="")
tool_result = {"toolUseId": "t3", "status": "success", "content": [{"text": "12:00"}]}

agent.model.receive = unittest.mock.Mock(return_value=agenerator([tool_use_event]))

# Start with request_state that already has stop_event_loop=True
# This simulates a tool having set it during execution
await loop.start(invocation_state={"request_state": {"stop_event_loop": True}})

tru_events = []
async for event in loop.receive():
tru_events.append(event)

# Should receive: tool_use_event, tool_result_event, tool_result_message, connection_close
assert len(tru_events) == 4

# Verify tool executed successfully
tool_result_event = tru_events[1]
assert isinstance(tool_result_event, ToolResultEvent)
assert tool_result_event.tool_result["status"] == "success"

# Verify connection close event was emitted
connection_close_event = tru_events[3]
assert isinstance(connection_close_event, BidiConnectionCloseEvent)
assert connection_close_event["reason"] == "user_request"

# Verify model.send was NOT called (tool result not sent to model)
agent.model.send.assert_not_called()


@pytest.mark.asyncio
async def test_bidi_agent_loop_stop_conversation_deprecated_but_works(loop, agent, agenerator):
"""Test that stop_conversation tool still works but emits a deprecation warning.

The stop_conversation tool is deprecated in favor of request_state["stop_event_loop"],
but should continue to work for backward compatibility via the name-based check.
"""
from strands.experimental.bidi.tools import stop_conversation

agent.tool_registry.register_tool(stop_conversation)

tool_use = {"toolUseId": "t5", "name": "stop_conversation", "input": {}}
tool_use_event = ToolUseStreamEvent(current_tool_use=tool_use, delta="")

agent.model.receive = unittest.mock.Mock(return_value=agenerator([tool_use_event]))

await loop.start()

tru_events = []
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always")
async for event in loop.receive():
tru_events.append(event)

# Should receive: tool_use_event, tool_result_event, tool_result_message, connection_close
assert len(tru_events) == 4

# Verify tool executed successfully
tool_result_event = tru_events[1]
assert isinstance(tool_result_event, ToolResultEvent)
assert tool_result_event.tool_result["status"] == "success"
assert "Ending conversation" in tool_result_event.tool_result["content"][0]["text"]

# Verify connection close event was emitted
connection_close_event = tru_events[3]
assert isinstance(connection_close_event, BidiConnectionCloseEvent)
assert connection_close_event["reason"] == "user_request"

# Verify model.send was NOT called (tool result not sent to model)
agent.model.send.assert_not_called()

# Verify deprecation warnings were emitted (from both the tool itself and the loop name check)
deprecation_warnings = [w for w in caught_warnings if issubclass(w.category, DeprecationWarning)]
assert len(deprecation_warnings) >= 1
assert any("stop_conversation" in str(w.message).lower() for w in deprecation_warnings)


@pytest.mark.asyncio
async def test_bidi_agent_loop_request_state_preserved_with_invocation_state(agent, agenerator):
"""Test that existing invocation_state is preserved when request_state is initialized."""

@tool(name="check_invocation_state")
async def check_invocation_state(custom_key: str) -> str:
return f"custom_key: {custom_key}"

agent.tool_registry.register_tool(check_invocation_state)

tool_use = {"toolUseId": "t4", "name": "check_invocation_state", "input": {"custom_key": "from_state"}}
tool_use_event = ToolUseStreamEvent(current_tool_use=tool_use, delta="")

agent.model.receive = unittest.mock.Mock(return_value=agenerator([tool_use_event]))

loop = agent._loop
# Start with custom invocation_state but no request_state
await loop.start(invocation_state={"custom_data": "preserved"})

tru_events = []
async for event in loop.receive():
tru_events.append(event)
if len(tru_events) >= 3:
break

# Verify tool executed successfully
tool_result_event = tru_events[1]
assert isinstance(tool_result_event, ToolResultEvent)
assert tool_result_event.tool_result["status"] == "success"

# Verify request_state was added without removing custom_data
assert "request_state" in loop._invocation_state
assert loop._invocation_state.get("custom_data") == "preserved"