Skip to content

Added MCP Server Registry Failover monitoring & Automatic fallback Feature #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"langgraph>=0.4.3",
"langchain-aws>=0.2.23",
"pytz>=2025.2",
"aiohttp>=3.9.0",
"aiofiles>=23.0.0",
]

[tool.setuptools]
Expand Down
30 changes: 30 additions & 0 deletions registry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
MCP Registry Package

This package provides registry functionality for MCP (Model Context Protocol) servers,
including health monitoring, failover capabilities, and server discovery.
"""

from .registry_health_monitor import (
RegistryHealthMonitor,
RegistryConfig,
RegistryStatus,
RegistryHealthMetrics,
HealthCheckResult
)

from .registry_failover_client import (
RegistryFailoverClient,
FailoverResult
)

__version__ = "0.1.0"
__all__ = [
"RegistryHealthMonitor",
"RegistryConfig",
"RegistryStatus",
"RegistryHealthMetrics",
"HealthCheckResult",
"RegistryFailoverClient",
"FailoverResult"
]
316 changes: 315 additions & 1 deletion registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
from mcp.client.sse import sse_client
# --- MCP Client Imports --- END

# --- Registry Health Monitoring Imports --- START
from .registry_health_monitor import (
RegistryHealthMonitor,
RegistryConfig,
RegistryStatus,
RegistryHealthMetrics
)
from .registry_failover_client import RegistryFailoverClient, FailoverResult
# --- Registry Health Monitoring Imports --- END

# --- Define paths based on container structure --- START
CONTAINER_APP_DIR = Path("/app")
CONTAINER_REGISTRY_DIR = CONTAINER_APP_DIR / "registry"
Expand Down Expand Up @@ -131,6 +141,12 @@
# --- WebSocket Connection Management ---
active_connections: Set[WebSocket] = set()

# --- Registry Health Monitoring Global Variables --- START
REGISTRY_CONFIG_PATH = SERVERS_DIR / "external_registries.json"
registry_health_monitor: Optional[RegistryHealthMonitor] = None
registry_failover_client: Optional[RegistryFailoverClient] = None
# --- Registry Health Monitoring Global Variables --- END

# --- FAISS Helper Functions --- START

def _get_text_for_embedding(server_info: dict) -> str:
Expand Down Expand Up @@ -1074,7 +1090,21 @@ async def lifespan(app: FastAPI):
logger.info("Generating initial Nginx configuration...")
regenerate_nginx_config() # Generate config based on initial health status

# 4. Start the background periodic health check task
# 4. Initialize Registry Health Monitoring System
logger.info("Initializing registry health monitoring system...")
global registry_health_monitor, registry_failover_client

registry_health_monitor = RegistryHealthMonitor(REGISTRY_CONFIG_PATH)
await registry_health_monitor.initialize()

registry_failover_client = RegistryFailoverClient(registry_health_monitor)
await registry_failover_client.initialize()

# Start health monitoring
await registry_health_monitor.start_monitoring()
logger.info("Registry health monitoring system started")

# 5. Start the background periodic health check task
logger.info("Starting background health check task...")
health_check_task = asyncio.create_task(run_health_checks())

Expand All @@ -1084,6 +1114,17 @@ async def lifespan(app: FastAPI):

# --- Shutdown tasks --- START
logger.info("Running shutdown tasks...")

# Stop registry health monitoring
if registry_health_monitor:
logger.info("Stopping registry health monitoring...")
await registry_health_monitor.stop_monitoring()
await registry_health_monitor.cleanup()

if registry_failover_client:
logger.info("Cleaning up registry failover client...")
await registry_failover_client.cleanup()

logger.info("Cancelling background health check task...")
health_check_task.cancel()
try:
Expand Down Expand Up @@ -1154,6 +1195,279 @@ def api_auth(
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
templates = Jinja2Templates(directory=TEMPLATES_DIR)

# --- Registry Health Monitoring API Routes ---

@app.post("/api/registries/add")
async def add_external_registry(
name: Annotated[str, Form()],
url: Annotated[str, Form()],
priority: Annotated[int, Form()] = 1,
timeout_seconds: Annotated[int, Form()] = 10,
retry_attempts: Annotated[int, Form()] = 3,
health_check_interval_seconds: Annotated[int, Form()] = 30,
api_key: Annotated[str, Form()] = None,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Add a new external registry for monitoring"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

# Validate URL format
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
raise ValueError("Invalid URL format")
except Exception:
raise HTTPException(status_code=400, detail="Invalid URL format")

# Check if registry already exists
if name in registry_health_monitor.registries:
raise HTTPException(status_code=400, detail=f"Registry '{name}' already exists")

# Create registry configuration
config = RegistryConfig(
name=name,
url=url,
priority=priority,
timeout_seconds=timeout_seconds,
retry_attempts=retry_attempts,
health_check_interval_seconds=health_check_interval_seconds,
api_key=api_key if api_key else None
)

try:
await registry_health_monitor.add_registry(config)
logger.info(f"Added external registry '{name}' by user '{username}'")
return JSONResponse(
status_code=201,
content={
"message": f"Registry '{name}' added successfully",
"registry": {
"name": name,
"url": url,
"priority": priority,
"status": "checking"
}
}
)
except Exception as e:
logger.error(f"Failed to add registry '{name}': {e}")
raise HTTPException(status_code=500, detail=f"Failed to add registry: {str(e)}")


@app.delete("/api/registries/{registry_name}")
async def remove_external_registry(
registry_name: str,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Remove an external registry"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

if registry_name not in registry_health_monitor.registries:
raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found")

try:
await registry_health_monitor.remove_registry(registry_name)
logger.info(f"Removed external registry '{registry_name}' by user '{username}'")
return JSONResponse(
status_code=200,
content={"message": f"Registry '{registry_name}' removed successfully"}
)
except Exception as e:
logger.error(f"Failed to remove registry '{registry_name}': {e}")
raise HTTPException(status_code=500, detail=f"Failed to remove registry: {str(e)}")


@app.get("/api/registries")
async def list_external_registries(
username: Annotated[str, Depends(api_auth)] = None,
):
"""List all external registries and their health status"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

registries_info = []
for name, config in registry_health_monitor.registries.items():
metrics = registry_health_monitor.health_metrics.get(name)
registries_info.append({
"name": name,
"url": config.url,
"priority": config.priority,
"enabled": config.enabled,
"status": metrics.status.value if metrics else "unknown",
"last_check": metrics.last_check_time.isoformat() if metrics and metrics.last_check_time else None,
"response_time_ms": metrics.response_time_ms if metrics else None,
"success_rate": metrics.success_rate if metrics else 0,
"uptime_percentage": metrics.uptime_percentage if metrics else 0,
"consecutive_failures": metrics.consecutive_failures if metrics else 0,
"last_error": metrics.last_error if metrics else None
})

return {
"registries": registries_info,
"system_health": registry_health_monitor.get_system_health_summary()
}


@app.get("/api/registries/{registry_name}/health")
async def get_registry_health(
registry_name: str,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Get detailed health information for a specific registry"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

if registry_name not in registry_health_monitor.registries:
raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found")

config = registry_health_monitor.registries[registry_name]
metrics = registry_health_monitor.health_metrics.get(registry_name)
history = registry_health_monitor.get_registry_history(registry_name, limit=50)

return {
"registry_name": registry_name,
"config": {
"url": config.url,
"priority": config.priority,
"timeout_seconds": config.timeout_seconds,
"enabled": config.enabled,
"health_check_interval_seconds": config.health_check_interval_seconds
},
"metrics": {
"status": metrics.status.value if metrics else "unknown",
"last_check": metrics.last_check_time.isoformat() if metrics and metrics.last_check_time else None,
"response_time_ms": metrics.response_time_ms if metrics else None,
"success_rate": metrics.success_rate if metrics else 0,
"uptime_percentage": metrics.uptime_percentage if metrics else 0,
"consecutive_failures": metrics.consecutive_failures if metrics else 0,
"total_requests": metrics.total_requests if metrics else 0,
"successful_requests": metrics.successful_requests if metrics else 0,
"last_error": metrics.last_error if metrics else None
},
"recent_history": [
{
"timestamp": check.timestamp.isoformat(),
"status": check.status.value,
"response_time_ms": check.response_time_ms,
"error": check.error
}
for check in history
]
}


@app.post("/api/registries/{registry_name}/health-check")
async def force_registry_health_check(
registry_name: str,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Force immediate health check for a specific registry"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

if registry_name not in registry_health_monitor.registries:
raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found")

try:
result = await registry_health_monitor.check_registry_health(registry_name)
logger.info(f"Manual health check triggered for registry '{registry_name}' by user '{username}'")

return {
"registry_name": registry_name,
"status": result.status.value,
"response_time_ms": result.response_time_ms,
"error": result.error,
"timestamp": result.timestamp.isoformat()
}
except Exception as e:
logger.error(f"Failed to perform health check for registry '{registry_name}': {e}")
raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}")


@app.post("/api/registries/health-check-all")
async def force_all_registries_health_check(
username: Annotated[str, Depends(api_auth)] = None,
):
"""Force immediate health check for all registries"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

try:
await registry_health_monitor.force_health_check()
logger.info(f"Manual health check triggered for all registries by user '{username}'")

# Return current status of all registries
return await list_external_registries(username)
except Exception as e:
logger.error(f"Failed to perform health check for all registries: {e}")
raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}")


@app.get("/api/registries/failover-status")
async def get_failover_status(
username: Annotated[str, Depends(api_auth)] = None,
):
"""Get current failover status and statistics"""
if not registry_failover_client:
raise HTTPException(status_code=503, detail="Registry failover client not initialized")

stats = registry_failover_client.get_failover_stats()
primary = registry_health_monitor.get_primary_registry() if registry_health_monitor else None
backups = registry_health_monitor.get_backup_registries() if registry_health_monitor else []

return {
"failover_stats": stats,
"primary_registry": primary,
"backup_registries": backups,
"failover_available": len(backups) > 0
}


@app.post("/api/registries/{registry_name}/enable")
async def enable_registry(
registry_name: str,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Enable a registry"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

if registry_name not in registry_health_monitor.registries:
raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found")

try:
await registry_health_monitor.update_registry_config(registry_name, {"enabled": True})
logger.info(f"Registry '{registry_name}' enabled by user '{username}'")
return {"message": f"Registry '{registry_name}' enabled successfully"}
except Exception as e:
logger.error(f"Failed to enable registry '{registry_name}': {e}")
raise HTTPException(status_code=500, detail=f"Failed to enable registry: {str(e)}")


@app.post("/api/registries/{registry_name}/disable")
async def disable_registry(
registry_name: str,
username: Annotated[str, Depends(api_auth)] = None,
):
"""Disable a registry"""
if not registry_health_monitor:
raise HTTPException(status_code=503, detail="Registry health monitoring not initialized")

if registry_name not in registry_health_monitor.registries:
raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found")

try:
await registry_health_monitor.update_registry_config(registry_name, {"enabled": False})
logger.info(f"Registry '{registry_name}' disabled by user '{username}'")
return {"message": f"Registry '{registry_name}' disabled successfully"}
except Exception as e:
logger.error(f"Failed to disable registry '{registry_name}': {e}")
raise HTTPException(status_code=500, detail=f"Failed to disable registry: {str(e)}")


# --- Routes ---


Expand Down
Loading