|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "flag" |
| 5 | + "fmt" |
| 6 | + "io/ioutil" |
| 7 | + "net/http" |
| 8 | + "os" |
| 9 | + "os/signal" |
| 10 | + "syscall" |
| 11 | + "time" |
| 12 | + |
| 13 | + "github.com/fsnotify/fsnotify" |
| 14 | + "github.com/ydb-platform/ydb-disk-manager/internal/hostdev" |
| 15 | + "github.com/ydb-platform/ydb-disk-manager/pkg/api" |
| 16 | + "gopkg.in/yaml.v2" |
| 17 | + "k8s.io/klog/v2" |
| 18 | + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" |
| 19 | +) |
| 20 | + |
| 21 | +type Config struct { |
| 22 | + DiskMatch string `yaml:"diskMatch"` |
| 23 | + HostProcPath string `yaml:"hostProcPath"` |
| 24 | + UpdateInterval time.Duration `yaml:"updateInterval"` |
| 25 | +} |
| 26 | + |
| 27 | +var cfg Config |
| 28 | +var confFileName string |
| 29 | + |
| 30 | +func usage() { |
| 31 | + fmt.Fprintf(os.Stderr, "usage: ydb-disk-manager\n") |
| 32 | + flag.PrintDefaults() |
| 33 | + os.Exit(2) |
| 34 | +} |
| 35 | + |
| 36 | +func main() { |
| 37 | + klog.InitFlags(nil) |
| 38 | + flag.StringVar(&confFileName, "config", "config/conf.yaml", "set the configuration file to use") |
| 39 | + flag.Usage = usage |
| 40 | + flag.Parse() |
| 41 | + |
| 42 | + defer klog.Flush() |
| 43 | + klog.V(0).Info("Loading ydb-disk-manager") |
| 44 | + |
| 45 | + // Setting up the disks to check |
| 46 | + klog.V(0).Infof("Reading configuration file %s", confFileName) |
| 47 | + yamlFile, err := ioutil.ReadFile(confFileName) |
| 48 | + if err != nil { |
| 49 | + klog.Fatal("Reading configuration file failed with: %v", err) |
| 50 | + } |
| 51 | + err = yaml.Unmarshal(yamlFile, &cfg) |
| 52 | + if err != nil { |
| 53 | + klog.Fatal("Unmarshal: %v", err) |
| 54 | + os.Exit(-1) |
| 55 | + } |
| 56 | + klog.V(0).Infof("Applied configuration: %v", cfg) |
| 57 | + |
| 58 | + klog.V(0).Info("Starting FS watcher.") |
| 59 | + watcher, err := newFSWatcher(pluginapi.DevicePluginPath) |
| 60 | + if err != nil { |
| 61 | + klog.Error("Failed to create FS watcher.") |
| 62 | + os.Exit(1) |
| 63 | + } |
| 64 | + defer watcher.Close() |
| 65 | + |
| 66 | + klog.V(0).Info("Starting OS watcher.") |
| 67 | + sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) |
| 68 | + |
| 69 | + klog.V(0).Info("Starting /healthz HTTP handler on port :8080") |
| 70 | + http.HandleFunc("/healthz", healthHandler) |
| 71 | + go func() { |
| 72 | + if err = http.ListenAndServe(":8080", nil); err != nil { |
| 73 | + klog.Error("Failed to create HTTP health server") |
| 74 | + os.Exit(1) |
| 75 | + } |
| 76 | + }() |
| 77 | + |
| 78 | + runWatcherLoop(watcher, sigs) |
| 79 | +} |
| 80 | + |
| 81 | +func runWatcherLoop(watcher *fsnotify.Watcher, sigs chan os.Signal) { |
| 82 | + var diskManager *hostdev.DiskManager |
| 83 | + var devicePluginInstance *api.DevicePlugin |
| 84 | + ticker := time.NewTicker(cfg.UpdateInterval) |
| 85 | + tickerCh := make(chan bool) |
| 86 | + defer close(tickerCh) |
| 87 | + restart := true |
| 88 | + defer ticker.Stop() |
| 89 | + for { |
| 90 | + if restart { |
| 91 | + if devicePluginInstance != nil { |
| 92 | + devicePluginInstance.Stop() |
| 93 | + } |
| 94 | + |
| 95 | + diskManager = hostdev.NewDiskmanager(tickerCh) |
| 96 | + devicePluginInstance = api.NewDevicePlugin(diskManager) |
| 97 | + |
| 98 | + if err := diskManager.UpdateDisks(cfg.DiskMatch, "/dev"); err != nil { |
| 99 | + klog.V(0).Infof("Failed to update disks: %v", err) |
| 100 | + time.Sleep(cfg.UpdateInterval) |
| 101 | + continue |
| 102 | + } |
| 103 | + |
| 104 | + if err := devicePluginInstance.Serve(); err != nil { |
| 105 | + klog.V(0).Info("Could not contact Kubelet, retrying. Did you enable the device plugin feature gate?") |
| 106 | + time.Sleep(cfg.UpdateInterval) |
| 107 | + continue |
| 108 | + } |
| 109 | + |
| 110 | + if err := diskManager.UpdateLocks(cfg.HostProcPath); err != nil { |
| 111 | + klog.V(0).Infof("Failed to update locks: %v", err) |
| 112 | + time.Sleep(cfg.UpdateInterval) |
| 113 | + continue |
| 114 | + } |
| 115 | + |
| 116 | + restart = false |
| 117 | + } |
| 118 | + |
| 119 | + select { |
| 120 | + case <-tickerCh: |
| 121 | + klog.V(2).Infof("Resetting ticker to updateInterval: %s", cfg.UpdateInterval) |
| 122 | + ticker.Reset(cfg.UpdateInterval) |
| 123 | + |
| 124 | + case event := <-watcher.Events: |
| 125 | + if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create { |
| 126 | + klog.V(0).Infof("inotify: %s created, restarting.", pluginapi.KubeletSocket) |
| 127 | + restart = true |
| 128 | + } |
| 129 | + |
| 130 | + case <-ticker.C: |
| 131 | + if err := diskManager.UpdateDisks(cfg.DiskMatch, "/dev"); err != nil { |
| 132 | + klog.V(0).Infof("Failed to update disks: %v", err) |
| 133 | + restart = true |
| 134 | + break |
| 135 | + } |
| 136 | + if err := diskManager.UpdateLocks(cfg.HostProcPath); err != nil { |
| 137 | + klog.V(0).Infof("Failed to update locks: %v", err) |
| 138 | + restart = true |
| 139 | + break |
| 140 | + } |
| 141 | + |
| 142 | + case err := <-watcher.Errors: |
| 143 | + klog.V(0).Infof("inotify: %s", err) |
| 144 | + |
| 145 | + case s := <-sigs: |
| 146 | + switch s { |
| 147 | + case syscall.SIGHUP: |
| 148 | + klog.V(0).Info("Received SIGHUP, restarting.") |
| 149 | + restart = true |
| 150 | + default: |
| 151 | + klog.V(0).Infof("Received signal \"%v\", shutting down.", s) |
| 152 | + if devicePluginInstance != nil { |
| 153 | + devicePluginInstance.Stop() |
| 154 | + } |
| 155 | + return |
| 156 | + } |
| 157 | + } |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +func newFSWatcher(files ...string) (*fsnotify.Watcher, error) { |
| 162 | + watcher, err := fsnotify.NewWatcher() |
| 163 | + if err != nil { |
| 164 | + return nil, err |
| 165 | + } |
| 166 | + |
| 167 | + for _, f := range files { |
| 168 | + err = watcher.Add(f) |
| 169 | + if err != nil { |
| 170 | + watcher.Close() |
| 171 | + return nil, err |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + return watcher, nil |
| 176 | +} |
| 177 | + |
| 178 | +func newOSWatcher(sigs ...os.Signal) chan os.Signal { |
| 179 | + sigChan := make(chan os.Signal, 1) |
| 180 | + signal.Notify(sigChan, sigs...) |
| 181 | + |
| 182 | + return sigChan |
| 183 | +} |
| 184 | + |
| 185 | +func healthHandler(w http.ResponseWriter, r *http.Request) { |
| 186 | + if err := api.LivenessProbe(api.SocketPath); err != nil { |
| 187 | + w.WriteHeader(http.StatusInternalServerError) |
| 188 | + fmt.Fprint(w, err) |
| 189 | + return |
| 190 | + } |
| 191 | + w.WriteHeader(http.StatusOK) |
| 192 | + fmt.Fprint(w, "Server is healthy") |
| 193 | +} |
0 commit comments