Skip to content

Commit dba5a39

Browse files
committed
refactor: use common sidecar flags functionality
1 parent 07fe125 commit dba5a39

File tree

1 file changed

+33
-82
lines changed

1 file changed

+33
-82
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 33 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -78,42 +78,20 @@ import (
7878
)
7979

8080
var (
81-
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
82-
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
83-
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
8481
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
8582
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
86-
showVersion = flag.Bool("version", false, "Show version.")
87-
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
88-
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
8983
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
9084
finalizerThreads = flag.Uint("cloning-protection-threads", 1, "Number of simultaneously running threads, handling cloning finalizer removal")
9185
capacityThreads = flag.Uint("capacity-threads", 1, "Number of simultaneously running threads, handling CSIStorageCapacity objects")
9286
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for volume operation (creation, deletion, capacity queries)")
9387

94-
enableLeaderElection = flag.Bool("leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
95-
96-
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
97-
strictTopology = flag.Bool("strict-topology", false, "Late binding: pass only selected node topology to CreateVolume Request, unlike default behavior of passing aggregated cluster topologies that match with topology keys of the selected node.")
98-
immediateTopology = flag.Bool("immediate-topology", true, "Immediate binding: pass aggregated cluster topologies for all nodes where the CSI driver is available (enabled, the default) or no topology requirements (if disabled).")
99-
extraCreateMetadata = flag.Bool("extra-create-metadata", false, "If set, add pv/pvc metadata to plugin create requests as parameters.")
100-
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
101-
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including pprof, metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
102-
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
103-
enableProfile = flag.Bool("enable-pprof", false, "Enable pprof profiling on the TCP network address specified by --http-endpoint. The HTTP path is `/debug/pprof/`.")
104-
105-
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
106-
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
107-
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
88+
strictTopology = flag.Bool("strict-topology", false, "Late binding: pass only selected node topology to CreateVolume Request, unlike default behavior of passing aggregated cluster topologies that match with topology keys of the selected node.")
89+
immediateTopology = flag.Bool("immediate-topology", true, "Immediate binding: pass aggregated cluster topologies for all nodes where the CSI driver is available (enabled, the default) or no topology requirements (if disabled).")
90+
extraCreateMetadata = flag.Bool("extra-create-metadata", false, "If set, add pv/pvc metadata to plugin create requests as parameters.")
91+
enableProfile = flag.Bool("enable-pprof", false, "Enable pprof profiling on the TCP network address specified by --http-endpoint. The HTTP path is `/debug/pprof/`.")
10892

10993
defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to provision when fstype is unspecified in the StorageClass. If the default is not set and fstype is unset in the StorageClass, then no fstype will be set")
11094

111-
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
112-
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
113-
114-
kubeAPICapacityQPS = flag.Float32("kube-api-capacity-qps", 1, "QPS to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 1.0.")
115-
kubeAPICapacityBurst = flag.Int("kube-api-capacity-burst", 5, "Burst to use for storage capacity updates while communicating with the kubernetes apiserver. Defaults to 5.")
116-
11795
enableCapacity = flag.Bool("enable-capacity", false, "This enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call.")
11896
capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.")
11997
capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.")
@@ -144,6 +122,7 @@ func main() {
144122
c := logsapi.NewLoggingConfiguration()
145123
logsapi.AddFlags(c, flag.CommandLine)
146124
logs.InitLogs()
125+
standardflags.RegisterCommonFlags(goflag.CommandLine)
147126
standardflags.AddAutomaxprocs(klog.Infof)
148127
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
149128
flag.Parse()
@@ -163,32 +142,32 @@ func main() {
163142
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
164143
}
165144

166-
if *showVersion {
145+
if standardflags.Configuration.ShowVersion {
167146
fmt.Println(os.Args[0], version)
168147
os.Exit(0)
169148
}
170149
klog.InfoS("Version", "version", version)
171150

172-
if *metricsAddress != "" && *httpEndpoint != "" {
151+
if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" {
173152
klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")
174153
os.Exit(1)
175154
}
176-
addr := *metricsAddress
155+
addr := standardflags.Configuration.MetricsAddress
177156
if addr == "" {
178-
addr = *httpEndpoint
157+
addr = standardflags.Configuration.HttpEndpoint
179158
}
180159

181160
// get the KUBECONFIG from env if specified (useful for local/debug cluster)
182161
kubeconfigEnv := os.Getenv("KUBECONFIG")
183162

184163
if kubeconfigEnv != "" {
185164
klog.Infof("Found KUBECONFIG environment variable set, using that..")
186-
kubeconfig = &kubeconfigEnv
165+
standardflags.Configuration.KubeConfig = kubeconfigEnv
187166
}
188167

189-
if *master != "" || *kubeconfig != "" {
168+
if standardflags.Configuration.Master != "" || standardflags.Configuration.KubeConfig != "" {
190169
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
191-
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
170+
config, err = clientcmd.BuildConfigFromFlags(standardflags.Configuration.Master, standardflags.Configuration.KubeConfig)
192171
} else {
193172
klog.Infof("Building kube configs for running in cluster...")
194173
config, err = rest.InClusterConfig()
@@ -197,8 +176,8 @@ func main() {
197176
klog.Fatalf("Failed to create config: %v", err)
198177
}
199178

200-
config.QPS = *kubeAPIQPS
201-
config.Burst = *kubeAPIBurst
179+
config.QPS = float32(standardflags.Configuration.KubeAPIQPS)
180+
config.Burst = standardflags.Configuration.KubeAPIBurst
202181

203182
coreConfig := rest.CopyConfig(config)
204183
coreConfig.ContentType = runtime.ContentTypeProtobuf
@@ -228,7 +207,7 @@ func main() {
228207
metrics.WithSubsystem(metrics.SubsystemSidecar),
229208
)
230209

231-
grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
210+
grpcClient, err := ctrl.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager)
232211
if err != nil {
233212
klog.Error(err.Error())
234213
os.Exit(1)
@@ -262,7 +241,7 @@ func main() {
262241
// Will be provided via default gatherer.
263242
metrics.WithProcessStartTime(false),
264243
metrics.WithMigration())
265-
migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)
244+
migratedGrpcClient, err := ctrl.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager)
266245
if err != nil {
267246
klog.Error(err.Error())
268247
os.Exit(1)
@@ -394,8 +373,8 @@ func main() {
394373

395374
// -------------------------------
396375
// PersistentVolumeClaims informer
397-
genericRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
398-
claimRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
376+
genericRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
377+
claimRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
399378
claimQueue := workqueue.NewTypedRateLimitingQueueWithConfig(claimRateLimiter, workqueue.TypedRateLimitingQueueConfig[string]{Name: "claims"})
400379
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
401380

@@ -409,7 +388,7 @@ func main() {
409388
controller.CreateProvisionedPVLimiter(workqueue.DefaultTypedControllerRateLimiter[string]()),
410389
controller.ClaimsInformer(claimInformer),
411390
controller.NodesLister(nodeLister),
412-
controller.RetryIntervalMax(*retryIntervalMax),
391+
controller.RetryIntervalMax(standardflags.Configuration.RetryIntervalMax),
413392
}
414393

415394
if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {
@@ -454,8 +433,6 @@ func main() {
454433
if *enableCapacity {
455434
// Publishing storage capacity information uses its own client
456435
// with separate rate limiting.
457-
config.QPS = *kubeAPICapacityQPS
458-
config.Burst = *kubeAPICapacityBurst
459436
clientset, err := kubernetes.NewForConfig(config)
460437
if err != nil {
461438
klog.Fatalf("Failed to create client: %v", err)
@@ -487,7 +464,7 @@ func main() {
487464

488465
var topologyInformer topology.Informer
489466
if nodeDeployment == nil {
490-
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax)
467+
topologyRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[string](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
491468
topologyInformer = topology.NewNodeTopology(
492469
provisionerName,
493470
clientset,
@@ -553,7 +530,7 @@ func main() {
553530
klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)
554531
}
555532

556-
capacityRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[capacity.QueueKey](*retryIntervalStart, *retryIntervalMax)
533+
capacityRateLimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[capacity.QueueKey](standardflags.Configuration.RetryIntervalStart, standardflags.Configuration.RetryIntervalMax)
557534
capacityController = capacity.NewCentralCapacityController(
558535
csi.NewControllerClient(grpcClient),
559536
provisionerName,
@@ -597,14 +574,14 @@ func main() {
597574
// because both CSI metrics manager and component-base manage
598575
// their own registry. Probably could be avoided by making
599576
// CSI metrics manager a bit more flexible.
600-
mux.Handle(*metricsPath,
577+
mux.Handle(standardflags.Configuration.MetricsPath,
601578
promhttp.InstrumentMetricHandler(
602579
reg,
603580
promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{}),
604581
))
605582

606583
if *enableProfile {
607-
klog.InfoS("Starting profiling", "endpoint", httpEndpoint)
584+
klog.InfoS("Starting profiling", "endpoint", standardflags.Configuration.HttpEndpoint)
608585

609586
mux.HandleFunc("/debug/pprof/", pprof.Index)
610587
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
@@ -616,7 +593,7 @@ func main() {
616593
klog.Infof("ServeMux listening at %q", addr)
617594
err := http.ListenAndServe(addr, mux)
618595
if err != nil {
619-
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)
596+
klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, standardflags.Configuration.MetricsPath, err)
620597
}
621598
}()
622599
}
@@ -718,39 +695,13 @@ func main() {
718695
}
719696
}
720697

721-
if !*enableLeaderElection {
722-
run(ctx)
723-
} else {
724-
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
725-
// to preserve backwards compatibility
726-
lockName := strings.Replace(provisionerName, "/", "-", -1)
727-
728-
// create a new clientset for leader election
729-
leClientset, err := kubernetes.NewForConfig(coreConfig)
730-
if err != nil {
731-
klog.Fatalf("Failed to create leaderelection client: %v", err)
732-
}
733-
734-
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
735-
if *httpEndpoint != "" {
736-
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
737-
}
738-
739-
if *leaderElectionNamespace != "" {
740-
le.WithNamespace(*leaderElectionNamespace)
741-
}
742-
743-
le.WithLeaseDuration(*leaderElectionLeaseDuration)
744-
le.WithRenewDeadline(*leaderElectionRenewDeadline)
745-
le.WithRetryPeriod(*leaderElectionRetryPeriod)
746-
le.WithIdentity(identity)
747-
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
748-
le.WithReleaseOnCancel(true)
749-
le.WithContext(ctx)
750-
}
751-
752-
if err := le.Run(); err != nil {
753-
klog.Fatalf("failed to initialize leader election: %v", err)
754-
}
755-
}
698+
leaderelection.RunWithLeaderElection(
699+
ctx,
700+
config,
701+
standardflags.Configuration,
702+
run,
703+
strings.Replace(provisionerName, "/", "-", -1),
704+
mux,
705+
utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit),
706+
)
756707
}

0 commit comments

Comments
 (0)