From 220895b9271621e330fe4ea4ea9de4df5490ffaa Mon Sep 17 00:00:00 2001 From: Andrey Korzinev Date: Mon, 10 Mar 2025 14:34:31 +0100 Subject: [PATCH] udev-manager: new disk manager plugin based on udev This plugin was designed from ground-up to allow exposing distinct device plugins for identifiable devices on host. This enables enchanced scheduling capabilities, including conf-less startup and NUMA topology hinting for the requestor. In future work this plugin may be enchanced further, e.g. exposing disks by serial or WWID to the Kube resource topology, allowing tie specific YDB pods to specific discs regardless of where are those disks located. Signed-off-by: Andrey Korzinev --- .gitignore | 1 + Dockerfile | 8 +- cmd/udev-manager/main.go | 279 ++++++++++++++++++++++ cmd/ydb-disk-manager/main.go | 3 +- go.mod | 24 +- go.sum | 47 +++- internal/mux/filter.go | 79 +++++++ internal/mux/filter_spec_test.go | 177 ++++++++++++++ internal/mux/mux.go | 259 +++++++++++++++++++++ internal/mux/mux_suite_test.go | 200 ++++++++++++++++ internal/mux/mux_test.go | 13 ++ internal/plugin/partition.go | 118 ++++++++++ internal/plugin/plugin.go | 191 +++++++++++++++ internal/plugin/registry.go | 166 +++++++++++++ internal/plugin/resource.go | 129 +++++++++++ internal/plugin/scatter.go | 95 ++++++++ internal/udev/discovery.go | 61 +++++ internal/udev/udev.go | 386 +++++++++++++++++++++++++++++++ udev-manager.Dockerfile | 24 ++ ydb-disk-manager.code-workspace | 16 ++ 20 files changed, 2254 insertions(+), 22 deletions(-) create mode 100644 .gitignore create mode 100644 cmd/udev-manager/main.go create mode 100644 internal/mux/filter.go create mode 100644 internal/mux/filter_spec_test.go create mode 100644 internal/mux/mux.go create mode 100644 internal/mux/mux_suite_test.go create mode 100644 internal/mux/mux_test.go create mode 100644 internal/plugin/partition.go create mode 100644 internal/plugin/plugin.go create mode 100644 internal/plugin/registry.go create mode 100644 internal/plugin/resource.go create mode 100644 internal/plugin/scatter.go create mode 100644 internal/udev/discovery.go create mode 100644 internal/udev/udev.go create mode 100644 udev-manager.Dockerfile create mode 100644 ydb-disk-manager.code-workspace diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..600d2d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 47107a4..838f1e3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,16 @@ -FROM golang:1.18 as build +FROM golang:1.24-bookworm as build WORKDIR /go/app COPY . /go/app -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go +RUN apt update && apt install -y libudev-dev +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go -FROM ubuntu:20.04 +FROM debian:bookworm WORKDIR /root +RUN apt update && apt install -y libudev1 && apt clean COPY --from=build /go/app/ydb-disk-manager /usr/bin/ydb-disk-manager ENTRYPOINT ["/usr/bin/ydb-disk-manager"] diff --git a/cmd/udev-manager/main.go b/cmd/udev-manager/main.go new file mode 100644 index 0000000..45c9550 --- /dev/null +++ b/cmd/udev-manager/main.go @@ -0,0 +1,279 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "regexp" + "strings" + "sync" + "syscall" + + "gopkg.in/yaml.v3" + + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/plugin" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +func main() { + appContext, appCancel := context.WithCancel(context.Background()) + appWaitGroup := &sync.WaitGroup{} + defer appWaitGroup.Wait() + defer appCancel() + + flags := initFlags() + + // Registry creates a separate plugin for each resource registered + registry, err := plugin.NewRegistry(appContext, appWaitGroup) + if err != nil { + klog.Fatalf("failed to create plugin registry: %v", err) + os.Exit(1) + } + + // udev discovery looks up devices and listens for system events + devDiscovery, err := udev.NewDiscovery(appWaitGroup) + if err != nil { + klog.Fatalf("failed to start udev discovery: %v", err) + os.Exit(1) + } + defer devDiscovery.Close() + + domain := flags.config.DeviceDomain + + var cancel mux.CancelFunc = mux.CancelFunc(appCancel) + for _, partConfig := range flags.config.Partitions { + cancel = mux.ChainCancelFunc( + plugin.NewScatter( + devDiscovery, + registry, + domain, + "part-", + plugin.PartitionLabelMatcher(domain, partConfig.matcher), + ), + cancel, + ) + } + + klog.Info("Starting /healthz server on port :8080") + go func() { + http.HandleFunc("/healthz", registry.Healthz) + if err := http.ListenAndServe(":8080", nil); err != nil { + klog.Fatalf("failed to start /healthz server: %v", err) + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + for signal := range sigs { + switch signal { + case syscall.SIGINT, syscall.SIGTERM: + klog.Infof("Received signal %q, shutting down", signal.String()) + return + } + } +} + +type configSource interface { + String() string + open() (io.Reader, func() error, error) +} + +type fileConfigSource struct { + path string +} + +func (fcs *fileConfigSource) open() (io.Reader, func() error, error) { + file, err := os.Open(fcs.path) + if err != nil { + return nil, nil, err + } + return file, file.Close, nil +} + +func (fcs *fileConfigSource) String() string { + return "file:" + fcs.path +} + +type envConfigSource struct { + variable string +} + +func (ecs *envConfigSource) open() (io.Reader, func() error, error) { + data := os.Getenv(ecs.variable) + if data == "" { + return nil, nil, fmt.Errorf("config: environment variable %s is not set", ecs.variable) + } + return strings.NewReader(data), func() error { return nil }, nil +} + +func (ecs *envConfigSource) String() string { + return "env:" + ecs.variable +} + +type stdinConfigSource struct{} + +func (scs *stdinConfigSource) open() (io.Reader, func() error, error) { + return os.Stdin, func() error { return nil }, nil +} + +func (scs *stdinConfigSource) String() string { + return "stdin" +} + +type ConfigFlag struct { + configSource +} + +func (cf *ConfigFlag) Set(value string) error { + if strings.HasPrefix(value, "file:") { + cf.configSource = &fileConfigSource{path: strings.TrimPrefix(value, "file:")} + } else if strings.HasPrefix(value, "env:") { + cf.configSource = &envConfigSource{variable: strings.TrimPrefix(value, "env:")} + } else if strings.HasPrefix(value, "stdin") { + cf.configSource = &stdinConfigSource{} + } else { + return fmt.Errorf("invalid config source: %s", value) + } + + return nil +} + +func (cf *ConfigFlag) String() string { + if cf.configSource == nil { + return "" + } + return cf.configSource.String() +} + +type FlagValues struct { + Config ConfigFlag + + config *Config +} + +func initFlags() FlagValues { + values := FlagValues{} + flags := flag.NewFlagSet("udev-manager", flag.ExitOnError) + klog.InitFlags(flags) + flags.Var(&values.Config, "config", `configuration source (in form "file:", "env:" or "stdin")`) + flags.Parse(os.Args[1:]) + if values.Config.configSource == nil { + flags.Output().Write([]byte("config flag is required\n")) + flags.Usage() + os.Exit(2) + } + configReader, configCloser, err := values.Config.open() + if err != nil { + klog.Fatalf("failed to open --config %q: %v", values.Config.String(), err) + os.Exit(1) + } + defer configCloser() + + config, err := parseConfig(configReader) + if err != nil { + klog.Fatalf("failed to parse --config %q: %v", values.Config.String(), err) + os.Exit(1) + } + + values.config = config + + return values +} + +type PartitionsConfig struct { + Matcher string `yaml:"matcher"` // matcher should be a valid regular expression + + matcher *regexp.Regexp // compiled matcher if the config is valid +} + +func (pc *PartitionsConfig) validate() error { + // Compile the regular expression + matcher, err := regexp.Compile(pc.Matcher) + if err != nil { + return fmt.Errorf(".matcher: %q must be a valid regexp: %w", pc.Matcher, err) + } + if matcher.NumSubexp() > 1 { + return fmt.Errorf(".matcher: %q must have at most one capturing group", pc.Matcher) + } + pc.matcher = matcher + return nil +} + +type HostDevConfig struct { + Matcher string `yaml:"matcher"` // matcher should be a valid regular expression + Prefix string `yaml:"prefix"` + + matcher *regexp.Regexp // compiled matcher if the config is valid +} + +func (hc *HostDevConfig) validate() error { + // Compile the regular expression + matcher, err := regexp.Compile(hc.Matcher) + if err != nil { + return fmt.Errorf(".matcher: %q must be a valid regexp: %w", hc.Matcher, err) + } + hc.matcher = matcher + return nil +} + +type Config struct { + DeviceDomain string `yaml:"domain"` + Partitions []PartitionsConfig `yaml:"partitions"` + HostDevs []HostDevConfig `yaml:"hostdevs"` +} + +var ( + deviceDomainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) +) + +func (c *Config) validate() error { + var errs error + // Validate the device domain + if c.DeviceDomain == "" { + errs = errors.Join(errs, fmt.Errorf(".domain: must be set")) + } + if !deviceDomainRegex.MatchString(c.DeviceDomain) { + errs = errors.Join(errs, fmt.Errorf(".domain: %q must be a valid domain name", c.DeviceDomain)) + } + + // Validate partitions + for i := range c.Partitions { + if err := c.Partitions[i].validate(); err != nil { + errs = errors.Join(errs, fmt.Errorf(".partitions[%d]%w", i, err)) + } + } + + // Validate hostdevs + for i := range c.HostDevs { + if err := c.HostDevs[i].validate(); err != nil { + errs = errors.Join(errs, fmt.Errorf(".hostdevs[%d]%w", i, err)) + } + } + + return nil +} + +func parseConfig(reader io.Reader) (*Config, error) { + // Parse the config file + decoder := yaml.NewDecoder(reader) + config := &Config{} + if err := decoder.Decode(config); err != nil { + return nil, err + } + + // Validate the config + if err := config.validate(); err != nil { + return nil, err + } + + return config, nil +} \ No newline at end of file diff --git a/cmd/ydb-disk-manager/main.go b/cmd/ydb-disk-manager/main.go index 3f9469a..b50462c 100644 --- a/cmd/ydb-disk-manager/main.go +++ b/cmd/ydb-disk-manager/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "io/ioutil" "net/http" "os" "os/signal" @@ -45,7 +44,7 @@ func main() { // Setting up the disks to check klog.V(0).Infof("Reading configuration file %s", confFileName) - yamlFile, err := ioutil.ReadFile(confFileName) + yamlFile, err := os.ReadFile(confFileName) if err != nil { klog.Fatalf("Reading configuration file failed with: %s", err) } diff --git a/go.mod b/go.mod index 9ecf7e3..76e3233 100644 --- a/go.mod +++ b/go.mod @@ -1,25 +1,37 @@ module github.com/ydb-platform/ydb-disk-manager -go 1.18 +go 1.23.0 + +toolchain go1.24.1 require ( github.com/fsnotify/fsnotify v1.6.0 - golang.org/x/net v0.12.0 + github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b + github.com/kennygrant/sanitize v1.2.4 + github.com/onsi/ginkgo/v2 v2.23.0 + github.com/onsi/gomega v1.36.2 + golang.org/x/net v0.36.0 google.golang.org/grpc v1.56.2 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v2 v2.4.0 + gopkg.in/yaml.v3 v3.0.1 k8s.io/klog/v2 v2.70.1 k8s.io/kubelet v0.25.3 ) require ( - github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect + github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect + golang.org/x/tools v0.30.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230724170836-66ad5b6ff146 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 7931c82..59ac7ab 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,29 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= -github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1 h1:smvLGU3obGU5kny71BtE/ibR0wIXRUiRFDmSn0Nxz1E= +github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1/go.mod h1:fP/NdyhRVOv09PLRbVXrSqHhrfQypdZwgE2L4h2U5C8= +github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b h1:Pzf7tldbCVqwl3NnOnTamEWdh/rL41fsoYCn2HdHgRA= +github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b/go.mod h1:IBDUGq30U56w969YNPomhMbRje1GrhUsCh7tHdwgLXA= +github.com/kennygrant/sanitize v1.2.4 h1:gN25/otpP5vAsO2djbMhF/LQX6R7+O1TB4yv8NzpJ3o= +github.com/kennygrant/sanitize v1.2.4/go.mod h1:LGsjYYtgxbetdg5owWB2mpgUL6e2nfw2eObZ0u0qvak= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -20,10 +33,18 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/onsi/ginkgo/v2 v2.23.0 h1:FA1xjp8ieYDzlgS5ABTpdUDB7wtngggONc8a7ku2NqQ= +github.com/onsi/ginkgo/v2 v2.23.0/go.mod h1:zXTP6xIp3U8aVuXN8ENK9IXRaTjFnpVB9mGmaSRvxnM= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -35,8 +56,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA= +golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -44,16 +65,18 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -64,13 +87,15 @@ google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI= google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kubelet v0.25.3 h1:PjT3Xo0VL1BpRilBpZrRN8pSy6w5pGQ0YDQQeQWSHvQ= diff --git a/internal/mux/filter.go b/internal/mux/filter.go new file mode 100644 index 0000000..f0b1fb8 --- /dev/null +++ b/internal/mux/filter.go @@ -0,0 +1,79 @@ +package mux + +import ( + "reflect" +) + +type FilterFunc[T any] func(T) bool + +type MapperFunc[T, U any] func(T) U + +func Filter[T any](in In[T], filter FilterFunc[T]) In[T] { + out := make(chan T) + + go func() { + defer close(out) + + for v := range in { + if filter(v) { + out <- v + } + } + }() + + return out +} + +func Any[T any]() FilterFunc[T] { + return func(T) bool { + return true + } +} + +func Not[T any](filter FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + return !filter(v) + } +} + +func Or[T any](filters ...FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + for _, filter := range filters { + if filter(v) { + return true + } + } + return false + } +} + +func And[T any](filters ...FilterFunc[T]) FilterFunc[T] { + return func(v T) bool { + for _, filter := range filters { + if !filter(v) { + return false + } + } + return true + } +} + +func IsNil[T any](v T) bool { + rv := reflect.ValueOf(v) + kind := rv.Kind() + // Check if the type can be nil and is nil + switch kind { + case reflect.Invalid: + return true + case reflect.Interface: + return !rv.IsValid() + case reflect.Chan, reflect.Func, reflect.Map, reflect.Ptr, reflect.UnsafePointer, reflect.Slice: + return rv.IsNil() + } + + return false +} + +func IsNotNil[T any](v T) bool { + return !IsNil(v) +} diff --git a/internal/mux/filter_spec_test.go b/internal/mux/filter_spec_test.go new file mode 100644 index 0000000..e37711f --- /dev/null +++ b/internal/mux/filter_spec_test.go @@ -0,0 +1,177 @@ +package mux_test + +import ( + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Filter", func() { + It("should filter elements based on the provided condition", func() { + input := make(chan int) + go func() { + defer close(input) + for i := 0; i < 10; i++ { + input <- i + } + }() + + evenFilter := func(n int) bool { + return n%2 == 0 + } + + result := mux.Filter(input, evenFilter) + + var filtered []int + for val := range result { + filtered = append(filtered, val) + } + + Expect(filtered).To(Equal([]int{0, 2, 4, 6, 8})) + }) + + It("should return empty channel when filter rejects all", func() { + input := make(chan string) + go func() { + defer close(input) + input <- "hello" + input <- "world" + }() + + noneFilter := func(s string) bool { + return false + } + + result := mux.Filter(input, noneFilter) + + var filtered []string + for val := range result { + filtered = append(filtered, val) + } + + Expect(filtered).To(BeEmpty()) + }) +}) + +var _ = Describe("Not", func() { + It("should invert the filter condition", func() { + isEven := func(n int) bool { + return n%2 == 0 + } + + isOdd := mux.Not(isEven) + + Expect(isEven(2)).To(BeTrue()) + Expect(isOdd(2)).To(BeFalse()) + + Expect(isEven(3)).To(BeFalse()) + Expect(isOdd(3)).To(BeTrue()) + }) +}) + +var _ = Describe("Or", func() { + It("should return true if any filter returns true", func() { + isEven := func(n int) bool { return n%2 == 0 } + isDivisibleBy3 := func(n int) bool { return n%3 == 0 } + + combined := mux.Or(isEven, isDivisibleBy3) + + Expect(combined(1)).To(BeFalse()) + Expect(combined(2)).To(BeTrue()) // Even + Expect(combined(3)).To(BeTrue()) // Divisible by 3 + Expect(combined(4)).To(BeTrue()) // Even + Expect(combined(5)).To(BeFalse()) + Expect(combined(6)).To(BeTrue()) // Both even and divisible by 3 + }) + + It("should return false when no filters provided", func() { + combined := mux.Or[int]() + Expect(combined(42)).To(BeFalse()) + }) +}) + +var _ = Describe("And", func() { + It("should return true only if all filters return true", func() { + isEven := func(n int) bool { return n%2 == 0 } + isDivisibleBy3 := func(n int) bool { return n%3 == 0 } + + combined := mux.And(isEven, isDivisibleBy3) + + Expect(combined(1)).To(BeFalse()) + Expect(combined(2)).To(BeFalse()) // Only even + Expect(combined(3)).To(BeFalse()) // Only divisible by 3 + Expect(combined(6)).To(BeTrue()) // Both even and divisible by 3 + Expect(combined(12)).To(BeTrue()) // Both even and divisible by 3 + }) + + It("should return true when no filters provided", func() { + combined := mux.And[int]() + Expect(combined(42)).To(BeTrue()) + }) +}) + +var _ = Describe("IsNil", func() { + It("should return true for nil values", func() { + var nilPtr *int + var nilSlice []int + var nilMap map[string]int + var nilChan chan int + var nilFunc func() + var nilInterface interface{} + + Expect(mux.IsNil(nilPtr)).To(BeTrue()) + Expect(mux.IsNil(nilSlice)).To(BeTrue()) + Expect(mux.IsNil(nilMap)).To(BeTrue()) + Expect(mux.IsNil(nilChan)).To(BeTrue()) + Expect(mux.IsNil(nilFunc)).To(BeTrue()) + Expect(mux.IsNil(nilInterface)).To(BeTrue()) + }) + + It("should return false for non-nil values", func() { + ptr := new(int) + slice := []int{} + m := make(map[string]int) + ch := make(chan int) + fn := func() {} + var iface interface{} = 42 + + Expect(mux.IsNil(ptr)).To(BeFalse()) + Expect(mux.IsNil(slice)).To(BeFalse()) + Expect(mux.IsNil(m)).To(BeFalse()) + Expect(mux.IsNil(ch)).To(BeFalse()) + Expect(mux.IsNil(fn)).To(BeFalse()) + Expect(mux.IsNil(iface)).To(BeFalse()) + }) + + It("should return false for basic types", func() { + Expect(mux.IsNil(42)).To(BeFalse()) + Expect(mux.IsNil("hello")).To(BeFalse()) + Expect(mux.IsNil(true)).To(BeFalse()) + Expect(mux.IsNil(3.14)).To(BeFalse()) + }) +}) + +var _ = Describe("IsNotNil", func() { + It("should return false for nil values", func() { + var nilPtr *int + var nilSlice []int + var nilInterface interface{} + + Expect(mux.IsNotNil(nilPtr)).To(BeFalse()) + Expect(mux.IsNotNil(nilSlice)).To(BeFalse()) + Expect(mux.IsNotNil(nilInterface)).To(BeFalse()) + }) + + It("should return true for non-nil values", func() { + ptr := new(int) + slice := []int{} + var iface interface{} = 42 + + Expect(mux.IsNotNil(ptr)).To(BeTrue()) + Expect(mux.IsNotNil(slice)).To(BeTrue()) + Expect(mux.IsNotNil(iface)).To(BeTrue()) + Expect(mux.IsNotNil(42)).To(BeTrue()) + Expect(mux.IsNotNil("hello")).To(BeTrue()) + }) +}) \ No newline at end of file diff --git a/internal/mux/mux.go b/internal/mux/mux.go new file mode 100644 index 0000000..987ede2 --- /dev/null +++ b/internal/mux/mux.go @@ -0,0 +1,259 @@ +package mux + +import ( + "fmt" + "time" +) + +type In[T any] <-chan T +type Out[T any] chan<- T + +type Logger interface { + Info(format string, args ...interface{}) +} + +type AwaitReply[T, U any] struct { + value T + reply chan U +} + +func (ar AwaitReply[T, U]) Value() T { + return ar.value +} + +func (ar AwaitReply[T, U]) Reply(value U) { + ar.reply <- value + close(ar.reply) +} + +func (ar AwaitReply[T, U]) Await() U { + return <-ar.reply +} + +type AwaitDone[T any] struct { + AwaitReply[T, struct{}] +} + +func (ad AwaitDone[T]) Done() { + ad.Reply(struct{}{}) +} + +func (ad AwaitDone[T]) Wait() { + ad.Await() +} + +func NewAwaitReply[T, U any](value T) AwaitReply[T, U] { + return AwaitReply[T, U]{ + value: value, + reply: make(chan U), + } +} + +func NewAwaitDone[T any](value T) AwaitDone[T] { + return AwaitDone[T]{ + NewAwaitReply[T, struct{}](value), + } +} + +type Sink[T any] interface { + Submit(T) error + Close() +} + +type thenSink[U, T any] struct { + sink Sink[T] + contramap func(U) T +} + +func (c *thenSink[U, T]) Submit(v U) error { + return c.sink.Submit(c.contramap(v)) +} + +func (c *thenSink[U, T]) Close() { + c.sink.Close() +} + +func ThenSink[U, T any](sink Sink[T], f func(U) T) Sink[U] { + return &thenSink[U, T]{sink, f} +} + +type filterSink[T any] struct { + sink Sink[T] + f func(T) bool +} + +func (c *filterSink[T]) Submit(v T) error { + if c.f(v) { + return c.sink.Submit(v) + } + return nil +} + +func (c *filterSink[T]) Close() { + c.sink.Close() +} + +func FilterSink[T any](sink Sink[T], f func(T) bool) Sink[T] { + return &filterSink[T]{sink, f} +} + +type chanSink[T any] struct { + ch chan<- T +} + +func (c *chanSink[T]) Submit(v T) error { + c.ch <- v + return nil +} + +func (c *chanSink[T]) Close() { + close(c.ch) +} + +func SinkFromChan[T any](ch chan<- T) Sink[T] { + return &chanSink[T]{ch} +} + +type Source[T any] interface { + Subscribe(Sink[T]) CancelFunc +} + +type Mux[T any] struct { + input chan T + register chan AwaitDone[Sink[T]] + unregister chan AwaitDone[Sink[T]] + outputs map[Sink[T]]bool + + submitTimeout time.Duration + inBufSize int + logger Logger +} + +type Option[T any] interface { + apply(*Mux[T]) +} + +type buffered[T any] struct { + Size int +} + +func (b *buffered[T]) apply(m *Mux[T]) { + m.inBufSize = b.Size +} + +func Buffered[T any](size int) Option[T] { + return &buffered[T]{size} +} + +type withLogger[T any] struct { + Logger Logger +} + +func (l *withLogger[T]) apply(m *Mux[T]) { + m.logger = l.Logger +} + +func WithLogger[T any](logger Logger) Option[T] { + return &withLogger[T]{logger} +} + +func Make[T any](opts ...Option[T]) *Mux[T] { + mux := &Mux[T]{ + submitTimeout: 1 * time.Second, + } + + for _, opt := range opts { + if opt == nil { + continue + } + opt.apply(mux) + } + + mux.input = make(chan T, mux.inBufSize) + mux.register = make(chan AwaitDone[Sink[T]]) + mux.unregister = make(chan AwaitDone[Sink[T]]) + mux.outputs = make(map[Sink[T]]bool) + + go mux.run() + + return mux +} + +func (c *Mux[T]) run() { + defer func() { + for sub := range c.outputs { + delete(c.outputs, sub) + sub.Close() + } + }() + defer close(c.input) + + for { + select { + case v := <-c.input: + for out := range c.outputs { + if err := out.Submit(v); err != nil { + c.error("error submitting value %v: %v", v, err) + } + } + case ar, ok := <-c.register: + if !ok { + return + } + c.outputs[ar.value] = true + ar.reply <- struct{}{} + case ar := <-c.unregister: + sub := ar.value + delete(c.outputs, sub) + sub.Close() + ar.reply <- struct{}{} + } + } +} + +func (m *Mux[T]) error(format string, args ...any) error { + if m.logger != nil { + m.logger.Info(format, args...) + } + return fmt.Errorf(format, args...) +} + +func (c *Mux[T]) Close() { + close(c.register) +} + + +func (c *Mux[T]) Submit(v T) error { + select { + case c.input <- v: + return nil + case <-time.After(c.submitTimeout): + return c.error("timed out submitting value %v after %s", v, c.submitTimeout) + } +} + +type CancelFunc func() + +func (c *Mux[T]) Subscribe(sink Sink[T]) CancelFunc { + ar := NewAwaitDone(sink) + c.register <- ar + ar.Wait() + + return func() { + ar := NewAwaitDone(sink) + c.unregister <- ar + ar.Wait() + } +} + +func ChainCancelFunc(cf1, cf2 func(), cfs ...func()) CancelFunc { + return func() { + cf1() + cf2() + for _, cf := range cfs { + if cf != nil { + cf() + } + } + } +} diff --git a/internal/mux/mux_suite_test.go b/internal/mux/mux_suite_test.go new file mode 100644 index 0000000..9f862cd --- /dev/null +++ b/internal/mux/mux_suite_test.go @@ -0,0 +1,200 @@ +package mux_test + +import ( + "sync" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Mux", func() { + Context("creation", func() { + It("should create a new Mux without buffer", func() { + mux := mux.Make[string]() + Expect(mux).NotTo(BeNil()) + mux.Close() + }) + + It("should create a new Mux with buffer", func() { + mux := mux.Make(mux.Buffered[string](2)) + Expect(mux).NotTo(BeNil()) + mux.Close() + }) + }) + + Context("registration", func() { + var m *mux.Mux[string] + + BeforeEach(func() { + m = mux.Make[string]() + }) + + AfterEach(func() { + m.Close() + }) + + It("should register a new output channel", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + Expect(cancel).NotTo(BeNil()) + cancel() + }) + + It("should register a buffered output channel", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + Expect(cancel).NotTo(BeNil()) + cancel() + }) + + It("should support multiple registrations", func() { + in1 := make(chan string) + in2 := make(chan string) + in3 := make(chan string) + cancel1 := m.Subscribe(mux.SinkFromChan(in1)) + cancel2 := m.Subscribe(mux.SinkFromChan(in2)) + cancel3 := m.Subscribe(mux.SinkFromChan(in3)) + + Expect(in1).NotTo(BeNil()) + Expect(in2).NotTo(BeNil()) + Expect(in3).NotTo(BeNil()) + + cancel1() + cancel2() + cancel3() + }) + + It("should allow unregistration using cancel function", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + Expect(in).NotTo(BeNil()) + + // Unregister + cancel() + + // Submit a value + m.Submit("test") + + // Verify channel doesn't receive the value + Consistently(in).ShouldNot(Receive()) + }) + }) + + Context("submission", func() { + var m *mux.Mux[string] + + BeforeEach(func() { + m = mux.Make(mux.WithLogger[string](GinkgoLogr)) + }) + + AfterEach(func() { + m.Close() + }) + + It("should distribute values to all registered outputs", func() { + in1 := make(chan string) + in2 := make(chan string) + cancel1 := m.Subscribe(mux.SinkFromChan(in1)) + cancel2 := m.Subscribe(mux.SinkFromChan(in2)) + defer cancel1() + defer cancel2() + + go func() { + m.Submit("hello") + }() + + Eventually(in1).Should(Receive(Equal("hello"))) + Eventually(in2).Should(Receive(Equal("hello"))) + }) + + It("should handle multiple submissions", func() { + in := make(chan string) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + go func() { + m.Submit("one") + m.Submit("two") + m.Submit("three") + }() + + Eventually(in).Should(Receive(Equal("one"))) + Eventually(in).Should(Receive(Equal("two"))) + Eventually(in).Should(Receive(Equal("three"))) + + }) + }) + + Context("buffering", func() { + It("should respect buffer size in RegisterWithBuffer", func() { + m := mux.Make[int]() + defer m.Close() + + in := make(chan int, 2) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + // These should succeed without blocking + m.Submit(1) + m.Submit(2) + + // Verify values + Eventually(in).Should(Receive(Equal(1))) + Eventually(in).Should(Receive(Equal(2))) + }) + + It("should respect buffer size in MakeWithBuffer", func() { + // Create mux with buffer of 2 + m := mux.Make(mux.Buffered[int](2)) + defer m.Close() + + in := make(chan int) + cancel := m.Subscribe(mux.SinkFromChan(in)) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + // This goroutine will block after 2 submissions + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + m.Submit(i) + } + }() + + // Receive values + Eventually(in).Should(Receive(Equal(0))) + Eventually(in).Should(Receive(Equal(1))) + Eventually(in).Should(Receive(Equal(2))) + + wg.Wait() + }) + }) + + Context("closing", func() { + It("should properly clean up when closed", func() { + m := mux.Make[string]() + in1 := make(chan string) + in2 := make(chan string) + m.Subscribe(mux.SinkFromChan(in1)) + m.Subscribe(mux.SinkFromChan(in2)) + + m.Close() + + // Channels should eventually close + Eventually(func() bool { + _, ok := <-in1 + return ok + }).Should(BeFalse()) + + Eventually(func() bool { + _, ok := <-in2 + return ok + }).Should(BeFalse()) + }) + }) +}) \ No newline at end of file diff --git a/internal/mux/mux_test.go b/internal/mux/mux_test.go new file mode 100644 index 0000000..f8273d4 --- /dev/null +++ b/internal/mux/mux_test.go @@ -0,0 +1,13 @@ +package mux_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMux(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Mux Suite") +} \ No newline at end of file diff --git a/internal/plugin/partition.go b/internal/plugin/partition.go new file mode 100644 index 0000000..4e74479 --- /dev/null +++ b/internal/plugin/partition.go @@ -0,0 +1,118 @@ +package plugin + +import ( + "context" + "path" + "regexp" + "strings" + + "github.com/ydb-platform/ydb-disk-manager/internal/udev" + + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Partition represents a partition of a disk as a Resource. +type partition struct { + domain string + label string + dev udev.Device +} + +func (p *partition) Id() Id { + return Id(p.label) +} + +func (p *partition) Health() Health { + return Healthy{} //TODO: health check? +} + +func (p *partition) TopologyHints() *pluginapi.TopologyInfo { + numaNode := p.dev.NumaNode() + if numaNode < 0 { + return nil + } + + return &pluginapi.TopologyInfo{ + Nodes: []*pluginapi.NUMANode{ + { + ID: int64(numaNode), + }, + }, + } +} + +func sanitizeEnv(s string) string { + replacer := strings.NewReplacer(".", "_", "-", "_") + return strings.ToUpper(replacer.Replace(s)) +} + +func (p *partition) envName(env string) string { + return sanitizeEnv(p.domain) + "_PART_" + sanitizeEnv(p.label) + "_" + sanitizeEnv(env) +} + +func (p *partition) Allocate(context.Context) (*pluginapi.ContainerAllocateResponse, error) { + response := &pluginapi.ContainerAllocateResponse{} + + containerPath := path.Join("/dev", "allocated", p.domain, "part", p.label) + + response.Devices = append(response.Devices, &pluginapi.DeviceSpec{ + HostPath: p.dev.DevNode(), + ContainerPath: containerPath, + Permissions: "rw", + }) + + response.Envs = make(map[string]string) + response.Envs[p.envName("PATH")] = containerPath + response.Envs[p.envName("DISK_ID")] = + p.dev.SystemAttributeLookup(udev.SysAttrWWID) + response.Envs[p.envName("DISK_MODEL")] = + p.dev.SystemAttributeLookup(udev.SysAttrModel) + + serial := p.dev.SystemAttributeLookup(udev.SysAttrSerial) + if serial == "" { + serial = p.dev.PropertyLookup(udev.PropertyShortSerial) + } + if serial != "" { + response.Envs[p.envName("DISK_SERIAL")] = serial + } + + klog.Info("allocated partition: ", p.label) + klog.V(2).Infof("%+v", response) + + return response, nil +} + +func PartitionLabelMatcher(domain string, matcher *regexp.Regexp) FromDevice[*partition] { + return func(dev udev.Device) **partition { + var part *partition + if dev.Subsystem() != udev.BlockSubsystem { + return nil + } + if dev.DevType() != udev.DeviceTypePart { + return nil + } + + partlabel, found := dev.Properties()[udev.PropertyPartName] + if !found { + return nil + } + + matches := matcher.FindStringSubmatch(partlabel) + if len(matches) == 0 { + return nil + } + + if len(matches) > 1 { + partlabel = matches[1] + } + + part = &partition{ + label: partlabel, + domain: domain, + dev: dev, + } + + return &part + } +} \ No newline at end of file diff --git a/internal/plugin/plugin.go b/internal/plugin/plugin.go new file mode 100644 index 0000000..21f7bcf --- /dev/null +++ b/internal/plugin/plugin.go @@ -0,0 +1,191 @@ +package plugin + +import ( + "context" + "fmt" + "net" + "os" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/kennygrant/sanitize" + + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +type plugin struct { + resource Resource + cancel context.CancelFunc +} + +func newPlugin(resource Resource, ctx context.Context, wg *sync.WaitGroup) (*plugin, error) { + ctx, cancel := context.WithCancel(ctx) + plugin := &plugin{ + resource: resource, + cancel: cancel, + } + + socketPath := pluginapi.DevicePluginPath + plugin.socketPath() + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + klog.Errorf("%q: failed to remove socket file %q: %v", plugin.resource.Name(), socketPath, err) + return nil, fmt.Errorf("failed to remove socket file %s: %w", socketPath, err) + } + + server := grpc.NewServer() + pluginapi.RegisterDevicePluginServer(server, plugin) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + klog.Errorf("%q: failed to listen on socket %q: %v", plugin.resource.Name(), socketPath, err) + return nil, fmt.Errorf("failed to listen on socket %s: %w", socketPath, err) + } + + go server.Serve(listener) + + wg.Add(1) + go func(ctx context.Context, wg *sync.WaitGroup, l net.Listener, s *grpc.Server) { + defer wg.Done() + defer l.Close() + defer s.Stop() + klog.Infof("Serving device plugin %q on socket %q", plugin.resource.Name(), socketPath) + <-ctx.Done() + }(ctx, wg, listener, server) + + return plugin, nil +} + +func (p *plugin) stop() { + if p.cancel != nil { + klog.Infof("%q: Stopping device plugin", p.resource.Name()) + p.cancel() + } +} + +func (p *plugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{}, nil +} + +func (p *plugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +func (p *plugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} + +func (p *plugin) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) (err error) { + defer klog.Infof("%q: closing ListAndWatch connection, err = %v", p.resource.Name(), err) + + for instances := range p.resource.ListAndWatch(stream.Context()) { + devices := make([]*pluginapi.Device, len(instances)) + for i, instance := range instances { + devices[i] = &pluginapi.Device{ + ID: string(instance.Id()), + Health: instance.Health().String(), + Topology: instance.TopologyHints(), + } + } + klog.V(2).Infof("%q: sending devices to ListAndWatch stream: %+v", p.resource.Name(), devices) + if err := stream.Send(&pluginapi.ListAndWatchResponse{Devices: devices}); err != nil { + klog.Errorf("%q: failed to send devices to ListAndWatch stream: %v", p.resource.Name(), err) + return err + } + } + return nil +} + +func mergeResponses(responses ...*pluginapi.ContainerAllocateResponse) *pluginapi.ContainerAllocateResponse { + response := &pluginapi.ContainerAllocateResponse{} + for _, r := range responses { + if r == nil { + continue + } + response.Devices = append(response.Devices, r.Devices...) + response.Mounts = append(response.Mounts, r.Mounts...) + if len(r.Envs) > 0 { + if response.Envs == nil { + response.Envs = make(map[string]string) + } + for key, value := range r.Envs { + response.Envs[key] = value + } + } + if len(r.Annotations) > 0 { + if response.Annotations == nil { + response.Annotations = make(map[string]string) + } + for key, value := range r.Annotations { + response.Annotations[key] = value + } + } + } + return response +} + +func (p *plugin) Allocate(ctx context.Context, request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + klog.Infof("%q: Received allocation request", p.resource.Name()) + klog.V(2).Infof("%+v", request) + + instances := p.resource.Instances() + response := &pluginapi.AllocateResponse{} + for _, containerRequest := range request.ContainerRequests { + containerResponse := &pluginapi.ContainerAllocateResponse{} + klog.V(2).Infof("%q: Processing container request: %+v", p.resource.Name(), containerRequest) + for _, id := range containerRequest.DevicesIDs { + instance, found := instances[Id(id)] + if !found { + klog.Errorf("%q: device with ID %q not found", p.resource.Name(), id) + return nil, status.Errorf(codes.NotFound, "device with ID %q not found", id) + } + allocateResponse, err := instance.Allocate(ctx) + if err != nil { + klog.Errorf("%q: failed to allocate device with ID %q: %v", p.resource.Name(), id, err) + return nil, status.Errorf(codes.Internal, "failed to allocate device with ID %q: %s", id, err.Error()) + } + containerResponse = mergeResponses(containerResponse, allocateResponse) + } + response.ContainerResponses = append(response.ContainerResponses, containerResponse) + } + + klog.V(2).Infof("%q: Responding to allocation request with: %+v", p.resource.Name(), response) + return response, nil +} + +func (p *plugin) socketPath() string { + return sanitize.BaseName(p.resource.Name()) + ".sock" +} + +func (p *plugin) probe(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 1 * time.Second) + defer cancel() + + pluginAddr := "unix://" + pluginapi.DevicePluginPath + p.socketPath() + + conn, err := grpc.DialContext( + ctx, + pluginAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + klog.Errorf("%q: failed to dial %q: %v", p.resource.Name(), pluginAddr, err) + return fmt.Errorf("failed to dial %q: %w", pluginAddr, err) + } + defer conn.Close() + + client := pluginapi.NewDevicePluginClient(conn) + _, err = client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + + if err != nil { + klog.Errorf("%q: failed to get device plugin options: %v", p.resource.Name(), err) + return fmt.Errorf("plugin[%q]: failed to get device plugin options: %w", p.resource.Name(), err) + } + + return nil +} \ No newline at end of file diff --git a/internal/plugin/registry.go b/internal/plugin/registry.go new file mode 100644 index 0000000..47e4ef5 --- /dev/null +++ b/internal/plugin/registry.go @@ -0,0 +1,166 @@ +package plugin + +import ( + "context" + "fmt" + "net/http" + "path" + "sync" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/fsnotify/fsnotify" + "k8s.io/klog/v2" + + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +// Registry is a lifecycle manager for plugins. +// It is responsible for (re-)registering plugins with the kubelet. +type Registry struct { + plugins sync.Map + ctx context.Context + wg *sync.WaitGroup + watcher *fsnotify.Watcher +} + +const kubeletAddr = "unix://" + pluginapi.KubeletSocket + +// register advertises plugin socket to the kubelet. +func (r *Registry) register(plugin *plugin) error { + conn, err := grpc.Dial(kubeletAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + if err != nil { + klog.Errorf("failed to dial %q: %v", kubeletAddr, err) + return err + } + defer func() { + err := conn.Close() + if err != nil { + klog.Errorf("failed to close connection: %v", err) + } + }() + + client := pluginapi.NewRegistrationClient(conn) + + _, err = client.Register(context.Background(), &pluginapi.RegisterRequest{ + ResourceName: plugin.resource.Name(), + Version: pluginapi.Version, + Endpoint: plugin.socketPath(), + Options: &pluginapi.DevicePluginOptions{}, + }) + if err != nil { + klog.Infof("failed to register with kubelet: %v", err) + return fmt.Errorf("failed to register with kubelet: %w", err) + } + klog.Infof("registered device %s with kubelet", plugin.resource.Name()) + return nil +} + +// hup registers all plugins with the freshly kubelet. +// Newely started kubelet removes all socket files, so we need to re-register +// all plugins. See https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/#handling-kubelet-restarts +func (r *Registry) hup() { + r.plugins.Range(func(_, p interface{}) bool { + plugin := p.(*plugin) + plugin.stop() + plugin, err := newPlugin(plugin.resource, r.ctx, r.wg) + if err != nil { + klog.Errorf("failed to create plugin for %s: %v", plugin.resource.Name(), err) + return true + } + if err := r.register(plugin); err != nil { + klog.Errorf("failed to register %s: %v", plugin.resource.Name(), err) + } + return true + }) +} + +// NewRegistry creates a new registry and starts goroutine +// that watches for kubelet restarts. Whenever kubelet restarts, +// the registry will re-register all plugins. +// `ctx`: context that controls the lifecycle of the registry. +// `wg`: wait group that would be waited on before the registry is stopped. +func NewRegistry(ctx context.Context, wg *sync.WaitGroup) (*Registry, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + klog.Errorf("failed to create fsnotifier watcher: %v", err) + return nil, fmt.Errorf("failed to create fsnotifier watcher: %w", err) + } + + registry := &Registry{ + plugins: sync.Map{}, + wg: wg, + ctx: ctx, + watcher: watcher, + } + + watcher.Add(path.Join(pluginapi.KubeletSocket)) + + registry.wg.Add(1) + go func(r *Registry) { + defer r.wg.Done() + defer r.watcher.Close() + + for { + select { + case event := <-r.watcher.Events: + if event.Op & fsnotify.Create != 0 { + r.hup() + } + case <-r.ctx.Done(): + // Parent context is done, exit the goroutine. + return + } + } + }(registry) + + return registry, nil +} + +func (r *Registry) Healthz(resp http.ResponseWriter, req *http.Request) { + unhealthy := make([]string, 0) + r.plugins.Range(func(_, p interface{}) bool { + plugin := p.(*plugin) + if err := plugin.probe(req.Context()); err != nil { + klog.Errorf("probe failed for %s: %v", plugin.resource.Name(), err) + unhealthy = append(unhealthy, plugin.resource.Name()) + } else { + klog.V(2).Infof("probe succeeded for %s", plugin.resource.Name()) + } + return true + }) + + if len(unhealthy) == 0 { + resp.WriteHeader(http.StatusOK) + } else { + resp.WriteHeader(http.StatusInternalServerError) + for _, name := range unhealthy { + fmt.Fprintf(resp, "probe failed for device plugin %q\n", name) + } + } +} + +// Add creates a new plugin for given Resource and registers it with the +// kubelet. Attempts to register resource with the same name twice will result +// in an error. +func (r *Registry) Add(resource Resource) error { + plugin, err := newPlugin(resource, r.ctx, r.wg) + if err != nil { + klog.Errorf("failed to create plugin for resource %q Cause: %v", resource.Name(), err) + return err + } + + _, loaded := r.plugins.LoadOrStore(resource.Name(), plugin) + if loaded { + klog.Errorf("resource with name %q already exists", resource.Name()) + return fmt.Errorf("resource with name %q already exists", resource.Name()) + } + if err := r.register(plugin); err != nil { + klog.Errorf("failed to register resource %q Cause: %v", resource.Name(), err) + return err + } + return nil +} + + diff --git a/internal/plugin/resource.go b/internal/plugin/resource.go new file mode 100644 index 0000000..3a1875f --- /dev/null +++ b/internal/plugin/resource.go @@ -0,0 +1,129 @@ +package plugin + +import ( + "context" + + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +type Health interface { + String() string + sealed() +} + +type Healthy struct{} + +func (Healthy) sealed() {} + +func (Healthy) String() string { + return "Healthy" +} + +type Unhealthy struct{} + +func (Unhealthy) sealed() {} + +func (Unhealthy) String() string { + return "Unhealthy" +} + +type Id string + +type Instance interface { + Id() Id + Health() Health + TopologyHints() *pluginapi.TopologyInfo + Allocate(context.Context) (*pluginapi.ContainerAllocateResponse, error) +} + +type healthInstance struct { + Instance + health Health +} + +func (h *healthInstance) Id() Id { + return h.Instance.Id() +} + +func (h *healthInstance) Health() Health { + if _, ok := h.health.(Healthy); ok { + return h.Instance.Health() + } + return h.health +} + +func (h *healthInstance) TopologyHints() *pluginapi.TopologyInfo { + return h.Instance.TopologyHints() +} + +func (h *healthInstance) Allocate(ctx context.Context) (*pluginapi.ContainerAllocateResponse, error) { + return h.Instance.Allocate(ctx) +} + +type FromDevice[T Instance] func(dev udev.Device) *T + +func (f FromDevice[T]) AsFilter() mux.FilterFunc[udev.Device] { + return func(dev udev.Device) bool { + return f(dev) != nil + } +} + +type HealthEvent struct { + Instance + Health +} + +type Resource interface { + mux.Sink[HealthEvent] + Name() string + Instances() map[Id]Instance + ListAndWatch(context.Context) <-chan []Instance +} + +type singletonResource struct { + resourcePrefix string + instance Instance + healthCh chan HealthEvent +} + +func (r *singletonResource) Name() string { + return r.resourcePrefix + string(r.instance.Id()) +} + +func (r *singletonResource) Instances() map[Id]Instance { + return map[Id]Instance{ + r.instance.Id(): r.instance, + } +} + +func (r *singletonResource) Submit(ev HealthEvent) error { + r.healthCh <- ev + return nil +} + +func (r *singletonResource) run(instanceCh chan<- []Instance) { + defer close(instanceCh) + instanceCh <- []Instance{r.instance} + for ev := range r.healthCh { + r.instance = &healthInstance{ + Instance: r.instance, + health: ev.Health, + } + instanceCh <- []Instance{r.instance} + } +} + +func (r *singletonResource) ListAndWatch(ctx context.Context) <-chan []Instance { + ch := make(chan []Instance, 1) + + go r.run(ch) + + return ch +} + +func (r *singletonResource) Close() { + close(r.healthCh) +} \ No newline at end of file diff --git a/internal/plugin/scatter.go b/internal/plugin/scatter.go new file mode 100644 index 0000000..85e4fa3 --- /dev/null +++ b/internal/plugin/scatter.go @@ -0,0 +1,95 @@ +package plugin + +import ( + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" + "github.com/ydb-platform/ydb-disk-manager/internal/udev" +) + +type Scatter[T Instance] struct { + mapper FromDevice[T] + registry *Registry + resourceDomain string + resourcePrefix string + routes map[Id]Resource +} + +func NewScatter[T Instance](d udev.Discovery, registry *Registry, resourceDomain, resourcePrefix string, mapper FromDevice[T]) mux.CancelFunc { + scatter := &Scatter[T]{ + mapper: mapper, + registry: registry, + resourceDomain: resourceDomain, + resourcePrefix: resourcePrefix, + routes: make(map[Id]Resource), + } + ch := make(chan udev.Event) + + go scatter.run(ch) + + return d.Subscribe(mux.SinkFromChan(ch)) +} + +func (s *Scatter[T]) add(id Id, resource Resource) { + err := s.registry.Add(resource) + if err != nil { + klog.Errorf("failed to add resource %s: %v", resource.Name, err) + return + } + s.routes[id] = resource +} + +func (s *Scatter[T]) run(evCh <-chan udev.Event) { + for ev := range evCh { + switch ev := ev.(type) { + case udev.Init: + for _, dev := range ev.Devices { + maybeInstance := s.mapper(dev) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Init: Matched device: %s", dev.Debug()) + instance := *maybeInstance + id := instance.Id() + if _, ok := s.routes[id]; ok { + continue + } + resource := &singletonResource{ + resourcePrefix: s.resourceDomain + "/" + s.resourcePrefix, + instance: instance, + healthCh: make(chan HealthEvent), + } + s.add(id, resource) + } + case udev.Added: + maybeInstance := s.mapper(ev.Device) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Added: Matched device: %s", ev.Device.Debug()) + instance := *maybeInstance + id := instance.Id() + if resource, ok := s.routes[id]; ok { + resource.Submit(HealthEvent{instance, Healthy{}}) + } else { + resource := &singletonResource{ + resourcePrefix: s.resourcePrefix, + instance: instance, + healthCh: make(chan HealthEvent), + } + s.add(id, resource) + } + case udev.Removed: + maybeInstance := s.mapper(ev.Device) + if maybeInstance == nil { + continue + } + klog.V(5).Info("Removed: Matched device: %s", ev.Device.Debug()) + instance := *maybeInstance + id := instance.Id() + if resource, ok := s.routes[id]; ok { + resource.Submit(HealthEvent{instance, Unhealthy{}}) + } + } + } +} diff --git a/internal/udev/discovery.go b/internal/udev/discovery.go new file mode 100644 index 0000000..6403778 --- /dev/null +++ b/internal/udev/discovery.go @@ -0,0 +1,61 @@ +package udev + +import ( + "github.com/ydb-platform/ydb-disk-manager/internal/mux" +) + +type Id string + +type Device interface { + Id() Id + Parent() Device + Subsystem() string + DevType() string + DevNode() string + DevLinks() []string + Properties() map[string]string + Property(string) string + PropertyLookup(string) string + SystemAttributes() map[string]string + SystemAttributeKeys() []string + SystemAttribute(string) string + SystemAttributeLookup(string) string + Tags() []string + NumaNode() int + + Debug() string +} + +type Event interface { + eventSealed() +} + +type Init struct { + Devices []Device +} + +func (Init) eventSealed() {} + +type Added struct { + Device +} + +func (Added) eventSealed() {} + +type Removed struct { + Device +} + +func (Removed) eventSealed() {} + +type Slice interface { + mux.Source[[]Device] +} + +type Discovery interface { + mux.Source[Event] + DeviceById(Id) Device + State(mux.FilterFunc[Device]) map[Id]Device + Slice(mux.FilterFunc[Device]) Slice + Close() +} diff --git a/internal/udev/udev.go b/internal/udev/udev.go new file mode 100644 index 0000000..7c73a7d --- /dev/null +++ b/internal/udev/udev.go @@ -0,0 +1,386 @@ +package udev + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + libudev "github.com/jochenvg/go-udev" + + "k8s.io/klog/v2" + + "github.com/ydb-platform/ydb-disk-manager/internal/mux" +) + +const ( + BlockSubsystem = "block" + + DeviceTypeKey = "DEVTYPE" + DeviceTypePart = "partition" + + PropertyPartName = "PARTNAME" + PropertyModel = "ID_MODEL" + PropertyShortSerial = "ID_SERIAL_SHORT" + + SysAttrWWID = "wwid" + SysAttrModel = "model" + SysAttrSerial = "serial" + + ActionAdd = "add" + ActionRemove = "remove" + ActionOffline = "offline" + ActionOnline = "online" +) + +type monitorRequest interface { + requestSealed() +} + +type stateRequest struct { + filter mux.FilterFunc[Device] +} + +func (r stateRequest) requestSealed() {} + +type stopRequest struct{} + +func (r stopRequest) requestSealed() {} + +type newSub struct { + sink mux.Sink[Event] +} + +func (n newSub) requestSealed() {} + +type generic struct { + udev Discovery + + dev *libudev.Device + parent Device +} + +func (g *generic) Id() Id { + return Id(g.dev.Syspath()) +} + +func (g *generic) Parent() Device { + if g.parent == nil { + g.parent = g.udev.DeviceById(Id(g.dev.Syspath())) + } + return g.parent +} + +func (g *generic) Subsystem() string { + return g.dev.Subsystem() +} + +func (g *generic) DevType() string { + return g.dev.Devtype() +} + +func (g *generic) DevNode() string { + return g.dev.Devnode() +} + +func (g *generic) DevLinks() []string { + devlinks := g.dev.Devlinks() + res := make([]string, 0, len(devlinks)) + for link := range devlinks { + res = append(res, link) + } + return res +} + +func (g *generic) Properties() map[string]string { + return g.dev.Properties() +} + +func (g *generic) Property(key string) string { + return strings.TrimSpace(g.dev.PropertyValue(key)) +} + +func (g *generic) PropertyLookup(key string) string { + value := g.Property(key) + if value == "" && g.parent != nil { + return g.parent.PropertyLookup(key) + } + + return value +} + +func (g *generic) SystemAttributeKeys() []string { + sysattrs := g.dev.Sysattrs() + res := make([]string, 0, len(sysattrs)) + for attr := range sysattrs { + res = append(res, attr) + } + return res +} + +func (g *generic) SystemAttribute(key string) string { + return strings.TrimSpace(g.dev.SysattrValue(key)) +} + +func (g *generic) SystemAttributes() map[string]string { + res := make(map[string]string) + for attr := range g.dev.Sysattrs() { + res[attr] = g.SystemAttribute(attr) + } + return res +} + +func (g *generic) SystemAttributeLookup(key string) string { + value := g.SystemAttribute(key) + if value == "" && g.parent != nil { + return g.parent.SystemAttributeLookup(key) + } + + return value +} + +func (g *generic) Tags() []string { + tags := g.dev.Tags() + res := make([]string, 0, len(tags)) + for tag := range tags { + res = append(res, tag) + } + return res +} + +func (g *generic) NumaNode() int { + numaNodeStr := g.SystemAttributeLookup("numa_node") + if numaNode, err := strconv.Atoi(numaNodeStr); err == nil { + return numaNode + } + return -1 +} + +func (g *generic) Debug() string { + return fmt.Sprintf("Device[ID=%s, Subsystem=%s, DevType=%s, DevNode=%s, NumaNode=%d, Links=%v, Tags=%v, Properties=%v, SysAttrs=%v]", + g.Id(), + g.Subsystem(), + g.DevType(), + g.DevNode(), + g.NumaNode(), + g.DevLinks(), + g.Tags(), + g.Properties(), + g.SystemAttributes(), + ) +} + + +type udevSlice struct { + discovery *udevDiscovery + state map[Id]Device + filter mux.FilterFunc[Device] + mux *mux.Mux[[]Device] + stop mux.CancelFunc +} + +func (s *udevSlice) Close() { + s.stop() +} + +func (s *udevSlice) Subscribe(sink mux.Sink[[]Device]) mux.CancelFunc { + return s.mux.Subscribe(sink) +} + +type udevDiscovery struct { + udev libudev.Udev + state map[Id]Device // should be accessed only by monitor goroutine + requests chan mux.AwaitReply[monitorRequest, any] + mux *mux.Mux[Event] + wg *sync.WaitGroup +} + +func NewDiscovery(wg *sync.WaitGroup) (Discovery, error) { + d := &udevDiscovery{ + state: make(map[Id]Device), + requests: make(chan mux.AwaitReply[monitorRequest, any]), + mux: mux.Make[Event](), + wg: wg, + } + enum := d.udev.NewEnumerate() + + devs, err := enum.Devices() + if err != nil { + klog.Errorf("Failed to enumerate devices: %v", err) + return nil, err + } + + for _, dev := range devs { + if dev == nil { + klog.Error("udev device is nil!") + continue + } + device := &generic{ + udev: d, + dev: dev, + } + if dev.Parent() != nil { + device.parent = d.state[Id(dev.Parent().Syspath())] + } + + d.state[Id(dev.Syspath())] = device + } + + wg.Add(1) + go d.monitor(wg) + + return d, nil +} + +func (d *udevDiscovery) Close() { + await := mux.NewAwaitReply[monitorRequest, any](stopRequest{}) + defer await.Await() + d.requests <- await +} + +// State returns the current state of the devices as seen by the monitor +func (d *udevDiscovery) State(filter mux.FilterFunc[Device]) map[Id]Device { + await := mux.NewAwaitReply[monitorRequest, any](stateRequest{filter: filter}) + d.requests <- await + return await.Await().(map[Id]Device) +} + +func (d *udevDiscovery) DeviceById(id Id) Device { + return d.state[id] // read-only access should be safe. worst case it would return stale data +} + +func (d *udevDiscovery) Slice(filter mux.FilterFunc[Device]) Slice { + slice := &udevSlice{ + discovery: d, + state: make(map[Id]Device), + filter: filter, + mux: mux.Make[[]Device](), + } + + evCh := make(chan Event) + + go func() { + defer slice.mux.Close() + for ev := range evCh { // exits on close + switch e := ev.(type) { + case Init: + for _, dev := range e.Devices { + if filter(dev) { + slice.state[dev.Id()] = dev + } + } + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + case Added: + if filter(e.Device) { + slice.state[e.Device.Id()] = e.Device + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + } + case Removed: + if _, found := slice.state[e.Device.Id()]; found { + delete(slice.state, e.Device.Id()) + copy := make([]Device, 0, len(slice.state)) + for _, d := range slice.state { + copy = append(copy, d) + } + slice.mux.Submit(copy) + } + } + } + }() + + evSink := mux.SinkFromChan(evCh) + cancelEvSub := d.mux.Subscribe(evSink) + slice.stop = cancelEvSub + + return slice +} + +func (d *udevDiscovery) monitor(wg *sync.WaitGroup) { + defer wg.Done() + defer d.mux.Close() + defer close(d.requests) + + mon := d.udev.NewMonitorFromNetlink("udev") + devChan, errChan, err := mon.DeviceChan(context.Background()) + if err != nil { + klog.Errorf("Failed to create device channel: %v", err) + return + } + + for { + select { + case dev := <-devChan: + klog.V(5).Infof("Received device event (%s): %s", dev.Action(), dev.Syspath()) + switch dev.Action() { + case ActionAdd, ActionOnline: + id := Id(dev.Syspath()) + dev := &generic{ + udev: d, + dev: dev, + } + d.state[id] = dev + d.mux.Submit(Added{dev}) + case ActionRemove, ActionOffline: + id := Id(dev.Syspath()) + dev := d.state[id] + delete(d.state, id) + d.mux.Submit(Removed{dev}) + } + case req := <-d.requests: + switch r := req.Value().(type) { + case stateRequest: + state := make(map[Id]Device) + for k, v := range d.state { + if r.filter(v) { + state[k] = v + } + } + req.Reply(state) + case newSub: + init := make([]Device, 0, len(d.state)) + for _, dev := range d.state { + init = append(init, dev) + } + err := r.sink.Submit(Init{init}) + if err != nil { + klog.Errorf("Failed to submit init event: %v", err) + } + cancel := d.mux.Subscribe(r.sink) + req.Reply(cancel) + case stopRequest: + req.Reply(nil) + return + } + case err := <-errChan: + klog.Errorf("Error from udev monitor, will try to retry connecting to udev: %v", err) + retry: mon = d.udev.NewMonitorFromNetlink("udev") + devChan, errChan, err = mon.DeviceChan(context.Background()) + if err != nil { + klog.Errorf("Failed to create device channel, retrying: %v", err) + time.Sleep(1 * time.Second) + goto retry + } + klog.Infof("Successfully reconnected to udev") + } + } +} + + +func (d *udevDiscovery) Subscribe(sink mux.Sink[Event]) mux.CancelFunc { + // here we're doing initialization in monitor goroutine + // to be able to pass consistent Init event to the sink + // before making fan out of udev events + await := mux.NewAwaitReply[monitorRequest, any](newSub{sink}) + d.requests <- await + return await.Await().(mux.CancelFunc) +} diff --git a/udev-manager.Dockerfile b/udev-manager.Dockerfile new file mode 100644 index 0000000..0a80638 --- /dev/null +++ b/udev-manager.Dockerfile @@ -0,0 +1,24 @@ +FROM golang:1.24-bookworm as build + +WORKDIR /go/app +COPY . /go/app + +RUN apt update && apt install -y libudev-dev +RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o udev-manager cmd/udev-manager/main.go + +FROM debian:bookworm + +WORKDIR /root + +RUN apt update && apt install -y libudev1 && apt clean +RUN echo '#!/bin/bash\n\ +set -e\n\ +\n\ +# Add initialization logic here if needed\n\ +\n\ +exec /usr/bin/udev-manager --config file:/etc/udev-manager/config.yaml ${UDEV_MANAGER_ARGS[@]}' > /usr/local/bin/entrypoint.sh && \ + chmod +x /usr/local/bin/entrypoint.sh +COPY --from=build /go/app/udev-manager /usr/bin/udev-manager + +ENV UDEV_MANAGER_ARGS= +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] diff --git a/ydb-disk-manager.code-workspace b/ydb-disk-manager.code-workspace new file mode 100644 index 0000000..f760840 --- /dev/null +++ b/ydb-disk-manager.code-workspace @@ -0,0 +1,16 @@ +{ + "folders": [ + { + "path": "." + } + ], + "extensions": { + "recommendations": [ + "golang.go", + "fill-labs.dependi", + "ms-vscode.makefile-tools", + "redhat.vscode-yaml", + "usernamehw.errorlens", + ] + } +} \ No newline at end of file