Skip to content

Commit a0ee591

Browse files
authored
Merge pull request #1413 from sunnylovestiramisu/leakedVolumeCache
Add In Memory Cache of Node/CSINode Info for Long Provisioning to Prevent Volume Leak
2 parents 07fe125 + f0f04cc commit a0ee591

File tree

22 files changed

+960
-164
lines changed

22 files changed

+960
-164
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
_ "k8s.io/component-base/metrics/prometheus/workqueue" // register work queues in the default legacy registry
6161
csitrans "k8s.io/csi-translation-lib"
6262
"k8s.io/klog/v2"
63-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
64-
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller/metrics"
63+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
64+
libmetrics "sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller/metrics"
6565

6666
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
6767
"github.com/kubernetes-csi/csi-lib-utils/metrics"
@@ -408,7 +408,6 @@ func main() {
408408
controller.Threadiness(int(*workerThreads)),
409409
controller.CreateProvisionedPVLimiter(workqueue.DefaultTypedControllerRateLimiter[string]()),
410410
controller.ClaimsInformer(claimInformer),
411-
controller.NodesLister(nodeLister),
412411
controller.RetryIntervalMax(*retryIntervalMax),
413412
}
414413

@@ -419,6 +418,10 @@ func main() {
419418
if supportsMigrationFromInTreePluginName != "" {
420419
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
421420
}
421+
var pvcNodeStore *ctrl.InMemoryStore
422+
if ctrl.SupportsTopology(pluginCapabilities) {
423+
pvcNodeStore = ctrl.NewInMemoryStore()
424+
}
422425

423426
// Create the provisioner: it implements the Provisioner interface expected by
424427
// the controller
@@ -448,6 +451,7 @@ func main() {
448451
nodeDeployment,
449452
*controllerPublishReadOnly,
450453
*preventVolumeModeConversion,
454+
pvcNodeStore,
451455
)
452456

453457
var capacityController *capacity.Controller

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/onsi/ginkgo/v2 v2.23.4
3434
github.com/onsi/gomega v1.37.0
3535
k8s.io/kubernetes v1.34.0
36-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1
36+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0
3737
)
3838

3939
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5E
374374
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg=
375375
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
376376
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
377-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1 h1:rQcMFPcZ12y82+BA7b29gCWyoI/+/gAQUZu/Cw+8bC0=
378-
sigs.k8s.io/sig-storage-lib-external-provisioner/v12 v12.0.1/go.mod h1:kPy4hBso6PNhP9PdlTDdBZqxP1RKg7DFFM7lIR1bA8k=
377+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0 h1:bqSqBfqtToTDMDz+FEzfqofXAp5ptt6Z7ShR0g05PGA=
378+
sigs.k8s.io/sig-storage-lib-external-provisioner/v13 v13.0.0/go.mod h1:1xSe5kgJcKbrtNdD5WoytKUoByAGDl3wVHlKP0RZIC8=
379379
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco=
380380
sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE=
381381
sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs=

pkg/capacity/provision.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121

2222
v1 "k8s.io/api/core/v1"
23-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
23+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2424
)
2525

2626
type provisionWrapper struct {

pkg/controller/cache.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"k8s.io/apimachinery/pkg/types"
8+
)
9+
10+
// TopologyInfo holds the data for NodeLabels and TopologyKeys
11+
type TopologyInfo struct {
12+
NodeLabels map[string]string
13+
TopologyKeys []string
14+
RequisiteTerms []topologyTerm
15+
}
16+
17+
// TopologyProvider is an interface that defines the behavior for looking up
18+
// a TopologyInfo object by its pvc UID.
19+
type TopologyProvider interface {
20+
GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error)
21+
// The entry is deleted when provision succeeds or returns a final error.
22+
Delete(pvcUID types.UID) error
23+
24+
// Update methods now perform an "upsert" and don't return errors.
25+
UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string)
26+
UpdateTopologyKeys(pvcUID types.UID, newKeys []string)
27+
UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm)
28+
}
29+
30+
// InMemoryStore is a concrete implementation of TopologyProvider.
31+
// It uses an in-memory map for quick lookups.
32+
type InMemoryStore struct {
33+
// The map key is the object's name.
34+
data map[types.UID]*TopologyInfo
35+
// Adding a mutex for thread-safe access
36+
mutex sync.RWMutex
37+
}
38+
39+
// NewInMemoryStore creates and initializes a new store.
40+
func NewInMemoryStore() *InMemoryStore {
41+
return &InMemoryStore{
42+
data: make(map[types.UID]*TopologyInfo),
43+
}
44+
}
45+
46+
// Delete implements the TopologyProvider interface.
47+
// It uses the built-in delete() function to remove the item from the map.
48+
func (s *InMemoryStore) Delete(pvcUID types.UID) error {
49+
s.mutex.Lock()
50+
defer s.mutex.Unlock()
51+
// First, check if the key exists to provide a helpful error.
52+
_, found := s.data[pvcUID]
53+
if !found {
54+
return nil
55+
}
56+
delete(s.data, pvcUID)
57+
return nil
58+
}
59+
60+
// GetByPvcUID implements the TopologyProvider interface.
61+
func (s *InMemoryStore) GetByPvcUID(pvcUID types.UID) (*TopologyInfo, error) {
62+
if s == nil {
63+
return nil, fmt.Errorf("pvcNodeStore is nil")
64+
}
65+
s.mutex.RLock()
66+
defer s.mutex.RUnlock()
67+
info, found := s.data[pvcUID]
68+
if !found {
69+
return nil, fmt.Errorf("topology object with pvcUID '%s' not found", pvcUID)
70+
}
71+
72+
// Return a deep copy to prevent data races
73+
infoCopy := &TopologyInfo{}
74+
if info.NodeLabels != nil {
75+
infoCopy.NodeLabels = make(map[string]string)
76+
for k, v := range info.NodeLabels {
77+
infoCopy.NodeLabels[k] = v
78+
}
79+
}
80+
81+
if info.TopologyKeys != nil {
82+
infoCopy.TopologyKeys = make([]string, len(info.TopologyKeys))
83+
copy(infoCopy.TopologyKeys, info.TopologyKeys)
84+
}
85+
86+
if info.RequisiteTerms != nil {
87+
infoCopy.RequisiteTerms = make([]topologyTerm, len(info.RequisiteTerms))
88+
for i, term := range info.RequisiteTerms {
89+
newTerm := make(topologyTerm, len(term))
90+
copy(newTerm, term)
91+
infoCopy.RequisiteTerms[i] = newTerm
92+
}
93+
}
94+
95+
return infoCopy, nil
96+
}
97+
98+
// UpdateNodeLabels finds an object by pvcUID and replaces its NodeLabels.
99+
func (s *InMemoryStore) UpdateNodeLabels(pvcUID types.UID, newLabels map[string]string) {
100+
s.mutex.Lock()
101+
defer s.mutex.Unlock()
102+
info, found := s.data[pvcUID]
103+
if !found {
104+
s.data[pvcUID] = &TopologyInfo{NodeLabels: newLabels}
105+
} else {
106+
info.NodeLabels = newLabels
107+
}
108+
}
109+
110+
// UpdateTopologyKeys finds an object by pvcUID and replaces its TopologyKeys.
111+
func (s *InMemoryStore) UpdateTopologyKeys(pvcUID types.UID, newKeys []string) {
112+
s.mutex.Lock()
113+
defer s.mutex.Unlock()
114+
info, found := s.data[pvcUID]
115+
if !found {
116+
s.data[pvcUID] = &TopologyInfo{TopologyKeys: newKeys}
117+
} else {
118+
info.TopologyKeys = newKeys
119+
}
120+
}
121+
122+
// UpdateRequisiteTerms finds an object by pvcUID and replaces its RequisiteTerms.
123+
func (s *InMemoryStore) UpdateRequisiteTerms(pvcUID types.UID, requisiteTerms []topologyTerm) {
124+
s.mutex.Lock()
125+
defer s.mutex.Unlock()
126+
info, found := s.data[pvcUID]
127+
if !found {
128+
s.data[pvcUID] = &TopologyInfo{RequisiteTerms: requisiteTerms}
129+
} else {
130+
info.RequisiteTerms = requisiteTerms
131+
}
132+
}

pkg/controller/cache_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"sync"
7+
"testing"
8+
9+
"k8s.io/apimachinery/pkg/types"
10+
)
11+
12+
// TestNewInMemoryStore tests the NewInMemoryStore function.
13+
func TestNewInMemoryStore(t *testing.T) {
14+
store := NewInMemoryStore()
15+
if store == nil {
16+
t.Error("Expected a new store, but got nil")
17+
}
18+
if store.data == nil {
19+
t.Error("Expected store data to be initialized, but it was nil")
20+
}
21+
}
22+
23+
// TestAddAndGet tests the Add and GetByPvcUID methods.
24+
func TestAddAndGetInInMemoryStore(t *testing.T) {
25+
store := NewInMemoryStore()
26+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
27+
labels := map[string]string{"foo": "bar"}
28+
store.UpdateNodeLabels(pvcUID, labels)
29+
30+
retrieved, err := store.GetByPvcUID(pvcUID)
31+
if err != nil {
32+
t.Fatalf("unexpected error: %v", err)
33+
}
34+
35+
expectedInfo := &TopologyInfo{NodeLabels: labels}
36+
if !reflect.DeepEqual(expectedInfo, retrieved) {
37+
t.Errorf("Expected %+v, got %+v", expectedInfo, retrieved)
38+
}
39+
40+
_, err = store.GetByPvcUID("nonexistent")
41+
if err == nil {
42+
t.Error("Expected an error for a nonexistent entry, but got nil")
43+
}
44+
}
45+
46+
// TestDelete tests the Delete method.
47+
func TestDeleteInInMemoryStore(t *testing.T) {
48+
store := NewInMemoryStore()
49+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
50+
labels := map[string]string{"foo": "bar"}
51+
store.UpdateNodeLabels(pvcUID, labels)
52+
53+
err := store.Delete(pvcUID)
54+
if err != nil {
55+
t.Fatalf("unexpected error: %v", err)
56+
}
57+
58+
_, err = store.GetByPvcUID(pvcUID)
59+
if err == nil {
60+
t.Error("Expected an error after deleting the entry, but got nil")
61+
}
62+
63+
err = store.Delete("nonexistent")
64+
if err != nil {
65+
t.Errorf("Did not expect an error for deleting a nonexistent entry, but got: %v", err)
66+
}
67+
}
68+
69+
// TestUpdate tests the update methods.
70+
func TestUpdateInInMemoryStore(t *testing.T) {
71+
store := NewInMemoryStore()
72+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
73+
74+
// Test updating a nonexistent entry
75+
store.UpdateNodeLabels(pvcUID, map[string]string{"foo": "bar"})
76+
retrieved, _ := store.GetByPvcUID(pvcUID)
77+
if retrieved.NodeLabels["foo"] != "bar" {
78+
t.Errorf("Expected NodeLabels to be updated")
79+
}
80+
81+
// Test updating an existing entry
82+
store.UpdateNodeLabels(pvcUID, map[string]string{"foo": "baz"})
83+
retrieved, _ = store.GetByPvcUID(pvcUID)
84+
if retrieved.NodeLabels["foo"] != "baz" {
85+
t.Errorf("Expected NodeLabels to be updated")
86+
}
87+
88+
store.UpdateTopologyKeys(pvcUID, []string{"key1"})
89+
retrieved, _ = store.GetByPvcUID(pvcUID)
90+
if retrieved.TopologyKeys[0] != "key1" {
91+
t.Errorf("Expected TopologyKeys to be updated")
92+
}
93+
}
94+
95+
// TestUpdateTermsInInMemoryStore tests the UpdateRequisiteTerms methods.
96+
func TestUpdateTermsInInMemoryStore(t *testing.T) {
97+
store := NewInMemoryStore()
98+
pvcUID := types.UID("a9e2d5a3-1c64-4787-97b7-1a22d5a0b123")
99+
100+
// Define some topology terms for testing
101+
term1 := topologyTerm{
102+
{Key: "zone", Value: "zone1"},
103+
{Key: "rack", Value: "rack1"},
104+
}
105+
term2 := topologyTerm{
106+
{Key: "zone", Value: "zone2"},
107+
{Key: "rack", Value: "rack2"},
108+
}
109+
110+
// Test updating requisite terms for a nonexistent entry
111+
store.UpdateRequisiteTerms(pvcUID, []topologyTerm{term1})
112+
retrieved, _ := store.GetByPvcUID(pvcUID)
113+
if !reflect.DeepEqual(retrieved.RequisiteTerms, []topologyTerm{term1}) {
114+
t.Errorf("Expected RequisiteTerms to be updated")
115+
}
116+
117+
// Test updating requisite terms for an existing entry
118+
store.UpdateRequisiteTerms(pvcUID, []topologyTerm{term2})
119+
retrieved, _ = store.GetByPvcUID(pvcUID)
120+
if !reflect.DeepEqual(retrieved.RequisiteTerms, []topologyTerm{term2}) {
121+
t.Errorf("Expected RequisiteTerms to be updated")
122+
}
123+
}
124+
125+
// TestConcurrentAccess tests thread-safety of the InMemoryStore.
126+
func TestConcurrentAccessInInMemoryStore(t *testing.T) {
127+
store := NewInMemoryStore()
128+
var wg sync.WaitGroup
129+
130+
// Number of concurrent goroutines
131+
concurrency := 100
132+
133+
wg.Add(concurrency)
134+
for i := 0; i < concurrency; i++ {
135+
go func(i int) {
136+
defer wg.Done()
137+
pvcUID := types.UID(fmt.Sprintf("item-%d", i))
138+
info := &TopologyInfo{TopologyKeys: []string{fmt.Sprintf("key-%d", i)}}
139+
140+
store.UpdateTopologyKeys(pvcUID, info.TopologyKeys)
141+
142+
retrieved, err := store.GetByPvcUID(pvcUID)
143+
if err != nil {
144+
t.Errorf("goroutine %d: unexpected error getting item: %v", i, err)
145+
}
146+
if !reflect.DeepEqual(retrieved.TopologyKeys, info.TopologyKeys) {
147+
t.Errorf("goroutine %d: retrieved wrong data", i)
148+
}
149+
150+
store.UpdateTopologyKeys(pvcUID, []string{fmt.Sprintf("new-key-%d", i)})
151+
retrieved, err = store.GetByPvcUID(pvcUID)
152+
if err != nil {
153+
t.Errorf("goroutine %d: unexpected error getting item: %v", i, err)
154+
}
155+
if !reflect.DeepEqual(retrieved.TopologyKeys, []string{fmt.Sprintf("new-key-%d", i)}) {
156+
t.Errorf("goroutine %d: retrieved wrong data after update", i)
157+
}
158+
159+
err = store.Delete(pvcUID)
160+
if err != nil {
161+
t.Errorf("goroutine %d: unexpected error deleting item: %v", i, err)
162+
}
163+
}(i)
164+
}
165+
166+
wg.Wait()
167+
}

pkg/controller/clone_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"k8s.io/client-go/tools/cache"
2222
"k8s.io/client-go/util/workqueue"
2323
"k8s.io/klog/v2"
24-
"sigs.k8s.io/sig-storage-lib-external-provisioner/v12/controller"
24+
"sigs.k8s.io/sig-storage-lib-external-provisioner/v13/controller"
2525
)
2626

2727
//

0 commit comments

Comments
 (0)