Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions internal/cnpgi/instance/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
// DefaultTTLSeconds is the default TTL in seconds of cache entries
const DefaultTTLSeconds = 10

// DefaultCleanupIntervalSeconds is the default interval in seconds for cache cleanup
const DefaultCleanupIntervalSeconds = 30

type cachedEntry struct {
entry client.Object
fetchUnixTime int64
Expand All @@ -49,18 +52,30 @@ func (e *cachedEntry) isExpired() bool {
// ExtendedClient is an extended client that is capable of caching multiple secrets without relying on informers
type ExtendedClient struct {
client.Client
cachedObjects []cachedEntry
mux *sync.Mutex
cachedObjects []cachedEntry
mux *sync.Mutex
cleanupInterval time.Duration
cleanupDone chan struct{} // Signals when cleanup routine exits
}

// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation
// NewExtendedClient returns an extended client capable of caching secrets on the 'Get' operation.
// It starts a background goroutine that periodically cleans up expired cache entries.
// The cleanup routine will stop when the provided context is cancelled.
func NewExtendedClient(
ctx context.Context,
baseClient client.Client,
) client.Client {
return &ExtendedClient{
Client: baseClient,
mux: &sync.Mutex{},
ec := &ExtendedClient{
Client: baseClient,
mux: &sync.Mutex{},
cleanupInterval: DefaultCleanupIntervalSeconds * time.Second,
cleanupDone: make(chan struct{}),
}

// Start the background cleanup routine
go ec.startCleanupRoutine(ctx)

return ec
}

func (e *ExtendedClient) isObjectCached(obj client.Object) bool {
Expand Down Expand Up @@ -208,3 +223,55 @@ func (e *ExtendedClient) Patch(

return e.Client.Patch(ctx, obj, patch, opts...)
}

// startCleanupRoutine periodically removes expired entries from the cache.
// It runs until the context is cancelled.
func (e *ExtendedClient) startCleanupRoutine(ctx context.Context) {
defer close(e.cleanupDone)
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")
ticker := time.NewTicker(e.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
contextLogger.Debug("stopping cache cleanup routine")
return
case <-ticker.C:
// Check context before cleanup to avoid unnecessary work during shutdown
if ctx.Err() != nil {
return
}
e.cleanupExpiredEntries(ctx)
}
}
}

// cleanupExpiredEntries removes all expired entries from the cache.
func (e *ExtendedClient) cleanupExpiredEntries(ctx context.Context) {
contextLogger := log.FromContext(ctx).WithName("extended_client_cleanup")

e.mux.Lock()
defer e.mux.Unlock()

initialCount := len(e.cachedObjects)
if initialCount == 0 {
return
}

// Create a new slice with only non-expired entries
validEntries := make([]cachedEntry, 0, initialCount)
for _, entry := range e.cachedObjects {
if !entry.isExpired() {
validEntries = append(validEntries, entry)
}
}

removedCount := initialCount - len(validEntries)
if removedCount > 0 {
e.cachedObjects = validEntries
contextLogger.Debug("cleaned up expired cache entries",
"removedCount", removedCount,
"remainingCount", len(validEntries))
}
}
149 changes: 148 additions & 1 deletion internal/cnpgi/instance/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SPDX-License-Identifier: Apache-2.0
package client

import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -59,6 +60,7 @@ var _ = Describe("ExtendedClient Get", func() {
extendedClient *ExtendedClient
secretInClient *corev1.Secret
objectStore *barmancloudv1.ObjectStore
cancelCtx context.CancelFunc
)

BeforeEach(func() {
Expand All @@ -79,7 +81,14 @@ var _ = Describe("ExtendedClient Get", func() {
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(secretInClient, objectStore).Build()
extendedClient = NewExtendedClient(baseClient).(*ExtendedClient)
ctx, cancel := context.WithCancel(context.Background())
cancelCtx = cancel
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
})

AfterEach(func() {
// Cancel the context to stop the cleanup routine
cancelCtx()
})

It("returns secret from cache if not expired", func(ctx SpecContext) {
Expand Down Expand Up @@ -164,3 +173,141 @@ var _ = Describe("ExtendedClient Get", func() {
Expect(objectStore.GetResourceVersion()).To(Equal("from cache"))
})
})

var _ = Describe("ExtendedClient Cache Cleanup", func() {
var (
extendedClient *ExtendedClient
cancelCtx context.CancelFunc
)

BeforeEach(func() {
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
ctx, cancel := context.WithCancel(context.Background())
cancelCtx = cancel
extendedClient = NewExtendedClient(ctx, baseClient).(*ExtendedClient)
})

AfterEach(func() {
cancelCtx()
})

It("cleans up expired entries", func(ctx SpecContext) {
// Add some expired entries
expiredSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-1",
},
}
expiredSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-2",
},
}
validSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret",
},
}

// Add expired entries (2 minutes ago)
addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())
// Add valid entry (just now)
addToCache(extendedClient, validSecret, time.Now().Unix())

Expect(extendedClient.cachedObjects).To(HaveLen(3))

// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)

// Only the valid entry should remain
Expect(extendedClient.cachedObjects).To(HaveLen(1))
Expect(extendedClient.cachedObjects[0].entry.GetName()).To(Equal("valid-secret"))
})

It("does nothing when all entries are valid", func(ctx SpecContext) {
validSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret-1",
},
}
validSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "valid-secret-2",
},
}

addToCache(extendedClient, validSecret1, time.Now().Unix())
addToCache(extendedClient, validSecret2, time.Now().Unix())

Expect(extendedClient.cachedObjects).To(HaveLen(2))

// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)

// Both entries should remain
Expect(extendedClient.cachedObjects).To(HaveLen(2))
})

It("does nothing when cache is empty", func(ctx SpecContext) {
Expect(extendedClient.cachedObjects).To(BeEmpty())

// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)

Expect(extendedClient.cachedObjects).To(BeEmpty())
})

It("removes all entries when all are expired", func(ctx SpecContext) {
expiredSecret1 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-1",
},
}
expiredSecret2 := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "expired-secret-2",
},
}

addToCache(extendedClient, expiredSecret1, time.Now().Add(-2*time.Minute).Unix())
addToCache(extendedClient, expiredSecret2, time.Now().Add(-2*time.Minute).Unix())

Expect(extendedClient.cachedObjects).To(HaveLen(2))

// Trigger cleanup
extendedClient.cleanupExpiredEntries(ctx)

Expect(extendedClient.cachedObjects).To(BeEmpty())
})

It("stops cleanup routine when context is cancelled", func() {
// Create a new client with a short cleanup interval for testing
baseClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
ctx, cancel := context.WithCancel(context.Background())
ec := NewExtendedClient(ctx, baseClient).(*ExtendedClient)
ec.cleanupInterval = 10 * time.Millisecond

// Cancel the context immediately
cancel()

// Verify the cleanup routine actually stops by waiting for the done channel
select {
case <-ec.cleanupDone:
// Success: cleanup routine exited as expected
case <-time.After(1 * time.Second):
Fail("cleanup routine did not stop within timeout")
}
})
})
2 changes: 1 addition & 1 deletion internal/cnpgi/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Start(ctx context.Context) error {
return err
}

customCacheClient := extendedclient.NewExtendedClient(mgr.GetClient())
customCacheClient := extendedclient.NewExtendedClient(ctx, mgr.GetClient())

if err := mgr.Add(&CNPGI{
Client: customCacheClient,
Expand Down
Loading