Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions cmd/ydb-disk-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,14 @@ import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/ydb-platform/ydb-disk-manager/config"
"github.com/ydb-platform/ydb-disk-manager/internal/hostdev"
"github.com/ydb-platform/ydb-disk-manager/pkg/api"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

type Config struct {
DiskMatch string `yaml:"diskMatch"`
HostProcPath string `yaml:"hostProcPath"`
UpdateInterval time.Duration `yaml:"updateInterval"`
DeviceCount uint `yaml:"deviceCount"`
}

var cfg Config
var confFileName string

func usage() {
Expand All @@ -49,6 +42,7 @@ func main() {
if err != nil {
klog.Fatalf("Reading configuration file failed with: %s", err)
}
cfg := config.Config{}
cfg.DeviceCount = 1 // Default
err = yaml.Unmarshal(yamlFile, &cfg)
if err != nil {
Expand All @@ -60,7 +54,7 @@ func main() {
klog.V(0).Info("Starting FS watcher.")
watcher, err := newFSWatcher(pluginapi.DevicePluginPath)
if err != nil {
klog.Error("Failed to create FS watcher.")
klog.Error("Failed to create FS watcher: %v", err)
os.Exit(1)
}
defer watcher.Close()
Expand All @@ -77,10 +71,10 @@ func main() {
}
}()

runWatcherLoop(watcher, sigs)
runWatcherLoop(cfg, watcher, sigs)
}

func runWatcherLoop(watcher *fsnotify.Watcher, sigs chan os.Signal) {
func runWatcherLoop(cfg config.Config, watcher *fsnotify.Watcher, sigs chan os.Signal) {
var diskManager *hostdev.DiskManager
var devicePluginInstance *api.DevicePlugin
ticker := time.NewTicker(cfg.UpdateInterval)
Expand All @@ -95,7 +89,7 @@ func runWatcherLoop(watcher *fsnotify.Watcher, sigs chan os.Signal) {
}

diskManager = hostdev.NewDiskmanager(tickerCh)
devicePluginInstance = api.NewDevicePlugin(diskManager, cfg.DeviceCount)
devicePluginInstance = api.NewDevicePlugin(diskManager, cfg)

if err := diskManager.UpdateDisks(cfg.DiskMatch, "/dev"); err != nil {
klog.V(0).Infof("Failed to update disks: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

import "time"

type Config struct {
DiskMatch string `yaml:"diskMatch"`
HostProcPath string `yaml:"hostProcPath"`
UpdateInterval time.Duration `yaml:"updateInterval"`
DeviceCount uint `yaml:"deviceCount"`
}
4 changes: 2 additions & 2 deletions helm/ydb-disk-manager/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ name: ydb-disk-manager

type: application

version: 0.2.9
appVersion: 0.2.9
version: 0.2.10
appVersion: 0.2.10
15 changes: 10 additions & 5 deletions internal/hostdev/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ type FileLock struct {
type DiskManager struct {
DiskFilenames []string
DiskInodes map[uint64]string
fileLocks FileLock
tickerChan chan<- bool

diskFilenamesLock sync.Mutex
fileLocks FileLock
tickerChan chan<- bool
}

// NewSmarterDevicePlugin returns an initialized SmarterDevicePlugin
Expand All @@ -33,6 +35,9 @@ func NewDiskmanager(ch chan<- bool) *DiskManager {
}

func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error {
mgr.diskFilenamesLock.Lock()
defer mgr.diskFilenamesLock.Unlock()

existingDisks, err := readDevDirectory(devDir, 10)
if err != nil {
return err
Expand All @@ -45,7 +50,7 @@ func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error {

// Check for new disks
for _, diskToAppend := range matchedDisks {
diskPath := filepath.Join("/dev", diskToAppend)
diskPath := filepath.Join(devDir, diskToAppend)
diskIno, err := readInode(diskPath)
if err != nil {
return err
Expand All @@ -65,11 +70,11 @@ func (mgr *DiskManager) UpdateDisks(diskRegexp string, devDir string) error {
}

// Check for removed disks
temp := mgr.DiskFilenames[:0]
temp := []string{}
for _, existingDisk := range mgr.DiskFilenames {
shouldRemove := true
for _, diskToRemove := range matchedDisks {
diskPath := filepath.Join("/dev", diskToRemove)
diskPath := filepath.Join(devDir, diskToRemove)
if existingDisk == diskPath {
temp = append(temp, existingDisk)
shouldRemove = false
Expand Down
16 changes: 14 additions & 2 deletions pkg/api/deviceplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"

"github.com/ydb-platform/ydb-disk-manager/config"
"github.com/ydb-platform/ydb-disk-manager/internal/hostdev"
"github.com/ydb-platform/ydb-disk-manager/proto/locks"
pb "github.com/ydb-platform/ydb-disk-manager/proto/locks"
Expand All @@ -38,16 +39,19 @@ type DevicePlugin struct {
server *grpc.Server

disks *hostdev.DiskManager

cfg config.Config
}

// NewDevicePlugin returns an initialized DevicePlugin
func NewDevicePlugin(hostDisks *hostdev.DiskManager, deviceCount uint) *DevicePlugin {
func NewDevicePlugin(hostDisks *hostdev.DiskManager, cfg config.Config) *DevicePlugin {
return &DevicePlugin{
devs: getDevices(deviceCount),
devs: getDevices(cfg.DeviceCount),
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),

disks: hostDisks,
cfg: cfg,
}
}

Expand Down Expand Up @@ -174,6 +178,14 @@ func (plugin *DevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.Alloca
responses := pluginapi.AllocateResponse{}
var devices []*pluginapi.DeviceSpec

for _, req := range reqs.ContainerRequests {
klog.V(0).Infof("Allocation request incoming: device IDs %v\n", req.DevicesIDs)
}

plugin.disks.UpdateDisks(plugin.cfg.DiskMatch, "/dev")

klog.V(0).Infof("Going to allocate following devices: %v\n", plugin.disks.DiskFilenames)

// This is the important part - we push all the disk devices we know about
// instead of the meta-device 'ydb-disk-manager/hostdev'
for _, diskPath := range plugin.disks.DiskFilenames {
Expand Down