diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..600d2d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 47107a4..838f1e3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,16 @@ -FROM golang:1.18 as build +FROM golang:1.24-bookworm as build WORKDIR /go/app COPY . /go/app -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go +RUN apt update && apt install -y libudev-dev +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go -FROM ubuntu:20.04 +FROM debian:bookworm WORKDIR /root +RUN apt update && apt install -y libudev1 && apt clean COPY --from=build /go/app/ydb-disk-manager /usr/bin/ydb-disk-manager ENTRYPOINT ["/usr/bin/ydb-disk-manager"] diff --git a/cmd/udev-manager/main.go b/cmd/udev-manager/main.go new file mode 100644 index 0000000..45c9550 --- /dev/null +++ b/cmd/udev-manager/main.go @@ -0,0 +1,279 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "regexp" + "strings" + "sync" + "syscall" + + "gopkg.in/yaml.v3" + + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/plugin" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +func main() { + appContext, appCancel := context.WithCancel(context.Background()) + appWaitGroup := &sync.WaitGroup{} + defer appWaitGroup.Wait() + defer appCancel() + + flags := initFlags() + + // Registry creates a separate plugin for each resource registered + registry, err := plugin.NewRegistry(appContext, appWaitGroup) + if err != nil { + klog.Fatalf("failed to create plugin registry: %v", err) + os.Exit(1) + } + + // udev discovery looks up devices and listens for system events + devDiscovery, err := udev.NewDiscovery(appWaitGroup) + if err != nil { + klog.Fatalf("failed to start udev discovery: %v", err) + os.Exit(1) + } + defer devDiscovery.Close() + + domain := flags.config.DeviceDomain + + var cancel mux.CancelFunc = mux.CancelFunc(appCancel) + for _, partConfig := range flags.config.Partitions { + cancel = mux.ChainCancelFunc( + plugin.NewScatter( + devDiscovery, + registry, + domain, + "part-", + plugin.PartitionLabelMatcher(domain, partConfig.matcher), + ), + cancel, + ) + } + + klog.Info("Starting /healthz server on port :8080") + go func() { + http.HandleFunc("/healthz", registry.Healthz) + if err := http.ListenAndServe(":8080", nil); err != nil { + klog.Fatalf("failed to start /healthz server: %v", err) + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + for signal := range sigs { + switch signal { + case syscall.SIGINT, syscall.SIGTERM: + klog.Infof("Received signal %q, shutting down", signal.String()) + return + } + } +} + +type configSource interface { + String() string + open() (io.Reader, func() error, error) +} + +type fileConfigSource struct { + path string +} + +func (fcs *fileConfigSource) open() (io.Reader, func() error, error) { + file, err := os.Open(fcs.path) + if err != nil { + return nil, nil, err + } + return file, file.Close, nil +} + +func (fcs *fileConfigSource) String() string { + return "file:" + fcs.path +} + +type envConfigSource struct { + variable string +} + +func (ecs *envConfigSource) open() (io.Reader, func() error, error) { + data := os.Getenv(ecs.variable) + if data == "" { + return nil, nil, fmt.Errorf("config: environment variable %s is not set", ecs.variable) + } + return strings.NewReader(data), func() error { return nil }, nil +} + +func (ecs *envConfigSource) String() string { + return "env:" + ecs.variable +} + +type stdinConfigSource struct{} + +func (scs *stdinConfigSource) open() (io.Reader, func() error, error) { + return os.Stdin, func() error { return nil }, nil +} + +func (scs *stdinConfigSource) String() string { + return "stdin" +} + +type ConfigFlag struct { + configSource +} + +func (cf *ConfigFlag) Set(value string) error { + if strings.HasPrefix(value, "file:") { + cf.configSource = &fileConfigSource{path: strings.TrimPrefix(value, "file:")} + } else if strings.HasPrefix(value, "env:") { + cf.configSource = &envConfigSource{variable: strings.TrimPrefix(value, "env:")} + } else if strings.HasPrefix(value, "stdin") { + cf.configSource = &stdinConfigSource{} + } else { + return fmt.Errorf("invalid config source: %s", value) + } + + return nil +} + +func (cf *ConfigFlag) String() string { + if cf.configSource == nil { + return "" + } + return cf.configSource.String() +} + +type FlagValues struct { + Config ConfigFlag + + config *Config +} + +func initFlags() FlagValues { + values := FlagValues{} + flags := flag.NewFlagSet("udev-manager", flag.ExitOnError) + klog.InitFlags(flags) + flags.Var(&values.Config, "config", `configuration source (in form "file:", "env:" or "stdin")`) + flags.Parse(os.Args[1:]) + if values.Config.configSource == nil { + flags.Output().Write([]byte("config flag is required\n")) + flags.Usage() + os.Exit(2) + } + configReader, configCloser, err := values.Config.open() + if err != nil { + klog.Fatalf("failed to open --config %q: %v", values.Config.String(), err) + os.Exit(1) + } + defer configCloser() + + config, err := parseConfig(configReader) + if err != nil { + klog.Fatalf("failed to parse --config %q: %v", values.Config.String(), err) + os.Exit(1) + } + + values.config = config + + return values +} + +type PartitionsConfig struct { + Matcher string `yaml:"matcher"` // matcher should be a valid regular expression + + matcher *regexp.Regexp // compiled matcher if the config is valid +} + +func (pc *PartitionsConfig) validate() error { + // Compile the regular expression + matcher, err := regexp.Compile(pc.Matcher) + if err != nil { + return fmt.Errorf(".matcher: %q must be a valid regexp: %w", pc.Matcher, err) + } + if matcher.NumSubexp() > 1 { + return fmt.Errorf(".matcher: %q must have at most one capturing group", pc.Matcher) + } + pc.matcher = matcher + return nil +} + +type HostDevConfig struct { + Matcher string `yaml:"matcher"` // matcher should be a valid regular expression + Prefix string `yaml:"prefix"` + + matcher *regexp.Regexp // compiled matcher if the config is valid +} + +func (hc *HostDevConfig) validate() error { + // Compile the regular expression + matcher, err := regexp.Compile(hc.Matcher) + if err != nil { + return fmt.Errorf(".matcher: %q must be a valid regexp: %w", hc.Matcher, err) + } + hc.matcher = matcher + return nil +} + +type Config struct { + DeviceDomain string `yaml:"domain"` + Partitions []PartitionsConfig `yaml:"partitions"` + HostDevs []HostDevConfig `yaml:"hostdevs"` +} + +var ( + deviceDomainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) +) + +func (c *Config) validate() error { + var errs error + // Validate the device domain + if c.DeviceDomain == "" { + errs = errors.Join(errs, fmt.Errorf(".domain: must be set")) + } + if !deviceDomainRegex.MatchString(c.DeviceDomain) { + errs = errors.Join(errs, fmt.Errorf(".domain: %q must be a valid domain name", c.DeviceDomain)) + } + + // Validate partitions + for i := range c.Partitions { + if err := c.Partitions[i].validate(); err != nil { + errs = errors.Join(errs, fmt.Errorf(".partitions[%d]%w", i, err)) + } + } + + // Validate hostdevs + for i := range c.HostDevs { + if err := c.HostDevs[i].validate(); err != nil { + errs = errors.Join(errs, fmt.Errorf(".hostdevs[%d]%w", i, err)) + } + } + + return nil +} + +func parseConfig(reader io.Reader) (*Config, error) { + // Parse the config file + decoder := yaml.NewDecoder(reader) + config := &Config{} + if err := decoder.Decode(config); err != nil { + return nil, err + } + + // Validate the config + if err := config.validate(); err != nil { + return nil, err + } + + return config, nil +} \ No newline at end of file diff --git a/cmd/ydb-disk-manager/main.go b/cmd/ydb-disk-manager/main.go index 3f9469a..b50462c 100644 --- a/cmd/ydb-disk-manager/main.go +++ b/cmd/ydb-disk-manager/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "io/ioutil" "net/http" "os" "os/signal" @@ -45,7 +44,7 @@ func main() { // Setting up the disks to check klog.V(0).Infof("Reading configuration file %s", confFileName) - yamlFile, err := ioutil.ReadFile(confFileName) + yamlFile, err := os.ReadFile(confFileName) if err != nil { klog.Fatalf("Reading configuration file failed with: %s", err) } diff --git a/go.mod b/go.mod index 9ecf7e3..76e3233 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,37 @@ module github.com/ydb-platform/ydb-disk-manager -go 1.18 +go 1.23.0 + +toolchain go1.24.1 require ( github.com/fsnotify/fsnotify v1.6.0 - golang.org/x/net v0.12.0 + github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b + github.com/kennygrant/sanitize v1.2.4 + github.com/onsi/ginkgo/v2 v2.23.0 + github.com/onsi/gomega v1.36.2 + golang.org/x/net v0.36.0 google.golang.org/grpc v1.56.2 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.1 k8s.io/klog/v2 v2.70.1 k8s.io/kubelet v0.25.3 ) require ( - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect + github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect + golang.org/x/tools v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230724170836-66ad5b6ff146 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 7931c82..59ac7ab 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,29 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1 h1:smvLGU3obGU5kny71BtE/ibR0wIXRUiRFDmSn0Nxz1E= +github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1/go.mod h1:fP/NdyhRVOv09PLRbVXrSqHhrfQypdZwgE2L4h2U5C8= +github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b h1:Pzf7tldbCVqwl3NnOnTamEWdh/rL41fsoYCn2HdHgRA= +github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b/go.mod h1:IBDUGq30U56w969YNPomhMbRje1GrhUsCh7tHdwgLXA= +github.com/kennygrant/sanitize v1.2.4 h1:gN25/otpP5vAsO2djbMhF/LQX6R7+O1TB4yv8NzpJ3o= +github.com/kennygrant/sanitize v1.2.4/go.mod h1:LGsjYYtgxbetdg5owWB2mpgUL6e2nfw2eObZ0u0qvak= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -20,10 +33,18 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/onsi/ginkgo/v2 v2.23.0 h1:FA1xjp8ieYDzlgS5ABTpdUDB7wtngggONc8a7ku2NqQ= +github.com/onsi/ginkgo/v2 v2.23.0/go.mod h1:zXTP6xIp3U8aVuXN8ENK9IXRaTjFnpVB9mGmaSRvxnM= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -35,8 +56,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -44,16 +65,18 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -64,13 +87,15 @@ google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI= google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kubelet v0.25.3 h1:PjT3Xo0VL1BpRilBpZrRN8pSy6w5pGQ0YDQQeQWSHvQ= diff --git a/internal/mux/filter.go b/internal/mux/filter.go new file mode 100644 index 0000000..f0b1fb8 --- /dev/null +++ b/internal/mux/filter.go @@ -0,0 +1,79 @@ +package mux + +import ( + "reflect" +) + +type FilterFunc[T any] func(T) bool + +type MapperFunc[T, U any] func(T) U + +func Filter[T any](in In[T], filter FilterFunc[T]) In[T] { + out := make(chan T) + + go func() { + defer close(out) + + for v := range in { + if filter(v) { + out <- v + } + } + }() + + return out +} + +func Any[T any]() FilterFunc[T] { + return func(T) bool { + return true + } +} + +func Not[T any](filter FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + return !filter(v) + } +} + +func Or[T any](filters ...FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + for _, filter := range filters { + if filter(v) { + return true + } + } + return false + } +} + +func And[T any](filters ...FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + for _, filter := range filters { + if !filter(v) { + return false + } + } + return true + } +} + +func IsNil[T any](v T) bool { + rv := reflect.ValueOf(v) + kind := rv.Kind() + // Check if the type can be nil and is nil + switch kind { + case reflect.Invalid: + return true + case reflect.Interface: + return !rv.IsValid() + case reflect.Chan, reflect.Func, reflect.Map, reflect.Ptr, reflect.UnsafePointer, reflect.Slice: + return rv.IsNil() + } + + return false +} + +func IsNotNil[T any](v T) bool { + return !IsNil(v) +} diff --git a/internal/mux/filter_spec_test.go b/internal/mux/filter_spec_test.go new file mode 100644 index 0000000..e37711f --- /dev/null +++ b/internal/mux/filter_spec_test.go @@ -0,0 +1,177 @@ +package mux_test + +import ( + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Filter", func() { + It("should filter elements based on the provided condition", func() { + input := make(chan int) + go func() { + defer close(input) + for i := 0; i < 10; i++ { + input <- i + } + }() + + evenFilter := func(n int) bool { + return n%2 == 0 + } + + result := mux.Filter(input, evenFilter) + + var filtered []int + for val := range result { + filtered = append(filtered, val) + } + + Expect(filtered).To(Equal([]int{0, 2, 4, 6, 8})) + }) + + It("should return empty channel when filter rejects all", func() { + input := make(chan string) + go func() { + defer close(input) + input <- "hello" + input <- "world" + }() + + noneFilter := func(s string) bool { + return false + } + + result := mux.Filter(input, noneFilter) + + var filtered []string + for val := range result { + filtered = append(filtered, val) + } + + Expect(filtered).To(BeEmpty()) + }) +}) + +var _ = Describe("Not", func() { + It("should invert the filter condition", func() { + isEven := func(n int) bool { + return n%2 == 0 + } + + isOdd := mux.Not(isEven) + + Expect(isEven(2)).To(BeTrue()) + Expect(isOdd(2)).To(BeFalse()) + + Expect(isEven(3)).To(BeFalse()) + Expect(isOdd(3)).To(BeTrue()) + }) +}) + +var _ = Describe("Or", func() { + It("should return true if any filter returns true", func() { + isEven := func(n int) bool { return n%2 == 0 } + isDivisibleBy3 := func(n int) bool { return n%3 == 0 } + + combined := mux.Or(isEven, isDivisibleBy3) + + Expect(combined(1)).To(BeFalse()) + Expect(combined(2)).To(BeTrue()) // Even + Expect(combined(3)).To(BeTrue()) // Divisible by 3 + Expect(combined(4)).To(BeTrue()) // Even + Expect(combined(5)).To(BeFalse()) + Expect(combined(6)).To(BeTrue()) // Both even and divisible by 3 + }) + + It("should return false when no filters provided", func() { + combined := mux.Or[int]() + Expect(combined(42)).To(BeFalse()) + }) +}) + +var _ = Describe("And", func() { + It("should return true only if all filters return true", func() { + isEven := func(n int) bool { return n%2 == 0 } + isDivisibleBy3 := func(n int) bool { return n%3 == 0 } + + combined := mux.And(isEven, isDivisibleBy3) + + Expect(combined(1)).To(BeFalse()) + Expect(combined(2)).To(BeFalse()) // Only even + Expect(combined(3)).To(BeFalse()) // Only divisible by 3 + Expect(combined(6)).To(BeTrue()) // Both even and divisible by 3 + Expect(combined(12)).To(BeTrue()) // Both even and divisible by 3 + }) + + It("should return true when no filters provided", func() { + combined := mux.And[int]() + Expect(combined(42)).To(BeTrue()) + }) +}) + +var _ = Describe("IsNil", func() { + It("should return true for nil values", func() { + var nilPtr *int + var nilSlice []int + var nilMap map[string]int + var nilChan chan int + var nilFunc func() + var nilInterface interface{} + + Expect(mux.IsNil(nilPtr)).To(BeTrue()) + Expect(mux.IsNil(nilSlice)).To(BeTrue()) + Expect(mux.IsNil(nilMap)).To(BeTrue()) + Expect(mux.IsNil(nilChan)).To(BeTrue()) + Expect(mux.IsNil(nilFunc)).To(BeTrue()) + Expect(mux.IsNil(nilInterface)).To(BeTrue()) + }) + + It("should return false for non-nil values", func() { + ptr := new(int) + slice := []int{} + m := make(map[string]int) + ch := make(chan int) + fn := func() {} + var iface interface{} = 42 + + Expect(mux.IsNil(ptr)).To(BeFalse()) + Expect(mux.IsNil(slice)).To(BeFalse()) + Expect(mux.IsNil(m)).To(BeFalse()) + Expect(mux.IsNil(ch)).To(BeFalse()) + Expect(mux.IsNil(fn)).To(BeFalse()) + Expect(mux.IsNil(iface)).To(BeFalse()) + }) + + It("should return false for basic types", func() { + Expect(mux.IsNil(42)).To(BeFalse()) + Expect(mux.IsNil("hello")).To(BeFalse()) + Expect(mux.IsNil(true)).To(BeFalse()) + Expect(mux.IsNil(3.14)).To(BeFalse()) + }) +}) + +var _ = Describe("IsNotNil", func() { + It("should return false for nil values", func() { + var nilPtr *int + var nilSlice []int + var nilInterface interface{} + + Expect(mux.IsNotNil(nilPtr)).To(BeFalse()) + Expect(mux.IsNotNil(nilSlice)).To(BeFalse()) + Expect(mux.IsNotNil(nilInterface)).To(BeFalse()) + }) + + It("should return true for non-nil values", func() { + ptr := new(int) + slice := []int{} + var iface interface{} = 42 + + Expect(mux.IsNotNil(ptr)).To(BeTrue()) + Expect(mux.IsNotNil(slice)).To(BeTrue()) + Expect(mux.IsNotNil(iface)).To(BeTrue()) + Expect(mux.IsNotNil(42)).To(BeTrue()) + Expect(mux.IsNotNil("hello")).To(BeTrue()) + }) +}) \ No newline at end of file diff --git a/internal/mux/mux.go b/internal/mux/mux.go new file mode 100644 index 0000000..987ede2 --- /dev/null +++ b/internal/mux/mux.go @@ -0,0 +1,259 @@ +package mux + +import ( + "fmt" + "time" +) + +type In[T any] <-chan T +type Out[T any] chan<- T + +type Logger interface { + Info(format string, args ...interface{}) +} + +type AwaitReply[T, U any] struct { + value T + reply chan U +} + +func (ar AwaitReply[T, U]) Value() T { + return ar.value +} + +func (ar AwaitReply[T, U]) Reply(value U) { + ar.reply <- value + close(ar.reply) +} + +func (ar AwaitReply[T, U]) Await() U { + return <-ar.reply +} + +type AwaitDone[T any] struct { + AwaitReply[T, struct{}] +} + +func (ad AwaitDone[T]) Done() { + ad.Reply(struct{}{}) +} + +func (ad AwaitDone[T]) Wait() { + ad.Await() +} + +func NewAwaitReply[T, U any](value T) AwaitReply[T, U] { + return AwaitReply[T, U]{ + value: value, + reply: make(chan U), + } +} + +func NewAwaitDone[T any](value T) AwaitDone[T] { + return AwaitDone[T]{ + NewAwaitReply[T, struct{}](value), + } +} + +type Sink[T any] interface { + Submit(T) error + Close() +} + +type thenSink[U, T any] struct { + sink Sink[T] + contramap func(U) T +} + +func (c *thenSink[U, T]) Submit(v U) error { + return c.sink.Submit(c.contramap(v)) +} + +func (c *thenSink[U, T]) Close() { + c.sink.Close() +} + +func ThenSink[U, T any](sink Sink[T], f func(U) T) Sink[U] { + return &thenSink[U, T]{sink, f} +} + +type filterSink[T any] struct { + sink Sink[T] + f func(T) bool +} + +func (c *filterSink[T]) Submit(v T) error { + if c.f(v) { + return c.sink.Submit(v) + } + return nil +} + +func (c *filterSink[T]) Close() { + c.sink.Close() +} + +func FilterSink[T any](sink Sink[T], f func(T) bool) Sink[T] { + return &filterSink[T]{sink, f} +} + +type chanSink[T any] struct { + ch chan<- T +} + +func (c *chanSink[T]) Submit(v T) error { + c.ch <- v + return nil +} + +func (c *chanSink[T]) Close() { + close(c.ch) +} + +func SinkFromChan[T any](ch chan<- T) Sink[T] { + return &chanSink[T]{ch} +} + +type Source[T any] interface { + Subscribe(Sink[T]) CancelFunc +} + +type Mux[T any] struct { + input chan T + register chan AwaitDone[Sink[T]] + unregister chan AwaitDone[Sink[T]] + outputs map[Sink[T]]bool + + submitTimeout time.Duration + inBufSize int + logger Logger +} + +type Option[T any] interface { + apply(*Mux[T]) +} + +type buffered[T any] struct { + Size int +} + +func (b *buffered[T]) apply(m *Mux[T]) { + m.inBufSize = b.Size +} + +func Buffered[T any](size int) Option[T] { + return &buffered[T]{size} +} + +type withLogger[T any] struct { + Logger Logger +} + +func (l *withLogger[T]) apply(m *Mux[T]) { + m.logger = l.Logger +} + +func WithLogger[T any](logger Logger) Option[T] { + return &withLogger[T]{logger} +} + +func Make[T any](opts ...Option[T]) *Mux[T] { + mux := &Mux[T]{ + submitTimeout: 1 * time.Second, + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt.apply(mux) + } + + mux.input = make(chan T, mux.inBufSize) + mux.register = make(chan AwaitDone[Sink[T]]) + mux.unregister = make(chan AwaitDone[Sink[T]]) + mux.outputs = make(map[Sink[T]]bool) + + go mux.run() + + return mux +} + +func (c *Mux[T]) run() { + defer func() { + for sub := range c.outputs { + delete(c.outputs, sub) + sub.Close() + } + }() + defer close(c.input) + + for { + select { + case v := <-c.input: + for out := range c.outputs { + if err := out.Submit(v); err != nil { + c.error("error submitting value %v: %v", v, err) + } + } + case ar, ok := <-c.register: + if !ok { + return + } + c.outputs[ar.value] = true + ar.reply <- struct{}{} + case ar := <-c.unregister: + sub := ar.value + delete(c.outputs, sub) + sub.Close() + ar.reply <- struct{}{} + } + } +} + +func (m *Mux[T]) error(format string, args ...any) error { + if m.logger != nil { + m.logger.Info(format, args...) + } + return fmt.Errorf(format, args...) +} + +func (c *Mux[T]) Close() { + close(c.register) +} + + +func (c *Mux[T]) Submit(v T) error { + select { + case c.input <- v: + return nil + case <-time.After(c.submitTimeout): + return c.error("timed out submitting value %v after %s", v, c.submitTimeout) + } +} + +type CancelFunc func() + +func (c *Mux[T]) Subscribe(sink Sink[T]) CancelFunc { + ar := NewAwaitDone(sink) + c.register <- ar + ar.Wait() + + return func() { + ar := NewAwaitDone(sink) + c.unregister <- ar + ar.Wait() + } +} + +func ChainCancelFunc(cf1, cf2 func(), cfs ...func()) CancelFunc { + return func() { + cf1() + cf2() + for _, cf := range cfs { + if cf != nil { + cf() + } + } + } +} diff --git a/internal/mux/mux_suite_test.go b/internal/mux/mux_suite_test.go new file mode 100644 index 0000000..9f862cd --- /dev/null +++ b/internal/mux/mux_suite_test.go @@ -0,0 +1,200 @@ +package mux_test + +import ( + "sync" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Mux", func() { + Context("creation", func() { + It("should create a new Mux without buffer", func() { + mux := mux.Make[string]() + Expect(mux).NotTo(BeNil()) + mux.Close() + }) + + It("should create a new Mux with buffer", func() { + mux := mux.Make(mux.Buffered[string](2)) + Expect(mux).NotTo(BeNil()) + mux.Close() + }) + }) + + Context("registration", func() { + var m *mux.Mux[string] + + BeforeEach(func() { + m = mux.Make[string]() + }) + + AfterEach(func() { + m.Close() + }) + + It("should register a new output channel", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + Expect(cancel).NotTo(BeNil()) + cancel() + }) + + It("should register a buffered output channel", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + Expect(cancel).NotTo(BeNil()) + cancel() + }) + + It("should support multiple registrations", func() { + in1 := make(chan string) + in2 := make(chan string) + in3 := make(chan string) + cancel1 := m.Subscribe(mux.SinkFromChan(in1)) + cancel2 := m.Subscribe(mux.SinkFromChan(in2)) + cancel3 := m.Subscribe(mux.SinkFromChan(in3)) + + Expect(in1).NotTo(BeNil()) + Expect(in2).NotTo(BeNil()) + Expect(in3).NotTo(BeNil()) + + cancel1() + cancel2() + cancel3() + }) + + It("should allow unregistration using cancel function", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + + // Unregister + cancel() + + // Submit a value + m.Submit("test") + + // Verify channel doesn't receive the value + Consistently(in).ShouldNot(Receive()) + }) + }) + + Context("submission", func() { + var m *mux.Mux[string] + + BeforeEach(func() { + m = mux.Make(mux.WithLogger[string](GinkgoLogr)) + }) + + AfterEach(func() { + m.Close() + }) + + It("should distribute values to all registered outputs", func() { + in1 := make(chan string) + in2 := make(chan string) + cancel1 := m.Subscribe(mux.SinkFromChan(in1)) + cancel2 := m.Subscribe(mux.SinkFromChan(in2)) + defer cancel1() + defer cancel2() + + go func() { + m.Submit("hello") + }() + + Eventually(in1).Should(Receive(Equal("hello"))) + Eventually(in2).Should(Receive(Equal("hello"))) + }) + + It("should handle multiple submissions", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + go func() { + m.Submit("one") + m.Submit("two") + m.Submit("three") + }() + + Eventually(in).Should(Receive(Equal("one"))) + Eventually(in).Should(Receive(Equal("two"))) + Eventually(in).Should(Receive(Equal("three"))) + + }) + }) + + Context("buffering", func() { + It("should respect buffer size in RegisterWithBuffer", func() { + m := mux.Make[int]() + defer m.Close() + + in := make(chan int, 2) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + // These should succeed without blocking + m.Submit(1) + m.Submit(2) + + // Verify values + Eventually(in).Should(Receive(Equal(1))) + Eventually(in).Should(Receive(Equal(2))) + }) + + It("should respect buffer size in MakeWithBuffer", func() { + // Create mux with buffer of 2 + m := mux.Make(mux.Buffered[int](2)) + defer m.Close() + + in := make(chan int) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + // This goroutine will block after 2 submissions + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + m.Submit(i) + } + }() + + // Receive values + Eventually(in).Should(Receive(Equal(0))) + Eventually(in).Should(Receive(Equal(1))) + Eventually(in).Should(Receive(Equal(2))) + + wg.Wait() + }) + }) + + Context("closing", func() { + It("should properly clean up when closed", func() { + m := mux.Make[string]() + in1 := make(chan string) + in2 := make(chan string) + m.Subscribe(mux.SinkFromChan(in1)) + m.Subscribe(mux.SinkFromChan(in2)) + + m.Close() + + // Channels should eventually close + Eventually(func() bool { + _, ok := <-in1 + return ok + }).Should(BeFalse()) + + Eventually(func() bool { + _, ok := <-in2 + return ok + }).Should(BeFalse()) + }) + }) +}) \ No newline at end of file diff --git a/internal/mux/mux_test.go b/internal/mux/mux_test.go new file mode 100644 index 0000000..f8273d4 --- /dev/null +++ b/internal/mux/mux_test.go @@ -0,0 +1,13 @@ +package mux_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMux(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Mux Suite") +} \ No newline at end of file diff --git a/internal/plugin/partition.go b/internal/plugin/partition.go new file mode 100644 index 0000000..4e74479 --- /dev/null +++ b/internal/plugin/partition.go @@ -0,0 +1,118 @@ +package plugin + +import ( + "context" + "path" + "regexp" + "strings" + + "github.com/ydb-platform/ydb-disk-manager/internal/udev" + + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Partition represents a partition of a disk as a Resource. +type partition struct { + domain string + label string + dev udev.Device +} + +func (p *partition) Id() Id { + return Id(p.label) +} + +func (p *partition) Health() Health { + return Healthy{} //TODO: health check? +} + +func (p *partition) TopologyHints() *pluginapi.TopologyInfo { + numaNode := p.dev.NumaNode() + if numaNode < 0 { + return nil + } + + return &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + { + ID: int64(numaNode), + }, + }, + } +} + +func sanitizeEnv(s string) string { + replacer := strings.NewReplacer(".", "_", "-", "_") + return strings.ToUpper(replacer.Replace(s)) +} + +func (p *partition) envName(env string) string { + return sanitizeEnv(p.domain) + "_PART_" + sanitizeEnv(p.label) + "_" + sanitizeEnv(env) +} + +func (p *partition) Allocate(context.Context) (*pluginapi.ContainerAllocateResponse, error) { + response := &pluginapi.ContainerAllocateResponse{} + + containerPath := path.Join("/dev", "allocated", p.domain, "part", p.label) + + response.Devices = append(response.Devices, &pluginapi.DeviceSpec{ + HostPath: p.dev.DevNode(), + ContainerPath: containerPath, + Permissions: "rw", + }) + + response.Envs = make(map[string]string) + response.Envs[p.envName("PATH")] = containerPath + response.Envs[p.envName("DISK_ID")] = + p.dev.SystemAttributeLookup(udev.SysAttrWWID) + response.Envs[p.envName("DISK_MODEL")] = + p.dev.SystemAttributeLookup(udev.SysAttrModel) + + serial := p.dev.SystemAttributeLookup(udev.SysAttrSerial) + if serial == "" { + serial = p.dev.PropertyLookup(udev.PropertyShortSerial) + } + if serial != "" { + response.Envs[p.envName("DISK_SERIAL")] = serial + } + + klog.Info("allocated partition: ", p.label) + klog.V(2).Infof("%+v", response) + + return response, nil +} + +func PartitionLabelMatcher(domain string, matcher *regexp.Regexp) FromDevice[*partition] { + return func(dev udev.Device) **partition { + var part *partition + if dev.Subsystem() != udev.BlockSubsystem { + return nil + } + if dev.DevType() != udev.DeviceTypePart { + return nil + } + + partlabel, found := dev.Properties()[udev.PropertyPartName] + if !found { + return nil + } + + matches := matcher.FindStringSubmatch(partlabel) + if len(matches) == 0 { + return nil + } + + if len(matches) > 1 { + partlabel = matches[1] + } + + part = &partition{ + label: partlabel, + domain: domain, + dev: dev, + } + + return &part + } +} \ No newline at end of file diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go new file mode 100644 index 0000000..21f7bcf --- /dev/null +++ b/internal/plugin/plugin.go @@ -0,0 +1,191 @@ +package plugin + +import ( + "context" + "fmt" + "net" + "os" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/kennygrant/sanitize" + + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +type plugin struct { + resource Resource + cancel context.CancelFunc +} + +func newPlugin(resource Resource, ctx context.Context, wg *sync.WaitGroup) (*plugin, error) { + ctx, cancel := context.WithCancel(ctx) + plugin := &plugin{ + resource: resource, + cancel: cancel, + } + + socketPath := pluginapi.DevicePluginPath + plugin.socketPath() + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + klog.Errorf("%q: failed to remove socket file %q: %v", plugin.resource.Name(), socketPath, err) + return nil, fmt.Errorf("failed to remove socket file %s: %w", socketPath, err) + } + + server := grpc.NewServer() + pluginapi.RegisterDevicePluginServer(server, plugin) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + klog.Errorf("%q: failed to listen on socket %q: %v", plugin.resource.Name(), socketPath, err) + return nil, fmt.Errorf("failed to listen on socket %s: %w", socketPath, err) + } + + go server.Serve(listener) + + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup, l net.Listener, s *grpc.Server) { + defer wg.Done() + defer l.Close() + defer s.Stop() + klog.Infof("Serving device plugin %q on socket %q", plugin.resource.Name(), socketPath) + <-ctx.Done() + }(ctx, wg, listener, server) + + return plugin, nil +} + +func (p *plugin) stop() { + if p.cancel != nil { + klog.Infof("%q: Stopping device plugin", p.resource.Name()) + p.cancel() + } +} + +func (p *plugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{}, nil +} + +func (p *plugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +func (p *plugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} + +func (p *plugin) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) (err error) { + defer klog.Infof("%q: closing ListAndWatch connection, err = %v", p.resource.Name(), err) + + for instances := range p.resource.ListAndWatch(stream.Context()) { + devices := make([]*pluginapi.Device, len(instances)) + for i, instance := range instances { + devices[i] = &pluginapi.Device{ + ID: string(instance.Id()), + Health: instance.Health().String(), + Topology: instance.TopologyHints(), + } + } + klog.V(2).Infof("%q: sending devices to ListAndWatch stream: %+v", p.resource.Name(), devices) + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: devices}); err != nil { + klog.Errorf("%q: failed to send devices to ListAndWatch stream: %v", p.resource.Name(), err) + return err + } + } + return nil +} + +func mergeResponses(responses ...*pluginapi.ContainerAllocateResponse) *pluginapi.ContainerAllocateResponse { + response := &pluginapi.ContainerAllocateResponse{} + for _, r := range responses { + if r == nil { + continue + } + response.Devices = append(response.Devices, r.Devices...) + response.Mounts = append(response.Mounts, r.Mounts...) + if len(r.Envs) > 0 { + if response.Envs == nil { + response.Envs = make(map[string]string) + } + for key, value := range r.Envs { + response.Envs[key] = value + } + } + if len(r.Annotations) > 0 { + if response.Annotations == nil { + response.Annotations = make(map[string]string) + } + for key, value := range r.Annotations { + response.Annotations[key] = value + } + } + } + return response +} + +func (p *plugin) Allocate(ctx context.Context, request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + klog.Infof("%q: Received allocation request", p.resource.Name()) + klog.V(2).Infof("%+v", request) + + instances := p.resource.Instances() + response := &pluginapi.AllocateResponse{} + for _, containerRequest := range request.ContainerRequests { + containerResponse := &pluginapi.ContainerAllocateResponse{} + klog.V(2).Infof("%q: Processing container request: %+v", p.resource.Name(), containerRequest) + for _, id := range containerRequest.DevicesIDs { + instance, found := instances[Id(id)] + if !found { + klog.Errorf("%q: device with ID %q not found", p.resource.Name(), id) + return nil, status.Errorf(codes.NotFound, "device with ID %q not found", id) + } + allocateResponse, err := instance.Allocate(ctx) + if err != nil { + klog.Errorf("%q: failed to allocate device with ID %q: %v", p.resource.Name(), id, err) + return nil, status.Errorf(codes.Internal, "failed to allocate device with ID %q: %s", id, err.Error()) + } + containerResponse = mergeResponses(containerResponse, allocateResponse) + } + response.ContainerResponses = append(response.ContainerResponses, containerResponse) + } + + klog.V(2).Infof("%q: Responding to allocation request with: %+v", p.resource.Name(), response) + return response, nil +} + +func (p *plugin) socketPath() string { + return sanitize.BaseName(p.resource.Name()) + ".sock" +} + +func (p *plugin) probe(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 1 * time.Second) + defer cancel() + + pluginAddr := "unix://" + pluginapi.DevicePluginPath + p.socketPath() + + conn, err := grpc.DialContext( + ctx, + pluginAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + klog.Errorf("%q: failed to dial %q: %v", p.resource.Name(), pluginAddr, err) + return fmt.Errorf("failed to dial %q: %w", pluginAddr, err) + } + defer conn.Close() + + client := pluginapi.NewDevicePluginClient(conn) + _, err = client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + + if err != nil { + klog.Errorf("%q: failed to get device plugin options: %v", p.resource.Name(), err) + return fmt.Errorf("plugin[%q]: failed to get device plugin options: %w", p.resource.Name(), err) + } + + return nil +} \ No newline at end of file diff --git a/internal/plugin/registry.go b/internal/plugin/registry.go new file mode 100644 index 0000000..47e4ef5 --- /dev/null +++ b/internal/plugin/registry.go @@ -0,0 +1,166 @@ +package plugin + +import ( + "context" + "fmt" + "net/http" + "path" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/fsnotify/fsnotify" + "k8s.io/klog/v2" + + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Registry is a lifecycle manager for plugins. +// It is responsible for (re-)registering plugins with the kubelet. +type Registry struct { + plugins sync.Map + ctx context.Context + wg *sync.WaitGroup + watcher *fsnotify.Watcher +} + +const kubeletAddr = "unix://" + pluginapi.KubeletSocket + +// register advertises plugin socket to the kubelet. +func (r *Registry) register(plugin *plugin) error { + conn, err := grpc.Dial(kubeletAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if err != nil { + klog.Errorf("failed to dial %q: %v", kubeletAddr, err) + return err + } + defer func() { + err := conn.Close() + if err != nil { + klog.Errorf("failed to close connection: %v", err) + } + }() + + client := pluginapi.NewRegistrationClient(conn) + + _, err = client.Register(context.Background(), &pluginapi.RegisterRequest{ + ResourceName: plugin.resource.Name(), + Version: pluginapi.Version, + Endpoint: plugin.socketPath(), + Options: &pluginapi.DevicePluginOptions{}, + }) + if err != nil { + klog.Infof("failed to register with kubelet: %v", err) + return fmt.Errorf("failed to register with kubelet: %w", err) + } + klog.Infof("registered device %s with kubelet", plugin.resource.Name()) + return nil +} + +// hup registers all plugins with the freshly kubelet. +// Newely started kubelet removes all socket files, so we need to re-register +// all plugins. See https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/#handling-kubelet-restarts +func (r *Registry) hup() { + r.plugins.Range(func(_, p interface{}) bool { + plugin := p.(*plugin) + plugin.stop() + plugin, err := newPlugin(plugin.resource, r.ctx, r.wg) + if err != nil { + klog.Errorf("failed to create plugin for %s: %v", plugin.resource.Name(), err) + return true + } + if err := r.register(plugin); err != nil { + klog.Errorf("failed to register %s: %v", plugin.resource.Name(), err) + } + return true + }) +} + +// NewRegistry creates a new registry and starts goroutine +// that watches for kubelet restarts. Whenever kubelet restarts, +// the registry will re-register all plugins. +// `ctx`: context that controls the lifecycle of the registry. +// `wg`: wait group that would be waited on before the registry is stopped. +func NewRegistry(ctx context.Context, wg *sync.WaitGroup) (*Registry, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("failed to create fsnotifier watcher: %v", err) + return nil, fmt.Errorf("failed to create fsnotifier watcher: %w", err) + } + + registry := &Registry{ + plugins: sync.Map{}, + wg: wg, + ctx: ctx, + watcher: watcher, + } + + watcher.Add(path.Join(pluginapi.KubeletSocket)) + + registry.wg.Add(1) + go func(r *Registry) { + defer r.wg.Done() + defer r.watcher.Close() + + for { + select { + case event := <-r.watcher.Events: + if event.Op & fsnotify.Create != 0 { + r.hup() + } + case <-r.ctx.Done(): + // Parent context is done, exit the goroutine. + return + } + } + }(registry) + + return registry, nil +} + +func (r *Registry) Healthz(resp http.ResponseWriter, req *http.Request) { + unhealthy := make([]string, 0) + r.plugins.Range(func(_, p interface{}) bool { + plugin := p.(*plugin) + if err := plugin.probe(req.Context()); err != nil { + klog.Errorf("probe failed for %s: %v", plugin.resource.Name(), err) + unhealthy = append(unhealthy, plugin.resource.Name()) + } else { + klog.V(2).Infof("probe succeeded for %s", plugin.resource.Name()) + } + return true + }) + + if len(unhealthy) == 0 { + resp.WriteHeader(http.StatusOK) + } else { + resp.WriteHeader(http.StatusInternalServerError) + for _, name := range unhealthy { + fmt.Fprintf(resp, "probe failed for device plugin %q\n", name) + } + } +} + +// Add creates a new plugin for given Resource and registers it with the +// kubelet. Attempts to register resource with the same name twice will result +// in an error. +func (r *Registry) Add(resource Resource) error { + plugin, err := newPlugin(resource, r.ctx, r.wg) + if err != nil { + klog.Errorf("failed to create plugin for resource %q Cause: %v", resource.Name(), err) + return err + } + + _, loaded := r.plugins.LoadOrStore(resource.Name(), plugin) + if loaded { + klog.Errorf("resource with name %q already exists", resource.Name()) + return fmt.Errorf("resource with name %q already exists", resource.Name()) + } + if err := r.register(plugin); err != nil { + klog.Errorf("failed to register resource %q Cause: %v", resource.Name(), err) + return err + } + return nil +} + + diff --git a/internal/plugin/resource.go b/internal/plugin/resource.go new file mode 100644 index 0000000..3a1875f --- /dev/null +++ b/internal/plugin/resource.go @@ -0,0 +1,129 @@ +package plugin + +import ( + "context" + + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +type Health interface { + String() string + sealed() +} + +type Healthy struct{} + +func (Healthy) sealed() {} + +func (Healthy) String() string { + return "Healthy" +} + +type Unhealthy struct{} + +func (Unhealthy) sealed() {} + +func (Unhealthy) String() string { + return "Unhealthy" +} + +type Id string + +type Instance interface { + Id() Id + Health() Health + TopologyHints() *pluginapi.TopologyInfo + Allocate(context.Context) (*pluginapi.ContainerAllocateResponse, error) +} + +type healthInstance struct { + Instance + health Health +} + +func (h *healthInstance) Id() Id { + return h.Instance.Id() +} + +func (h *healthInstance) Health() Health { + if _, ok := h.health.(Healthy); ok { + return h.Instance.Health() + } + return h.health +} + +func (h *healthInstance) TopologyHints() *pluginapi.TopologyInfo { + return h.Instance.TopologyHints() +} + +func (h *healthInstance) Allocate(ctx context.Context) (*pluginapi.ContainerAllocateResponse, error) { + return h.Instance.Allocate(ctx) +} + +type FromDevice[T Instance] func(dev udev.Device) *T + +func (f FromDevice[T]) AsFilter() mux.FilterFunc[udev.Device] { + return func(dev udev.Device) bool { + return f(dev) != nil + } +} + +type HealthEvent struct { + Instance + Health +} + +type Resource interface { + mux.Sink[HealthEvent] + Name() string + Instances() map[Id]Instance + ListAndWatch(context.Context) <-chan []Instance +} + +type singletonResource struct { + resourcePrefix string + instance Instance + healthCh chan HealthEvent +} + +func (r *singletonResource) Name() string { + return r.resourcePrefix + string(r.instance.Id()) +} + +func (r *singletonResource) Instances() map[Id]Instance { + return map[Id]Instance{ + r.instance.Id(): r.instance, + } +} + +func (r *singletonResource) Submit(ev HealthEvent) error { + r.healthCh <- ev + return nil +} + +func (r *singletonResource) run(instanceCh chan<- []Instance) { + defer close(instanceCh) + instanceCh <- []Instance{r.instance} + for ev := range r.healthCh { + r.instance = &healthInstance{ + Instance: r.instance, + health: ev.Health, + } + instanceCh <- []Instance{r.instance} + } +} + +func (r *singletonResource) ListAndWatch(ctx context.Context) <-chan []Instance { + ch := make(chan []Instance, 1) + + go r.run(ch) + + return ch +} + +func (r *singletonResource) Close() { + close(r.healthCh) +} \ No newline at end of file diff --git a/internal/plugin/scatter.go b/internal/plugin/scatter.go new file mode 100644 index 0000000..85e4fa3 --- /dev/null +++ b/internal/plugin/scatter.go @@ -0,0 +1,95 @@ +package plugin + +import ( + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +type Scatter[T Instance] struct { + mapper FromDevice[T] + registry *Registry + resourceDomain string + resourcePrefix string + routes map[Id]Resource +} + +func NewScatter[T Instance](d udev.Discovery, registry *Registry, resourceDomain, resourcePrefix string, mapper FromDevice[T]) mux.CancelFunc { + scatter := &Scatter[T]{ + mapper: mapper, + registry: registry, + resourceDomain: resourceDomain, + resourcePrefix: resourcePrefix, + routes: make(map[Id]Resource), + } + ch := make(chan udev.Event) + + go scatter.run(ch) + + return d.Subscribe(mux.SinkFromChan(ch)) +} + +func (s *Scatter[T]) add(id Id, resource Resource) { + err := s.registry.Add(resource) + if err != nil { + klog.Errorf("failed to add resource %s: %v", resource.Name, err) + return + } + s.routes[id] = resource +} + +func (s *Scatter[T]) run(evCh <-chan udev.Event) { + for ev := range evCh { + switch ev := ev.(type) { + case udev.Init: + for _, dev := range ev.Devices { + maybeInstance := s.mapper(dev) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Init: Matched device: %s", dev.Debug()) + instance := *maybeInstance + id := instance.Id() + if _, ok := s.routes[id]; ok { + continue + } + resource := &singletonResource{ + resourcePrefix: s.resourceDomain + "/" + s.resourcePrefix, + instance: instance, + healthCh: make(chan HealthEvent), + } + s.add(id, resource) + } + case udev.Added: + maybeInstance := s.mapper(ev.Device) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Added: Matched device: %s", ev.Device.Debug()) + instance := *maybeInstance + id := instance.Id() + if resource, ok := s.routes[id]; ok { + resource.Submit(HealthEvent{instance, Healthy{}}) + } else { + resource := &singletonResource{ + resourcePrefix: s.resourcePrefix, + instance: instance, + healthCh: make(chan HealthEvent), + } + s.add(id, resource) + } + case udev.Removed: + maybeInstance := s.mapper(ev.Device) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Removed: Matched device: %s", ev.Device.Debug()) + instance := *maybeInstance + id := instance.Id() + if resource, ok := s.routes[id]; ok { + resource.Submit(HealthEvent{instance, Unhealthy{}}) + } + } + } +} diff --git a/internal/udev/discovery.go b/internal/udev/discovery.go new file mode 100644 index 0000000..6403778 --- /dev/null +++ b/internal/udev/discovery.go @@ -0,0 +1,61 @@ +package udev + +import ( + "github.com/ydb-platform/ydb-disk-manager/internal/mux" +) + +type Id string + +type Device interface { + Id() Id + Parent() Device + Subsystem() string + DevType() string + DevNode() string + DevLinks() []string + Properties() map[string]string + Property(string) string + PropertyLookup(string) string + SystemAttributes() map[string]string + SystemAttributeKeys() []string + SystemAttribute(string) string + SystemAttributeLookup(string) string + Tags() []string + NumaNode() int + + Debug() string +} + +type Event interface { + eventSealed() +} + +type Init struct { + Devices []Device +} + +func (Init) eventSealed() {} + +type Added struct { + Device +} + +func (Added) eventSealed() {} + +type Removed struct { + Device +} + +func (Removed) eventSealed() {} + +type Slice interface { + mux.Source[[]Device] +} + +type Discovery interface { + mux.Source[Event] + DeviceById(Id) Device + State(mux.FilterFunc[Device]) map[Id]Device + Slice(mux.FilterFunc[Device]) Slice + Close() +} diff --git a/internal/udev/udev.go b/internal/udev/udev.go new file mode 100644 index 0000000..7c73a7d --- /dev/null +++ b/internal/udev/udev.go @@ -0,0 +1,386 @@ +package udev + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + libudev "github.com/jochenvg/go-udev" + + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" +) + +const ( + BlockSubsystem = "block" + + DeviceTypeKey = "DEVTYPE" + DeviceTypePart = "partition" + + PropertyPartName = "PARTNAME" + PropertyModel = "ID_MODEL" + PropertyShortSerial = "ID_SERIAL_SHORT" + + SysAttrWWID = "wwid" + SysAttrModel = "model" + SysAttrSerial = "serial" + + ActionAdd = "add" + ActionRemove = "remove" + ActionOffline = "offline" + ActionOnline = "online" +) + +type monitorRequest interface { + requestSealed() +} + +type stateRequest struct { + filter mux.FilterFunc[Device] +} + +func (r stateRequest) requestSealed() {} + +type stopRequest struct{} + +func (r stopRequest) requestSealed() {} + +type newSub struct { + sink mux.Sink[Event] +} + +func (n newSub) requestSealed() {} + +type generic struct { + udev Discovery + + dev *libudev.Device + parent Device +} + +func (g *generic) Id() Id { + return Id(g.dev.Syspath()) +} + +func (g *generic) Parent() Device { + if g.parent == nil { + g.parent = g.udev.DeviceById(Id(g.dev.Syspath())) + } + return g.parent +} + +func (g *generic) Subsystem() string { + return g.dev.Subsystem() +} + +func (g *generic) DevType() string { + return g.dev.Devtype() +} + +func (g *generic) DevNode() string { + return g.dev.Devnode() +} + +func (g *generic) DevLinks() []string { + devlinks := g.dev.Devlinks() + res := make([]string, 0, len(devlinks)) + for link := range devlinks { + res = append(res, link) + } + return res +} + +func (g *generic) Properties() map[string]string { + return g.dev.Properties() +} + +func (g *generic) Property(key string) string { + return strings.TrimSpace(g.dev.PropertyValue(key)) +} + +func (g *generic) PropertyLookup(key string) string { + value := g.Property(key) + if value == "" && g.parent != nil { + return g.parent.PropertyLookup(key) + } + + return value +} + +func (g *generic) SystemAttributeKeys() []string { + sysattrs := g.dev.Sysattrs() + res := make([]string, 0, len(sysattrs)) + for attr := range sysattrs { + res = append(res, attr) + } + return res +} + +func (g *generic) SystemAttribute(key string) string { + return strings.TrimSpace(g.dev.SysattrValue(key)) +} + +func (g *generic) SystemAttributes() map[string]string { + res := make(map[string]string) + for attr := range g.dev.Sysattrs() { + res[attr] = g.SystemAttribute(attr) + } + return res +} + +func (g *generic) SystemAttributeLookup(key string) string { + value := g.SystemAttribute(key) + if value == "" && g.parent != nil { + return g.parent.SystemAttributeLookup(key) + } + + return value +} + +func (g *generic) Tags() []string { + tags := g.dev.Tags() + res := make([]string, 0, len(tags)) + for tag := range tags { + res = append(res, tag) + } + return res +} + +func (g *generic) NumaNode() int { + numaNodeStr := g.SystemAttributeLookup("numa_node") + if numaNode, err := strconv.Atoi(numaNodeStr); err == nil { + return numaNode + } + return -1 +} + +func (g *generic) Debug() string { + return fmt.Sprintf("Device[ID=%s, Subsystem=%s, DevType=%s, DevNode=%s, NumaNode=%d, Links=%v, Tags=%v, Properties=%v, SysAttrs=%v]", + g.Id(), + g.Subsystem(), + g.DevType(), + g.DevNode(), + g.NumaNode(), + g.DevLinks(), + g.Tags(), + g.Properties(), + g.SystemAttributes(), + ) +} + + +type udevSlice struct { + discovery *udevDiscovery + state map[Id]Device + filter mux.FilterFunc[Device] + mux *mux.Mux[[]Device] + stop mux.CancelFunc +} + +func (s *udevSlice) Close() { + s.stop() +} + +func (s *udevSlice) Subscribe(sink mux.Sink[[]Device]) mux.CancelFunc { + return s.mux.Subscribe(sink) +} + +type udevDiscovery struct { + udev libudev.Udev + state map[Id]Device // should be accessed only by monitor goroutine + requests chan mux.AwaitReply[monitorRequest, any] + mux *mux.Mux[Event] + wg *sync.WaitGroup +} + +func NewDiscovery(wg *sync.WaitGroup) (Discovery, error) { + d := &udevDiscovery{ + state: make(map[Id]Device), + requests: make(chan mux.AwaitReply[monitorRequest, any]), + mux: mux.Make[Event](), + wg: wg, + } + enum := d.udev.NewEnumerate() + + devs, err := enum.Devices() + if err != nil { + klog.Errorf("Failed to enumerate devices: %v", err) + return nil, err + } + + for _, dev := range devs { + if dev == nil { + klog.Error("udev device is nil!") + continue + } + device := &generic{ + udev: d, + dev: dev, + } + if dev.Parent() != nil { + device.parent = d.state[Id(dev.Parent().Syspath())] + } + + d.state[Id(dev.Syspath())] = device + } + + wg.Add(1) + go d.monitor(wg) + + return d, nil +} + +func (d *udevDiscovery) Close() { + await := mux.NewAwaitReply[monitorRequest, any](stopRequest{}) + defer await.Await() + d.requests <- await +} + +// State returns the current state of the devices as seen by the monitor +func (d *udevDiscovery) State(filter mux.FilterFunc[Device]) map[Id]Device { + await := mux.NewAwaitReply[monitorRequest, any](stateRequest{filter: filter}) + d.requests <- await + return await.Await().(map[Id]Device) +} + +func (d *udevDiscovery) DeviceById(id Id) Device { + return d.state[id] // read-only access should be safe. worst case it would return stale data +} + +func (d *udevDiscovery) Slice(filter mux.FilterFunc[Device]) Slice { + slice := &udevSlice{ + discovery: d, + state: make(map[Id]Device), + filter: filter, + mux: mux.Make[[]Device](), + } + + evCh := make(chan Event) + + go func() { + defer slice.mux.Close() + for ev := range evCh { // exits on close + switch e := ev.(type) { + case Init: + for _, dev := range e.Devices { + if filter(dev) { + slice.state[dev.Id()] = dev + } + } + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + case Added: + if filter(e.Device) { + slice.state[e.Device.Id()] = e.Device + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + } + case Removed: + if _, found := slice.state[e.Device.Id()]; found { + delete(slice.state, e.Device.Id()) + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + } + } + } + }() + + evSink := mux.SinkFromChan(evCh) + cancelEvSub := d.mux.Subscribe(evSink) + slice.stop = cancelEvSub + + return slice +} + +func (d *udevDiscovery) monitor(wg *sync.WaitGroup) { + defer wg.Done() + defer d.mux.Close() + defer close(d.requests) + + mon := d.udev.NewMonitorFromNetlink("udev") + devChan, errChan, err := mon.DeviceChan(context.Background()) + if err != nil { + klog.Errorf("Failed to create device channel: %v", err) + return + } + + for { + select { + case dev := <-devChan: + klog.V(5).Infof("Received device event (%s): %s", dev.Action(), dev.Syspath()) + switch dev.Action() { + case ActionAdd, ActionOnline: + id := Id(dev.Syspath()) + dev := &generic{ + udev: d, + dev: dev, + } + d.state[id] = dev + d.mux.Submit(Added{dev}) + case ActionRemove, ActionOffline: + id := Id(dev.Syspath()) + dev := d.state[id] + delete(d.state, id) + d.mux.Submit(Removed{dev}) + } + case req := <-d.requests: + switch r := req.Value().(type) { + case stateRequest: + state := make(map[Id]Device) + for k, v := range d.state { + if r.filter(v) { + state[k] = v + } + } + req.Reply(state) + case newSub: + init := make([]Device, 0, len(d.state)) + for _, dev := range d.state { + init = append(init, dev) + } + err := r.sink.Submit(Init{init}) + if err != nil { + klog.Errorf("Failed to submit init event: %v", err) + } + cancel := d.mux.Subscribe(r.sink) + req.Reply(cancel) + case stopRequest: + req.Reply(nil) + return + } + case err := <-errChan: + klog.Errorf("Error from udev monitor, will try to retry connecting to udev: %v", err) + retry: mon = d.udev.NewMonitorFromNetlink("udev") + devChan, errChan, err = mon.DeviceChan(context.Background()) + if err != nil { + klog.Errorf("Failed to create device channel, retrying: %v", err) + time.Sleep(1 * time.Second) + goto retry + } + klog.Infof("Successfully reconnected to udev") + } + } +} + + +func (d *udevDiscovery) Subscribe(sink mux.Sink[Event]) mux.CancelFunc { + // here we're doing initialization in monitor goroutine + // to be able to pass consistent Init event to the sink + // before making fan out of udev events + await := mux.NewAwaitReply[monitorRequest, any](newSub{sink}) + d.requests <- await + return await.Await().(mux.CancelFunc) +} diff --git a/udev-manager.Dockerfile b/udev-manager.Dockerfile new file mode 100644 index 0000000..0a80638 --- /dev/null +++ b/udev-manager.Dockerfile @@ -0,0 +1,24 @@ +FROM golang:1.24-bookworm as build + +WORKDIR /go/app +COPY . /go/app + +RUN apt update && apt install -y libudev-dev +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o udev-manager cmd/udev-manager/main.go + +FROM debian:bookworm + +WORKDIR /root + +RUN apt update && apt install -y libudev1 && apt clean +RUN echo '#!/bin/bash\n\ +set -e\n\ +\n\ +# Add initialization logic here if needed\n\ +\n\ +exec /usr/bin/udev-manager --config file:/etc/udev-manager/config.yaml ${UDEV_MANAGER_ARGS[@]}' > /usr/local/bin/entrypoint.sh && \ + chmod +x /usr/local/bin/entrypoint.sh +COPY --from=build /go/app/udev-manager /usr/bin/udev-manager + +ENV UDEV_MANAGER_ARGS= +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] diff --git a/ydb-disk-manager.code-workspace b/ydb-disk-manager.code-workspace new file mode 100644 index 0000000..f760840 --- /dev/null +++ b/ydb-disk-manager.code-workspace @@ -0,0 +1,16 @@ +{ + "folders": [ + { + "path": "." + } + ], + "extensions": { + "recommendations": [ + "golang.go", + "fill-labs.dependi", + "ms-vscode.makefile-tools", + "redhat.vscode-yaml", + "usernamehw.errorlens", + ] + } +} \ No newline at end of file