Skip to content

Commit 8f7adb0

Browse files
committed
fix: prevent memory leak by periodically cleaning up expired cache entries
The plugin-barman-cloud sidecar container was experiencing a gradual memory leak on primary instances, particularly during continuous WAL archiving operations. The memory usage would steadily increase over time, eventually causing OOM conditions that required cluster restarts to mitigate. Root cause: The ExtendedClient cache implementation was only replacing expired entries when the exact same object was requested again. Different objects (secrets and object stores) were continuously being added to the cache slice but never removed, causing unbounded memory growth. This fix introduces a background cleanup routine that periodically scans and removes expired cache entries every 30 seconds. The cleanup routine: - Runs in a separate goroutine tied to the manager's lifecycle - Respects context cancellation for clean shutdown - Efficiently rebuilds the cache slice with only valid entries - Logs cleanup activity for observability Users should notice stable memory usage on primary instances after applying this fix, eliminating the need for periodic cluster restarts. Partially Fixes: #385 Tagging this as a partial fix given that there could be other memory leaks Signed-off-by: Armando Ruocco <[email protected]>
1 parent 8901cb9 commit 8f7adb0

File tree

3 files changed

+217
-8
lines changed

3 files changed

+217
-8
lines changed

internal/cnpgi/instance/internal/client/client.go

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ import (
3636
// DefaultTTLSeconds is the default TTL in seconds of cache entries
3737
const DefaultTTLSeconds = 10
3838

39+
// DefaultCleanupIntervalSeconds is the default interval in seconds for cache cleanup
40+
const DefaultCleanupIntervalSeconds = 30
41+
3942
type cachedEntry struct {
4043
entry client.Object
4144
fetchUnixTime int64
@@ -49,18 +52,28 @@ func (e *cachedEntry) isExpired() bool {
4952
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
5053
type ExtendedClient struct {
5154
client.Client
52-
cachedObjects []cachedEntry
53-
mux *sync.Mutex
55+
cachedObjects []cachedEntry
56+
mux *sync.Mutex
57+
cleanupInterval time.Duration
5458
}
5559

56-
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
60+
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation.
61+
// It starts a background goroutine that periodically cleans up expired cache entries.
62+
// The cleanup routine will stop when the provided context is cancelled.
5763
func NewExtendedClient(
64+
ctx context.Context,
5865
baseClient client.Client,
5966
) client.Client {
60-
return &ExtendedClient{
61-
Client: baseClient,
62-
mux: &sync.Mutex{},
67+
ec := &ExtendedClient{
68+
Client: baseClient,
69+
mux: &sync.Mutex{},
70+
cleanupInterval: DefaultCleanupIntervalSeconds * time.Second,
6371
}
72+
73+
// Start the background cleanup routine
74+
go ec.startCleanupRoutine(ctx)
75+
76+
return ec
6477
}
6578

6679
func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
@@ -208,3 +221,54 @@ func (e *ExtendedClient) Patch(
208221

209222
return e.Client.Patch(ctx, obj, patch, opts...)
210223
}
224+
225+
// startCleanupRoutine periodically removes expired entries from the cache.
226+
// It runs until the context is cancelled.
227+
func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) {
228+
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
229+
ticker := time.NewTicker(e.cleanupInterval)
230+
defer ticker.Stop()
231+
232+
for {
233+
select {
234+
case <-ctx.Done():
235+
contextLogger.Debug("stopping cache cleanup routine")
236+
return
237+
case <-ticker.C:
238+
// Check context before cleanup to avoid unnecessary work during shutdown
239+
if ctx.Err() != nil {
240+
return
241+
}
242+
e.cleanupExpiredEntries(ctx)
243+
}
244+
}
245+
}
246+
247+
// cleanupExpiredEntries removes all expired entries from the cache.
248+
func (e *ExtendedClient) cleanupExpiredEntries(ctx context.Context) {
249+
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
250+
251+
e.mux.Lock()
252+
defer e.mux.Unlock()
253+
254+
initialCount := len(e.cachedObjects)
255+
if initialCount == 0 {
256+
return
257+
}
258+
259+
// Create a new slice with only non-expired entries
260+
validEntries := make([]cachedEntry, 0, initialCount)
261+
for _, entry := range e.cachedObjects {
262+
if !entry.isExpired() {
263+
validEntries = append(validEntries, entry)
264+
}
265+
}
266+
267+
removedCount := initialCount - len(validEntries)
268+
if removedCount > 0 {
269+
e.cachedObjects = validEntries
270+
contextLogger.Debug("cleaned up expired cache entries",
271+
"removedCount", removedCount,
272+
"remainingCount", len(validEntries))
273+
}
274+
}

internal/cnpgi/instance/internal/client/client_test.go

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ SPDX-License-Identifier: Apache-2.0
2020
package client
2121

2222
import (
23+
"context"
2324
"time"
2425

2526
corev1 "k8s.io/api/core/v1"
@@ -59,6 +60,7 @@ var _ = Describe("ExtendedClient Get", func() {
5960
extendedClient *ExtendedClient
6061
secretInClient *corev1.Secret
6162
objectStore *barmancloudv1.ObjectStore
63+
cancelCtx context.CancelFunc
6264
)
6365

6466
BeforeEach(func() {
@@ -79,7 +81,14 @@ var _ = Describe("ExtendedClient Get", func() {
7981
baseClient := fake.NewClientBuilder().
8082
WithScheme(scheme).
8183
WithObjects(secretInClient, objectStore).Build()
82-
extendedClient = NewExtendedClient(baseClient).(*ExtendedClient)
84+
ctx, cancel := context.WithCancel(context.Background())
85+
cancelCtx = cancel
86+
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
87+
})
88+
89+
AfterEach(func() {
90+
// Cancel the context to stop the cleanup routine
91+
cancelCtx()
8392
})
8493

8594
It("returns secret from cache if not expired", func(ctx SpecContext) {
@@ -164,3 +173,139 @@ var _ = Describe("ExtendedClient Get", func() {
164173
Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
165174
})
166175
})
176+
177+
var _ = Describe("ExtendedClient Cache Cleanup", func() {
178+
var (
179+
extendedClient *ExtendedClient
180+
cancelCtx context.CancelFunc
181+
)
182+
183+
BeforeEach(func() {
184+
baseClient := fake.NewClientBuilder().
185+
WithScheme(scheme).
186+
Build()
187+
ctx, cancel := context.WithCancel(context.Background())
188+
cancelCtx = cancel
189+
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
190+
})
191+
192+
AfterEach(func() {
193+
cancelCtx()
194+
})
195+
196+
It("cleans up expired entries", func(ctx SpecContext) {
197+
// Add some expired entries
198+
expiredSecret1 := &corev1.Secret{
199+
ObjectMeta: metav1.ObjectMeta{
200+
Namespace: "default",
201+
Name: "expired-secret-1",
202+
},
203+
}
204+
expiredSecret2 := &corev1.Secret{
205+
ObjectMeta: metav1.ObjectMeta{
206+
Namespace: "default",
207+
Name: "expired-secret-2",
208+
},
209+
}
210+
validSecret := &corev1.Secret{
211+
ObjectMeta: metav1.ObjectMeta{
212+
Namespace: "default",
213+
Name: "valid-secret",
214+
},
215+
}
216+
217+
// Add expired entries (2 minutes ago)
218+
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
219+
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
220+
// Add valid entry (just now)
221+
addToCache(extendedClient, validSecret, time.Now().Unix())
222+
223+
Expect(extendedClient.cachedObjects).To(HaveLen(3))
224+
225+
// Trigger cleanup
226+
extendedClient.cleanupExpiredEntries(ctx)
227+
228+
// Only the valid entry should remain
229+
Expect(extendedClient.cachedObjects).To(HaveLen(1))
230+
Expect(extendedClient.cachedObjects[0].entry.GetName()).To(Equal("valid-secret"))
231+
})
232+
233+
It("does nothing when all entries are valid", func(ctx SpecContext) {
234+
validSecret1 := &corev1.Secret{
235+
ObjectMeta: metav1.ObjectMeta{
236+
Namespace: "default",
237+
Name: "valid-secret-1",
238+
},
239+
}
240+
validSecret2 := &corev1.Secret{
241+
ObjectMeta: metav1.ObjectMeta{
242+
Namespace: "default",
243+
Name: "valid-secret-2",
244+
},
245+
}
246+
247+
addToCache(extendedClient, validSecret1, time.Now().Unix())
248+
addToCache(extendedClient, validSecret2, time.Now().Unix())
249+
250+
Expect(extendedClient.cachedObjects).To(HaveLen(2))
251+
252+
// Trigger cleanup
253+
extendedClient.cleanupExpiredEntries(ctx)
254+
255+
// Both entries should remain
256+
Expect(extendedClient.cachedObjects).To(HaveLen(2))
257+
})
258+
259+
It("does nothing when cache is empty", func(ctx SpecContext) {
260+
Expect(extendedClient.cachedObjects).To(BeEmpty())
261+
262+
// Trigger cleanup
263+
extendedClient.cleanupExpiredEntries(ctx)
264+
265+
Expect(extendedClient.cachedObjects).To(BeEmpty())
266+
})
267+
268+
It("removes all entries when all are expired", func(ctx SpecContext) {
269+
expiredSecret1 := &corev1.Secret{
270+
ObjectMeta: metav1.ObjectMeta{
271+
Namespace: "default",
272+
Name: "expired-secret-1",
273+
},
274+
}
275+
expiredSecret2 := &corev1.Secret{
276+
ObjectMeta: metav1.ObjectMeta{
277+
Namespace: "default",
278+
Name: "expired-secret-2",
279+
},
280+
}
281+
282+
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
283+
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
284+
285+
Expect(extendedClient.cachedObjects).To(HaveLen(2))
286+
287+
// Trigger cleanup
288+
extendedClient.cleanupExpiredEntries(ctx)
289+
290+
Expect(extendedClient.cachedObjects).To(BeEmpty())
291+
})
292+
293+
It("stops cleanup routine when context is cancelled", func() {
294+
// Create a new client with a short cleanup interval for testing
295+
baseClient := fake.NewClientBuilder().
296+
WithScheme(scheme).
297+
Build()
298+
ctx, cancel := context.WithCancel(context.Background())
299+
ec := NewExtendedClient(ctx, baseClient).(*ExtendedClient)
300+
ec.cleanupInterval = 10 * time.Millisecond
301+
302+
// Cancel the context immediately
303+
cancel()
304+
305+
// Give the goroutine time to stop
306+
time.Sleep(50 * time.Millisecond)
307+
308+
// The goroutine should have stopped gracefully (no panic or hanging)
309+
// This test mainly verifies the cleanup routine respects context cancellation
310+
})
311+
})

internal/cnpgi/instance/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func Start(ctx context.Context) error {
8383
return err
8484
}
8585

86-
customCacheClient := extendedclient.NewExtendedClient(mgr.GetClient())
86+
customCacheClient := extendedclient.NewExtendedClient(ctx, mgr.GetClient())
8787

8888
if err := mgr.Add(&CNPGI{
8989
Client: customCacheClient,

0 commit comments

Comments
 (0)