From 62b579101f1091e2abe0dd6321675d6a26b80d29 Mon Sep 17 00:00:00 2001 From: Armando Ruocco Date: Tue, 25 Nov 2025 17:54:42 +0100 Subject: [PATCH 1/2] fix: prevent memory leak by periodically cleaning up expired cache entries Signed-off-by: Armando Ruocco --- .../cnpgi/instance/internal/client/client.go | 76 ++++++++- .../instance/internal/client/client_test.go | 147 +++++++++++++++++- internal/cnpgi/instance/manager.go | 2 +- 3 files changed, 217 insertions(+), 8 deletions(-) diff --git a/internal/cnpgi/instance/internal/client/client.go b/internal/cnpgi/instance/internal/client/client.go index 536f1ddd..c436c058 100644 --- a/internal/cnpgi/instance/internal/client/client.go +++ b/internal/cnpgi/instance/internal/client/client.go @@ -36,6 +36,9 @@ import ( // DefaultTTLSeconds is the default TTL in seconds of cache entries const DefaultTTLSeconds = 10 +// DefaultCleanupIntervalSeconds is the default interval in seconds for cache cleanup +const DefaultCleanupIntervalSeconds = 30 + type cachedEntry struct { entry client.Object fetchUnixTime int64 @@ -49,18 +52,28 @@ func (e *cachedEntry) isExpired() bool { // ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers type ExtendedClient struct { client.Client - cachedObjects []cachedEntry - mux *sync.Mutex + cachedObjects []cachedEntry + mux *sync.Mutex + cleanupInterval time.Duration } -// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation +// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation. +// It starts a background goroutine that periodically cleans up expired cache entries. +// The cleanup routine will stop when the provided context is cancelled. func NewExtendedClient( + ctx context.Context, baseClient client.Client, ) client.Client { - return &ExtendedClient{ - Client: baseClient, - mux: &sync.Mutex{}, + ec := &ExtendedClient{ + Client: baseClient, + mux: &sync.Mutex{}, + cleanupInterval: DefaultCleanupIntervalSeconds * time.Second, } + + // Start the background cleanup routine + go ec.startCleanupRoutine(ctx) + + return ec } func (e *ExtendedClient) isObjectCached(obj client.Object) bool { @@ -208,3 +221,54 @@ func (e *ExtendedClient) Patch( return e.Client.Patch(ctx, obj, patch, opts...) } + +// startCleanupRoutine periodically removes expired entries from the cache. +// It runs until the context is cancelled. +func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) { + contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup") + ticker := time.NewTicker(e.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + contextLogger.Debug("stopping cache cleanup routine") + return + case <-ticker.C: + // Check context before cleanup to avoid unnecessary work during shutdown + if ctx.Err() != nil { + return + } + e.cleanupExpiredEntries(ctx) + } + } +} + +// cleanupExpiredEntries removes all expired entries from the cache. +func (e *ExtendedClient) cleanupExpiredEntries(ctx context.Context) { + contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup") + + e.mux.Lock() + defer e.mux.Unlock() + + initialCount := len(e.cachedObjects) + if initialCount == 0 { + return + } + + // Create a new slice with only non-expired entries + validEntries := make([]cachedEntry, 0, initialCount) + for _, entry := range e.cachedObjects { + if !entry.isExpired() { + validEntries = append(validEntries, entry) + } + } + + removedCount := initialCount - len(validEntries) + if removedCount > 0 { + e.cachedObjects = validEntries + contextLogger.Debug("cleaned up expired cache entries", + "removedCount", removedCount, + "remainingCount", len(validEntries)) + } +} diff --git a/internal/cnpgi/instance/internal/client/client_test.go b/internal/cnpgi/instance/internal/client/client_test.go index c7d2fa68..dc1c7465 100644 --- a/internal/cnpgi/instance/internal/client/client_test.go +++ b/internal/cnpgi/instance/internal/client/client_test.go @@ -20,6 +20,7 @@ SPDX-License-Identifier: Apache-2.0 package client import ( + "context" "time" corev1 "k8s.io/api/core/v1" @@ -59,6 +60,7 @@ var _ = Describe("ExtendedClient Get", func() { extendedClient *ExtendedClient secretInClient *corev1.Secret objectStore *barmancloudv1.ObjectStore + cancelCtx context.CancelFunc ) BeforeEach(func() { @@ -79,7 +81,14 @@ var _ = Describe("ExtendedClient Get", func() { baseClient := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(secretInClient, objectStore).Build() - extendedClient = NewExtendedClient(baseClient).(*ExtendedClient) + ctx, cancel := context.WithCancel(context.Background()) + cancelCtx = cancel + extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient) + }) + + AfterEach(func() { + // Cancel the context to stop the cleanup routine + cancelCtx() }) It("returns secret from cache if not expired", func(ctx SpecContext) { @@ -164,3 +173,139 @@ var _ = Describe("ExtendedClient Get", func() { Expect(objectStore.GetResourceVersion()).To(Equal("from cache")) }) }) + +var _ = Describe("ExtendedClient Cache Cleanup", func() { + var ( + extendedClient *ExtendedClient + cancelCtx context.CancelFunc + ) + + BeforeEach(func() { + baseClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + ctx, cancel := context.WithCancel(context.Background()) + cancelCtx = cancel + extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient) + }) + + AfterEach(func() { + cancelCtx() + }) + + It("cleans up expired entries", func(ctx SpecContext) { + // Add some expired entries + expiredSecret1 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "expired-secret-1", + }, + } + expiredSecret2 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "expired-secret-2", + }, + } + validSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "valid-secret", + }, + } + + // Add expired entries (2 minutes ago) + addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix()) + addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix()) + // Add valid entry (just now) + addToCache(extendedClient, validSecret, time.Now().Unix()) + + Expect(extendedClient.cachedObjects).To(HaveLen(3)) + + // Trigger cleanup + extendedClient.cleanupExpiredEntries(ctx) + + // Only the valid entry should remain + Expect(extendedClient.cachedObjects).To(HaveLen(1)) + Expect(extendedClient.cachedObjects[0].entry.GetName()).To(Equal("valid-secret")) + }) + + It("does nothing when all entries are valid", func(ctx SpecContext) { + validSecret1 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "valid-secret-1", + }, + } + validSecret2 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "valid-secret-2", + }, + } + + addToCache(extendedClient, validSecret1, time.Now().Unix()) + addToCache(extendedClient, validSecret2, time.Now().Unix()) + + Expect(extendedClient.cachedObjects).To(HaveLen(2)) + + // Trigger cleanup + extendedClient.cleanupExpiredEntries(ctx) + + // Both entries should remain + Expect(extendedClient.cachedObjects).To(HaveLen(2)) + }) + + It("does nothing when cache is empty", func(ctx SpecContext) { + Expect(extendedClient.cachedObjects).To(BeEmpty()) + + // Trigger cleanup + extendedClient.cleanupExpiredEntries(ctx) + + Expect(extendedClient.cachedObjects).To(BeEmpty()) + }) + + It("removes all entries when all are expired", func(ctx SpecContext) { + expiredSecret1 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "expired-secret-1", + }, + } + expiredSecret2 := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "expired-secret-2", + }, + } + + addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix()) + addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix()) + + Expect(extendedClient.cachedObjects).To(HaveLen(2)) + + // Trigger cleanup + extendedClient.cleanupExpiredEntries(ctx) + + Expect(extendedClient.cachedObjects).To(BeEmpty()) + }) + + It("stops cleanup routine when context is cancelled", func() { + // Create a new client with a short cleanup interval for testing + baseClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + ctx, cancel := context.WithCancel(context.Background()) + ec := NewExtendedClient(ctx, baseClient).(*ExtendedClient) + ec.cleanupInterval = 10 * time.Millisecond + + // Cancel the context immediately + cancel() + + // Give the goroutine time to stop + time.Sleep(50 * time.Millisecond) + + // The goroutine should have stopped gracefully (no panic or hanging) + // This test mainly verifies the cleanup routine respects context cancellation + }) +}) diff --git a/internal/cnpgi/instance/manager.go b/internal/cnpgi/instance/manager.go index 6f82c4c1..a6c70d1c 100644 --- a/internal/cnpgi/instance/manager.go +++ b/internal/cnpgi/instance/manager.go @@ -84,7 +84,7 @@ func Start(ctx context.Context) error { return err } - customCacheClient := extendedclient.NewExtendedClient(mgr.GetClient()) + customCacheClient := extendedclient.NewExtendedClient(ctx, mgr.GetClient()) if err := mgr.Add(&CNPGI{ Client: customCacheClient, From 6a55a361a36d09cfeff0bb61edadd1c4cdc6a553 Mon Sep 17 00:00:00 2001 From: Marco Nenciarini Date: Mon, 1 Dec 2025 18:46:34 +0100 Subject: [PATCH 2/2] test: replace sleep-based test with deterministic channel verification The cleanup routine test used time.Sleep() without actually verifying the goroutine stopped. Added a done channel to provide deterministic verification of goroutine termination. Signed-off-by: Marco Nenciarini --- internal/cnpgi/instance/internal/client/client.go | 3 +++ .../cnpgi/instance/internal/client/client_test.go | 12 +++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/cnpgi/instance/internal/client/client.go b/internal/cnpgi/instance/internal/client/client.go index c436c058..3b990d8f 100644 --- a/internal/cnpgi/instance/internal/client/client.go +++ b/internal/cnpgi/instance/internal/client/client.go @@ -55,6 +55,7 @@ type ExtendedClient struct { cachedObjects []cachedEntry mux *sync.Mutex cleanupInterval time.Duration + cleanupDone chan struct{} // Signals when cleanup routine exits } // NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation. @@ -68,6 +69,7 @@ func NewExtendedClient( Client: baseClient, mux: &sync.Mutex{}, cleanupInterval: DefaultCleanupIntervalSeconds * time.Second, + cleanupDone: make(chan struct{}), } // Start the background cleanup routine @@ -225,6 +227,7 @@ func (e *ExtendedClient) Patch( // startCleanupRoutine periodically removes expired entries from the cache. // It runs until the context is cancelled. func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) { + defer close(e.cleanupDone) contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup") ticker := time.NewTicker(e.cleanupInterval) defer ticker.Stop() diff --git a/internal/cnpgi/instance/internal/client/client_test.go b/internal/cnpgi/instance/internal/client/client_test.go index dc1c7465..fb57173f 100644 --- a/internal/cnpgi/instance/internal/client/client_test.go +++ b/internal/cnpgi/instance/internal/client/client_test.go @@ -302,10 +302,12 @@ var _ = Describe("ExtendedClient Cache Cleanup", func() { // Cancel the context immediately cancel() - // Give the goroutine time to stop - time.Sleep(50 * time.Millisecond) - - // The goroutine should have stopped gracefully (no panic or hanging) - // This test mainly verifies the cleanup routine respects context cancellation + // Verify the cleanup routine actually stops by waiting for the done channel + select { + case <-ec.cleanupDone: + // Success: cleanup routine exited as expected + case <-time.After(1 * time.Second): + Fail("cleanup routine did not stop within timeout") + } }) })