Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 141 additions & 8 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
package runner

import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -272,21 +274,40 @@ func (r *Runner) Run(ctx context.Context) error {

logger.Infof("MCP server %s started successfully", r.Config.ContainerName)

// Wait for the MCP server to accept initialize requests before updating client configurations.
// This prevents timing issues where clients try to connect before the server is fully ready.
// We repeatedly call initialize until it succeeds (up to 5 minutes).
// Note: We skip this check for pure STDIO transport because STDIO servers may reject
// multiple initialize calls (see #1982).
transportType := labels.GetTransportType(r.Config.ContainerLabels)
serverURL := transport.GenerateMCPServerURL(
transportType,
"localhost",
r.Config.Port,
r.Config.ContainerName,
r.Config.RemoteURL)

// Only wait for initialization on non-STDIO transports
// STDIO servers communicate directly via stdin/stdout and calling initialize multiple times
// can cause issues as the behavior is not specified by the MCP spec
if transportType != "stdio" {
// Repeatedly try calling initialize until it succeeds (up to 5 minutes)
// Some servers (like mcp-optimizer) can take significant time to start up
if err := waitForInitializeSuccess(ctx, serverURL, transportType, 5*time.Minute); err != nil {
logger.Warnf("Warning: Initialize not successful, but continuing: %v", err)
// Continue anyway to maintain backward compatibility, but log a warning
}
} else {
logger.Debugf("Skipping initialize check for STDIO transport")
}

// Update client configurations with the MCP server URL.
// Note that this function checks the configuration to determine which
// clients should be updated, if any.
clientManager, err := client.NewManager(ctx)
if err != nil {
logger.Warnf("Warning: Failed to create client manager: %v", err)
} else {
transportType := labels.GetTransportType(r.Config.ContainerLabels)
serverURL := transport.GenerateMCPServerURL(
transportType,
"localhost",
r.Config.Port,
r.Config.ContainerName,
r.Config.RemoteURL)

if err := clientManager.AddServerToClients(ctx, r.Config.ContainerName, serverURL, transportType, r.Config.Group); err != nil {
logger.Warnf("Warning: Failed to add server to client configurations: %v", err)
}
Expand Down Expand Up @@ -448,3 +469,115 @@ func (r *Runner) Cleanup(ctx context.Context) error {

return lastErr
}

// waitForInitializeSuccess repeatedly checks if the MCP server is ready to accept requests.
// This prevents timing issues where clients try to connect before the server is fully ready.
// It makes repeated attempts with exponential backoff up to a maximum timeout.
// Note: This function should not be called for STDIO transport.
func waitForInitializeSuccess(ctx context.Context, serverURL, transportType string, maxWaitTime time.Duration) error {
// Determine the endpoint and method to use based on transport type
var endpoint string
var method string
var payload string

switch transportType {
case "streamable-http", "streamable":
// For streamable-http, send initialize request to /mcp endpoint
// Format: http://localhost:port/mcp
endpoint = serverURL
method = "POST"
payload = `{"jsonrpc":"2.0","method":"initialize","id":"toolhive-init-check",` +
`"params":{"protocolVersion":"2024-11-05","capabilities":{},` +
`"clientInfo":{"name":"toolhive","version":"1.0"}}}`
case "sse":
// For SSE, just check if the SSE endpoint is available
// We can't easily call initialize without establishing a full SSE connection,
// so we just verify the endpoint responds.
// Format: http://localhost:port/sse#container-name -> http://localhost:port/sse
endpoint = serverURL
// Remove fragment if present (everything after #)
if idx := strings.Index(endpoint, "#"); idx != -1 {
endpoint = endpoint[:idx]
}
method = "GET"
payload = ""
default:
// For other transports, no HTTP check is needed
logger.Debugf("Skipping readiness check for transport type: %s", transportType)
return nil
}

// Setup retry logic with exponential backoff
startTime := time.Now()
attempt := 0
delay := 100 * time.Millisecond
maxDelay := 2 * time.Second // Cap at 2 seconds between retries

logger.Infof("Waiting for MCP server to be ready at %s (timeout: %v)", endpoint, maxWaitTime)

// Create HTTP client with a reasonable timeout for requests
httpClient := &http.Client{
Timeout: 10 * time.Second,
}

for {
attempt++

// Make the readiness check request
var req *http.Request
var err error
if payload != "" {
req, err = http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBufferString(payload))
} else {
req, err = http.NewRequestWithContext(ctx, method, endpoint, nil)
}

if err != nil {
logger.Debugf("Failed to create request (attempt %d): %v", attempt, err)
} else {
if method == "POST" {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")
req.Header.Set("MCP-Protocol-Version", "2024-11-05")
}

resp, err := httpClient.Do(req)
if err == nil {
//nolint:errcheck // Ignoring close error on response body in error path
defer resp.Body.Close()

// For GET (SSE), accept 200 OK
// For POST (streamable-http), also accept 200 OK
if resp.StatusCode == http.StatusOK {
elapsed := time.Since(startTime)
logger.Infof("MCP server is ready after %v (attempt %d)", elapsed, attempt)
return nil
}

logger.Debugf("Server returned status %d (attempt %d)", resp.StatusCode, attempt)
} else {
logger.Debugf("Failed to reach endpoint (attempt %d): %v", attempt, err)
}
}

// Check if we've exceeded the maximum wait time
elapsed := time.Since(startTime)
if elapsed >= maxWaitTime {
return fmt.Errorf("initialize not successful after %v (%d attempts)", elapsed, attempt)
}

// Wait before retrying
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for initialize")
case <-time.After(delay):
// Continue to next attempt
}

// Update delay for next iteration with exponential backoff
delay *= 2
if delay > maxDelay {
delay = maxDelay
}
}
}
Loading
Loading