|
| 1 | +--- |
| 2 | +title: MCP服务 |
| 3 | +date: 2025-04-09 02:23:56 |
| 4 | +updated: 2025-04-09 02:23:56 |
| 5 | +tags: |
| 6 | + - AI |
| 7 | + - MCP |
| 8 | +comments: false |
| 9 | +categories: AI |
| 10 | +thumbnail: https://images.unsplash.com/photo-1682687220923-c58b9a4592ae?crop=entropy&cs=srgb&fm=jpg&ixid=M3w2NDU1OTF8MHwxfHJhbmRvbXx8fHx8fHx8fDE3NDQxNzk4MzV8&ixlib=rb-4.0.3&q=85&w=1920&h=1080 |
| 11 | +published: false |
| 12 | +--- |
| 13 | +# MCP服务 |
| 14 | + |
| 15 | +## 1. 简介 |
| 16 | + |
| 17 | +模型上下文协议(MCP)是一个创新的开源协议,它重新定义了大语言模型(LLM)与外部世界的互动方式。MCP 提供了一种标准化方法,使任意大语言模型能够轻松连接各种数据源和工具,实现信息的无缝访问和处理。MCP 就像是 AI 应用程序的 USB-C 接口,为 AI 模型提供了一种标准化的方式来连接不同的数据源和工具。 |
| 18 | +## 1.1 MCP架构 |
| 19 | + |
| 20 | +### 1.1.1 服务架构 |
| 21 | + |
| 22 | +![[../../images/MCP服务架构.svg]] |
| 23 | + |
| 24 | +### 1.1.2 Agent架构 |
| 25 | + |
| 26 | +![[../../images/Agent架构.svg]] |
| 27 | + |
| 28 | +### 1.1.3 简单客户端 |
| 29 | + |
| 30 | +通过启动本地的一个客户端来实现循环对话调用大模型 |
| 31 | + |
| 32 | +```python |
| 33 | +import asyncio |
| 34 | +from mcp import ClientSession |
| 35 | +from openai import OpenAI |
| 36 | +from contextlib import AsyncExitStack |
| 37 | + |
| 38 | +BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" |
| 39 | +MODEL_NAME = "qwen-max" |
| 40 | +KEY = "" |
| 41 | + |
| 42 | +class MCPClient: |
| 43 | + |
| 44 | + def __init__(self): |
| 45 | + """初始化MCP客户端""" |
| 46 | + self.session = None |
| 47 | + self.exit_stack = AsyncExitStack() |
| 48 | + self.openai_api_key = KEY |
| 49 | + self.openai_api_base = BASE_URL |
| 50 | + self.model = MODEL_NAME |
| 51 | + |
| 52 | + if not self.openai_api_key: |
| 53 | + raise ValueError("请设置您的OpenAI API密钥") |
| 54 | + |
| 55 | + self.client = OpenAI( |
| 56 | + api_key=self.openai_api_key, |
| 57 | + base_url=self.openai_api_base, |
| 58 | + ) |
| 59 | + |
| 60 | + # 处理对话请求 |
| 61 | + async def process_query(self, query: str) -> str: |
| 62 | + messages = [{ |
| 63 | + "role": "system", |
| 64 | + "content": "你是一个智能助手,帮助用户回答问题", |
| 65 | + }, { |
| 66 | + "role": "user", |
| 67 | + "content": query, |
| 68 | + }] |
| 69 | + try: |
| 70 | + response = await asyncio.get_event_loop().run_in_executor( |
| 71 | + None, |
| 72 | + lambda: self.client.chat.completions.create( |
| 73 | + model=MODEL_NAME, |
| 74 | + messages=messages |
| 75 | + ) |
| 76 | + ) |
| 77 | + return response.choices[0].message.content |
| 78 | + except Exception as e: |
| 79 | + return f"调用OpenAI API错误:{str(e)}" |
| 80 | + |
| 81 | + async def connect_to_mock_server(self): |
| 82 | + """模拟 MCP 服务器的连接 """ print("Connecting to mock MCP server...") |
| 83 | + |
| 84 | + async def chat_loop(self): |
| 85 | + """运行交互式聊天循环""" |
| 86 | + print("\n MCP 客户端已启动!输入 ‘quit’ 退出") |
| 87 | + |
| 88 | + while True: |
| 89 | + try: |
| 90 | + user_input = input("请输入您的问题:").strip() |
| 91 | + if user_input.lower() == "quit": |
| 92 | + print("退出交互式聊天") |
| 93 | + break |
| 94 | + response = await self.process_query(user_input) |
| 95 | + print(f"大模型:{response}") |
| 96 | + except Exception as e: |
| 97 | + print(f"发生错误:{str(e)}") |
| 98 | + |
| 99 | + async def cleanup(self): |
| 100 | + """清理资源""" |
| 101 | + print("Cleaning up resources...") |
| 102 | + await self.exit_stack.aclose() |
| 103 | + |
| 104 | + |
| 105 | +async def main(): |
| 106 | + mcp_client = MCPClient() |
| 107 | + |
| 108 | + try: |
| 109 | + await mcp_client.connect_to_mock_server() |
| 110 | + await mcp_client.chat_loop() |
| 111 | + finally: |
| 112 | + await mcp_client.cleanup() |
| 113 | + |
| 114 | + |
| 115 | +if __name__ == '__main__': |
| 116 | + asyncio.run(main()) |
| 117 | +``` |
| 118 | + |
| 119 | +## 1.2 MCP服务器通讯机制 |
| 120 | + |
| 121 | +**Model Context Protocol(MCP)** 是一种由Anthropic开源的协议,旨在将大型语言模型直接连接至数据源,实现无缝集成。根据 MCP 的规范,当前支持两种传输方式: |
| 122 | +- 标准输入输出(stdio):打开文件流的方式进行传输(同一个服务器,不需要通过端口监听) |
| 123 | +- 基于HTTP的服务器推送事件(SSE):在不同的机器分布式部署 |
| 124 | + |
| 125 | +而近期,开发者在MCP 的 GitHub 仓库中提交了一项提案,建议采用 **"可流式传输的HTTP(Streamable Http)"** 来替代现有的 HTTP+SSE方案。此举旨在解决当前远程 MCP 传输方式的关键限制,同时保留其优势。HTTP和SSE(服务器推送事件)在数据传输方式上存在明显区别 |
| 126 | + |
| 127 | +- 通信方式 |
| 128 | + - HTTP:采用请求-响应模式,客户端发送请求,服务器返回响应,每次请求都是独立的 |
| 129 | + - SSE:允许服务器通过单个持久的HTTP连接,持续向客户端推送数据连 |
| 130 | +- 连接特性 |
| 131 | + - HTTP:每次请求通常建立新的连接,虽然在HTTP/1.1中引入了持久连是短连接 |
| 132 | + - SSE:适用于需要服务器主动向客户端推送数据的场景,如实时通知、股票行情更新等 |
| 133 | + |
| 134 | +注意:SSE仅支持服务器向客户端的单向通信,而websocket则是双向通信 |
| 135 | + |
| 136 | + |
| 137 | +| 传输方式 | 是否需要同时启动服务器 | 是否支持远程连接 | 使用场景 | |
| 138 | +| :-----------: | :---------: | :------: | :-----------: | |
| 139 | +| stdio(标准输入输出) | ✔ | ❌ | 本地通信,低延迟,高速交互 | |
| 140 | +| http(网络api) | ❌ | ✔ | 分布式架构,远程通信 | |
| 141 | +## 1.3 简单天气查询服务端 |
| 142 | + |
| 143 | +通过 **\@mcp.tool()** 来标注服务端提供的工具有哪些 |
| 144 | + |
| 145 | +```python |
| 146 | +from typing import Any |
| 147 | +from mcp.server.fastmcp import FastMCP |
| 148 | +mcp = FastMCP("WeatherServer") |
| 149 | + |
| 150 | +async def fetch_weather(city: str) -> dict[str, Any] | None: |
| 151 | + """ |
| 152 | + 获取天气的api |
| 153 | + :param city: 城市名称(需要使用英文,如Beijing) |
| 154 | + :return: 天气数据字典;若出错返回包含 error信息的字典 |
| 155 | + """ return { |
| 156 | + "城市": f"{city}\n", |
| 157 | + "温度": "25.5°C\n", |
| 158 | + "湿度": "25%\n", |
| 159 | + "风俗": "12 m/s\n", |
| 160 | + "天气": "晴\n", |
| 161 | + } |
| 162 | + |
| 163 | +@mcp.tool() |
| 164 | +async def get_weather(city: str) -> dict[str, Any] | None: |
| 165 | + """ |
| 166 | + 获取天气 |
| 167 | + :param city: 城市名称(需要使用英文,如Beijing) |
| 168 | + :return: 天气数据字典;若出错返回包含 error信息的字典 |
| 169 | + """ return await fetch_weather(city) |
| 170 | + |
| 171 | +if __name__ == '__main__': |
| 172 | + # 使用标准 I/O 方式运行MCP服务器 |
| 173 | + mcp.run(transport='stdio') |
| 174 | +``` |
0 commit comments