Skip to content

Commit 7c1a99d

Browse files
sjwiesmandef-
andauthored
Introduce Materialize MCP Server (#32373)
Add an official Materialize MCP Server that exposes indexed views as tools to LLMs. From the README. > Instantly turn your Materialize indexed views into live context providers for Retrieval‑Augmented Generation (RAG) pipelines and LLM-driven applications. By defining views and indexes, you create typed, fast, and consistent data tools. No extra services or stale caches required. > > These live, indexed views form operational data products. > Self-contained, discoverable services that deliver real-time context instantly. ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Co-authored-by: Dennis Felsing <[email protected]>
1 parent 83acbd1 commit 7c1a99d

File tree

15 files changed

+2346
-0
lines changed

15 files changed

+2346
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ services.log
3939
/temp
4040
test/scalability/results/**/*.csv
4141
test/scalability/results/**/*.png
42+
misc/mcp-materialize/dist
4243
misc/wasm/target
4344
parallel-benchmark.db
4445
license_key

ci/test/lint-main/checks/check-copyright.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ copyright_files=$(grep -vE \
4848
-e '^ci/www/public/_redirects$' \
4949
-e '^ci/test/lint-deps/' \
5050
-e '^misc/completions/.*' \
51+
-e '^misc/mcp-materialize/uv.lock' \
5152
-e '^misc/python/MANIFEST\.in' \
5253
-e '^test/chbench/chbench' \
5354
-e '^src/pgtz/tznames/.*' \

ci/test/pipeline.template.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,15 @@ steps:
735735
composition: dbt-materialize
736736
agents:
737737
queue: hetzner-aarch64-4cpu-8gb
738+
- id: mcp-materialize
739+
label: mcp-materialize tests
740+
depends_on: build-aarch64
741+
timeout_in_minutes: 30
742+
plugins:
743+
- ./ci/plugins/mzcompose:
744+
composition: mcp-materialize
745+
agents:
746+
queue: hetzner-aarch64-4cpu-8gb
738747

739748
- id: storage-usage
740749
label: "Storage Usage Table Test"

misc/mcp-materialize/Dockerfile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
FROM python:3.13-alpine
11+
12+
RUN pip install uv
13+
14+
WORKDIR /app/mcp-materialize
15+
16+
COPY . .
17+
18+
RUN uv sync --dev

misc/mcp-materialize/README.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Materialize MCP Server
2+
3+
**Instantly turn indexed views in Materialize into real-time context tools for LLM-powered applications.**
4+
5+
Materialize MCP Server exposes your Materialize views—when indexed and documented—as live, typed, callable tools. These tools behave like stable APIs for structured data, enabling models to act on fresh, consistent, and trustworthy information.
6+
No pipelines, no glue code, no stale caches.
7+
8+
---
9+
10+
## ✨ What Are Operational Data Products?
11+
12+
Views + indexes + comments = **Operational Data Products**:
13+
Self-contained, versioned services that model real-world concepts and provide **fast, reliable, and testable** access to dynamic data.
14+
15+
| Feature | Benefit |
16+
| -------------- | --------------------------------------------------------------------- |
17+
| **Stable** | Define once, use repeatedly across use cases. |
18+
| **Typed** | Input/output schemas inferred directly from indexes. |
19+
| **Observable** | Tool usage is logged per client, revealing real cost and performance. |
20+
| **Secure** | If it’s not indexed and documented, it’s not exposed. |
21+
22+
---
23+
24+
## 🚀 Quickstart
25+
26+
```bash
27+
uv run mcp-materialize
28+
```
29+
30+
This launches the server with default settings and immediately exposes any indexed views as tools.
31+
32+
---
33+
34+
## ⚙️ Configuration
35+
36+
The server can be configured via CLI flags or environment variables:
37+
38+
| Argument | Env Var | Default | Description |
39+
| ----------------- | ------------------- | ----------------------------------------------------- | ------------------------------------------ |
40+
| `--mz-dsn` | `MZ_DSN` | `postgresql://materialize@localhost:6875/materialize` | Materialize connection string |
41+
| `--transport` | `MCP_TRANSPORT` | `stdio` | Communication transport (`stdio` or `sse`) |
42+
| `--host` | `MCP_HOST` | `0.0.0.0` | Host address |
43+
| `--port` | `MCP_PORT` | `3001` | Port number |
44+
| `--pool-min-size` | `MCP_POOL_MIN_SIZE` | `1` | Minimum DB pool size |
45+
| `--pool-max-size` | `MCP_POOL_MAX_SIZE` | `10` | Maximum DB pool size |
46+
| `--log-level` | `MCP_LOG_LEVEL` | `INFO` | Log verbosity |
47+
48+
---
49+
50+
## 🛠 Defining a Tool
51+
52+
1. **Write a view** that captures your business logic.
53+
2. **Create an index** on its primary lookup key.
54+
3. **Document it** with `COMMENT` statements.
55+
56+
```sql
57+
CREATE VIEW order_status_summary AS
58+
SELECT o.order_id, o.status, s.carrier, c.estimated_delivery, e.delay_reason
59+
FROM orders o
60+
LEFT JOIN shipments s ON o.order_id = s.order_id
61+
LEFT JOIN carrier_tracking c ON s.shipment_id = c.shipment_id
62+
LEFT JOIN delivery_exceptions e ON c.tracking_id = e.tracking_id;
63+
64+
CREATE INDEX ON order_status_summary (order_id);
65+
66+
COMMENT ON VIEW order_status_summary IS
67+
'Given an order ID, retrieve the current status, shipping carrier, estimated delivery date, and any delivery exceptions. Use this tool to show real-time order tracking information to users.';
68+
69+
COMMENT ON COLUMN order_status_summary.order_id IS
70+
'The unique id for an order';
71+
```
72+
73+
Now, this tool appears in `/tools/list`:
74+
75+
```json
76+
{
77+
"name": "order_status_summary",
78+
"description": "Given an order ID, retrieve the current status, shipping carrier, estimated delivery date, and any delivery exceptions. Use this tool to show real-time order tracking information to users.",
79+
"inputSchema": {
80+
"type": "object",
81+
"required": ["order_id"],
82+
"properties": {
83+
"order_id": {
84+
"type": "text",
85+
"description": "The unique id for an order"
86+
}
87+
}
88+
}
89+
}
90+
```
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License in the LICENSE file at the
6+
# root of this repository, or online at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""
16+
Materialize MCP Server
17+
18+
A server that exposes Materialize indexes as "tools" over the Model Context
19+
Protocol (MCP). Each Materialize index that the connected role is allowed to
20+
`SELECT` from (and whose cluster it can `USAGE`) is surfaced as a tool whose
21+
inputs correspond to the indexed columns and whose output is the remaining
22+
columns of the underlying view.
23+
24+
The server supports two transports:
25+
26+
* stdio – lines of JSON over stdin/stdout (handy for local CLIs)
27+
* sse – server‑sent events suitable for web browsers
28+
29+
---------------
30+
31+
1. ``list_tools`` executes a catalog query to derive the list of exposable
32+
indexes; the result is translated into MCP ``Tool`` objects.
33+
2. ``call_tool`` validates the requested tool, switches the session to the
34+
appropriate cluster, executes a parameterised ``SELECT`` against the
35+
indexed view, and returns the first matching row (minus any columns whose
36+
values were supplied as inputs).
37+
"""
38+
39+
import asyncio
40+
import logging
41+
from collections.abc import AsyncIterator, Sequence
42+
from contextlib import asynccontextmanager
43+
from typing import Any
44+
45+
import uvicorn
46+
from mcp import stdio_server
47+
from mcp.server import NotificationOptions, Server
48+
from mcp.server.sse import SseServerTransport
49+
from mcp.types import EmbeddedResource, ImageContent, TextContent, Tool
50+
from psycopg.rows import dict_row
51+
from psycopg_pool import AsyncConnectionPool
52+
53+
from .config import load_config
54+
from .mz_client import MzClient
55+
56+
logger = logging.getLogger("mz_mcp_server")
57+
logging.basicConfig(
58+
level=logging.INFO,
59+
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
60+
)
61+
62+
63+
def get_lifespan(cfg):
64+
@asynccontextmanager
65+
async def lifespan(server) -> AsyncIterator[MzClient]:
66+
logger.info(
67+
"Initializing connection pool with min_size=%s, max_size=%s",
68+
cfg.pool_min_size,
69+
cfg.pool_max_size,
70+
)
71+
72+
async def configure(conn):
73+
await conn.set_autocommit(True)
74+
logger.debug("Configured new database connection")
75+
76+
try:
77+
async with AsyncConnectionPool(
78+
conninfo=cfg.dsn,
79+
min_size=cfg.pool_min_size,
80+
max_size=cfg.pool_max_size,
81+
kwargs={"application_name": "mcp_materialize"},
82+
configure=configure,
83+
) as pool:
84+
try:
85+
logger.debug("Testing database connection...")
86+
async with pool.connection() as conn:
87+
await conn.set_autocommit(True)
88+
async with conn.cursor(row_factory=dict_row) as cur:
89+
await cur.execute(
90+
"SELECT"
91+
" mz_environment_id() AS env,"
92+
" current_role AS role;"
93+
)
94+
meta = await cur.fetchone()
95+
logger.info(
96+
"Connected to Materialize environment %s as user %s",
97+
meta["env"],
98+
meta["role"],
99+
)
100+
logger.debug("Connection pool initialized successfully")
101+
async with MzClient(pool=pool) as client:
102+
yield client
103+
except Exception as e:
104+
logger.error(f"Failed to initialize connection pool: {str(e)}")
105+
raise
106+
finally:
107+
logger.info("Closing connection pool...")
108+
await pool.close()
109+
except Exception as e:
110+
logger.error(f"Failed to create connection pool: {str(e)}")
111+
raise
112+
113+
return lifespan
114+
115+
116+
async def run():
117+
cfg = load_config()
118+
server = Server("mcp_materialize", lifespan=get_lifespan(cfg))
119+
120+
@server.list_tools()
121+
async def list_tools() -> list[Tool]:
122+
logger.debug("Listing available tools...")
123+
tools = await server.request_context.lifespan_context.list_tools()
124+
return tools
125+
126+
@server.call_tool()
127+
async def call_tool(
128+
name: str, arguments: dict[str, Any]
129+
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
130+
logger.debug(f"Calling tool '{name}' with arguments: {arguments}")
131+
try:
132+
result = await server.request_context.lifespan_context.call_tool(
133+
name, arguments
134+
)
135+
logger.debug(f"Tool '{name}' executed successfully")
136+
return result
137+
except Exception as e:
138+
logger.error(f"Error executing tool '{name}': {str(e)}")
139+
await server.request_context.session.send_tool_list_changed()
140+
raise
141+
142+
options = server.create_initialization_options(
143+
notification_options=NotificationOptions(tools_changed=True)
144+
)
145+
match cfg.transport:
146+
case "stdio":
147+
logger.info("Starting server in stdio mode...")
148+
async with stdio_server() as (read_stream, write_stream):
149+
await server.run(
150+
read_stream,
151+
write_stream,
152+
options,
153+
)
154+
case "sse":
155+
logger.info(f"Starting SSE server on {cfg.host}:{cfg.port}...")
156+
from starlette.applications import Starlette
157+
from starlette.routing import Mount, Route
158+
159+
sse = SseServerTransport("/messages/")
160+
161+
async def handle_sse(request):
162+
logger.debug(
163+
"New SSE connection from %s",
164+
request.client.host if request.client else "unknown",
165+
)
166+
try:
167+
async with sse.connect_sse(
168+
request.scope, request.receive, request._send
169+
) as streams:
170+
await server.run(
171+
streams[0],
172+
streams[1],
173+
options,
174+
)
175+
except Exception as e:
176+
logger.error(f"Error handling SSE connection: {str(e)}")
177+
raise
178+
179+
starlette_app = Starlette(
180+
routes=[
181+
Route("/sse", endpoint=handle_sse),
182+
Mount("/messages/", app=sse.handle_post_message),
183+
],
184+
)
185+
186+
config = uvicorn.Config(
187+
starlette_app,
188+
host=cfg.host,
189+
port=cfg.port,
190+
log_level=cfg.log_level.upper(),
191+
)
192+
server = uvicorn.Server(config)
193+
await server.serve()
194+
case t:
195+
raise ValueError(f"Unknown transport: {t}")
196+
197+
198+
def main():
199+
"""Synchronous wrapper for the async main function."""
200+
try:
201+
logger.info("Starting Materialize MCP Server...")
202+
asyncio.run(run())
203+
except KeyboardInterrupt:
204+
logger.info("Shutting down …")
205+
206+
207+
if __name__ == "__main__":
208+
main()

0 commit comments

Comments
 (0)