diff --git a/charts/csi-hyperstack/templates/daemonset-node.yaml b/charts/csi-hyperstack/templates/daemonset-node.yaml index 8a7a400..c90391e 100644 --- a/charts/csi-hyperstack/templates/daemonset-node.yaml +++ b/charts/csi-hyperstack/templates/daemonset-node.yaml @@ -16,7 +16,7 @@ spec: spec: {{- if .Values.imagePullSecrets }} imagePullSecrets: - - name: {{ .Values.imagePullSecrets }} +{{ toYaml .Values.imagePullSecrets | indent 8 }} {{- end }} serviceAccountName: csi-hyperstack-node-sa hostNetwork: true @@ -63,6 +63,8 @@ spec: value: "hyperstack.csi.nexgencloud.com" - name: DRIVER_VERSION value: "v1.0.0" + - name: HYPERSTACK_MAX_VOLUMES_PER_NODE + value: "{{ .Values.hyperstack.maxVolumesPerNode }}" - name: KUBE_NODE_NAME valueFrom: fieldRef: diff --git a/charts/csi-hyperstack/values.yaml b/charts/csi-hyperstack/values.yaml index e1834f1..fc7b03d 100644 --- a/charts/csi-hyperstack/values.yaml +++ b/charts/csi-hyperstack/values.yaml @@ -18,6 +18,7 @@ components: hyperstack: apiAddress: "https://infrahub-api.nexgencloud.com/v1" + maxVolumesPerNode: 5 storageClass: enabled: true diff --git a/main.go b/main.go index cdc5bcb..96dee79 100755 --- a/main.go +++ b/main.go @@ -25,9 +25,11 @@ func main() { _ = viper.BindEnv("hyperstack-api-key", "HYPERSTACK_API_KEY") _ = viper.BindEnv("hyperstack-api-address", "HYPERSTACK_API_ADDRESS") // _ = viper.BindEnv("hyperstack-environment", "HYPERSTACK_ENVIRONMENT") + _ = viper.BindEnv("max-volumes-per-node", "HYPERSTACK_MAX_VOLUMES_PER_NODE") viper.SetDefault("endpoint", "unix://var/run/csi.sock") viper.SetDefault("metrics-enabled", true) viper.SetDefault("http-endpoint", ":8080") + viper.SetDefault("max-volumes-per-node", 5) rootCmd := &cobra.Command{ Use: name, @@ -61,6 +63,7 @@ func main() { flags.String("hyperstack-api-key", viper.GetString("hyperstack-api-key"), "Hyperstack API key (env: HYPERSTACK_API_KEY)") flags.String("hyperstack-api-address", viper.GetString("hyperstack-api-address"), "Hyperstack API server address (env: HYPERSTACK_API_ADDRESS)") // flags.String("hyperstack-environment", viper.GetString("hyperstack-environment"), "Hyperstack environment name") + flags.Int64("max-volumes-per-node", viper.GetInt64("max-volumes-per-node"), "Maximum number of volumes per node (env: HYPERSTACK_MAX_VOLUMES_PER_NODE)") flags.Bool("service-controller-enabled", false, "Enables CSI controller service") flags.Bool("service-node-enabled", false, "Enables CSI node service") @@ -115,6 +118,7 @@ Global Flags: Environment variables: HYPERSTACK_API_KEY Hyperstack API key HYPERSTACK_API_ADDRESS Hyperstack API server address + HYPERSTACK_MAX_VOLUMES_PER_NODE Maximum number of volumes per node (default: 5) Use "{{.CommandPath}} [command] --help" for more information about a command. ` @@ -128,6 +132,7 @@ func driverStart(ctx context.Context) (err error) { HyperstackApiKey: viper.GetString("hyperstack-api-key"), HyperstackApiAddress: viper.GetString("hyperstack-api-address"), // Environment: viper.GetString("hyperstack-environment"), + MaxVolumesPerNode: viper.GetInt64("max-volumes-per-node"), }) drv.SetupIdentityService() diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index eadd9f9..01c9e72 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -199,15 +199,42 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume: GetVolume returned nil volume") } klog.Infof("ControllerPublishVolume: GetVolume succeeded -\nStatus: %s\nName: %s\nID: %d\nSize:%d", *getVolume.Status, *getVolume.Name, *getVolume.Id, *getVolume.Size) - if *getVolume.Status == "in-use" { //Volume is already attached + if *getVolume.Status == "in-use" { klog.Infof("ControllerPublishVolume: Volume %s is already in use", *getVolume.Name) if len(*getVolume.Attachments) > 0 { - klog.Infof("ControllerPublishVolume: Volume %s is already attached to node %s", *getVolume.Name, *(*getVolume.Attachments)[0].InstanceId) - return &csi.ControllerPublishVolumeResponse{ - PublishContext: map[string]string{ - volNameKeyFromControllerPublishVolume: *(*getVolume.Attachments)[0].Device, - }, - }, nil + attachedInstanceIdStr := *(*getVolume.Attachments)[0].InstanceId + attachedInstanceId := attachedInstanceIdStr + klog.Infof("ControllerPublishVolume: Volume %s is already attached to node %d", *getVolume.Name, attachedInstanceId) + if attachedInstanceId == vmId { + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{ + volNameKeyFromControllerPublishVolume: *(*getVolume.Attachments)[0].Device, + }, + }, nil + } + klog.Infof("ControllerPublishVolume: Volume attached to wrong node (attached: %d, requested: %d), detaching", attachedInstanceId, vmId) + _, err = cloud.DetachVolumeFromNode(ctx, attachedInstanceId, volumeIDInt) + if err != nil { + klog.Errorf("ControllerPublishVolume: Failed to detach from wrong node: %v", err) + return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Failed to detach from wrong node: %v", err) + } + maxAttempts := 30 + for i := 0; i < maxAttempts; i++ { + v, err := cloud.GetVolume(ctx, volumeIDInt) + if err != nil || v == nil { + time.Sleep(2 * time.Second) + continue + } + if *v.Status == "available" { + klog.Infof("ControllerPublishVolume: Volume detached from wrong node, now available") + getVolume = v + break + } + time.Sleep(2 * time.Second) + } + if *getVolume.Status != "available" { + return nil, status.Error(codes.DeadlineExceeded, "ControllerPublishVolume: Timeout waiting for detachment from wrong node") + } } } if *getVolume.Status == "available" { @@ -217,8 +244,42 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Failed to AttachVolumeToNode: %v", err) } klog.Infof("ControllerPublishVolume: AttachVolumeToNode succeeded -\nID: %v\nInstance id ID: %v\nStatus: %v\nVolume ID: %v", *attachVolume.Id, *attachVolume.InstanceId, *attachVolume.Status, *attachVolume.VolumeId) + + maxAttempts := 30 + klog.Infof("ControllerPublishVolume: Polling for attachment to complete with max attempts: %d", maxAttempts) + for i := 0; i < maxAttempts; i++ { + klog.Infof("ControllerPublishVolume: Polling for attachment %d/%d", i+1, maxAttempts) + v, err := cloud.GetVolume(ctx, volumeIDInt) + if err != nil { + klog.Errorf("ControllerPublishVolume: Failed to GetVolume while polling: %v", err) + time.Sleep(2 * time.Second) + continue + } + if v == nil { + klog.Warningf("ControllerPublishVolume: GetVolume attempt %d returned nil volume", i+1) + time.Sleep(2 * time.Second) + continue + } + if *v.Status == "in-use" && len(*v.Attachments) > 0 { + attachment := (*v.Attachments)[0] + attachedId := *attachment.InstanceId + if attachedId == vmId && attachment.Device != nil { + klog.Infof("ControllerPublishVolume: Volume attached to correct node with device %s", *attachment.Device) + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{ + volNameKeyFromControllerPublishVolume: *attachment.Device, + }, + }, nil + } + } + time.Sleep(2 * time.Second) + } + return nil, status.Error(codes.DeadlineExceeded, "ControllerPublishVolume: Timeout waiting for attachment to complete") } - return &csi.ControllerPublishVolumeResponse{}, nil + + klog.Errorf("ControllerPublishVolume: Volume %s in unexpected state: %s", *getVolume.Name, *getVolume.Status) + return nil, status.Errorf(codes.FailedPrecondition, + "Volume in unexpected state: %s (expected 'available' or 'in-use')", *getVolume.Status) } func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { @@ -254,8 +315,31 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req * return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume: Failed to DetachVolumeFromNode: %v", err) } klog.Infof("ControllerUnpublishVolume: DetachVolumeFromNode succeeded -\nMessage: %v\nStatus: %v\nVolume Attachments: %v", *detachVolume.Message, *detachVolume.Status, *detachVolume.VolumeAttachments) + + maxAttempts := 30 + klog.Infof("ControllerUnpublishVolume: Polling for volume to become available with max attempts: %d", maxAttempts) + for i := 0; i < maxAttempts; i++ { + klog.Infof("ControllerUnpublishVolume: Polling for volume to become available %d/%d", i+1, maxAttempts) + v, err := cloud.GetVolume(ctx, volumeIDInt) + if err != nil { + klog.Errorf("ControllerUnpublishVolume: Failed to GetVolume while polling: %v", err) + time.Sleep(2 * time.Second) + continue + } + if v == nil { + klog.Warningf("ControllerUnpublishVolume: GetVolume attempt %d returned nil volume", i+1) + time.Sleep(2 * time.Second) + continue + } + if *v.Status == "available" { + klog.Infof("ControllerUnpublishVolume: Volume is now available after detachment") + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + time.Sleep(2 * time.Second) + } + return nil, status.Error(codes.DeadlineExceeded, "ControllerUnpublishVolume: Timeout waiting for volume to detach") } - return &csi.ControllerUnpublishVolumeResponse{}, nil + return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Volume in unexpected state: %s", *getVolume.Status) } func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index cf462fd..e265972 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -30,6 +30,7 @@ type DriverOpts struct { // HyperstackNodeId string HyperstackApiKey string HyperstackApiAddress string + MaxVolumesPerNode int64 } var ( @@ -136,9 +137,10 @@ func (d *Driver) SetupControllerService() { func (d *Driver) SetupNodeService() { klog.Info("Providing node service") d.serviceNode = &nodeServer{ - driver: d, - mount: mount.GetMountProvider(), - metadata: metadata.GetMetadataProvider(d.hyperstackClient.GetMetadataOpts().SearchOrder), + driver: d, + mount: mount.GetMountProvider(), + metadata: metadata.GetMetadataProvider(d.hyperstackClient.GetMetadataOpts().SearchOrder), + maxVolumesPerNode: d.opts.MaxVolumesPerNode, } } diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index 6e37113..10064e5 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "strings" + "time" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -26,9 +27,10 @@ const ( ) type nodeServer struct { - driver *Driver - mount mount.IMount - metadata metadata.IMetadata + driver *Driver + mount mount.IMount + metadata metadata.IMetadata + maxVolumesPerNode int64 csi.UnimplementedNodeServer } @@ -55,21 +57,81 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol func formateAndMakeFS(device string, fstype string) error { klog.Infof("formateAndMakeFS: called with args %s, %s", device, fstype) + + // Wait for device to be ready and check if filesystem exists + // This prevents race condition where device is attached but not yet accessible + const maxRetries = 10 + const retryDelay = 500 * time.Millisecond + + var existingFS string + var deviceReady bool + + for i := 0; i < maxRetries; i++ { + // Check if device file exists + if _, err := os.Stat(device); err != nil { + if i < maxRetries-1 { + klog.Infof("Device %s not ready (attempt %d/%d), waiting...", device, i+1, maxRetries) + time.Sleep(retryDelay) + continue + } + return fmt.Errorf("device %s not found after %d attempts: %v", device, maxRetries, err) + } + + // Try to read filesystem type with blkid + blkidCmd := exec.Command("blkid", "-o", "value", "-s", "TYPE", device) + output, err := blkidCmd.CombinedOutput() + existingFS = strings.TrimSpace(string(output)) + + if err == nil { + // blkid succeeded - filesystem exists + deviceReady = true + break + } + + // Check if error indicates device is not ready (vs no filesystem) + exitErr, ok := err.(*exec.ExitError) + if ok && exitErr.ExitCode() == 2 { + // Exit code 2 means no filesystem found - device is ready but empty + deviceReady = true + break + } + + // Other errors might indicate device not ready + if i < maxRetries-1 { + klog.Infof("Device %s not ready for blkid (attempt %d/%d): %v, waiting...", device, i+1, maxRetries, err) + time.Sleep(retryDelay) + continue + } + + return fmt.Errorf("device %s not ready after %d attempts: %v", device, maxRetries, err) + } + + if !deviceReady { + return fmt.Errorf("device %s did not become ready", device) + } + + if existingFS != "" { + klog.Infof("Filesystem %s already exists on %s, skipping format", existingFS, device) + return nil // Don't format if filesystem exists + } + + klog.Infof("No filesystem detected on %s, creating %s filesystem", device, fstype) + + // Only format if no filesystem exists mkfsCmd := fmt.Sprintf("mkfs.%s", fstype) - _, err := exec.LookPath(mkfsCmd) - if err != nil { + if _, err := exec.LookPath(mkfsCmd); err != nil { return fmt.Errorf("unable to find the mkfs (%s) utiltiy errors is %s", mkfsCmd, err.Error()) } // actually run mkfs.ext4 -F source mkfsArgs := []string{"-F", device} - out, err := exec.Command(mkfsCmd, mkfsArgs...).CombinedOutput() - if err != nil { + if out, err := exec.Command(mkfsCmd, mkfsArgs...).CombinedOutput(); err != nil { return fmt.Errorf("create fs command failed output: %s, and err: %s", out, err.Error()) + } else { + klog.Infof("formateAndMakeFS: command output: %s", out) } - klog.Infof("formateAndMakeFS: command output: %s", out) return nil } @@ -177,9 +239,13 @@ func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque return nil, fmt.Errorf("failed to get node UUID: %v", err) } klog.Infof("NodeGetInfo called with nodeID: %#v\n", nodeID) + maxVolumes := ns.maxVolumesPerNode + if maxVolumes == 0 { + maxVolumes = 5 // Default to 5 if not configured + } return &csi.NodeGetInfoResponse{ NodeId: nodeID, - MaxVolumesPerNode: 5, + MaxVolumesPerNode: maxVolumes, AccessibleTopology: &csi.Topology{ Segments: map[string]string{ "hyperstack.cloud/instance-id": nodeID, diff --git a/pkg/hyperstack/hyperstack-sdk-go/lib/volume/volume_client.go b/pkg/hyperstack/hyperstack-sdk-go/lib/volume/volume_client.go index 905cad7..c97c474 100644 --- a/pkg/hyperstack/hyperstack-sdk-go/lib/volume/volume_client.go +++ b/pkg/hyperstack/hyperstack-sdk-go/lib/volume/volume_client.go @@ -21,7 +21,7 @@ import ( type AttachmentsFieldsForVolume struct { Device *string `json:"device,omitempty"` Id *int `json:"id,omitempty"` - InstanceId *string `json:"instance_id,omitempty"` + InstanceId *int `json:"instance_id,omitempty"` Protected *bool `json:"protected,omitempty"` Status *string `json:"status,omitempty"` }