Skip to content

Multi bucket support and index advisor tool addition #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th

- Get a list of all the scopes and collections in the specified bucket
- Get the structure for a collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
- Delete a document by ID from a specified scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified scope
- Get a document by ID from a specified bucket, scope and collection
- Upsert a document by ID to a specified bucket, scope and collection
- Delete a document by ID from a specified bucket, scope and collection
- Run a [SQL++ query](https://www.couchbase.com/sqlplusplus/) on a specified bucket and scope
- There is an option in the MCP server, `READ_ONLY_QUERY_MODE` that is set to true by default to disable running SQL++ queries that change the data or the underlying collection structure. Note that the documents can still be updated by ID.
- Retreive Index Advisor advice for a query on a specified bucket and scope.

## Prerequisites

Expand Down Expand Up @@ -46,7 +47,6 @@ This is the common configuration for the MCP clients such as Claude Desktop, Cur
"CB_CONNECTION_STRING": "couchbases://connection-string",
"CB_USERNAME": "username",
"CB_PASSWORD": "password",
"CB_BUCKET_NAME": "bucket_name"
}
}
}
Expand All @@ -58,7 +58,6 @@ The server can be configured using environment variables. The following variable
- `CB_CONNECTION_STRING`: The connection string to the Couchbase cluster
- `CB_USERNAME`: The username with access to the bucket to use to connect
- `CB_PASSWORD`: The password for the username to connect
- `CB_BUCKET_NAME`: The name of the bucket that the server will access
- `READ_ONLY_QUERY_MODE`: Setting to configure whether SQL++ queries that allow data to be modified are allowed. It is set to True by default.
- `path/to/cloned/repo/mcp-server-couchbase/` should be the path to the cloned repository on your local machine. Don't forget the trailing slash at the end!

Expand Down Expand Up @@ -138,7 +137,7 @@ There is an option to run the MCP server in [Server-Sent Events (SSE)](https://m

By default, the MCP server will run on port 8080 but this can be configured using the `FASTMCP_PORT` environment variable.

> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --bucket-name='<couchbase_bucket_to_use>' --read-only-query-mode=true --transport=sse
> uv run src/mcp_server.py --connection-string='<couchbase_connection_string>' --username='<database_username>' --password='<database_password>' --read-only-query-mode=true --transport=sse

The server will be available on http://localhost:8080/sse. This can be used in MCP clients supporting SSE transport mode.

Expand All @@ -159,7 +158,6 @@ docker run -i \
-e CB_CONNECTION_STRING='<couchbase_connection_string>' \
-e CB_USERNAME='<database_user>' \
-e CB_PASSWORD='<database_password>' \
-e CB_BUCKET_NAME='<bucket_name>' \
-e MCP_TRANSPORT='stdio/sse' \
-e READ_ONLY_QUERY_MODE="true/false" \
mcp/couchbase
Expand Down
8 changes: 1 addition & 7 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
envvar="CB_PASSWORD",
help="Couchbase database password (required for operations)",
)
@click.option(
"--bucket-name",
envvar="CB_BUCKET_NAME",
help="Couchbase bucket name (required for operations)",
)

@click.option(
"--read-only-query-mode",
envvar="READ_ONLY_QUERY_MODE",
Expand All @@ -97,7 +93,6 @@ def main(
connection_string,
username,
password,
bucket_name,
read_only_query_mode,
transport,
):
Expand All @@ -107,7 +102,6 @@ def main(
"connection_string": connection_string,
"username": username,
"password": password,
"bucket_name": bucket_name,
"read_only_query_mode": read_only_query_mode,
}

Expand Down
6 changes: 6 additions & 0 deletions src/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,43 @@
from .query import (
get_schema_for_collection,
run_sql_plus_plus_query,
advise_index_for_sql_plus_plus_query,
)

# Server tools
from .server import (
get_scopes_and_collections_in_bucket,
get_server_configuration_status,
test_connection,
get_list_of_buckets_with_settings,
)

# List of all tools for easy registration
ALL_TOOLS = [
get_server_configuration_status,
test_connection,
get_list_of_buckets_with_settings,
get_scopes_and_collections_in_bucket,
get_document_by_id,
upsert_document_by_id,
delete_document_by_id,
get_schema_for_collection,
run_sql_plus_plus_query,
advise_index_for_sql_plus_plus_query,
]

__all__ = [
# Individual tools
"get_server_configuration_status",
"test_connection",
"get_list_of_buckets_with_settings",
"get_scopes_and_collections_in_bucket",
"get_document_by_id",
"upsert_document_by_id",
"delete_document_by_id",
"get_schema_for_collection",
"run_sql_plus_plus_query",
"advise_index_for_sql_plus_plus_query",
# Convenience
"ALL_TOOLS",
]
32 changes: 22 additions & 10 deletions src/tools/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
from mcp.server.fastmcp import Context

from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import ensure_cluster_connection, ensure_bucket_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.kv")


def get_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> dict[str, Any]:
"""Get a document by its ID from the specified scope and collection.
If the document is not found, it will raise an exception."""
bucket = ensure_bucket_connection(ctx)
"""Get a document by its ID from the specified bucket, scope and collection."""
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
Comment on lines +22 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This try...except block for handling the bucket connection is duplicated in upsert_document_by_id and delete_document_by_id in this file, as well as in run_sql_plus_plus_query (src/tools/query.py) and get_scopes_and_collections_in_bucket (src/tools/server.py).

To improve maintainability and adhere to the Don't Repeat Yourself (DRY) principle, I recommend extracting this logic into a common helper function. This would centralize the error handling and make the tool functions cleaner and easier to maintain.

try:
collection = bucket.scope(scope_name).collection(collection_name)
result = collection.get(document_id)
Expand All @@ -32,14 +35,19 @@ def get_document_by_id(

def upsert_document_by_id(
ctx: Context,
bucket_name: str,
scope_name: str,
collection_name: str,
document_id: str,
document_content: dict[str, Any],
) -> bool:
"""Insert or update a document by its ID.
"""Insert or update a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.upsert(document_id, document_content)
Expand All @@ -51,11 +59,15 @@ def upsert_document_by_id(


def delete_document_by_id(
ctx: Context, scope_name: str, collection_name: str, document_id: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str, document_id: str
) -> bool:
"""Delete a document by its ID.
"""Delete a document in a bucket, scope and collection by its ID.
Returns True on success, False on failure."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
try:
collection = bucket.scope(scope_name).collection(collection_name)
collection.remove(document_id)
Expand Down
39 changes: 34 additions & 5 deletions src/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,60 @@
from mcp.server.fastmcp import Context

from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import ensure_cluster_connection, ensure_bucket_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.query")


def get_schema_for_collection(
ctx: Context, scope_name: str, collection_name: str
ctx: Context, bucket_name: str, scope_name: str, collection_name: str
) -> list[dict[str, Any]]:
"""Get the schema for a collection in the specified scope.
Returns a dictionary with the schema returned by running INFER on the Couchbase collection.
"""
try:
query = f"INFER {collection_name}"
result = run_sql_plus_plus_query(ctx, scope_name, query)
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, query)
return result
except Exception as e:
logger.error(f"Error getting schema: {e}")
raise

def advise_index_for_sql_plus_plus_query(
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> dict[str, Any]:
"""Get an index recommendation from the SQL++ index advisor for a specified query on a specified bucket and scope.
Returns a dictionary with the query advised on, as well as:
1. an array of the current indexes used and their status (or a string indicating no existing indexes available)
2. an array of recommended indexes and/or covering indexes with reasoning (or a string indicating no possible index improvements)
"""
response = {}

try:
advise_query = f"ADVISE {query}"
result = run_sql_plus_plus_query(ctx, bucket_name, scope_name, advise_query)

if result and (advice := result[0].get("advice")):
if (advice is not None):
advise_info = advice.get("adviseinfo")
if ( advise_info is not None):
response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
response["recommended_indexes"] = advise_info.get("recommended_indexes","No index recommendations available")
response["query"]=result[0].get("query","Query statement unavailable")
Comment on lines +47 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The nested if statements and explicit is not None checks make this logic for parsing the advisor's response a bit difficult to follow. The if (advice is not None): check is redundant since the walrus operator in the outer if already ensures advice is a truthy value.

I suggest refactoring this to be more concise and Pythonic by flattening the conditional logic and using the walrus operator more effectively. This will improve readability.

if result and (advice := result[0].get("advice")):
    if advise_info := advice.get("adviseinfo"):
        response["current_indexes"] = advise_info.get("current_indexes", "No current indexes")
        response["recommended_indexes"] = advise_info.get("recommended_indexes", "No index recommendations available")
        response["query"] = result[0].get("query", "Query statement unavailable")

return response
except Exception as e:
logger.error(f"Error running Advise on query: {e}")
raise ValueError(f"Unable to run ADVISE on: {query} for keyspace {bucket_name}.{scope_name}") from e

def run_sql_plus_plus_query(
ctx: Context, scope_name: str, query: str
ctx: Context, bucket_name: str, scope_name: str, query: str
) -> list[dict[str, Any]]:
"""Run a SQL++ query on a scope and return the results as a list of JSON objects."""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
Comment on lines +63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is another instance of the duplicated bucket connection logic. As mentioned in the review for src/tools/kv.py, extracting this into a shared utility function would improve code maintainability.

app_context = ctx.request_context.lifespan_context
read_only_query_mode = app_context.read_only_query_mode
logger.info(f"Running SQL++ queries in read-only mode: {read_only_query_mode}")
Expand Down
98 changes: 78 additions & 20 deletions src/tools/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from utils.config import get_settings
from utils.constants import MCP_SERVER_NAME
from utils.context import ensure_bucket_connection
from utils.context import ensure_cluster_connection, ensure_bucket_connection

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.server")

Expand Down Expand Up @@ -45,38 +45,96 @@ def get_server_configuration_status(ctx: Context) -> dict[str, Any]:
}


def test_connection(ctx: Context) -> dict[str, Any]:
"""Test the connection to Couchbase cluster and bucket.
def test_connection(ctx: Context, bucket_name: str = None) -> dict[str, Any]:
"""Test the connection to Couchbase cluster and optionally a specified bucket.
Returns connection status and basic cluster information.
"""
cluster_connected = False
bucket_connected = False
try:
bucket = ensure_bucket_connection(ctx)

# Test basic connectivity by getting bucket name
bucket_name = bucket.name

return {
"status": "success",
"cluster_connected": True,
"bucket_connected": True,
"bucket_name": bucket_name,
"message": "Successfully connected to Couchbase cluster and bucket",
}
cluster = ensure_cluster_connection(ctx)
cluster_connected = True
if bucket_name is not None:
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
bucket_connected = True
return {
"status": "success",
"cluster_connected": cluster_connected,
"bucket_connected": bucket_connected,
"message": f"Successfully connected to Couchbase cluster and bucket `{bucket_name}`",
}
except Exception as e:
return {
"status": "error",
"cluster_connected": cluster_connected,
"bucket_connected": bucket_connected,
"error": str(e),
"message": f"Failed to connect to bucket named `{bucket_name}`",
}
else:
return {
"status": "success",
"cluster_connected": cluster_connected,
"message": "Successfully connected to Couchbase cluster",
}
except Exception as e:
return {
"status": "error",
"cluster_connected": False,
"bucket_connected": False,
"cluster_connected": cluster_connected,
"error": str(e),
"message": "Failed to connect to Couchbase",
}


def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
"""Get the names of all scopes and collections in the bucket.
def get_list_of_buckets_with_settings(
ctx: Context
) -> list[dict[str, Any]]:
"""Get the list of buckets from the Couchbase cluster, including their bucket settings and additional statistics.
Returns a list of comprehensive bucket information objects including settings.
"""

result = []

try:
cluster = ensure_cluster_connection(ctx)
bucket_manager = cluster.buckets()
buckets = bucket_manager.get_all_buckets()

for bucket_settings in buckets:
# Convert BucketSettings object to dictionary using available attributes
bucket_dict = {"bucket_name": bucket_settings.name}

# Add basic bucket settings with safe access
for attr in ["bucket_type", "ram_quota", "num_replicas", "replica_indexes",
"flush_enabled", "max_expiry", "compression_mode",
"minimum_durability_level", "storage_backend", "eviction_policy",
"conflict_resolution", "history_retention_collection_default",
"history_retention_bytes", "history_retention_duration"]:
if hasattr(bucket_settings, attr):
value = getattr(bucket_settings, attr)
# If the value has a .value attribute (enum), use that
if hasattr(value, 'value'):
bucket_dict[attr] = value.value
else:
bucket_dict[attr] = value

result.append(bucket_dict)

return result
except Exception as e:
logger.error(f"Error getting bucket information: {e}")
raise

def get_scopes_and_collections_in_bucket(ctx: Context, bucket_name: str) -> dict[str, list[str]]:
"""Get the names of all scopes and collections for a specified bucket.
Returns a dictionary with scope names as keys and lists of collection names as values.
"""
bucket = ensure_bucket_connection(ctx)
try:
bucket = ensure_bucket_connection(ctx, bucket_name)
except Exception as e:
logger.error(f"Error accessing bucket: {e}")
raise ValueError("Tool does not have access to bucket, or bucket does not exist.") from e
Comment on lines +133 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is another instance of the duplicated bucket connection logic. As mentioned in the review for src/tools/kv.py, extracting this into a shared utility function would improve code maintainability.

try:
scopes_collections = {}
collection_manager = bucket.collections()
Expand Down
Loading