-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
364 lines (306 loc) · 12.2 KB
/
server.py
File metadata and controls
364 lines (306 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import sys
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from utils import get_tools, extract_parameters
from config import MCP_MODE
_executor = ThreadPoolExecutor()
# Auto-discover all kits (dynamic import via kits/__init__.py)
import kits # noqa: F401
# Manual kit imports no longer needed:
# import kits.sqlite_kit # noqa: F401
# import kits.web_kit # noqa: F401
app = FastAPI(title="SimpleMCP Server")
# Allow OpenWebUI (and any other origin) to reach the MCP endpoint
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/list_tools")
def list_tools():
"""
Expose all registered tools in Groq-style schema format:
[
{
"type": "function",
"function": {
"name": ...,
"description": ...,
"parameters": {...}
}
},
...
]
"""
tool_list = []
for name, func in get_tools().items():
tool_list.append({
"type": "function",
"function": {
"name": name,
"description": func.__doc__ or "",
"parameters": extract_parameters(func),
},
})
return {"tools": tool_list}
@app.post("/run_tool")
async def run_tool(req: dict):
"""
Execute a tool by name with JSON arguments.
Request JSON:
{ "tool": "<name>", "arguments": {...} }
"""
from utils import get_tools # local import to avoid circulars in some setups
name = req.get("tool")
args = req.get("arguments", {})
tools = get_tools()
if name not in tools:
return {"error": f"Tool '{name}' not found"}
func = tools[name]
try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(_executor, lambda: func(**args))
return {"result": result}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
# =============================================================================
# MCP HTTP mode — registered only when MCP_MODE=True
#
# Implements the MCP Streamable HTTP transport (spec 2025-06-18):
# POST /mcp → JSON-RPC 2.0 dispatch; responds as SSE or plain JSON
# depending on the client's Accept header
# GET /mcp → SSE keepalive stream for server-initiated messages
#
# Enable: MCP_MODE=true uvicorn server:app --host 0.0.0.0 --port 8000
# =============================================================================
def _build_tool_schema_http(name: str, func) -> dict:
"""MCP-standard tool descriptor for HTTP discovery."""
return {
"name": name,
"description": func.__doc__ or "",
"inputSchema": extract_parameters(func),
}
def _sse_message(obj: dict) -> str:
"""Format a dict as a single SSE data event."""
return f"data: {json.dumps(obj)}\n\n"
if MCP_MODE:
@app.get("/mcp")
async def mcp_sse_stream(request: Request):
"""
GET /mcp — SSE stream for server-initiated messages.
MCP Inspector opens this to listen for server pushes.
We return a valid SSE stream that stays open (keepalive).
"""
async def event_stream():
# Keepalive: send a comment every 15 s so the connection stays open
while True:
if await request.is_disconnected():
break
yield ": keepalive\n\n"
await asyncio.sleep(15)
return StreamingResponse(event_stream(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"})
@app.post("/mcp")
async def mcp_jsonrpc(request: Request):
"""
POST /mcp — Streamable HTTP JSON-RPC 2.0 dispatch.
Responds as SSE when the client sends Accept: text/event-stream,
otherwise plain application/json.
Handles: initialize, tools/list, tools/call
"""
try:
body = await request.json()
except Exception:
return JSONResponse(
{"jsonrpc": "2.0", "id": None,
"error": {"code": -32700, "message": "Parse error"}},
status_code=400,
)
req_id = body.get("id")
method = body.get("method", "")
params = body.get("params") or {}
tools = get_tools()
accept = request.headers.get("accept", "")
wants_sse = "text/event-stream" in accept
def make_response(result: dict):
payload = {"jsonrpc": "2.0", "id": req_id, "result": result}
if wants_sse:
return StreamingResponse(
iter([_sse_message(payload)]),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"},
)
return JSONResponse(payload)
def make_error(code: int, message: str):
payload = {"jsonrpc": "2.0", "id": req_id,
"error": {"code": code, "message": message}}
if wants_sse:
return StreamingResponse(
iter([_sse_message(payload)]),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"},
)
return JSONResponse(payload)
# ── notifications (fire-and-forget) ─────────────────────────────────
if method.startswith("notifications/"):
return JSONResponse(status_code=202, content=None)
# ── initialize ───────────────────────────────────────────────────────
if method == "initialize":
result = {
"protocolVersion": "2025-06-18",
"capabilities": {"tools": {}},
"serverInfo": {"name": "SimpleMCP", "version": "1.0.0"},
}
return make_response(result)
# ── tools/list ───────────────────────────────────────────────────────
elif method == "tools/list":
return make_response({
"tools": [
_build_tool_schema_http(n, f) for n, f in tools.items()
]
})
# ── tools/call ───────────────────────────────────────────────────────
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments") or {}
if tool_name not in tools:
return make_error(-32601, f"Tool '{tool_name}' not found")
try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(_executor, lambda: tools[tool_name](**arguments))
if not isinstance(result, str):
result = json.dumps(result, ensure_ascii=False)
return make_response({
"content": [{"type": "text", "text": result}],
"isError": False,
})
except Exception as e:
return make_response({
"content": [{"type": "text", "text": f"{type(e).__name__}: {e}"}],
"isError": True,
})
else:
return make_error(-32601, f"Method not found: {method}")
# =============================================================================
# MCP stdio mode — runs when the script is executed directly:
# python server.py
#
# When imported by uvicorn (uvicorn server:app ...) this block is skipped
# and the FastAPI HTTP server runs as normal.
# =============================================================================
def _mcp_send(obj: dict):
"""Write a single JSON-RPC response line to stdout."""
sys.stdout.write(json.dumps(obj) + "\n")
sys.stdout.flush()
def _mcp_error(id, code: int, message: str):
_mcp_send({
"jsonrpc": "2.0",
"id": id,
"error": {"code": code, "message": message},
})
async def _handle_request(req: dict):
"""Handle a single JSON-RPC request asynchronously."""
req_id = req.get("id")
method = req.get("method", "")
params = req.get("params", {})
loop = asyncio.get_running_loop()
try:
if method == "initialize":
_mcp_send({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "SimpleMCP", "version": "1.0.0"},
},
})
elif method == "tools/list":
tool_schemas = [
_build_tool_schema_http(name, func)
for name, func in get_tools().items()
]
_mcp_send({
"jsonrpc": "2.0",
"id": req_id,
"result": {"tools": tool_schemas},
})
elif method == "tools/call":
tool_name = params.get("name")
arguments = params.get("arguments", {})
tools = get_tools()
if tool_name not in tools:
_mcp_error(req_id, -32601, f"Tool '{tool_name}' not found")
return
try:
# Run blocking tool in thread executor so parallel calls don't block
result = await loop.run_in_executor(
_executor, lambda: tools[tool_name](**arguments)
)
if not isinstance(result, str):
result = json.dumps(result, ensure_ascii=False)
_mcp_send({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": result}],
"isError": False,
},
})
except Exception as e:
_mcp_send({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": f"{type(e).__name__}: {e}"}],
"isError": True,
},
})
elif method.startswith("notifications/"):
pass # fire-and-forget, no response
else:
_mcp_error(req_id, -32601, f"Method not found: {method}")
except Exception as e:
_mcp_error(req_id, -32603, f"Internal error: {e}")
async def _stdio_loop():
"""
Async stdin reader — dispatches each request as a concurrent task
so parallel tool calls from clients like Codex don't block each other.
"""
_log = sys.stderr
_log.write("[SimpleMCP] stdio/MCP mode active. Waiting for requests...\n")
_log.flush()
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
while True:
try:
raw = await reader.readline()
except Exception:
break
if not raw:
break
line = raw.decode().strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError as e:
_mcp_error(None, -32700, f"Parse error: {e}")
continue
# Fire off each request as its own task — parallel tool calls run concurrently
asyncio.create_task(_handle_request(req))
def run_stdio_mcp():
"""Entry point for stdio/MCP mode. Runs the async event loop."""
asyncio.run(_stdio_loop())
if __name__ == "__main__":
run_stdio_mcp()