Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Anthropic's Model Context Protocol tool servers #7799

Open
1 of 2 tasks
kalanyuz opened this issue Feb 11, 2025 · 2 comments
Open
1 of 2 tasks

[Feature] Support Anthropic's Model Context Protocol tool servers #7799

kalanyuz opened this issue Feb 11, 2025 · 2 comments
Labels
enhancement New feature or request

Comments

@kalanyuz
Copy link
Contributor

kalanyuz commented Feb 11, 2025

What feature would you like to see?

MCP server is open-source which allows you to access a wide variety of tools provided by 3rd parties and community members, reducing the need to write a bunch of functions.

However, MCP clients requires the code to run in async function and ReAct, even with asyncify, still can't run the tools as the forward function needs to call await on the tools as well.

Happy to contribute to this, but not sure where would be the best place to start.

Reference code:

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="docker",  # Executable
    args=[
        "run",
        "-i",
        "--rm",
        "--mount",
        "type=bind,src=/Users/kalanyuz/Github/test_project/playground,dst=/projects/playground",
        "--mount",
        "mcp/filesystem",
        "/projects",
    ],
    env=None,  # Optional environment variables
)


async def run():
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()

            # List available prompts
            # prompts = await session.list_prompts()

            # Get a prompt
            # prompt = await session.get_prompt(
            #     "example-prompt", arguments={"arg1": "value"}
            # )

            # List available resources
            # resources = await session.list_resources()

            # List available tools
            tools = await session.list_tools()

            # Read a resource
            # content, mime_type = await session.read_resource("file://system")

            # Call a tool
            result = await session.call_tool("list_directory", arguments={"path": "/projects/src"})

            args, arg_types, arg_desc = map_json_schema_to_tool_args(tools.tools[2].inputSchema)
            test_tool = dspy.Tool(
                func=lambda path, content: session.call_tool(tools.tools[2].name, {'path':path, 'content': content}),
                name=tools.tools[2].name,
                desc=tools.tools[2].description,
                args=args,
                arg_types=arg_types,
                arg_desc=arg_desc
            )

            bot = dspy.ReAct('request -> output', tools=[test_tool])
            result = await bot(request='Generate random message and write to /projects/playground/test.txt')
            print(result)




if __name__ == "__main__":
    import asyncio

    asyncio.run(run())

And ReAct's updated forward that can call subroutine tools

    async def forward(self, **input_args):
        def format(trajectory: dict[str, Any], last_iteration: bool):
            adapter = dspy.settings.adapter or dspy.ChatAdapter()
            trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x")
            return adapter.format_fields(trajectory_signature, trajectory, role="user")

        trajectory = {}
        for idx in range(self.max_iters):
            pred = self.react(
                **input_args,
                trajectory=format(trajectory, last_iteration=(idx == self.max_iters - 1)),
            )

            trajectory[f"thought_{idx}"] = pred.next_thought
            trajectory[f"tool_name_{idx}"] = pred.next_tool_name
            trajectory[f"tool_args_{idx}"] = pred.next_tool_args

            try:
                parsed_tool_args = {}
                tool = self.tools[pred.next_tool_name]
                for k, v in pred.next_tool_args.items():
                    if hasattr(tool, "arg_types") and k in tool.arg_types:
                        arg_type = tool.arg_types[k]
                        if isinstance((origin := get_origin(arg_type) or arg_type), type) and issubclass(
                            origin, BaseModel
                        ):
                            parsed_tool_args[k] = arg_type.model_validate(v)
                            continue
                    parsed_tool_args[k] = v

                result = self.tools[pred.next_tool_name](**parsed_tool_args)
                if inspect.isawaitable(result):
                    trajectory[f"observation_{idx}"] = await result
                else:
                    trajectory[f"observation_{idx}"] = result
            except Exception as e:
                trajectory[f"observation_{idx}"] = f"Failed to execute: {e}"

            if pred.next_tool_name == "finish":
                break

        extract = self.extract(**input_args, trajectory=format(trajectory, last_iteration=False))
        return dspy.Prediction(trajectory=trajectory, **extract)

Would you like to contribute?

  • Yes, I'd like to help implement this.
  • No, I just want to request it.

Additional Context

No response

@kalanyuz kalanyuz added the enhancement New feature or request label Feb 11, 2025
@chenmoneygithub
Copy link
Collaborator

@kalanyuz Thanks for reporting the issue!

To clarify, the blocking part is you want to call the tools asynchronously, while react's forward function is synchronous? Potentially using the @syncify annotation to convert tools calls to sync calls can resolve the issue? The caveat is you actually lose the benefits of async, but I am not sure if async tool calling is useful for ReAct, because you need the full obersavation before going to the next iteration. Let me know your thoughts!

@kalanyuz
Copy link
Contributor Author

kalanyuz commented Feb 12, 2025

Hi @chenmoneygithub , and @CyrusNuevoDia (since I got similar replies form you both)!

I tried both of your suggestions with @syncify, either at the top of run, or wrapping the mcp'ssession.call_tool with it, but to no avail.

There might be some thread conflicts happening? It would throw an exception that the function needs to be called from an anyio worker thread, and supplying raise_sync_error flag would throw another error saying there's already an anyio thread running.

I could, however, use a combination of anyio.to_thread.run_sync with syncify to make this work without directly modifying ReAct's module. Not sure if there's going to be any performance impact to the module itself.

So with this, the code above would be changed to:

            args, arg_types, arg_desc = map_json_schema_to_tool_args(tools.tools[2].inputSchema)
            test_tool = dspy.Tool(
                func=lambda **kwargs: syncify(session.call_tool)(tools.tools[2].name, kwargs),
                name=tools.tools[2].name,
                desc=tools.tools[2].description,
                args=args,
                arg_types=arg_types,
                arg_desc=arg_desc,
            )

            bot = dspy.ReAct("request -> output", tools=[test_tool])
            bot = functools.partial(bot, request="Generate random message and write to /projects/playground/test.txt")
            bot = anyio.to_thread.run_sync(bot)
            result = await bot

now without modifications to ReAct.

Not sure which would be the right path. I can contribute with a documentation on how to use mcp with dspy with the above logic if you think this is acceptable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants