Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/prepare/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"namespacelabs.dev/foundation/internal/runtime/kubernetes"
"namespacelabs.dev/foundation/orchestration"
orchclient "namespacelabs.dev/foundation/orchestration/client"
"namespacelabs.dev/foundation/schema"
orchpb "namespacelabs.dev/foundation/schema/orchestration"
"namespacelabs.dev/foundation/std/cfg"
Expand All @@ -18,6 +19,10 @@ import (
func Orchestrator() ClusterStage {
return ClusterStage{
Pre: func(ch chan *orchpb.Event) {
if !orchclient.UseOrchestrator {
// Skip orchestrator preparation if disabled
return
}
ch <- &orchpb.Event{
ResourceId: "orchestrator",
ResourceLabel: "Deploy Namespace Orchestrator",
Expand All @@ -27,13 +32,20 @@ func Orchestrator() ClusterStage {
}
},
Post: func(ch chan *orchpb.Event) {
if !orchclient.UseOrchestrator {
return
}
ch <- &orchpb.Event{
ResourceId: "orchestrator",
Ready: orchpb.Event_READY,
Stage: orchpb.Event_DONE,
}
},
Run: func(ctx context.Context, env cfg.Context, devhost *schema.DevHost_ConfigureEnvironment, kube *kubernetes.Cluster, ch chan *orchpb.Event) error {
if !orchclient.UseOrchestrator {
// Skip orchestrator preparation if disabled
return nil
}
return PrepareOrchestratorInKube(ctx, env, devhost, kube)
},
}
Expand Down
30 changes: 2 additions & 28 deletions internal/runtime/kubernetes/clusternamespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"net"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,7 +25,6 @@ import (
"namespacelabs.dev/foundation/internal/runtime"
"namespacelabs.dev/foundation/internal/runtime/kubernetes/client"
"namespacelabs.dev/foundation/internal/runtime/kubernetes/kubeobserver"
orchclient "namespacelabs.dev/foundation/orchestration/client"
"namespacelabs.dev/foundation/schema"
runtimepb "namespacelabs.dev/foundation/schema/runtime"
"namespacelabs.dev/foundation/schema/storage"
Expand Down Expand Up @@ -214,31 +211,8 @@ func (r *ClusterNamespace) isDeployableReady(ctx context.Context, srv runtime.De
}

func (r *ClusterNamespace) areServicesReady(ctx context.Context, srv runtime.Deployable) (ServiceReadiness, error) {
if client.IsInclusterClient(r.underlying.cli) {
return AreServicesReady(ctx, r.underlying.cli, r.target.namespace, srv)
}

if !orchclient.UseOrchestrator {
fmt.Fprintf(console.Debug(ctx), "will not wait for services of server %s...\n", srv.GetName())
return ServiceReadiness{Ready: true}, nil
}

conn, err := orchclient.ConnectToOrchestrator(ctx, r.parent)
if err != nil {
return ServiceReadiness{}, err
}

res, err := orchclient.CallAreServicesReady(ctx, conn, srv, r.target.namespace)
if err != nil {
if status.Code(err) == codes.Unimplemented {
fmt.Fprintf(console.Debug(ctx), "old orchestrator version, will not wait for services of server %s...\n", srv.GetName())
return ServiceReadiness{Ready: true}, nil
}

return ServiceReadiness{}, err
}

return ServiceReadiness{Ready: res.Ready, Message: res.Message}, nil
// Use the port-forward based implementation which works for both in-cluster and remote clusters
return AreServicesReady(ctx, r.underlying, r.target.namespace, srv)
}

func (r *ClusterNamespace) isPodReady(ctx context.Context, srv runtime.Deployable) (bool, error) {
Expand Down
202 changes: 180 additions & 22 deletions internal/runtime/kubernetes/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,213 @@ package kubernetes
import (
"context"
"fmt"
"net"
"time"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
"namespacelabs.dev/foundation/framework/kubernetes/kubedef"
"namespacelabs.dev/foundation/framework/kubernetes/kubeobj"
"namespacelabs.dev/foundation/internal/fnerrors"
"namespacelabs.dev/foundation/internal/runtime"
"namespacelabs.dev/foundation/internal/runtime/kubernetes/client"
"namespacelabs.dev/foundation/internal/runtime/kubernetes/kubeobserver"
"namespacelabs.dev/foundation/std/tasks"
"namespacelabs.dev/go-ids"
)

type ServiceReadiness struct {
Ready bool
Message string
}

func AreServicesReady(ctx context.Context, cli *kubernetes.Clientset, namespace string, srv runtime.Deployable) (ServiceReadiness, error) {
if !client.IsInclusterClient(cli) {
return ServiceReadiness{}, fnerrors.InternalError("cannot check service readiness for remote kubernetes cluster")
}

// AreServicesReady checks if all TCP ports of services for a deployable are accepting connections.
// It deploys a one-shot pod in the cluster that attempts TCP connections using in-cluster DNS.
// This works for both in-cluster and remote clusters since we use kubectl to run the pod.
func AreServicesReady(ctx context.Context, cluster *Cluster, namespace string, srv runtime.Deployable) (ServiceReadiness, error) {
// TODO only check services that are required
services, err := cli.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{
services, err := cluster.cli.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{
LabelSelector: kubeobj.SerializeSelector(kubedef.SelectById(srv)),
})
if err != nil {
return ServiceReadiness{}, err
}

if len(services.Items) == 0 {
// No services to check
return ServiceReadiness{Ready: true}, nil
}

// Build a shell script that checks all TCP ports with retry logic
// This runs inside the cluster so it can use service DNS names
// Uses POSIX-compliant shell syntax for busybox compatibility
script := "#!/bin/sh\nset -e\n"

for _, s := range services.Items {
for _, port := range s.Spec.Ports {
if port.Protocol != v1.ProtocolTCP {
if port.Protocol != corev1.ProtocolTCP {
continue
}

addr := fmt.Sprintf("%s.%s.svc.cluster.local:%d", s.Name, s.Namespace, port.Port)
// Try to connect every 500ms for up to 2 minutes (240 attempts)
// Use nc (netcat) for TCP connectivity test - available in busybox
serviceDNS := fmt.Sprintf("%s.%s.svc.cluster.local", s.Name, s.Namespace)
script += fmt.Sprintf(`
echo "Checking %s:%d..."
i=0
while [ $i -lt 240 ]; do
if nc -z -w 1 %s %d 2>/dev/null; then
echo " ✓ %s:%d is ready"
break
fi
i=$((i + 1))
if [ $i -eq 240 ]; then
echo " ✗ %s:%d failed after 2 minutes"
exit 1
fi
sleep 0.5
done
`, serviceDNS, port.Port, serviceDNS, port.Port, serviceDNS, port.Port, serviceDNS, port.Port)
}
}

conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
if err != nil {
return ServiceReadiness{
Ready: false,
Message: fmt.Sprintf("%q not ready: failed to dial %s:%d: %v", srv.GetName(), s.Name, port.Port, err),
}, nil
}
conn.Close()
script += "echo \"All services ready\"\nexit 0\n"

// Create a unique pod name for this check
podName := fmt.Sprintf("ns-svc-check-%s", ids.NewRandomBase32ID(8))

// Use Chainguard's busybox image for the connection checker
// This is a minimal, secure base image with shell and basic networking tools
container := applycorev1.Container().
WithName("checker").
WithImage("cgr.dev/chainguard/busybox:latest").
WithCommand("/bin/sh", "-c", script).
WithSecurityContext(
applycorev1.SecurityContext().
WithReadOnlyRootFilesystem(true).
WithRunAsNonRoot(true).
WithRunAsUser(65532)) // nonroot user in Chainguard images

podSpec := applycorev1.PodSpec().
WithContainers(container).
WithRestartPolicy(corev1.RestartPolicyNever).
WithSecurityContext(applycorev1.PodSecurityContext())

pod := applycorev1.Pod(podName, namespace).
WithSpec(podSpec).
WithLabels(kubedef.SelectNamespaceDriver()).
WithLabels(kubedef.ManagedByUs())

// Create the pod
if _, err := cluster.cli.CoreV1().Pods(namespace).Apply(ctx, pod, kubedef.Ego()); err != nil {
return ServiceReadiness{}, fmt.Errorf("failed to create service readiness checker pod: %w", err)
}

// Schedule cleanup
defer func() {
cluster.cli.CoreV1().Pods(namespace).Delete(context.Background(), podName, metav1.DeleteOptions{})
}()

// Wait for the pod to complete
var finalStatus corev1.PodStatus
if err := kubeobserver.WaitForCondition(ctx, cluster.cli,
tasks.Action("kubernetes.service-readiness-check").
Arg("namespace", namespace).
Arg("deployable", srv.GetName()),
kubeobserver.WaitForPodConditition(namespace, kubeobserver.PickPod(podName),
func(status corev1.PodStatus) (bool, error) {
finalStatus = status
return status.Phase == corev1.PodSucceeded || status.Phase == corev1.PodFailed, nil
})); err != nil {
return ServiceReadiness{}, fmt.Errorf("service readiness check pod failed to complete: %w", err)
}

// Check the exit code
if finalStatus.Phase == corev1.PodSucceeded {
return ServiceReadiness{Ready: true}, nil
}

// Pod failed - extract error message from container status
var exitCode int32
var reason string
for _, containerStatus := range finalStatus.ContainerStatuses {
if containerStatus.Name == "checker" && containerStatus.State.Terminated != nil {
exitCode = containerStatus.State.Terminated.ExitCode
reason = containerStatus.State.Terminated.Reason
break
}
}

return ServiceReadiness{Ready: true}, nil
if exitCode == 0 {
// Should not happen if phase is Failed, but handle it
return ServiceReadiness{Ready: true}, nil
}

return ServiceReadiness{
Ready: false,
Message: fmt.Sprintf("%q not ready: service connectivity check failed (exit code %d, reason: %s)", srv.GetName(), exitCode, reason),
}, nil
}

// CheckServiceConnectivity is a helper that checks if a specific service port is accepting connections.
// It's used primarily for testing and debugging.
func CheckServiceConnectivity(ctx context.Context, cluster *Cluster, namespace, serviceName string, port int32) error {
podName := fmt.Sprintf("ns-svc-check-%s", ids.NewRandomBase32ID(8))
serviceDNS := fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, namespace)

script := fmt.Sprintf(`#!/bin/sh
i=0
while [ $i -lt 240 ]; do
if nc -z -w 1 %s %d 2>/dev/null; then
exit 0
fi
i=$((i + 1))
sleep 0.5
done
exit 1
`, serviceDNS, port)

container := applycorev1.Container().
WithName("checker").
WithImage("cgr.dev/chainguard/busybox:latest").
WithCommand("/bin/sh", "-c", script).
WithSecurityContext(
applycorev1.SecurityContext().
WithReadOnlyRootFilesystem(true).
WithRunAsNonRoot(true).
WithRunAsUser(65532))

podSpec := applycorev1.PodSpec().
WithContainers(container).
WithRestartPolicy(corev1.RestartPolicyNever)

pod := applycorev1.Pod(podName, namespace).
WithSpec(podSpec).
WithLabels(kubedef.ManagedByUs())

if _, err := cluster.cli.CoreV1().Pods(namespace).Apply(ctx, pod, kubedef.Ego()); err != nil {
return err
}

defer func() {
cluster.cli.CoreV1().Pods(namespace).Delete(context.Background(), podName, metav1.DeleteOptions{})
}()

var finalStatus corev1.PodStatus
if err := kubeobserver.WaitForCondition(ctx, cluster.cli,
tasks.Action("kubernetes.check-service-connectivity").
Arg("namespace", namespace).
Arg("service", serviceName).
Arg("port", fmt.Sprintf("%d", port)),
kubeobserver.WaitForPodConditition(namespace, kubeobserver.PickPod(podName),
func(status corev1.PodStatus) (bool, error) {
finalStatus = status
return status.Phase == corev1.PodSucceeded || status.Phase == corev1.PodFailed, nil
})); err != nil {
return err
}

if finalStatus.Phase == corev1.PodSucceeded {
return nil
}

return fnerrors.Newf("service %s:%d is not accepting connections", serviceName, port)
}
18 changes: 4 additions & 14 deletions orchestration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"namespacelabs.dev/foundation/internal/compute"
"namespacelabs.dev/foundation/internal/console"
"namespacelabs.dev/foundation/internal/runtime"
"namespacelabs.dev/foundation/orchestration/proto"
"namespacelabs.dev/foundation/orchestration/server/constants"
"namespacelabs.dev/foundation/schema"
"namespacelabs.dev/foundation/std/cfg"
Expand All @@ -27,7 +26,10 @@ const (
ConnTimeout = time.Minute // TODO reduce - we've seen slow connections in CI
)

var UseOrchestrator = true
// UseOrchestrator controls whether to deploy the orchestrator.
// Historically this was used for service readiness checks, but that functionality has been removed.
// The orchestrator still provides Kubernetes controllers for runtime config management.
var UseOrchestrator = false

type remoteOrchestrator struct {
cluster runtime.ClusterNamespace
Expand Down Expand Up @@ -74,15 +76,3 @@ func ConnectToOrchestrator(ctx context.Context, cluster runtime.Cluster) (*grpc.

return raw.(*remoteOrchestrator).Connect(ctx)
}

func CallAreServicesReady(ctx context.Context, conn *grpc.ClientConn, srv runtime.Deployable, ns string) (*proto.AreServicesReadyResponse, error) {
req := &proto.AreServicesReadyRequest{
Deployable: runtime.DeployableToProto(srv),
Namespace: ns,
}

ctx, cancel := context.WithTimeout(ctx, ConnTimeout)
defer cancel()

return proto.NewOrchestrationServiceClient(conn).AreServicesReady(ctx, req)
}
Loading
Loading