Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POA-2928] Changed the event watcher to watch pod changes not event changes #82

Merged
merged 26 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
49306e9
Chores:
mudit-postman Feb 14, 2025
38dcb25
chore: change StateChangeMutex to sync.Mutex instead to pointer
mudit-postman Feb 14, 2025
317c5e1
chore: change PodTrafficMonitorState from int to string
mudit-postman Feb 14, 2025
79d2b4e
chore: pass pointer of mutext parent and run go mod tidy
mudit-postman Feb 14, 2025
4fe1937
chore: use for range instead of single switch case
mudit-postman Feb 14, 2025
2b63162
chore: set revisionVersion to only fetch events after agent creation
mudit-postman Feb 14, 2025
0335c52
fix: parent run command starting in separate go routine
mudit-postman Feb 14, 2025
b8f9005
Fixes:
mudit-postman Feb 14, 2025
aa932f6
chore: if pod is already running but agent is not listening to it sta…
mudit-postman Feb 14, 2025
d11593e
fix: prepend '/host' to network namespace
mudit-postman Feb 14, 2025
7718c43
fix: pass learnClient in util.NewLearnSession()
mudit-postman Feb 14, 2025
2168fc4
chore: set default apidump args
mudit-postman Feb 17, 2025
3811e45
fix:
mudit-postman Feb 17, 2025
295b355
fix:
mudit-postman Feb 17, 2025
a9075e0
Merge branch 'mudit/poa-2609-testing-changes' of github.com:postmanla…
mudit-postman Feb 17, 2025
2e4cde2
chores:
mudit-postman Feb 18, 2025
6e5f658
Chores:
mudit-postman Feb 18, 2025
2290089
chore: improve pods map dump formatting
mudit-postman Feb 18, 2025
662b4e3
POA-2904 Fix double close in redactor (#83)
liujed Feb 18, 2025
c8b4193
Merge branch 'main' of github.com:postmanlabs/postman-insights-agent …
mudit-postman Feb 18, 2025
2a7667c
chore: refactor ENV vars
mudit-postman Feb 19, 2025
15eb4bc
chore: refactor ENV vars 2
mudit-postman Feb 19, 2025
0fb81ee
chore: fix failing tests
mudit-postman Feb 19, 2025
33e5b2a
chore: merge POSTMAN_ENV and POSTMAN_INSIGHTS_ENV
mudit-postman Feb 19, 2025
3dcabf3
Merge branch 'mudit/poa-2609' of github.com:postmanlabs/postman-insig…
mudit-postman Feb 19, 2025
4f12d50
chore: address PR comments
mudit-postman Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions cmd/internal/kube/daemonset/apidump_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {
return err
}

err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStarted, PodDetected, PodInitialized)
err = podArgs.changePodTrafficMonitorState(TrafficMonitoringRunning, PodRunning)
if err != nil {
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStarted)
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringRunning)
}

// Increment the wait group counter
d.ApidumpProcessesWG.Add(1)

go func() (funcErr error) {
// defer function handle the error (if any) in the apidump process and change the pod state accordingly
defer func() {
// Decrement the wait group counter
d.ApidumpProcessesWG.Done()

nextState := TrafficMonitoringEnded

if err := recover(); err != nil {
Expand All @@ -41,22 +47,17 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {
printer.Errorf("Error occurred in apidump process for pod %s, err: %v\n", podArgs.PodName, funcErr)
nextState = TrafficMonitoringFailed
} else {
printer.Infof("Apidump process ended for pod %s", podArgs.PodName)
printer.Infof("Apidump process ended for pod %s\n", podArgs.PodName)
}

err = podArgs.changePodTrafficMonitorState(nextState, TrafficMonitoringStarted)
// Move monitoring state to final apidump processing state
err = podArgs.changePodTrafficMonitorState(nextState,
TrafficMonitoringRunning, PodSucceeded, PodFailed, PodTerminated, DaemonSetShutdown)
if err != nil {
printer.Errorf("Failed to change pod state, pod name: %s, from: %s to: %s, error: %v\n",
podArgs.PodName, podArgs.PodTrafficMonitorState, nextState, err)
return
}

// It is possible that the apidump process is already stopped and the stopChannel is of no use
// This is just a safety check
err := d.StopApiDumpProcess(podUID, err)
if err != nil {
printer.Errorf("Failed to stop api dump process, pod name: %s, error: %v\n", podArgs.PodName, err)
}
}()

networkNamespace, err := d.CRIClient.GetNetworkNamespace(podArgs.ContainerUUID)
Expand Down Expand Up @@ -90,32 +91,26 @@ func (d *Daemonset) StartApiDumpProcess(podUID types.UID) error {
}

if err := apidump.Run(apidumpArgs); err != nil {
funcErr = errors.Wrapf(err, "Failed to run apidump process for pod %s", podArgs.PodName)
funcErr = errors.Wrapf(err, "failed to run apidump process for pod %s", podArgs.PodName)
}
return
}()

return nil
}

// StopApiDumpProcess stops the API dump process for a given pod identified by its UID.
// It retrieves the pod arguments from a map and changes the pod's traffic monitor state.
// If successful, it sends a stop signal through the pod's stop channel.
// StopApiDumpProcess signals the API dump process to stop for a given pod
// identified by its UID. It retrieves the process's stop channel object from a map
// and sends a stop signal to trigger apidump shutdown.
func (d *Daemonset) StopApiDumpProcess(podUID types.UID, stopErr error) error {
podArgs, err := d.getPodArgsFromMap(podUID)
if err != nil {
return err
}

err = podArgs.changePodTrafficMonitorState(TrafficMonitoringStopped,
PodTerminated, DaemonSetShutdown, TrafficMonitoringFailed, TrafficMonitoringEnded)
if err != nil {
return errors.Wrapf(err, "failed to change pod state, pod name: %s, from: %s to: %s",
podArgs.PodName, podArgs.PodTrafficMonitorState, TrafficMonitoringStopped)
}

printer.Infof("Stopping API dump process for pod %s\n", podArgs.PodName)
podArgs.StopChan <- stopErr
close(podArgs.StopChan)

return nil
}
13 changes: 8 additions & 5 deletions cmd/internal/kube/daemonset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package daemonset

import "time"

type podEnvVars string

const (
// Pod environment variables
POSTMAN_INSIGHTS_PROJECT_ID podEnvVars = "POSTMAN_INSIGHTS_PROJECT_ID"
POSTMAN_INSIGHTS_API_KEY podEnvVars = "POSTMAN_INSIGHTS_API_KEY"
POSTMAN_INSIGHTS_ENV podEnvVars = "POSTMAN_INSIGHTS_ENV"
POSTMAN_INSIGHTS_PROJECT_ID = "POSTMAN_INSIGHTS_PROJECT_ID"
POSTMAN_INSIGHTS_API_KEY = "POSTMAN_INSIGHTS_API_KEY"

// Daemonset environment variables
POSTMAN_INSIGHTS_ENV = "POSTMAN_ENV" // This is same as root POSTMAN_ENV
POSTMAN_INSIGHTS_VERIFICATION_TOKEN = "POSTMAN_INSIGHTS_VERIFICATION_TOKEN"
POSTMAN_INSIGHTS_CLUSTER_NAME = "POSTMAN_INSIGHTS_CLUSTER_NAME"
POSTMAN_INSIGHTS_REPRO_MODE_ENABLED = "POSTMAN_INSIGHTS_REPRO_MODE_ENABLED"

// Workers intervals
DefaultTelemetryInterval = 5 * time.Minute
Expand Down
60 changes: 43 additions & 17 deletions cmd/internal/kube/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (

type Daemonset struct {
ClusterName string
InsightsEnvironment string
InsightsReproModeEnabled bool

KubeClient kube_apis.KubeClient
Expand All @@ -38,6 +39,9 @@ type Daemonset struct {
// and do not have the agent sidecar container
PodArgsByNameMap sync.Map

// WaitGroup to wait for all apidump processes to stop
ApidumpProcessesWG sync.WaitGroup

PodHealthCheckInterval time.Duration
TelemetryInterval time.Duration
}
Expand All @@ -49,7 +53,7 @@ func StartDaemonset() error {
}

// Initialize the front client
postmanInsightsVerificationToken := os.Getenv("POSTMAN_INSIGHTS_VERIFICATION_TOKEN")
postmanInsightsVerificationToken := os.Getenv(POSTMAN_INSIGHTS_VERIFICATION_TOKEN)
frontClient := rest.NewFrontClient(
rest.Domain,
telemetry.GetClientID(),
Expand All @@ -59,7 +63,7 @@ func StartDaemonset() error {
defer cancel()

// Send initial telemetry
clusterName := os.Getenv("POSTMAN_CLUSTER_NAME")
clusterName := os.Getenv(POSTMAN_INSIGHTS_CLUSTER_NAME)
telemetryInterval := DefaultTelemetryInterval
if clusterName == "" {
printer.Infof(
Expand All @@ -81,8 +85,6 @@ func StartDaemonset() error {
}
}

insightsReproModeEnabled := os.Getenv("POSTMAN_INSIGHTS_REPRO_MODE_ENABLED") == "true"

kubeClient, err := kube_apis.NewKubeClient()
if err != nil {
return errors.Wrap(err, "failed to create kube client")
Expand All @@ -93,8 +95,12 @@ func StartDaemonset() error {
return errors.Wrap(err, "failed to create CRI client")
}

insightsReproModeEnabled := os.Getenv(POSTMAN_INSIGHTS_REPRO_MODE_ENABLED) == "true"
insightsEnvironment := os.Getenv(POSTMAN_INSIGHTS_ENV)

daemonsetRun := &Daemonset{
ClusterName: clusterName,
InsightsEnvironment: insightsEnvironment,
InsightsReproModeEnabled: insightsReproModeEnabled,
KubeClient: kubeClient,
CRIClient: criClient,
Expand Down Expand Up @@ -126,8 +132,8 @@ func (d *Daemonset) Run() error {
go d.TelemetryWorker(done)

// Start the kubernetes events worker
printer.Infof("Starting kubernetes events worker...\n")
go d.KubernetesEventsWorker(done)
printer.Infof("Starting kubernetes pods events worker...\n")
go d.KubernetesPodEventsWorker(done)

// Start the pods health worker
printer.Infof("Starting pods health worker...\n")
Expand Down Expand Up @@ -241,7 +247,9 @@ func (d *Daemonset) StartProcessInExistingPods() error {

// Iterate over each pod without the agent sidecar
for _, pod := range podsWithoutAgentSidecar {
args, err := d.inspectPodForEnvVars(pod)
// Empty pod_args object for PodPending state
args := NewPodArgs(pod.Name)
err := d.inspectPodForEnvVars(pod, args)
if err != nil {
switch err {
case allRequiredEnvVarsAbsentErr:
Expand All @@ -254,7 +262,7 @@ func (d *Daemonset) StartProcessInExistingPods() error {
continue
}

err = d.addPodArgsToMap(pod.UID, args, PodDetected)
err = d.addPodArgsToMap(pod.UID, args, PodRunning)
if err != nil {
printer.Errorf("Failed to add pod args to map, pod name: %s, error: %v\n", pod.Name, err)
continue
Expand All @@ -269,26 +277,34 @@ func (d *Daemonset) StartProcessInExistingPods() error {
return nil
}

// KubernetesEventsWorker listens for Kubernetes events and handles them accordingly.
// KubernetesPodEventsWorker listens for Kubernetes events and handles them accordingly.
// It runs indefinitely until the provided done channel is closed.
func (d *Daemonset) KubernetesEventsWorker(done <-chan struct{}) {
func (d *Daemonset) KubernetesPodEventsWorker(done <-chan struct{}) {
for {
select {
case <-done:
printer.Debugf("Kubernetes pod events worker stopped\n")
return
case event := <-d.KubeClient.EventWatch.ResultChan():
case event := <-d.KubeClient.PodEventWatch.ResultChan():
switch event.Type {
case watch.Added:
printer.Debugf("Got k8s added event: %v\n", event.Object)
if e, ok := event.Object.(*coreV1.Event); ok {
go d.handlePodAddEvent(e.InvolvedObject.UID)
printer.Debugf("Got k8s pod added event: %v\n", event.Object)
if p, ok := event.Object.(*coreV1.Pod); ok {
go d.handlePodAddEvent(*p)
}
// A pod is deleted
case watch.Deleted:
printer.Debugf("Got k8s deleted event: %v\n", event.Object)
if e, ok := event.Object.(*coreV1.Event); ok {
go d.handlePodDeleteEvent(e.InvolvedObject.UID)
printer.Debugf("Got k8s pod deleted event: %v\n", event.Object)
if p, ok := event.Object.(*coreV1.Pod); ok {
go d.handlePodDeleteEvent(*p)
}
case watch.Modified:
printer.Debugf("Got k8s pod modified event: %v\n", event.Object)
if p, ok := event.Object.(*coreV1.Pod); ok {
go d.handlePodModifyEvent(*p)
}
}

}
}
}
Expand Down Expand Up @@ -319,11 +335,17 @@ func (d *Daemonset) PodsHealthWorker(done <-chan struct{}) {
// 2. Stops the API dump process for the pod.
// 3. Logs any errors encountered during the state change or stopping process.
// 4. Removes the pod from the PodArgsByNameMap.
// 5. Wait for all the apidump processes to stop.
func (d *Daemonset) StopAllApiDumpProcesses() {
d.PodArgsByNameMap.Range(func(k, v interface{}) bool {
podUID := k.(types.UID)
podArgs := v.(*PodArgs)

if podArgs.isEndState() {
printer.Debugf("API dump process for pod %s already stopped, state: %s\n", podArgs.PodName, podArgs.PodTrafficMonitorState)
return true
}

// Since this state can happen at any time so no check for allowed current states
err := podArgs.changePodTrafficMonitorState(DaemonSetShutdown)
if err != nil {
Expand All @@ -341,4 +363,8 @@ func (d *Daemonset) StopAllApiDumpProcesses() {
d.PodArgsByNameMap.Delete(podUID)
return true
})

// Wait for all apidump processes to stop
printer.Debugf("Waiting for all apidump processes to stop...\n")
d.ApidumpProcessesWG.Wait()
}
Loading