-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Description
Describe the bug
Continuation of #3705 (comment)
Turning on ADK_ENABLE_PROGRESSIVE_SSE_STREAMING=1 in .env causes Gemini 3 models to excessively use tools. It calls the same tool with different args 4 or 5 times in parallel, and will repeat for dozens of iterations before returning a response. It appears to duplicate tool invocations but that might be that this feature enables partial tool invocation events.
To Reproduce
Unfortunately I am tight for a deadline right now and can't take the time to properly build out a reproducible demo, but claude copilot put this together for me based on my repo which hopefully points in the right direction:
"""
Minimal reproduction of excessive tool usage with ADK_ENABLE_PROGRESSIVE_SSE_STREAMING=1
Issue: When progressive SSE streaming is enabled, Gemini makes excessive/duplicate
tool calls even when instructed to limit to 3 calls per turn. With progressive streaming disabled,
behavior is normal.
Architecture:
- Custom OrchestratorAgent (BaseAgent) routes to sub-agents (LlmAgents)
- Sub-agents have input_schema set to Pydantic models
- Sub-agents use BuiltInPlanner with ThinkingConfig
- Tools are given directly to sub-agents (not orchestrator)
"""
import asyncio
from google.adk.agents import Agent, BaseAgent, RunConfig
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.run_config import StreamingMode
from google.adk.planners import BuiltInPlanner
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.genai.types import Content
from pydantic import BaseModel, Field
from typing import Literal, AsyncGenerator
from google.adk.events import Event
import json
# Structured input for sub-agent
class UserPromptRequest(BaseModel):
message_type: Literal['user_prompt'] = 'user_prompt'
text: str = Field(..., description='User question')
# Simple tool that should be called once
async def search_knowledge(query: str) -> dict:
"""Search knowledge base for information."""
return {"status": "success", "data": {"result": f"Found info about: {query}"}}
# Sub-agent with input schema and tools
sub_agent = Agent(
name='knowledge_agent',
description='Agent that searches knowledge and answers questions',
model='gemini-3-flash-preview',
instruction="""You are a helpful assistant. Use the search_knowledge tool to answer questions.
CRITICAL: You are limited to a maximum of 2 tool calls per turn. Never make duplicate calls.""",
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
include_thoughts=True,
thinking_level=types.ThinkingLevel.HIGH,
)
),
input_schema=UserPromptRequest,
tools=[search_knowledge],
)
# Custom orchestrator that routes to sub-agent
class OrchestratorAgent(BaseAgent):
knowledge_agent: Agent
def __init__(self, knowledge_agent: Agent, **kwargs):
super().__init__(
name='orchestrator',
description='Routes messages to appropriate agent',
knowledge_agent=knowledge_agent,
sub_agents=[knowledge_agent],
**kwargs
)
async def _run_async_impl(self, context: InvocationContext) -> AsyncGenerator[Event, None]:
# Extract message from user content
message_str = context.user_content.parts[0].text if context.user_content else ''
message = json.loads(message_str)
# Route to sub-agent
async for event in self.knowledge_agent.run_async(context):
yield event
async def test_streaming():
root = OrchestratorAgent(knowledge_agent=sub_agent)
session_service = InMemorySessionService()
runner = Runner(agent=root, session_service=session_service)
# Test with progressive SSE streaming enabled (set env var: ADK_ENABLE_PROGRESSIVE_SSE_STREAMING=1)
run_config = RunConfig(
response_modalities=['TEXT'],
streaming_mode=StreamingMode.SSE,
)
# Structured message (like your WebSocket flow)
message = json.dumps({
"message_type": "user_prompt",
"text": "What is Python programming?"
})
print("Testing with streaming_mode=StreamingMode.SSE")
print("(Set ADK_ENABLE_PROGRESSIVE_SSE_STREAMING=1 to reproduce issue)\n")
tool_call_count = 0
async for event in runner.run_async(
user_id='test_user',
session_id='test_session',
new_message=Content(role='user', parts=[{'text': message}]),
run_config=run_config
):
tool_calls = event.get_function_calls()
if tool_calls:
tool_call_count += len(tool_calls)
for call in tool_calls:
print(f"Tool call #{tool_call_count}: {call.name}({call.args})")
if event.content and event.content.parts:
for part in event.content.parts:
if hasattr(part, 'text') and part.text:
print(f"Response: {part.text[:100]}...")
print(f"\nTotal tool calls: {tool_call_count}")
if tool_call_count > 2:
print("⚠️ ISSUE REPRODUCED: Agent exceeded 2 tool call limit!")
if __name__ == '__main__':
asyncio.run(test_streaming())Claude believes this is related to using an OrchestratorAgent.
Expected behavior
Distinct tools should be executed in parallel, but the same tool should only be executed once following the instructions in the system prompt.
Desktop (please complete the following information):
- OS: Ubuntu
- Python version: 3.11
- ADK version: 1.21
Model Information:
- Are you using LiteLLM: No
- Which model is being used: gemini 3 flash and pro