Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion charts/csi-hyperstack/templates/daemonset-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spec:
spec:
{{- if .Values.imagePullSecrets }}
imagePullSecrets:
- name: {{ .Values.imagePullSecrets }}
{{ toYaml .Values.imagePullSecrets | indent 8 }}
{{- end }}
serviceAccountName: csi-hyperstack-node-sa
hostNetwork: true
Expand Down Expand Up @@ -63,6 +63,8 @@ spec:
value: "hyperstack.csi.nexgencloud.com"
- name: DRIVER_VERSION
value: "v1.0.0"
- name: HYPERSTACK_MAX_VOLUMES_PER_NODE
value: "{{ .Values.hyperstack.maxVolumesPerNode }}"
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
Expand Down
1 change: 1 addition & 0 deletions charts/csi-hyperstack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ components:

hyperstack:
apiAddress: "https://infrahub-api.nexgencloud.com/v1"
maxVolumesPerNode: 5

storageClass:
enabled: true
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func main() {
_ = viper.BindEnv("hyperstack-api-key", "HYPERSTACK_API_KEY")
_ = viper.BindEnv("hyperstack-api-address", "HYPERSTACK_API_ADDRESS")
// _ = viper.BindEnv("hyperstack-environment", "HYPERSTACK_ENVIRONMENT")
_ = viper.BindEnv("max-volumes-per-node", "HYPERSTACK_MAX_VOLUMES_PER_NODE")
viper.SetDefault("endpoint", "unix://var/run/csi.sock")
viper.SetDefault("metrics-enabled", true)
viper.SetDefault("http-endpoint", ":8080")
viper.SetDefault("max-volumes-per-node", 5)

rootCmd := &cobra.Command{
Use: name,
Expand Down Expand Up @@ -61,6 +63,7 @@ func main() {
flags.String("hyperstack-api-key", viper.GetString("hyperstack-api-key"), "Hyperstack API key (env: HYPERSTACK_API_KEY)")
flags.String("hyperstack-api-address", viper.GetString("hyperstack-api-address"), "Hyperstack API server address (env: HYPERSTACK_API_ADDRESS)")
// flags.String("hyperstack-environment", viper.GetString("hyperstack-environment"), "Hyperstack environment name")
flags.Int64("max-volumes-per-node", viper.GetInt64("max-volumes-per-node"), "Maximum number of volumes per node (env: HYPERSTACK_MAX_VOLUMES_PER_NODE)")
flags.Bool("service-controller-enabled", false, "Enables CSI controller service")
flags.Bool("service-node-enabled", false, "Enables CSI node service")

Expand Down Expand Up @@ -115,6 +118,7 @@ Global Flags:
Environment variables:
HYPERSTACK_API_KEY Hyperstack API key
HYPERSTACK_API_ADDRESS Hyperstack API server address
HYPERSTACK_MAX_VOLUMES_PER_NODE Maximum number of volumes per node (default: 5)

Use "{{.CommandPath}} [command] --help" for more information about a command.
`
Expand All @@ -128,6 +132,7 @@ func driverStart(ctx context.Context) (err error) {
HyperstackApiKey: viper.GetString("hyperstack-api-key"),
HyperstackApiAddress: viper.GetString("hyperstack-api-address"),
// Environment: viper.GetString("hyperstack-environment"),
MaxVolumesPerNode: viper.GetInt64("max-volumes-per-node"),
})

drv.SetupIdentityService()
Expand Down
102 changes: 93 additions & 9 deletions pkg/driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,42 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
return nil, status.Errorf(codes.NotFound, "ControllerPublishVolume: GetVolume returned nil volume")
}
klog.Infof("ControllerPublishVolume: GetVolume succeeded -\nStatus: %s\nName: %s\nID: %d\nSize:%d", *getVolume.Status, *getVolume.Name, *getVolume.Id, *getVolume.Size)
if *getVolume.Status == "in-use" { //Volume is already attached
if *getVolume.Status == "in-use" {
klog.Infof("ControllerPublishVolume: Volume %s is already in use", *getVolume.Name)
if len(*getVolume.Attachments) > 0 {
klog.Infof("ControllerPublishVolume: Volume %s is already attached to node %s", *getVolume.Name, *(*getVolume.Attachments)[0].InstanceId)
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
volNameKeyFromControllerPublishVolume: *(*getVolume.Attachments)[0].Device,
},
}, nil
attachedInstanceIdStr := *(*getVolume.Attachments)[0].InstanceId
attachedInstanceId := attachedInstanceIdStr
klog.Infof("ControllerPublishVolume: Volume %s is already attached to node %d", *getVolume.Name, attachedInstanceId)
if attachedInstanceId == vmId {
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
volNameKeyFromControllerPublishVolume: *(*getVolume.Attachments)[0].Device,
},
}, nil
}
klog.Infof("ControllerPublishVolume: Volume attached to wrong node (attached: %d, requested: %d), detaching", attachedInstanceId, vmId)
_, err = cloud.DetachVolumeFromNode(ctx, attachedInstanceId, volumeIDInt)
if err != nil {
klog.Errorf("ControllerPublishVolume: Failed to detach from wrong node: %v", err)
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Failed to detach from wrong node: %v", err)
}
maxAttempts := 30
for i := 0; i < maxAttempts; i++ {
v, err := cloud.GetVolume(ctx, volumeIDInt)
if err != nil || v == nil {
time.Sleep(2 * time.Second)
continue
}
if *v.Status == "available" {
klog.Infof("ControllerPublishVolume: Volume detached from wrong node, now available")
getVolume = v
break
}
time.Sleep(2 * time.Second)
}
if *getVolume.Status != "available" {
return nil, status.Error(codes.DeadlineExceeded, "ControllerPublishVolume: Timeout waiting for detachment from wrong node")
}
}
}
if *getVolume.Status == "available" {
Expand All @@ -217,8 +244,42 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Failed to AttachVolumeToNode: %v", err)
}
klog.Infof("ControllerPublishVolume: AttachVolumeToNode succeeded -\nID: %v\nInstance id ID: %v\nStatus: %v\nVolume ID: %v", *attachVolume.Id, *attachVolume.InstanceId, *attachVolume.Status, *attachVolume.VolumeId)

maxAttempts := 30
klog.Infof("ControllerPublishVolume: Polling for attachment to complete with max attempts: %d", maxAttempts)
for i := 0; i < maxAttempts; i++ {
klog.Infof("ControllerPublishVolume: Polling for attachment %d/%d", i+1, maxAttempts)
v, err := cloud.GetVolume(ctx, volumeIDInt)
if err != nil {
klog.Errorf("ControllerPublishVolume: Failed to GetVolume while polling: %v", err)
time.Sleep(2 * time.Second)
continue
}
if v == nil {
klog.Warningf("ControllerPublishVolume: GetVolume attempt %d returned nil volume", i+1)
time.Sleep(2 * time.Second)
continue
}
if *v.Status == "in-use" && len(*v.Attachments) > 0 {
attachment := (*v.Attachments)[0]
attachedId := *attachment.InstanceId
if attachedId == vmId && attachment.Device != nil {
klog.Infof("ControllerPublishVolume: Volume attached to correct node with device %s", *attachment.Device)
return &csi.ControllerPublishVolumeResponse{
PublishContext: map[string]string{
volNameKeyFromControllerPublishVolume: *attachment.Device,
},
}, nil
}
}
time.Sleep(2 * time.Second)
}
return nil, status.Error(codes.DeadlineExceeded, "ControllerPublishVolume: Timeout waiting for attachment to complete")
}
return &csi.ControllerPublishVolumeResponse{}, nil

klog.Errorf("ControllerPublishVolume: Volume %s in unexpected state: %s", *getVolume.Name, *getVolume.Status)
return nil, status.Errorf(codes.FailedPrecondition,
"Volume in unexpected state: %s (expected 'available' or 'in-use')", *getVolume.Status)
}

func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
Expand Down Expand Up @@ -254,8 +315,31 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return nil, status.Errorf(codes.Internal, "ControllerUnpublishVolume: Failed to DetachVolumeFromNode: %v", err)
}
klog.Infof("ControllerUnpublishVolume: DetachVolumeFromNode succeeded -\nMessage: %v\nStatus: %v\nVolume Attachments: %v", *detachVolume.Message, *detachVolume.Status, *detachVolume.VolumeAttachments)

maxAttempts := 30
klog.Infof("ControllerUnpublishVolume: Polling for volume to become available with max attempts: %d", maxAttempts)
for i := 0; i < maxAttempts; i++ {
klog.Infof("ControllerUnpublishVolume: Polling for volume to become available %d/%d", i+1, maxAttempts)
v, err := cloud.GetVolume(ctx, volumeIDInt)
if err != nil {
klog.Errorf("ControllerUnpublishVolume: Failed to GetVolume while polling: %v", err)
time.Sleep(2 * time.Second)
continue
}
if v == nil {
klog.Warningf("ControllerUnpublishVolume: GetVolume attempt %d returned nil volume", i+1)
time.Sleep(2 * time.Second)
continue
}
if *v.Status == "available" {
klog.Infof("ControllerUnpublishVolume: Volume is now available after detachment")
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
time.Sleep(2 * time.Second)
}
return nil, status.Error(codes.DeadlineExceeded, "ControllerUnpublishVolume: Timeout waiting for volume to detach")
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
return nil, status.Errorf(codes.Internal, "ControllerPublishVolume: Volume in unexpected state: %s", *getVolume.Status)
}

func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type DriverOpts struct {
// HyperstackNodeId string
HyperstackApiKey string
HyperstackApiAddress string
MaxVolumesPerNode int64
}

var (
Expand Down Expand Up @@ -136,9 +137,10 @@ func (d *Driver) SetupControllerService() {
func (d *Driver) SetupNodeService() {
klog.Info("Providing node service")
d.serviceNode = &nodeServer{
driver: d,
mount: mount.GetMountProvider(),
metadata: metadata.GetMetadataProvider(d.hyperstackClient.GetMetadataOpts().SearchOrder),
driver: d,
mount: mount.GetMountProvider(),
metadata: metadata.GetMetadataProvider(d.hyperstackClient.GetMetadataOpts().SearchOrder),
maxVolumesPerNode: d.opts.MaxVolumesPerNode,
}
}

Expand Down
84 changes: 75 additions & 9 deletions pkg/driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/exec"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
Expand All @@ -26,9 +27,10 @@ const (
)

type nodeServer struct {
driver *Driver
mount mount.IMount
metadata metadata.IMetadata
driver *Driver
mount mount.IMount
metadata metadata.IMetadata
maxVolumesPerNode int64
csi.UnimplementedNodeServer
}

Expand All @@ -55,21 +57,81 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol

func formateAndMakeFS(device string, fstype string) error {
klog.Infof("formateAndMakeFS: called with args %s, %s", device, fstype)

// Wait for device to be ready and check if filesystem exists
// This prevents race condition where device is attached but not yet accessible
const maxRetries = 10
const retryDelay = 500 * time.Millisecond

var existingFS string
var deviceReady bool

for i := 0; i < maxRetries; i++ {
// Check if device file exists
if _, err := os.Stat(device); err != nil {
if i < maxRetries-1 {
klog.Infof("Device %s not ready (attempt %d/%d), waiting...", device, i+1, maxRetries)
time.Sleep(retryDelay)
continue
}
return fmt.Errorf("device %s not found after %d attempts: %v", device, maxRetries, err)
}

// Try to read filesystem type with blkid
blkidCmd := exec.Command("blkid", "-o", "value", "-s", "TYPE", device)
output, err := blkidCmd.CombinedOutput()
existingFS = strings.TrimSpace(string(output))

if err == nil {
// blkid succeeded - filesystem exists
deviceReady = true
break
}

// Check if error indicates device is not ready (vs no filesystem)
exitErr, ok := err.(*exec.ExitError)
if ok && exitErr.ExitCode() == 2 {
// Exit code 2 means no filesystem found - device is ready but empty
deviceReady = true
break
}

// Other errors might indicate device not ready
if i < maxRetries-1 {
klog.Infof("Device %s not ready for blkid (attempt %d/%d): %v, waiting...", device, i+1, maxRetries, err)
time.Sleep(retryDelay)
continue
}

return fmt.Errorf("device %s not ready after %d attempts: %v", device, maxRetries, err)
}

if !deviceReady {
return fmt.Errorf("device %s did not become ready", device)
}

if existingFS != "" {
klog.Infof("Filesystem %s already exists on %s, skipping format", existingFS, device)
return nil // Don't format if filesystem exists
}

klog.Infof("No filesystem detected on %s, creating %s filesystem", device, fstype)

// Only format if no filesystem exists
mkfsCmd := fmt.Sprintf("mkfs.%s", fstype)

_, err := exec.LookPath(mkfsCmd)
if err != nil {
if _, err := exec.LookPath(mkfsCmd); err != nil {
return fmt.Errorf("unable to find the mkfs (%s) utiltiy errors is %s", mkfsCmd, err.Error())
}

// actually run mkfs.ext4 -F source
mkfsArgs := []string{"-F", device}

out, err := exec.Command(mkfsCmd, mkfsArgs...).CombinedOutput()
if err != nil {
if out, err := exec.Command(mkfsCmd, mkfsArgs...).CombinedOutput(); err != nil {
return fmt.Errorf("create fs command failed output: %s, and err: %s", out, err.Error())
} else {
klog.Infof("formateAndMakeFS: command output: %s", out)
}
klog.Infof("formateAndMakeFS: command output: %s", out)
return nil
}

Expand Down Expand Up @@ -177,9 +239,13 @@ func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque
return nil, fmt.Errorf("failed to get node UUID: %v", err)
}
klog.Infof("NodeGetInfo called with nodeID: %#v\n", nodeID)
maxVolumes := ns.maxVolumesPerNode
if maxVolumes == 0 {
maxVolumes = 5 // Default to 5 if not configured
}
return &csi.NodeGetInfoResponse{
NodeId: nodeID,
MaxVolumesPerNode: 5,
MaxVolumesPerNode: maxVolumes,
AccessibleTopology: &csi.Topology{
Segments: map[string]string{
"hyperstack.cloud/instance-id": nodeID,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.