|
| 1 | +/* |
| 2 | +Copyright 2025 The Kubernetes Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package clusters |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "errors" |
| 22 | + "fmt" |
| 23 | + "maps" |
| 24 | + "slices" |
| 25 | + "sync" |
| 26 | + |
| 27 | + "github.com/google/go-cmp/cmp" |
| 28 | + |
| 29 | + "sigs.k8s.io/controller-runtime/pkg/client" |
| 30 | + "sigs.k8s.io/controller-runtime/pkg/cluster" |
| 31 | + |
| 32 | + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" |
| 33 | +) |
| 34 | + |
| 35 | +// Clusters implements the common patterns around managing clusters |
| 36 | +// observed in providers. |
| 37 | +// It partially implements the multicluster.Provider interface. |
| 38 | +type Clusters[T cluster.Cluster] struct { |
| 39 | + // ErrorHandler is called when an error occurs that cannot be |
| 40 | + // returned to a caller, e.g. when a cluster's Start method returns |
| 41 | + // an error. |
| 42 | + ErrorHandler func(error, string, ...any) |
| 43 | + |
| 44 | + Lock sync.RWMutex |
| 45 | + Clusters map[string]T |
| 46 | + Cancels map[string]context.CancelFunc |
| 47 | + // Indexers holds representations of all indexes that were applied |
| 48 | + // and should be applied to clusters that are added. |
| 49 | + Indexers []Index |
| 50 | +} |
| 51 | + |
| 52 | +// Index represents an index on a field in a cluster. |
| 53 | +type Index struct { |
| 54 | + Object client.Object |
| 55 | + Field string |
| 56 | + Extractor client.IndexerFunc |
| 57 | +} |
| 58 | + |
| 59 | +// New returns a new instance of Clusters. |
| 60 | +func New[T cluster.Cluster]() Clusters[T] { |
| 61 | + return Clusters[T]{ |
| 62 | + Clusters: make(map[string]T), |
| 63 | + Cancels: make(map[string]context.CancelFunc), |
| 64 | + Indexers: []Index{}, |
| 65 | + } |
| 66 | +} |
| 67 | + |
| 68 | +// ClusterNames returns the names of all clusters in a sorted order. |
| 69 | +func (c *Clusters[T]) ClusterNames() []string { |
| 70 | + c.Lock.RLock() |
| 71 | + defer c.Lock.RUnlock() |
| 72 | + return slices.Sorted(maps.Keys(c.Clusters)) |
| 73 | +} |
| 74 | + |
| 75 | +// Get returns the cluster with the given name as a cluster.Cluster. |
| 76 | +// It implements the Get method from the Provider interface. |
| 77 | +func (c *Clusters[T]) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) { |
| 78 | + return c.GetTyped(ctx, clusterName) |
| 79 | +} |
| 80 | + |
| 81 | +// GetTyped returns the cluster with the given name. |
| 82 | +func (c *Clusters[T]) GetTyped(_ context.Context, clusterName string) (T, error) { |
| 83 | + c.Lock.RLock() |
| 84 | + defer c.Lock.RUnlock() |
| 85 | + |
| 86 | + cl, ok := c.Clusters[clusterName] |
| 87 | + if !ok { |
| 88 | + return *new(T), fmt.Errorf("cluster with name %s not found: %w", clusterName, multicluster.ErrClusterNotFound) |
| 89 | + } |
| 90 | + |
| 91 | + return cl, nil |
| 92 | +} |
| 93 | + |
| 94 | +// Add adds a new cluster. |
| 95 | +// If a cluster with the given name already exists, it returns an error. |
| 96 | +func (c *Clusters[T]) Add(ctx context.Context, clusterName string, cl T, aware multicluster.Aware) error { |
| 97 | + c.Lock.Lock() |
| 98 | + defer c.Lock.Unlock() |
| 99 | + |
| 100 | + if _, exists := c.Clusters[clusterName]; exists { |
| 101 | + return fmt.Errorf("cluster with name %s already exists", clusterName) |
| 102 | + } |
| 103 | + |
| 104 | + ctx, cancel := context.WithCancel(ctx) |
| 105 | + if aware != nil { |
| 106 | + if err := aware.Engage(ctx, clusterName, cl); err != nil { |
| 107 | + cancel() |
| 108 | + return err |
| 109 | + } |
| 110 | + } |
| 111 | + |
| 112 | + c.Clusters[clusterName] = cl |
| 113 | + c.Cancels[clusterName] = cancel |
| 114 | + go func() { |
| 115 | + defer c.Remove(clusterName) |
| 116 | + if err := cl.Start(ctx); err != nil { |
| 117 | + if c.ErrorHandler != nil { |
| 118 | + c.ErrorHandler(err, "error in cluster", "name", clusterName) |
| 119 | + } |
| 120 | + } |
| 121 | + }() |
| 122 | + |
| 123 | + for _, index := range c.Indexers { |
| 124 | + if err := cl.GetFieldIndexer().IndexField(ctx, index.Object, index.Field, index.Extractor); err != nil { |
| 125 | + defer c.Remove(clusterName) |
| 126 | + return fmt.Errorf("failed to index field %s on cluster %s: %w", index.Field, clusterName, err) |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + return nil |
| 131 | +} |
| 132 | + |
| 133 | +// Remove removes a cluster by name. |
| 134 | +func (c *Clusters[T]) Remove(clusterName string) { |
| 135 | + c.Lock.Lock() |
| 136 | + defer c.Lock.Unlock() |
| 137 | + |
| 138 | + if cancel, ok := c.Cancels[clusterName]; ok { |
| 139 | + cancel() |
| 140 | + } |
| 141 | + delete(c.Cancels, clusterName) |
| 142 | + delete(c.Clusters, clusterName) |
| 143 | +} |
| 144 | + |
| 145 | +// AddOrReplace adds or replaces a cluster with the given name. |
| 146 | +// If a cluster with the name already exists it compares the |
| 147 | +// configuration as returned by cluster.GetConfig() to compare |
| 148 | +// clusters. |
| 149 | +func (c *Clusters[T]) AddOrReplace(ctx context.Context, clusterName string, cl T, aware multicluster.Aware) error { |
| 150 | + existing, err := c.Get(ctx, clusterName) |
| 151 | + if err != nil { |
| 152 | + // Cluster does not exist, add it |
| 153 | + return c.Add(ctx, clusterName, cl, aware) |
| 154 | + } |
| 155 | + |
| 156 | + if cmp.Equal(existing.GetConfig(), cl.GetConfig()) { |
| 157 | + // Cluster already exists with the same config, nothing to do |
| 158 | + return nil |
| 159 | + } |
| 160 | + |
| 161 | + // Cluster exists with a different config, replace it |
| 162 | + c.Remove(clusterName) |
| 163 | + return c.Add(ctx, clusterName, cl, aware) |
| 164 | +} |
| 165 | + |
| 166 | +// IndexField indexes a field on all clusters. |
| 167 | +// It implements the IndexField method from the Provider interface. |
| 168 | +// Clusters engaged after this call will also have the index applied. |
| 169 | +func (c *Clusters[T]) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { |
| 170 | + c.Lock.Lock() |
| 171 | + c.Indexers = append(c.Indexers, Index{ |
| 172 | + Object: obj, |
| 173 | + Field: field, |
| 174 | + Extractor: extractValue, |
| 175 | + }) |
| 176 | + c.Lock.Unlock() |
| 177 | + |
| 178 | + var errs error |
| 179 | + c.Lock.RLock() |
| 180 | + for name, cl := range c.Clusters { |
| 181 | + if err := cl.GetFieldIndexer().IndexField(ctx, obj, field, extractValue); err != nil { |
| 182 | + errs = errors.Join(errs, fmt.Errorf("failed to index field on cluster %q: %w", name, err)) |
| 183 | + } |
| 184 | + } |
| 185 | + c.Lock.RUnlock() |
| 186 | + return errs |
| 187 | +} |
0 commit comments