Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 198 additions & 7 deletions pkg/monitortests/node/kubeletlogcollector/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"regexp"
Expand All @@ -14,12 +15,16 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/pkg/monitortestlibrary/utility"
"github.com/openshift/origin/pkg/monitortests/kubeapiserver/staticpodinstall/kubeletlogparser"
"github.com/openshift/origin/test/extended/util/image"

"k8s.io/client-go/kubernetes"
)
Expand All @@ -42,31 +47,31 @@ func intervalsFromNodeLogs(ctx context.Context, kubeClient kubernetes.Interface,
defer wg.Done()

// TODO limit by begin/end here instead of post-processing
nodeLogs, err := getNodeLog(ctx, kubeClient, nodeName, "kubelet")
nodeLogs, err := GetNodeLog(ctx, kubeClient, nodeName, "kubelet")
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting node logs from %s: %s", nodeName, err.Error())
errCh <- err
return
}
newEvents := eventsFromKubeletLogs(nodeName, nodeLogs)

ovsVswitchdLogs, err := getNodeLog(ctx, kubeClient, nodeName, "ovs-vswitchd")
ovsVswitchdLogs, err := GetNodeLog(ctx, kubeClient, nodeName, "ovs-vswitchd")
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting node ovs-vswitchd logs from %s: %s", nodeName, err.Error())
errCh <- err
return
}
newOVSEvents := intervalsFromOVSVswitchdLogs(nodeName, ovsVswitchdLogs)

networkManagerLogs, err := getNodeLog(ctx, kubeClient, nodeName, "NetworkManager")
networkManagerLogs, err := GetNodeLog(ctx, kubeClient, nodeName, "NetworkManager")
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting node NetworkManager logs from %s: %s", nodeName, err.Error())
errCh <- err
return
}
newNetworkManagerIntervals := intervalsFromNetworkManagerLogs(nodeName, networkManagerLogs)

systemdCoreDumpLogs, err := getNodeLog(ctx, kubeClient, nodeName, "systemd-coredump")
systemdCoreDumpLogs, err := GetNodeLog(ctx, kubeClient, nodeName, "systemd-coredump")
if err != nil {
fmt.Fprintf(os.Stderr, "Error getting node systemd-coredump logs from %s: %s", nodeName, err.Error())
errCh <- err
Expand Down Expand Up @@ -692,9 +697,10 @@ func commonErrorInterval(nodeName, logLine string, messageExp *regexp.Regexp, re
}
}

// getNodeLog returns logs for a particular systemd service on a given node.
// GetNodeLog returns logs for a particular systemd service on a given node.
// We're count on these logs to fit into some reasonable memory size.
func getNodeLog(ctx context.Context, client kubernetes.Interface, nodeName, systemdServiceName string) ([]byte, error) {
// If the kubelet API is unavailable, it falls back to using a debug pod.
func GetNodeLog(ctx context.Context, client kubernetes.Interface, nodeName, systemdServiceName string) ([]byte, error) {
path := client.CoreV1().RESTClient().Get().
Namespace("").Name(nodeName).
Resource("nodes").SubResource("proxy", "logs").Suffix("journal").URL().Path
Expand All @@ -706,9 +712,194 @@ func getNodeLog(ctx context.Context, client kubernetes.Interface, nodeName, syst

in, err := req.Stream(ctx)
if err != nil {
return nil, err
// Kubelet API might be down, try fallback method using debug pod
fmt.Fprintf(os.Stderr, "Failed to get logs via kubelet API for %s on node %s: %v. Trying fallback method...\n",
systemdServiceName, nodeName, err)
return getNodeLogViaDebugPod(ctx, client, nodeName, systemdServiceName)
}
defer in.Close()

return ioutil.ReadAll(in)
}

// getNodeLogViaDebugPod collects systemd service logs using a privileged debug pod.
// This method works even when kubelet is down because it directly accesses the host filesystem.
func getNodeLogViaDebugPod(ctx context.Context, client kubernetes.Interface, nodeName, systemdServiceName string) ([]byte, error) {
const debugNamespace = "openshift-e2e-node-log-collector"

// Ensure namespace exists
if err := ensureNamespace(ctx, client, debugNamespace); err != nil {
return nil, fmt.Errorf("failed to ensure namespace %s: %w", debugNamespace, err)
}

// Create debug pod
podName := fmt.Sprintf("node-log-%s-%d", nodeName, time.Now().Unix())
_, err := createDebugPod(ctx, client, debugNamespace, podName, nodeName, systemdServiceName)
if err != nil {
return nil, fmt.Errorf("failed to create debug pod: %w", err)
}

// Ensure pod cleanup
defer func() {
deleteCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
client.CoreV1().Pods(debugNamespace).Delete(deleteCtx, podName, metav1.DeleteOptions{})
}()

// Wait for pod to complete
if err := waitForPodCompletion(ctx, client, debugNamespace, podName); err != nil {
return nil, fmt.Errorf("pod did not complete: %w", err)
}

// Get logs from the pod
logs, err := getPodLogs(ctx, client, debugNamespace, podName)
if err != nil {
return nil, fmt.Errorf("failed to get pod logs: %w", err)
}

return logs, nil
}

// ensureNamespace creates the namespace if it doesn't exist
func ensureNamespace(ctx context.Context, client kubernetes.Interface, namespace string) error {
_, err := client.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err == nil {
return nil
}

if !errors.IsNotFound(err) {
return err
}

// Create namespace with privileged labels
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Labels: map[string]string{
"pod-security.kubernetes.io/enforce": "privileged",
"pod-security.kubernetes.io/audit": "privileged",
"pod-security.kubernetes.io/warn": "privileged",
"security.openshift.io/scc.podSecurityLabelSync": "false",
},
},
}
_, err = client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
return err
}

// createDebugPod creates a privileged pod that runs journalctl to collect logs
func createDebugPod(ctx context.Context, client kubernetes.Interface, namespace, podName, nodeName, systemdServiceName string) (*corev1.Pod, error) {
privileged := true
hostPathDirectory := corev1.HostPathDirectory
zero := int64(0)

// Build journalctl command to collect logs from the last 24 hours
command := fmt.Sprintf("chroot /host journalctl --utc --no-pager -u %s --since=-1d", systemdServiceName)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"app": "node-log-collector",
},
},
Spec: corev1.PodSpec{
NodeName: nodeName,
HostPID: true,
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "log-collector",
Image: image.ShellImage(),
Command: []string{
"/bin/bash",
"-c",
command,
},
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
RunAsUser: &zero,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "host",
MountPath: "/host",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "host",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/",
Type: &hostPathDirectory,
},
},
},
},
Tolerations: []corev1.Toleration{
{
Operator: corev1.TolerationOpExists,
},
},
},
}

return client.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
}

// waitForPodCompletion waits for the pod to reach completed or failed state
func waitForPodCompletion(ctx context.Context, client kubernetes.Interface, namespace, podName string) error {
return wait.PollUntilContextTimeout(ctx, 2*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return false, err
}

switch pod.Status.Phase {
case corev1.PodSucceeded:
return true, nil
case corev1.PodFailed:
return false, fmt.Errorf("pod failed with reason: %s, message: %s",
pod.Status.Reason, pod.Status.Message)
case corev1.PodPending, corev1.PodRunning:
// Check if container has terminated
if len(pod.Status.ContainerStatuses) > 0 {
containerStatus := pod.Status.ContainerStatuses[0]
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.ExitCode == 0 {
return true, nil
}
return false, fmt.Errorf("container exited with code %d: %s",
containerStatus.State.Terminated.ExitCode,
containerStatus.State.Terminated.Message)
}
}
return false, nil
default:
return false, fmt.Errorf("unexpected pod phase: %s", pod.Status.Phase)
}
})
}

// getPodLogs retrieves the logs from a completed pod
func getPodLogs(ctx context.Context, client kubernetes.Interface, namespace, podName string) ([]byte, error) {
req := client.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{})

podLogs, err := req.Stream(ctx)
if err != nil {
return nil, err
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}
59 changes: 59 additions & 0 deletions pkg/monitortests/node/logcollectorservice/logcollector-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash
# OpenShift Node Log Collector Service
# This service starts early in the boot process and provides HTTP access to systemd logs
# even when kubelet is down.

PORT=9333
LOG_LINES=50

# Function to get the last N lines of a systemd service
get_service_logs() {
local service=$1
local lines=${2:-$LOG_LINES}

# Use journalctl to get logs, with fallback to "not found" if service doesn't exist
journalctl -u "${service}.service" --no-pager -n "$lines" 2>/dev/null || \
echo "Service ${service} not found or no logs available"
}

# Simple HTTP server using netcat
handle_request() {
local request
read -r request

# Parse the request path
local path=$(echo "$request" | awk '{print $2}')

case "$path" in
/logs/kubelet)
echo -ne "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n"
get_service_logs "kubelet"
;;
/logs/crio)
echo -ne "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n"
get_service_logs "crio"
;;
/logs/both)
echo -ne "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n"
echo "=== KUBELET LOGS ==="
get_service_logs "kubelet"
echo ""
echo "=== CRIO LOGS ==="
get_service_logs "crio"
;;
/health)
echo -ne "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n"
echo "OK"
;;
*)
echo -ne "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\n\r\n"
echo "Available endpoints: /logs/kubelet, /logs/crio, /logs/both, /health"
;;
esac
}

# Main server loop
echo "Starting OpenShift Node Log Collector Service on port $PORT"
while true; do
handle_request | nc -l -p $PORT -q 1
done
Loading