Skip to content

Commit 9119ad9

Browse files
authored
fix: Delete OTLP log agent resources when no longer needed (#1815)
1 parent 816055c commit 9119ad9

File tree

5 files changed

+405
-5
lines changed

5 files changed

+405
-5
lines changed

docs/contributor/development.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Telemetry Manager has been bootstrapped with [Kubebuilder](https://github.com/ku
66

77
- Install [kubebuilder 3.6.0](https://github.com/kubernetes-sigs/kubebuilder), which is the base framework for Telemetry Manager. Required to add new APIs.
88
- Install [Golang 1.20](https://golang.org/dl/) or newer (for development and local execution).
9-
- Install [Docker](https://www.docker.com/get-started).
9+
- Install [Docker](https://www.docker.com/get-started/).
1010
- Install [golangci-lint](https://golangci-lint.run).
1111
- Install [ginkgo CLI](https://pkg.go.dev/github.com/onsi/ginkgo/ginkgo) to run the E2E test commands straight from your terminal.
1212

internal/reconciler/logpipeline/otel/reconciler.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ func (r *Reconciler) doReconcile(ctx context.Context, pipeline *telemetryv1alpha
134134
return fmt.Errorf("failed to fetch deployable log pipelines: %w", err)
135135
}
136136

137+
var reconcilablePipelinesRequiringAgents = r.getPipelinesRequiringAgents(reconcilablePipelines)
138+
139+
if len(reconcilablePipelinesRequiringAgents) == 0 {
140+
logf.FromContext(ctx).V(1).Info("cleaning up log agent resources: no log pipelines require an agent")
141+
142+
if err = r.agentApplierDeleter.DeleteResources(ctx, r.Client); err != nil {
143+
return fmt.Errorf("failed to delete agent resources: %w", err)
144+
}
145+
}
146+
137147
if len(reconcilablePipelines) == 0 {
138148
logf.FromContext(ctx).V(1).Info("cleaning up log pipeline resources: all log pipelines are non-reconcilable")
139149

@@ -317,6 +327,18 @@ func getAgentPorts() []int32 {
317327
}
318328
}
319329

330+
func (r *Reconciler) getPipelinesRequiringAgents(allPipelines []telemetryv1alpha1.LogPipeline) []telemetryv1alpha1.LogPipeline {
331+
var pipelinesRequiringAgents []telemetryv1alpha1.LogPipeline
332+
333+
for i := range allPipelines {
334+
if isLogAgentRequired(&allPipelines[i]) {
335+
pipelinesRequiringAgents = append(pipelinesRequiringAgents, allPipelines[i])
336+
}
337+
}
338+
339+
return pipelinesRequiringAgents
340+
}
341+
320342
func isLogAgentRequired(pipeline *telemetryv1alpha1.LogPipeline) bool {
321343
input := pipeline.Spec.Input
322344

internal/reconciler/logpipeline/otel/reconciler_test.go

Lines changed: 212 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestReconcile(t *testing.T) {
4444
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build()
4545

4646
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
47-
agentApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
47+
agentApplierDeleterMock.On("DeleteResources", mock.Anything, mock.Anything).Return(nil).Times(1)
4848

4949
agentConfigBuilderMock := &mocks.AgentConfigBuilder{}
5050
agentConfigBuilderMock.On("Build", mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1)
@@ -97,7 +97,7 @@ func TestReconcile(t *testing.T) {
9797
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build()
9898

9999
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
100-
agentApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
100+
agentApplierDeleterMock.On("DeleteResources", mock.Anything, mock.Anything).Return(nil).Times(1)
101101

102102
agentConfigBuilderMock := &mocks.AgentConfigBuilder{}
103103
agentConfigBuilderMock.On("Build", containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1)
@@ -150,7 +150,7 @@ func TestReconcile(t *testing.T) {
150150
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build()
151151

152152
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
153-
agentApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
153+
agentApplierDeleterMock.On("DeleteResources", mock.Anything, mock.Anything).Return(nil).Times(1)
154154

155155
agentConfigBuilderMock := &mocks.AgentConfigBuilder{}
156156
agentConfigBuilderMock.On("Build", containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1)
@@ -196,6 +196,7 @@ func TestReconcile(t *testing.T) {
196196

197197
gatewayConfigBuilderMock.AssertExpectations(t)
198198
})
199+
199200
t.Run("log agent daemonset is not ready", func(t *testing.T) {
200201
pipeline := testutils.NewLogPipelineBuilder().WithName("pipeline").WithOTLPOutput().WithApplicationInput(true).Build()
201202
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build()
@@ -299,12 +300,199 @@ func TestReconcile(t *testing.T) {
299300
agentConfigBuilderMock.AssertExpectations(t)
300301
gatewayConfigBuilderMock.AssertExpectations(t)
301302
})
303+
302304
// TODO: "referenced secret missing" (requires SecretRefValidator to be implemented)
303305
// TODO: "referenced secret exists" (requires SecretRefValidator to be implemented)
304306
// TODO: "flow healthy" (requires SelfMonitoring to be implemented)
305307
// TODO: "tls conditions" (requires TLSCertValidator to be implemented)
306308
// TODO: "all log pipelines are non-reconcilable" (requires SecretRefValidator to be implemented)
307309
// TODO: "Check different Pod Error Conditions" (requires SecretRefValidator to be implemented)
310+
311+
t.Run("one log pipeline does not require an agent", func(t *testing.T) {
312+
pipeline := testutils.NewLogPipelineBuilder().WithName("pipeline").WithOTLPOutput().WithApplicationInput(false).Build()
313+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build()
314+
315+
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
316+
agentApplierDeleterMock.On("DeleteResources", mock.Anything, mock.Anything).Return(nil).Times(1)
317+
318+
gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{}
319+
gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1)
320+
321+
gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{}
322+
gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
323+
324+
gatewayProberStub := commonStatusStubs.NewDeploymentSetProber(nil)
325+
agentProberStub := commonStatusStubs.NewDaemonSetProber(nil)
326+
327+
sut := New(
328+
fakeClient,
329+
telemetryNamespace,
330+
moduleVersion,
331+
&mocks.AgentConfigBuilder{},
332+
agentApplierDeleterMock,
333+
agentProberStub,
334+
gatewayApplierDeleterMock,
335+
gatewayConfigBuilderMock,
336+
gatewayProberStub,
337+
istioStatusCheckerStub,
338+
&Validator{},
339+
&conditions.ErrorToMessageConverter{})
340+
err := sut.Reconcile(context.Background(), &pipeline)
341+
require.NoError(t, err)
342+
343+
var updatedPipeline telemetryv1alpha1.LogPipeline
344+
_ = fakeClient.Get(context.Background(), types.NamespacedName{Name: pipeline.Name}, &updatedPipeline)
345+
346+
requireHasStatusCondition(t, updatedPipeline,
347+
conditions.TypeAgentHealthy,
348+
metav1.ConditionTrue,
349+
conditions.ReasonLogAgentNotRequired,
350+
"")
351+
352+
agentApplierDeleterMock.AssertExpectations(t)
353+
gatewayConfigBuilderMock.AssertExpectations(t)
354+
})
355+
356+
t.Run("some log pipelines do not require an agent", func(t *testing.T) {
357+
pipeline1 := testutils.NewLogPipelineBuilder().WithName("pipeline1").WithOTLPOutput().WithApplicationInput(false).Build()
358+
pipeline2 := testutils.NewLogPipelineBuilder().WithName("pipeline2").WithOTLPOutput().WithApplicationInput(true).Build()
359+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline1, &pipeline2).WithStatusSubresource(&pipeline1, &pipeline2).Build()
360+
361+
agentConfigBuilderMock := &mocks.AgentConfigBuilder{}
362+
agentConfigBuilderMock.On("Build", containsPipelines([]telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}), mock.Anything).Return(&agent.Config{}, nil, nil).Times(1)
363+
364+
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
365+
agentApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(1)
366+
367+
gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{}
368+
gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipelines([]telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}), mock.Anything).Return(&gateway.Config{}, nil, nil)
369+
370+
gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{}
371+
gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
372+
373+
gatewayProberStub := commonStatusStubs.NewDeploymentSetProber(nil)
374+
agentProberStub := commonStatusStubs.NewDaemonSetProber(nil)
375+
376+
sut := New(
377+
fakeClient,
378+
telemetryNamespace,
379+
moduleVersion,
380+
agentConfigBuilderMock,
381+
agentApplierDeleterMock,
382+
agentProberStub,
383+
gatewayApplierDeleterMock,
384+
gatewayConfigBuilderMock,
385+
gatewayProberStub,
386+
istioStatusCheckerStub,
387+
&Validator{},
388+
&conditions.ErrorToMessageConverter{})
389+
err1 := sut.Reconcile(context.Background(), &pipeline1)
390+
err2 := sut.Reconcile(context.Background(), &pipeline2)
391+
392+
require.NoError(t, err1)
393+
require.NoError(t, err2)
394+
395+
var updatedPipeline1 telemetryv1alpha1.LogPipeline
396+
_ = fakeClient.Get(context.Background(), types.NamespacedName{Name: pipeline1.Name}, &updatedPipeline1)
397+
398+
requireHasStatusCondition(t, updatedPipeline1,
399+
conditions.TypeAgentHealthy,
400+
metav1.ConditionTrue,
401+
conditions.ReasonLogAgentNotRequired,
402+
"")
403+
404+
agentConfigBuilderMock.AssertExpectations(t)
405+
agentApplierDeleterMock.AssertExpectations(t)
406+
gatewayConfigBuilderMock.AssertExpectations(t)
407+
})
408+
409+
t.Run("all log pipelines do not require an agent", func(t *testing.T) {
410+
pipeline1 := testutils.NewLogPipelineBuilder().WithName("pipeline1").WithOTLPOutput().WithApplicationInput(false).Build()
411+
pipeline2 := testutils.NewLogPipelineBuilder().WithName("pipeline2").WithOTLPOutput().WithApplicationInput(false).Build()
412+
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline1, &pipeline2).WithStatusSubresource(&pipeline1, &pipeline2).Build()
413+
414+
agentApplierDeleterMock := &mocks.AgentApplierDeleter{}
415+
agentApplierDeleterMock.On("DeleteResources", mock.Anything, mock.Anything).Return(nil).Times(2)
416+
417+
gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{}
418+
gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipelines([]telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}), mock.Anything).Return(&gateway.Config{}, nil, nil)
419+
420+
gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{}
421+
gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil)
422+
423+
gatewayProberStub := commonStatusStubs.NewDeploymentSetProber(nil)
424+
agentProberStub := commonStatusStubs.NewDaemonSetProber(nil)
425+
426+
sut := New(
427+
fakeClient,
428+
telemetryNamespace,
429+
moduleVersion,
430+
&mocks.AgentConfigBuilder{},
431+
agentApplierDeleterMock,
432+
agentProberStub,
433+
gatewayApplierDeleterMock,
434+
gatewayConfigBuilderMock,
435+
gatewayProberStub,
436+
istioStatusCheckerStub,
437+
&Validator{},
438+
&conditions.ErrorToMessageConverter{})
439+
err1 := sut.Reconcile(context.Background(), &pipeline1)
440+
err2 := sut.Reconcile(context.Background(), &pipeline2)
441+
442+
require.NoError(t, err1)
443+
require.NoError(t, err2)
444+
445+
var updatedPipeline1 telemetryv1alpha1.LogPipeline
446+
447+
var updatedPipeline2 telemetryv1alpha1.LogPipeline
448+
449+
_ = fakeClient.Get(context.Background(), types.NamespacedName{Name: pipeline1.Name}, &updatedPipeline1)
450+
_ = fakeClient.Get(context.Background(), types.NamespacedName{Name: pipeline1.Name}, &updatedPipeline2)
451+
452+
requireHasStatusCondition(t, updatedPipeline1,
453+
conditions.TypeAgentHealthy,
454+
metav1.ConditionTrue,
455+
conditions.ReasonLogAgentNotRequired,
456+
"")
457+
requireHasStatusCondition(t, updatedPipeline2,
458+
conditions.TypeAgentHealthy,
459+
metav1.ConditionTrue,
460+
conditions.ReasonLogAgentNotRequired,
461+
"")
462+
463+
agentApplierDeleterMock.AssertExpectations(t)
464+
gatewayConfigBuilderMock.AssertExpectations(t)
465+
})
466+
}
467+
468+
func TestGetPipelinesRequiringAgents(t *testing.T) {
469+
r := Reconciler{}
470+
471+
t.Run("no pipelines", func(t *testing.T) {
472+
pipelines := []telemetryv1alpha1.LogPipeline{}
473+
require.Empty(t, r.getPipelinesRequiringAgents(pipelines))
474+
})
475+
476+
t.Run("no pipeline requires an agent", func(t *testing.T) {
477+
pipeline1 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(false).Build()
478+
pipeline2 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(false).Build()
479+
pipelines := []telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}
480+
require.Empty(t, r.getPipelinesRequiringAgents(pipelines))
481+
})
482+
483+
t.Run("some pipelines require an agent", func(t *testing.T) {
484+
pipeline1 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(true).Build()
485+
pipeline2 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(false).Build()
486+
pipelines := []telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}
487+
require.ElementsMatch(t, []telemetryv1alpha1.LogPipeline{pipeline1}, r.getPipelinesRequiringAgents(pipelines))
488+
})
489+
490+
t.Run("all pipelines require an agent", func(t *testing.T) {
491+
pipeline1 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(true).Build()
492+
pipeline2 := testutils.NewLogPipelineBuilder().WithOTLPOutput().WithApplicationInput(true).Build()
493+
pipelines := []telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}
494+
require.ElementsMatch(t, []telemetryv1alpha1.LogPipeline{pipeline1, pipeline2}, r.getPipelinesRequiringAgents(pipelines))
495+
})
308496
}
309497

310498
func requireHasStatusCondition(t *testing.T, pipeline telemetryv1alpha1.LogPipeline, condType string, status metav1.ConditionStatus, reason, message string) {
@@ -322,3 +510,24 @@ func containsPipeline(p telemetryv1alpha1.LogPipeline) any {
322510
return len(pipelines) == 1 && pipelines[0].Name == p.Name
323511
})
324512
}
513+
514+
func containsPipelines(pp []telemetryv1alpha1.LogPipeline) any {
515+
return mock.MatchedBy(func(pipelines []telemetryv1alpha1.LogPipeline) bool {
516+
if len(pipelines) != len(pp) {
517+
return false
518+
}
519+
520+
pipelineMap := make(map[string]bool)
521+
for _, p := range pipelines {
522+
pipelineMap[p.Name] = true
523+
}
524+
525+
for _, p := range pp {
526+
if !pipelineMap[p.Name] {
527+
return false
528+
}
529+
}
530+
531+
return true
532+
})
533+
}

0 commit comments

Comments
 (0)