Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve code quality #138

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ def set_default_openai_key(key: str) -> None:

If provided, this key will be used instead of the OPENAI_API_KEY environment variable.
"""
_config.set_default_openai_key(key)
try:
_config.set_default_openai_key(key)
except Exception as e:
logging.error(f"Error setting default OpenAI key: {e}")
raise


def set_default_openai_client(client: AsyncOpenAI, use_for_tracing: bool = True) -> None:
Expand All @@ -111,22 +115,34 @@ def set_default_openai_client(client: AsyncOpenAI, use_for_tracing: bool = True)
you'll either need to set the OPENAI_API_KEY environment variable or call
set_tracing_export_api_key() with the API key you want to use for tracing.
"""
_config.set_default_openai_client(client, use_for_tracing)
try:
_config.set_default_openai_client(client, use_for_tracing)
except Exception as e:
logging.error(f"Error setting default OpenAI client: {e}")
raise


def set_default_openai_api(api: Literal["chat_completions", "responses"]) -> None:
"""Set the default API to use for OpenAI LLM requests. By default, we will use the responses API
but you can set this to use the chat completions API instead.
"""
_config.set_default_openai_api(api)
try:
_config.set_default_openai_api(api)
except Exception as e:
logging.error(f"Error setting default OpenAI API: {e}")
raise


def enable_verbose_stdout_logging():
def enable_verbose_stdout_logging() -> None:
"""Enables verbose logging to stdout. This is useful for debugging."""
for name in ["openai.agents", "openai.agents.tracing"]:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
try:
for name in ["openai.agents", "openai.agents.tracing"]:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
except Exception as e:
logging.error(f"Error enabling verbose stdout logging: {e}")
raise


__all__ = [
Expand Down
139 changes: 95 additions & 44 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Union, List

from openai.types.responses import (
ResponseComputerToolCall,
Expand Down Expand Up @@ -65,6 +65,7 @@


class QueueCompleteSentinel:
"""A sentinel value to indicate that the queue is complete."""
pass


Expand Down Expand Up @@ -97,8 +98,7 @@ class ProcessedResponse:
computer_actions: list[ToolRunComputerAction]

def has_tools_to_run(self) -> bool:
# Handoffs, functions and computer actions need local processing
# Hosted tools have already run, so there's nothing to do.
"""Check if there are any tools to run."""
return any(
[
self.handoffs,
Expand Down Expand Up @@ -151,6 +151,7 @@ def generated_items(self) -> list[RunItem]:
def get_model_tracing_impl(
tracing_disabled: bool, trace_include_sensitive_data: bool
) -> ModelTracing:
"""Get the model tracing implementation based on the tracing configuration."""
if tracing_disabled:
return ModelTracing.DISABLED
elif trace_include_sensitive_data:
Expand All @@ -176,6 +177,7 @@ async def execute_tools_and_side_effects(
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
) -> SingleStepResult:
"""Execute tools and side effects for the current step."""
# Make a copy of the generated items
pre_step_items = list(pre_step_items)

Expand Down Expand Up @@ -271,6 +273,7 @@ def process_model_response(
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
) -> ProcessedResponse:
"""Process the model response and extract relevant information."""
items: list[RunItem] = []

run_handoffs = []
Expand Down Expand Up @@ -356,6 +359,7 @@ async def execute_function_tool_calls(
context_wrapper: RunContextWrapper[TContext],
config: RunConfig,
) -> list[RunItem]:
"""Execute function tool calls."""
async def run_single_tool(
func_tool: FunctionTool, tool_call: ResponseFunctionToolCall
) -> str:
Expand Down Expand Up @@ -422,6 +426,7 @@ async def execute_computer_actions(
context_wrapper: RunContextWrapper[TContext],
config: RunConfig,
) -> list[RunItem]:
"""Execute computer actions."""
results: list[RunItem] = []
# Need to run these serially, because each action can affect the computer state
for action in actions:
Expand Down Expand Up @@ -451,6 +456,7 @@ async def execute_handoffs(
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
) -> SingleStepResult:
"""Execute handoffs."""
# If there is more than one handoff, add tool responses that reject those handoffs
if len(run_handoffs) > 1:
output_message = "Multiple handoffs detected, ignoring this one."
Expand All @@ -470,9 +476,20 @@ async def execute_handoffs(
actual_handoff = run_handoffs[0]
with handoff_span(from_agent=agent.name) as span_handoff:
handoff = actual_handoff.handoff
new_agent: Agent[Any] = await handoff.on_invoke_handoff(
context_wrapper, actual_handoff.tool_call.arguments
)
try:
new_agent: Agent[Any] = await handoff.on_invoke_handoff(
context_wrapper, actual_handoff.tool_call.arguments
)
except Exception as e:
_utils.attach_error_to_span(
span_handoff,
SpanError(
message="Error invoking handoff",
data={"error": str(e)},
)
)
raise

span_handoff.span_data.to_agent = new_agent.name

# Append a tool output item for the handoff
Expand Down Expand Up @@ -568,6 +585,7 @@ async def execute_final_output(
hooks: RunHooks[TContext],
context_wrapper: RunContextWrapper[TContext],
) -> SingleStepResult:
"""Execute final output."""
# Run the on_end hooks
await cls.run_final_output_hooks(agent, hooks, context_wrapper, final_output)

Expand All @@ -587,6 +605,7 @@ async def run_final_output_hooks(
context_wrapper: RunContextWrapper[TContext],
final_output: Any,
):
"""Run the final output hooks."""
await asyncio.gather(
hooks.on_agent_end(context_wrapper, agent, final_output),
agent.hooks.on_end(context_wrapper, agent, final_output)
Expand All @@ -602,8 +621,19 @@ async def run_single_input_guardrail(
input: str | list[TResponseInputItem],
context: RunContextWrapper[TContext],
) -> InputGuardrailResult:
"""Run a single input guardrail."""
with guardrail_span(guardrail.get_name()) as span_guardrail:
result = await guardrail.run(agent, input, context)
try:
result = await guardrail.run(agent, input, context)
except Exception as e:
_utils.attach_error_to_span(
span_guardrail,
SpanError(
message="Error running input guardrail",
data={"error": str(e)},
)
)
raise
span_guardrail.span_data.triggered = result.output.tripwire_triggered
return result

Expand All @@ -615,8 +645,19 @@ async def run_single_output_guardrail(
agent_output: Any,
context: RunContextWrapper[TContext],
) -> OutputGuardrailResult:
"""Run a single output guardrail."""
with guardrail_span(guardrail.get_name()) as span_guardrail:
result = await guardrail.run(agent=agent, agent_output=agent_output, context=context)
try:
result = await guardrail.run(agent=agent, agent_output=agent_output, context=context)
except Exception as e:
_utils.attach_error_to_span(
span_guardrail,
SpanError(
message="Error running output guardrail",
data={"error": str(e)},
)
)
raise
span_guardrail.span_data.triggered = result.output.tripwire_triggered
return result

Expand All @@ -626,6 +667,7 @@ def stream_step_result_to_queue(
step_result: SingleStepResult,
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
"""Stream the step result to the queue."""
for item in step_result.new_step_items:
if isinstance(item, MessageOutputItem):
event = RunItemStreamEvent(item=item, name="message_output_created")
Expand Down Expand Up @@ -695,6 +737,7 @@ async def execute(
context_wrapper: RunContextWrapper[TContext],
config: RunConfig,
) -> RunItem:
"""Execute a computer action."""
output_func = (
cls._get_screenshot_async(action.computer_tool.computer, action.tool_call)
if isinstance(action.computer_tool.computer, AsyncComputer)
Expand Down Expand Up @@ -741,25 +784,29 @@ async def _get_screenshot_sync(
computer: Computer,
tool_call: ResponseComputerToolCall,
) -> str:
"""Get a screenshot synchronously."""
action = tool_call.action
if isinstance(action, ActionClick):
computer.click(action.x, action.y, action.button)
elif isinstance(action, ActionDoubleClick):
computer.double_click(action.x, action.y)
elif isinstance(action, ActionDrag):
computer.drag([(p.x, p.y) for p in action.path])
elif isinstance(action, ActionKeypress):
computer.keypress(action.keys)
elif isinstance(action, ActionMove):
computer.move(action.x, action.y)
elif isinstance(action, ActionScreenshot):
computer.screenshot()
elif isinstance(action, ActionScroll):
computer.scroll(action.x, action.y, action.scroll_x, action.scroll_y)
elif isinstance(action, ActionType):
computer.type(action.text)
elif isinstance(action, ActionWait):
computer.wait()
try:
if isinstance(action, ActionClick):
computer.click(action.x, action.y, action.button)
elif isinstance(action, ActionDoubleClick):
computer.double_click(action.x, action.y)
elif isinstance(action, ActionDrag):
computer.drag([(p.x, p.y) for p in action.path])
elif isinstance(action, ActionKeypress):
computer.keypress(action.keys)
elif isinstance(action, ActionMove):
computer.move(action.x, action.y)
elif isinstance(action, ActionScreenshot):
computer.screenshot()
elif isinstance(action, ActionScroll):
computer.scroll(action.x, action.y, action.scroll_x, action.scroll_y)
elif isinstance(action, ActionType):
computer.type(action.text)
elif isinstance(action, ActionWait):
computer.wait()
except Exception as e:
raise ModelBehaviorError(f"Error executing computer action: {e}")

return computer.screenshot()

Expand All @@ -769,24 +816,28 @@ async def _get_screenshot_async(
computer: AsyncComputer,
tool_call: ResponseComputerToolCall,
) -> str:
"""Get a screenshot asynchronously."""
action = tool_call.action
if isinstance(action, ActionClick):
await computer.click(action.x, action.y, action.button)
elif isinstance(action, ActionDoubleClick):
await computer.double_click(action.x, action.y)
elif isinstance(action, ActionDrag):
await computer.drag([(p.x, p.y) for p in action.path])
elif isinstance(action, ActionKeypress):
await computer.keypress(action.keys)
elif isinstance(action, ActionMove):
await computer.move(action.x, action.y)
elif isinstance(action, ActionScreenshot):
await computer.screenshot()
elif isinstance(action, ActionScroll):
await computer.scroll(action.x, action.y, action.scroll_x, action.scroll_y)
elif isinstance(action, ActionType):
await computer.type(action.text)
elif isinstance(action, ActionWait):
await computer.wait()
try:
if isinstance(action, ActionClick):
await computer.click(action.x, action.y, action.button)
elif isinstance(action, ActionDoubleClick):
await computer.double_click(action.x, action.y)
elif isinstance(action, ActionDrag):
await computer.drag([(p.x, p.y) for p in action.path])
elif isinstance(action, ActionKeypress):
await computer.keypress(action.keys)
elif isinstance(action, ActionMove):
await computer.move(action.x, action.y)
elif isinstance(action, ActionScreenshot):
await computer.screenshot()
elif isinstance(action, ActionScroll):
await computer.scroll(action.x, action.y, action.scroll_x, action.scroll_y)
elif isinstance(action, ActionType):
await computer.type(action.text)
elif isinstance(action, ActionWait):
await computer.wait()
except Exception as e:
raise ModelBehaviorError(f"Error executing computer action: {e}")

return await computer.screenshot()
48 changes: 28 additions & 20 deletions src/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,36 @@ def as_tool(
async def run_agent(context: RunContextWrapper, input: str) -> str:
from .run import Runner

output = await Runner.run(
starting_agent=self,
input=input,
context=context.context,
)
if custom_output_extractor:
return await custom_output_extractor(output)

return ItemHelpers.text_message_outputs(output.new_items)
try:
output = await Runner.run(
starting_agent=self,
input=input,
context=context.context,
)
if custom_output_extractor:
return await custom_output_extractor(output)

return ItemHelpers.text_message_outputs(output.new_items)
except Exception as e:
logger.error(f"Error running agent as tool: {e}")
raise

return run_agent

async def get_system_prompt(self, run_context: RunContextWrapper[TContext]) -> str | None:
"""Get the system prompt for the agent."""
if isinstance(self.instructions, str):
return self.instructions
elif callable(self.instructions):
if inspect.iscoroutinefunction(self.instructions):
return await cast(Awaitable[str], self.instructions(run_context, self))
else:
return cast(str, self.instructions(run_context, self))
elif self.instructions is not None:
logger.error(f"Instructions must be a string or a function, got {self.instructions}")

return None
try:
if isinstance(self.instructions, str):
return self.instructions
elif callable(self.instructions):
if inspect.iscoroutinefunction(self.instructions):
return await cast(Awaitable[str], self.instructions(run_context, self))
else:
return cast(str, self.instructions(run_context, self))
elif self.instructions is not None:
logger.error(f"Instructions must be a string or a function, got {self.instructions}")

return None
except Exception as e:
logger.error(f"Error getting system prompt: {e}")
raise
Loading