diff --git a/cmd/ydb-disk-manager/main.go b/cmd/ydb-disk-manager/main.go index 3f9469a..0dceee8 100644 --- a/cmd/ydb-disk-manager/main.go +++ b/cmd/ydb-disk-manager/main.go @@ -11,6 +11,7 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/ydb-platform/ydb-disk-manager/config" "github.com/ydb-platform/ydb-disk-manager/internal/hostdev" "github.com/ydb-platform/ydb-disk-manager/pkg/api" "gopkg.in/yaml.v2" @@ -18,14 +19,6 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" ) -type Config struct { - DiskMatch string `yaml:"diskMatch"` - HostProcPath string `yaml:"hostProcPath"` - UpdateInterval time.Duration `yaml:"updateInterval"` - DeviceCount uint `yaml:"deviceCount"` -} - -var cfg Config var confFileName string func usage() { @@ -49,6 +42,7 @@ func main() { if err != nil { klog.Fatalf("Reading configuration file failed with: %s", err) } + cfg := config.Config{} cfg.DeviceCount = 1 // Default err = yaml.Unmarshal(yamlFile, &cfg) if err != nil { @@ -60,7 +54,7 @@ func main() { klog.V(0).Info("Starting FS watcher.") watcher, err := newFSWatcher(pluginapi.DevicePluginPath) if err != nil { - klog.Error("Failed to create FS watcher.") + klog.Error("Failed to create FS watcher: %v", err) os.Exit(1) } defer watcher.Close() @@ -77,10 +71,10 @@ func main() { } }() - runWatcherLoop(watcher, sigs) + runWatcherLoop(cfg, watcher, sigs) } -func runWatcherLoop(watcher *fsnotify.Watcher, sigs chan os.Signal) { +func runWatcherLoop(cfg config.Config, watcher *fsnotify.Watcher, sigs chan os.Signal) { var diskManager *hostdev.DiskManager var devicePluginInstance *api.DevicePlugin ticker := time.NewTicker(cfg.UpdateInterval) @@ -95,7 +89,7 @@ func runWatcherLoop(watcher *fsnotify.Watcher, sigs chan os.Signal) { } diskManager = hostdev.NewDiskmanager(tickerCh) - devicePluginInstance = api.NewDevicePlugin(diskManager, cfg.DeviceCount) + devicePluginInstance = api.NewDevicePlugin(diskManager, cfg) if err := diskManager.UpdateDisks(cfg.DiskMatch, "/dev"); err != nil { klog.V(0).Infof("Failed to update disks: %v", err) diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..b87b3e3 --- /dev/null +++ b/config/config.go @@ -0,0 +1,10 @@ +package config + +import "time" + +type Config struct { + DiskMatch string `yaml:"diskMatch"` + HostProcPath string `yaml:"hostProcPath"` + UpdateInterval time.Duration `yaml:"updateInterval"` + DeviceCount uint `yaml:"deviceCount"` +} diff --git a/helm/ydb-disk-manager/Chart.yaml b/helm/ydb-disk-manager/Chart.yaml index dc20d71..f3529fc 100644 --- a/helm/ydb-disk-manager/Chart.yaml +++ b/helm/ydb-disk-manager/Chart.yaml @@ -3,5 +3,5 @@ name: ydb-disk-manager type: application -version: 0.2.9 -appVersion: 0.2.9 +version: 0.2.10 +appVersion: 0.2.10 diff --git a/internal/hostdev/disks.go b/internal/hostdev/disks.go index 8d0b8fb..1e0bf28 100644 --- a/internal/hostdev/disks.go +++ b/internal/hostdev/disks.go @@ -17,8 +17,10 @@ type FileLock struct { type DiskManager struct { DiskFilenames []string DiskInodes map[uint64]string - fileLocks FileLock - tickerChan chan<- bool + + diskFilenamesLock sync.Mutex + fileLocks FileLock + tickerChan chan<- bool } // NewSmarterDevicePlugin returns an initialized SmarterDevicePlugin @@ -33,6 +35,9 @@ func NewDiskmanager(ch chan<- bool) *DiskManager { } func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error { + mgr.diskFilenamesLock.Lock() + defer mgr.diskFilenamesLock.Unlock() + existingDisks, err := readDevDirectory(devDir, 10) if err != nil { return err @@ -45,7 +50,7 @@ func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error { // Check for new disks for _, diskToAppend := range matchedDisks { - diskPath := filepath.Join("/dev", diskToAppend) + diskPath := filepath.Join(devDir, diskToAppend) diskIno, err := readInode(diskPath) if err != nil { return err @@ -65,11 +70,11 @@ func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error { } // Check for removed disks - temp := mgr.DiskFilenames[:0] + temp := []string{} for _, existingDisk := range mgr.DiskFilenames { shouldRemove := true for _, diskToRemove := range matchedDisks { - diskPath := filepath.Join("/dev", diskToRemove) + diskPath := filepath.Join(devDir, diskToRemove) if existingDisk == diskPath { temp = append(temp, existingDisk) shouldRemove = false diff --git a/pkg/api/deviceplugin.go b/pkg/api/deviceplugin.go index 99589d1..bdc62f3 100644 --- a/pkg/api/deviceplugin.go +++ b/pkg/api/deviceplugin.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc" "k8s.io/klog/v2" + "github.com/ydb-platform/ydb-disk-manager/config" "github.com/ydb-platform/ydb-disk-manager/internal/hostdev" "github.com/ydb-platform/ydb-disk-manager/proto/locks" pb "github.com/ydb-platform/ydb-disk-manager/proto/locks" @@ -38,16 +39,19 @@ type DevicePlugin struct { server *grpc.Server disks *hostdev.DiskManager + + cfg config.Config } // NewDevicePlugin returns an initialized DevicePlugin -func NewDevicePlugin(hostDisks *hostdev.DiskManager, deviceCount uint) *DevicePlugin { +func NewDevicePlugin(hostDisks *hostdev.DiskManager, cfg config.Config) *DevicePlugin { return &DevicePlugin{ - devs: getDevices(deviceCount), + devs: getDevices(cfg.DeviceCount), stop: make(chan interface{}), health: make(chan *pluginapi.Device), disks: hostDisks, + cfg: cfg, } } @@ -174,6 +178,14 @@ func (plugin *DevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.Alloca responses := pluginapi.AllocateResponse{} var devices []*pluginapi.DeviceSpec + for _, req := range reqs.ContainerRequests { + klog.V(0).Infof("Allocation request incoming: device IDs %v\n", req.DevicesIDs) + } + + plugin.disks.UpdateDisks(plugin.cfg.DiskMatch, "/dev") + + klog.V(0).Infof("Going to allocate following devices: %v\n", plugin.disks.DiskFilenames) + // This is the important part - we push all the disk devices we know about // instead of the meta-device 'ydb-disk-manager/hostdev' for _, diskPath := range plugin.disks.DiskFilenames {