diff --git a/src/smolagents/agents.py b/src/smolagents/agents.py index a1a91aeb3..1516c3a94 100644 --- a/src/smolagents/agents.py +++ b/src/smolagents/agents.py @@ -206,29 +206,55 @@ def __init__( provide_run_summary: bool = False, final_answer_checks: Optional[List[Callable]] = None, ): - if tool_parser is None: - tool_parser = parse_json_tool_call self.agent_name = self.__class__.__name__ self.model = model self.prompt_templates = prompt_templates or EMPTY_PROMPT_TEMPLATES self.max_steps = max_steps - self.step_number: int = 0 - self.tool_parser = tool_parser + self.step_number = 0 + self.tool_parser = tool_parser or parse_json_tool_call self.grammar = grammar self.planning_interval = planning_interval self.state = {} self.name = name self.description = description self.provide_run_summary = provide_run_summary + self.final_answer_checks = final_answer_checks + + self._setup_managed_agents(managed_agents) + self._setup_tools(tools, add_base_tools) + self._validate_tools_and_managed_agents(tools, managed_agents) + + self.system_prompt = self.initialize_system_prompt() + self.input_messages = None + self.task = None + self.memory = AgentMemory(self.system_prompt) + self.logger = AgentLogger(level=verbosity_level) + self.monitor = Monitor(self.model, self.logger) + self.step_callbacks = step_callbacks if step_callbacks is not None else [] + self.step_callbacks.append(self.monitor.update_metrics) + def _setup_managed_agents(self, managed_agents): self.managed_agents = {} - if managed_agents is not None: - for managed_agent in managed_agents: - assert managed_agent.name and managed_agent.description, ( - "All managed agents need both a name and a description!" - ) + if managed_agents: + assert all(agent.name and agent.description for agent in managed_agents), ( + "All managed agents need both a name and a description!" + ) self.managed_agents = {agent.name: agent for agent in managed_agents} + def _setup_tools(self, tools, add_base_tools): + assert all(isinstance(tool, Tool) for tool in tools), "All elements must be instance of Tool (or a subclass)" + self.tools = {tool.name: tool for tool in tools} + if add_base_tools: + self.tools.update( + { + name: cls() + for name, cls in TOOL_MAPPING.items() + if name != "python_interpreter" or self.__class__.__name__ == "ToolCallingAgent" + } + ) + self.tools["final_answer"] = FinalAnswerTool() + + def _validate_tools_and_managed_agents(self, tools, managed_agents): tool_and_managed_agent_names = [tool.name for tool in tools] if managed_agents is not None: tool_and_managed_agent_names += [agent.name for agent in managed_agents] @@ -238,25 +264,242 @@ def __init__( f"{[name for name in tool_and_managed_agent_names if tool_and_managed_agent_names.count(name) > 1]}" ) - for tool in tools: - assert isinstance(tool, Tool), f"This element is not of class Tool: {str(tool)}" - self.tools = {tool.name: tool for tool in tools} + def run( + self, + task: str, + stream: bool = False, + reset: bool = True, + images: Optional[List[str]] = None, + additional_args: Optional[Dict] = None, + ): + """ + Run the agent for the given task. - if add_base_tools: - for tool_name, tool_class in TOOL_MAPPING.items(): - if tool_name != "python_interpreter" or self.__class__.__name__ == "ToolCallingAgent": - self.tools[tool_name] = tool_class() - self.tools["final_answer"] = FinalAnswerTool() + Args: + task (`str`): Task to perform. + stream (`bool`): Whether to run in a streaming way. + reset (`bool`): Whether to reset the conversation or keep it going from previous run. + images (`list[str]`, *optional*): Paths to image(s). + additional_args (`dict`): Any other variables that you want to pass to the agent run, for instance images or dataframes. Give them clear names! + + Example: + ```py + from smolagents import CodeAgent + agent = CodeAgent(tools=[]) + agent.run("What is the result of 2 power 3.7384?") + ``` + """ + + self.task = task + if additional_args is not None: + self.state.update(additional_args) + self.task += f""" +You have been provided with these additional arguments, that you can access using the keys as variables in your python code: +{str(additional_args)}.""" self.system_prompt = self.initialize_system_prompt() - self.input_messages = None - self.task = None - self.memory = AgentMemory(self.system_prompt) - self.logger = AgentLogger(level=verbosity_level) - self.monitor = Monitor(self.model, self.logger) - self.step_callbacks = step_callbacks if step_callbacks is not None else [] - self.step_callbacks.append(self.monitor.update_metrics) - self.final_answer_checks = final_answer_checks + self.memory.system_prompt = SystemPromptStep(system_prompt=self.system_prompt) + if reset: + self.memory.reset() + self.monitor.reset() + + self.logger.log_task( + content=self.task.strip(), + subtitle=f"{type(self.model).__name__} - {(self.model.model_id if hasattr(self.model, 'model_id') else '')}", + level=LogLevel.INFO, + title=self.name if hasattr(self, "name") else None, + ) + + self.memory.steps.append(TaskStep(task=self.task, task_images=images)) + + if stream: + # The steps are returned as they are executed through a generator to iterate on. + return self._run(task=self.task, images=images) + # Outputs are returned only at the end as a string. We only look at the last step + return deque(self._run(task=self.task, images=images), maxlen=1)[0] + + def _run(self, task: str, images: List[str] | None = None) -> Generator[ActionStep | AgentType, None, None]: + final_answer = None + self.step_number = 1 + while final_answer is None and self.step_number <= self.max_steps: + step_start_time = time.time() + memory_step = self._create_memory_step(step_start_time, images) + try: + final_answer = self._execute_step(task, memory_step) + except AgentError as e: + memory_step.error = e + finally: + self._finalize_step(memory_step, step_start_time) + yield memory_step + self.step_number += 1 + + if final_answer is None and self.step_number == self.max_steps + 1: + final_answer = self._handle_max_steps_reached(task, images, step_start_time) + yield memory_step + yield handle_agent_output_types(final_answer) + + def _create_memory_step(self, step_start_time: float, images: List[str] | None) -> ActionStep: + return ActionStep(step_number=self.step_number, start_time=step_start_time, observations_images=images) + + def _execute_step(self, task: str, memory_step: ActionStep) -> Union[None, Any]: + if self.planning_interval is not None and self.step_number % self.planning_interval == 1: + self.planning_step(task, is_first_step=(self.step_number == 1), step=self.step_number) + self.logger.log_rule(f"Step {self.step_number}", level=LogLevel.INFO) + final_answer = self.step(memory_step) + if final_answer is not None and self.final_answer_checks: + self._validate_final_answer(final_answer) + return final_answer + + def _validate_final_answer(self, final_answer: Any): + for check_function in self.final_answer_checks: + try: + assert check_function(final_answer, self.memory) + except Exception as e: + raise AgentError(f"Check {check_function.__name__} failed with error: {e}", self.logger) + + def _finalize_step(self, memory_step: ActionStep, step_start_time: float): + memory_step.end_time = time.time() + memory_step.duration = memory_step.end_time - step_start_time + self.memory.steps.append(memory_step) + for callback in self.step_callbacks: + # For compatibility with old callbacks that don't take the agent as an argument + callback(memory_step) if len(inspect.signature(callback).parameters) == 1 else callback( + memory_step, agent=self + ) + + def _handle_max_steps_reached(self, task: str, images: List[str], step_start_time: float) -> Any: + final_answer = self.provide_final_answer(task, images) + final_memory_step = ActionStep( + step_number=self.step_number, error=AgentMaxStepsError("Reached max steps.", self.logger) + ) + final_memory_step.action_output = final_answer + final_memory_step.end_time = time.time() + final_memory_step.duration = final_memory_step.end_time - step_start_time + self.memory.steps.append(final_memory_step) + for callback in self.step_callbacks: + callback(final_memory_step) if len(inspect.signature(callback).parameters) == 1 else callback( + final_memory_step, agent=self + ) + return final_answer + + def planning_step(self, task, is_first_step: bool, step: int) -> None: + input_messages, facts_message, plan_message = ( + self._generate_initial_plan(task) if is_first_step else self._generate_updated_plan(task, step) + ) + self._record_planning_step(input_messages, facts_message, plan_message, is_first_step) + + def _generate_initial_plan(self, task: str) -> Tuple[ChatMessage, ChatMessage]: + input_messages = [ + { + "role": MessageRole.USER, + "content": [ + { + "type": "text", + "text": populate_template( + self.prompt_templates["planning"]["initial_facts"], variables={"task": task} + ), + } + ], + }, + ] + facts_message = self.model(input_messages) + + message_prompt_plan = { + "role": MessageRole.USER, + "content": [ + { + "type": "text", + "text": populate_template( + self.prompt_templates["planning"]["initial_plan"], + variables={ + "task": task, + "tools": self.tools, + "managed_agents": self.managed_agents, + "answer_facts": facts_message.content, + }, + ), + } + ], + } + plan_message = self.model([message_prompt_plan], stop_sequences=[""]) + return input_messages, facts_message, plan_message + + def _generate_updated_plan(self, task: str, step: int) -> Tuple[ChatMessage, ChatMessage]: + # Do not take the system prompt message from the memory + # summary_mode=False: Do not take previous plan steps to avoid influencing the new plan + memory_messages = self.write_memory_to_messages()[1:] + facts_update_pre = { + "role": MessageRole.SYSTEM, + "content": [{"type": "text", "text": self.prompt_templates["planning"]["update_facts_pre_messages"]}], + } + facts_update_post = { + "role": MessageRole.USER, + "content": [{"type": "text", "text": self.prompt_templates["planning"]["update_facts_post_messages"]}], + } + input_messages = [facts_update_pre] + memory_messages + [facts_update_post] + facts_message = self.model(input_messages) + + update_plan_pre = { + "role": MessageRole.SYSTEM, + "content": [ + { + "type": "text", + "text": populate_template( + self.prompt_templates["planning"]["update_plan_pre_messages"], variables={"task": task} + ), + } + ], + } + update_plan_post = { + "role": MessageRole.USER, + "content": [ + { + "type": "text", + "text": populate_template( + self.prompt_templates["planning"]["update_plan_post_messages"], + variables={ + "task": task, + "tools": self.tools, + "managed_agents": self.managed_agents, + "facts_update": facts_message.content, + "remaining_steps": (self.max_steps - step), + }, + ), + } + ], + } + plan_message = self.model( + [update_plan_pre] + memory_messages + [update_plan_post], stop_sequences=[""] + ) + return input_messages, facts_message, plan_message + + def _record_planning_step( + self, input_messages: list, facts_message: ChatMessage, plan_message: ChatMessage, is_first_step: bool + ) -> None: + if is_first_step: + facts = textwrap.dedent(f"""Here are the facts that I know so far:\n```\n{facts_message.content}\n```""") + plan = textwrap.dedent( + f"""Here is the plan of action that I will follow to solve the task:\n```\n{plan_message.content}\n```""" + ) + log_message = "Initial plan" + else: + facts = textwrap.dedent( + f"""Here is the updated list of the facts that I know:\n```\n{facts_message.content}\n```""" + ) + plan = textwrap.dedent( + f"""I still need to solve the task I was given:\n```\n{self.task}\n```\n\nHere is my new/updated plan of action to solve the task:\n```\n{plan_message.content}\n```""" + ) + log_message = "Updated plan" + self.memory.steps.append( + PlanningStep( + model_input_messages=input_messages, + facts=facts, + plan=plan, + model_output_message_plan=plan_message, + model_output_message_facts=facts_message, + ) + ) + self.logger.log(Rule(f"[bold]{log_message}", style="orange"), Text(plan), level=LogLevel.INFO) @property def logs(self): @@ -403,293 +646,6 @@ def step(self, memory_step: ActionStep) -> Union[None, Any]: """To be implemented in children classes. Should return either None if the step is not final.""" pass - def run( - self, - task: str, - stream: bool = False, - reset: bool = True, - images: Optional[List[str]] = None, - additional_args: Optional[Dict] = None, - ): - """ - Run the agent for the given task. - - Args: - task (`str`): Task to perform. - stream (`bool`): Whether to run in a streaming way. - reset (`bool`): Whether to reset the conversation or keep it going from previous run. - images (`list[str]`, *optional*): Paths to image(s). - additional_args (`dict`): Any other variables that you want to pass to the agent run, for instance images or dataframes. Give them clear names! - - Example: - ```py - from smolagents import CodeAgent - agent = CodeAgent(tools=[]) - agent.run("What is the result of 2 power 3.7384?") - ``` - """ - - self.task = task - if additional_args is not None: - self.state.update(additional_args) - self.task += f""" -You have been provided with these additional arguments, that you can access using the keys as variables in your python code: -{str(additional_args)}.""" - - self.system_prompt = self.initialize_system_prompt() - self.memory.system_prompt = SystemPromptStep(system_prompt=self.system_prompt) - if reset: - self.memory.reset() - self.monitor.reset() - - self.logger.log_task( - content=self.task.strip(), - subtitle=f"{type(self.model).__name__} - {(self.model.model_id if hasattr(self.model, 'model_id') else '')}", - level=LogLevel.INFO, - title=self.name if hasattr(self, "name") else None, - ) - - self.memory.steps.append(TaskStep(task=self.task, task_images=images)) - - if stream: - # The steps are returned as they are executed through a generator to iterate on. - return self._run(task=self.task, images=images) - # Outputs are returned only at the end as a string. We only look at the last step - return deque(self._run(task=self.task, images=images), maxlen=1)[0] - - def _run(self, task: str, images: List[str] | None = None) -> Generator[ActionStep | AgentType, None, None]: - """ - Run the agent in streaming mode and returns a generator of all the steps. - - Args: - task (`str`): Task to perform. - images (`list[str]`): Paths to image(s). - """ - final_answer = None - self.step_number = 1 - while final_answer is None and self.step_number <= self.max_steps: - step_start_time = time.time() - memory_step = ActionStep( - step_number=self.step_number, - start_time=step_start_time, - observations_images=images, - ) - try: - if self.planning_interval is not None and self.step_number % self.planning_interval == 1: - self.planning_step( - task, - is_first_step=(self.step_number == 1), - step=self.step_number, - ) - self.logger.log_rule(f"Step {self.step_number}", level=LogLevel.INFO) - - # Run one step! - final_answer = self.step(memory_step) - if final_answer is not None and self.final_answer_checks is not None: - for check_function in self.final_answer_checks: - try: - assert check_function(final_answer, self.memory) - except Exception as e: - final_answer = None - raise AgentError(f"Check {check_function.__name__} failed with error: {e}", self.logger) - except AgentError as e: - memory_step.error = e - finally: - memory_step.end_time = time.time() - memory_step.duration = memory_step.end_time - step_start_time - self.memory.steps.append(memory_step) - for callback in self.step_callbacks: - # For compatibility with old callbacks that don't take the agent as an argument - if len(inspect.signature(callback).parameters) == 1: - callback(memory_step) - else: - callback(memory_step, agent=self) - self.step_number += 1 - yield memory_step - - if final_answer is None and self.step_number == self.max_steps + 1: - error_message = "Reached max steps." - final_answer = self.provide_final_answer(task, images) - final_memory_step = ActionStep( - step_number=self.step_number, error=AgentMaxStepsError(error_message, self.logger) - ) - final_memory_step.action_output = final_answer - final_memory_step.end_time = time.time() - final_memory_step.duration = memory_step.end_time - step_start_time - self.memory.steps.append(final_memory_step) - for callback in self.step_callbacks: - # For compatibility with old callbacks that don't take the agent as an argument - if len(inspect.signature(callback).parameters) == 1: - callback(final_memory_step) - else: - callback(final_memory_step, agent=self) - yield final_memory_step - - yield handle_agent_output_types(final_answer) - - def planning_step(self, task, is_first_step: bool, step: int) -> None: - """ - Used periodically by the agent to plan the next steps to reach the objective. - - Args: - task (`str`): Task to perform. - is_first_step (`bool`): If this step is not the first one, the plan should be an update over a previous plan. - step (`int`): The number of the current step, used as an indication for the LLM. - """ - if is_first_step: - input_messages = [ - { - "role": MessageRole.USER, - "content": [ - { - "type": "text", - "text": populate_template( - self.prompt_templates["planning"]["initial_facts"], variables={"task": task} - ), - } - ], - }, - ] - - chat_message_facts: ChatMessage = self.model(input_messages) - answer_facts = chat_message_facts.content - - message_prompt_plan = { - "role": MessageRole.USER, - "content": [ - { - "type": "text", - "text": populate_template( - self.prompt_templates["planning"]["initial_plan"], - variables={ - "task": task, - "tools": self.tools, - "managed_agents": self.managed_agents, - "answer_facts": answer_facts, - }, - ), - } - ], - } - chat_message_plan: ChatMessage = self.model( - [message_prompt_plan], - stop_sequences=[""], - ) - answer_plan = chat_message_plan.content - - final_plan_redaction = textwrap.dedent( - f"""Here is the plan of action that I will follow to solve the task: - ``` - {answer_plan} - ```""" - ) - final_facts_redaction = textwrap.dedent( - f"""Here are the facts that I know so far: - ``` - {answer_facts} - ```""".strip() - ) - self.memory.steps.append( - PlanningStep( - model_input_messages=input_messages, - plan=final_plan_redaction, - facts=final_facts_redaction, - model_output_message_plan=chat_message_plan, - model_output_message_facts=chat_message_facts, - ) - ) - self.logger.log( - Rule("[bold]Initial plan", style="orange"), - Text(final_plan_redaction), - level=LogLevel.INFO, - ) - else: # update plan - # Do not take the system prompt message from the memory - # summary_mode=False: Do not take previous plan steps to avoid influencing the new plan - memory_messages = self.write_memory_to_messages()[1:] - - # Redact updated facts - facts_update_pre_messages = { - "role": MessageRole.SYSTEM, - "content": [{"type": "text", "text": self.prompt_templates["planning"]["update_facts_pre_messages"]}], - } - facts_update_post_messages = { - "role": MessageRole.USER, - "content": [{"type": "text", "text": self.prompt_templates["planning"]["update_facts_post_messages"]}], - } - input_messages = [facts_update_pre_messages] + memory_messages + [facts_update_post_messages] - chat_message_facts: ChatMessage = self.model(input_messages) - facts_update = chat_message_facts.content - - # Redact updated plan - update_plan_pre_messages = { - "role": MessageRole.SYSTEM, - "content": [ - { - "type": "text", - "text": populate_template( - self.prompt_templates["planning"]["update_plan_pre_messages"], variables={"task": task} - ), - } - ], - } - update_plan_post_messages = { - "role": MessageRole.USER, - "content": [ - { - "type": "text", - "text": populate_template( - self.prompt_templates["planning"]["update_plan_post_messages"], - variables={ - "task": task, - "tools": self.tools, - "managed_agents": self.managed_agents, - "facts_update": facts_update, - "remaining_steps": (self.max_steps - step), - }, - ), - } - ], - } - chat_message_plan: ChatMessage = self.model( - [update_plan_pre_messages] + memory_messages + [update_plan_post_messages], - stop_sequences=[""], - ) - - # Log final facts and plan - final_plan_redaction = textwrap.dedent( - f"""I still need to solve the task I was given: - ``` - {task} - ``` - - Here is my new/updated plan of action to solve the task: - ``` - {chat_message_plan.content} - ```""" - ) - - final_facts_redaction = textwrap.dedent( - f"""Here is the updated list of the facts that I know: - ``` - {facts_update} - ```""" - ) - self.memory.steps.append( - PlanningStep( - model_input_messages=input_messages, - plan=final_plan_redaction, - facts=final_facts_redaction, - model_output_message_plan=chat_message_plan, - model_output_message_facts=chat_message_facts, - ) - ) - self.logger.log( - Rule("[bold]Updated plan", style="orange"), - Text(final_plan_redaction), - level=LogLevel.INFO, - ) - def replay(self, detailed: bool = False): """Prints a pretty replay of the agent's steps. diff --git a/tests/test_models.py b/tests/test_models.py index 10f36aef2..182e90acd 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -74,7 +74,7 @@ def test_transformers_message_no_tool(self): model = TransformersModel( model_id="HuggingFaceTB/SmolLM2-135M-Instruct", max_new_tokens=5, - device_map="auto", + device_map="cpu", do_sample=False, ) messages = [{"role": "user", "content": [{"type": "text", "text": "Hello!"}]}] @@ -88,7 +88,7 @@ def test_transformers_message_vl_no_tool(self): model = TransformersModel( model_id="llava-hf/llava-interleave-qwen-0.5b-hf", max_new_tokens=5, - device_map="auto", + device_map="cpu", do_sample=False, ) messages = [{"role": "user", "content": [{"type": "text", "text": "Hello!"}, {"type": "image", "image": img}]}] @@ -132,7 +132,7 @@ def test_call_with_custom_role_conversions(self): @pytest.mark.skipif(not os.getenv("RUN_ALL"), reason="RUN_ALL environment variable not set") def test_get_hfapi_message_no_tool(self): - model = HfApiModel(max_tokens=10) + model = HfApiModel(model="Qwen/Qwen2.5-Coder-32B-Instruct", max_tokens=10) messages = [{"role": "user", "content": [{"type": "text", "text": "Hello!"}]}] model(messages, stop_sequences=["great"]) diff --git a/tests/test_utils.py b/tests/test_utils.py index 25f163284..16ba39141 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -109,17 +109,17 @@ def test_func(): ... def test_get_source_ipython_errors_empty_cells(ipython_shell): test_code = textwrap.dedent("""class TestClass:\n ...""").strip() ipython_shell.user_ns["In"] = [""] - exec(test_code) + ipython_shell.run_cell(test_code, store_history=True) with pytest.raises(ValueError, match="No code cells found in IPython session"): - get_source(locals()["TestClass"]) + get_source(ipython_shell.user_ns["TestClass"]) def test_get_source_ipython_errors_definition_not_found(ipython_shell): test_code = textwrap.dedent("""class TestClass:\n ...""").strip() ipython_shell.user_ns["In"] = ["", "print('No class definition here')"] - exec(test_code) + ipython_shell.run_cell(test_code, store_history=True) with pytest.raises(ValueError, match="Could not find source code for TestClass in IPython history"): - get_source(locals()["TestClass"]) + get_source(ipython_shell.user_ns["TestClass"]) def test_get_source_ipython_errors_type_error():