Skip to content

Commit f71fe6a

Browse files
committed
udev-manager: new disk manager plugin based on udev
This plugin was designed from ground-up to allow exposing distinct device plugins for identifiable devices on host. This enables enchanced scheduling capabilities, including conf-less startup and NUMA topology hinting for the requestor. In future work this plugin may be enchanced further, e.g. exposing disks by serial or WWID to the Kube resource topology, allowing tie specific YDB pods to specific discs regardless of where are those disks located. Signed-off-by: Andrey Korzinev <ak@nebius.com>
1 parent be722de commit f71fe6a

File tree

22 files changed

+2298
-30
lines changed

22 files changed

+2298
-30
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.vscode

Dockerfile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
FROM golang:1.18 as build
1+
FROM golang:1.24-bookworm as build
22

33
WORKDIR /go/app
44
COPY . /go/app
55

6-
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go
6+
RUN apt update && apt install -y libudev-dev
7+
RUN CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o ydb-disk-manager cmd/ydb-disk-manager/main.go
78

8-
FROM ubuntu:20.04
9+
FROM debian:bookworm
910

1011
WORKDIR /root
1112

13+
RUN apt update && apt install -y libudev1 && apt clean
1214
COPY --from=build /go/app/ydb-disk-manager /usr/bin/ydb-disk-manager
1315

1416
ENTRYPOINT ["/usr/bin/ydb-disk-manager"]

cmd/udev-manager/main.go

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"flag"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"os"
11+
"os/signal"
12+
"regexp"
13+
"strings"
14+
"sync"
15+
"syscall"
16+
17+
"gopkg.in/yaml.v3"
18+
19+
"k8s.io/klog/v2"
20+
21+
"github.com/ydb-platform/ydb-disk-manager/internal/mux"
22+
"github.com/ydb-platform/ydb-disk-manager/internal/plugin"
23+
"github.com/ydb-platform/ydb-disk-manager/internal/udev"
24+
)
25+
26+
func main() {
27+
appContext, appCancel := context.WithCancel(context.Background())
28+
appWaitGroup := &sync.WaitGroup{}
29+
defer appWaitGroup.Wait()
30+
defer appCancel()
31+
32+
flags := initFlags()
33+
34+
// Registry creates a separate plugin for each resource registered
35+
registry, err := plugin.NewRegistry(appContext, appWaitGroup)
36+
if err != nil {
37+
klog.Fatalf("failed to create plugin registry: %v", err)
38+
os.Exit(1)
39+
}
40+
41+
// udev discovery looks up devices and listens for system events
42+
devDiscovery, err := udev.NewDiscovery(appWaitGroup)
43+
if err != nil {
44+
klog.Fatalf("failed to start udev discovery: %v", err)
45+
os.Exit(1)
46+
}
47+
defer devDiscovery.Close()
48+
49+
domain := flags.config.DeviceDomain
50+
51+
var cancel mux.CancelFunc = mux.CancelFunc(appCancel)
52+
for _, partConfig := range flags.config.Partitions {
53+
cancel = mux.ChainCancelFunc(
54+
plugin.NewScatter(
55+
devDiscovery,
56+
registry,
57+
domain,
58+
"part-",
59+
plugin.PartitionLabelMatcher(domain, partConfig.matcher),
60+
),
61+
cancel,
62+
)
63+
}
64+
65+
klog.Info("Starting /healthz server on port :8080")
66+
go func() {
67+
http.HandleFunc("/healthz", registry.Healthz)
68+
if err := http.ListenAndServe(":8080", nil); err != nil {
69+
klog.Fatalf("failed to start /healthz server: %v", err)
70+
}
71+
}()
72+
73+
sigs := make(chan os.Signal, 1)
74+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
75+
defer cancel()
76+
for signal := range sigs {
77+
switch signal {
78+
case syscall.SIGINT, syscall.SIGTERM:
79+
klog.Infof("Received signal %q, shutting down", signal.String())
80+
return
81+
}
82+
}
83+
}
84+
85+
type configSource interface {
86+
String() string
87+
open() (io.Reader, func() error, error)
88+
}
89+
90+
type fileConfigSource struct {
91+
path string
92+
}
93+
94+
func (fcs *fileConfigSource) open() (io.Reader, func() error, error) {
95+
file, err := os.Open(fcs.path)
96+
if err != nil {
97+
return nil, nil, err
98+
}
99+
return file, file.Close, nil
100+
}
101+
102+
func (fcs *fileConfigSource) String() string {
103+
return "file:" + fcs.path
104+
}
105+
106+
type envConfigSource struct {
107+
variable string
108+
}
109+
110+
func (ecs *envConfigSource) open() (io.Reader, func() error, error) {
111+
data := os.Getenv(ecs.variable)
112+
if data == "" {
113+
return nil, nil, fmt.Errorf("config: environment variable %s is not set", ecs.variable)
114+
}
115+
return strings.NewReader(data), func() error { return nil }, nil
116+
}
117+
118+
func (ecs *envConfigSource) String() string {
119+
return "env:" + ecs.variable
120+
}
121+
122+
type stdinConfigSource struct{}
123+
124+
func (scs *stdinConfigSource) open() (io.Reader, func() error, error) {
125+
return os.Stdin, func() error { return nil }, nil
126+
}
127+
128+
func (scs *stdinConfigSource) String() string {
129+
return "stdin"
130+
}
131+
132+
type ConfigFlag struct {
133+
configSource
134+
}
135+
136+
func (cf *ConfigFlag) Set(value string) error {
137+
if strings.HasPrefix(value, "file:") {
138+
cf.configSource = &fileConfigSource{path: strings.TrimPrefix(value, "file:")}
139+
} else if strings.HasPrefix(value, "env:") {
140+
cf.configSource = &envConfigSource{variable: strings.TrimPrefix(value, "env:")}
141+
} else if strings.HasPrefix(value, "stdin") {
142+
cf.configSource = &stdinConfigSource{}
143+
} else {
144+
return fmt.Errorf("invalid config source: %s", value)
145+
}
146+
147+
return nil
148+
}
149+
150+
func (cf *ConfigFlag) String() string {
151+
if cf.configSource == nil {
152+
return ""
153+
}
154+
return cf.configSource.String()
155+
}
156+
157+
type FlagValues struct {
158+
Config ConfigFlag
159+
160+
config *Config
161+
}
162+
163+
func initFlags() FlagValues {
164+
values := FlagValues{}
165+
flags := flag.NewFlagSet("udev-manager", flag.ExitOnError)
166+
klog.InitFlags(flags)
167+
flags.Var(&values.Config, "config", `configuration source (in form "file:<path>", "env:<ENV_VARIABLE>" or "stdin")`)
168+
flags.Parse(os.Args[1:])
169+
if values.Config.configSource == nil {
170+
flags.Output().Write([]byte("config flag is required\n"))
171+
flags.Usage()
172+
os.Exit(2)
173+
}
174+
configReader, configCloser, err := values.Config.open()
175+
if err != nil {
176+
klog.Fatalf("failed to open --config %q: %v", values.Config.String(), err)
177+
os.Exit(1)
178+
}
179+
defer configCloser()
180+
181+
config, err := parseConfig(configReader)
182+
if err != nil {
183+
klog.Fatalf("failed to parse --config %q: %v", values.Config.String(), err)
184+
os.Exit(1)
185+
}
186+
187+
values.config = config
188+
189+
return values
190+
}
191+
192+
type PartitionsConfig struct {
193+
Matcher string `yaml:"matcher"` // matcher should be a valid regular expression
194+
195+
matcher *regexp.Regexp // compiled matcher if the config is valid
196+
}
197+
198+
func (pc *PartitionsConfig) validate() error {
199+
// Compile the regular expression
200+
matcher, err := regexp.Compile(pc.Matcher)
201+
if err != nil {
202+
return fmt.Errorf(".matcher: %q must be a valid regexp: %w", pc.Matcher, err)
203+
}
204+
pc.matcher = matcher
205+
return nil
206+
}
207+
208+
type HostDevConfig struct {
209+
Matcher string `yaml:"matcher"` // matcher should be a valid regular expression
210+
Prefix string `yaml:"prefix"`
211+
212+
matcher *regexp.Regexp // compiled matcher if the config is valid
213+
}
214+
215+
func (hc *HostDevConfig) validate() error {
216+
// Compile the regular expression
217+
matcher, err := regexp.Compile(hc.Matcher)
218+
if err != nil {
219+
return fmt.Errorf(".matcher: %q must be a valid regexp: %w", hc.Matcher, err)
220+
}
221+
hc.matcher = matcher
222+
return nil
223+
}
224+
225+
type Config struct {
226+
DeviceDomain string `yaml:"domain"`
227+
Partitions []PartitionsConfig `yaml:"partitions"`
228+
HostDevs []HostDevConfig `yaml:"hostdevs"`
229+
}
230+
231+
var (
232+
deviceDomainRegex = regexp.MustCompile(`^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
233+
)
234+
235+
func (c *Config) validate() error {
236+
var errs error
237+
// Validate the device domain
238+
if c.DeviceDomain == "" {
239+
errs = errors.Join(errs, fmt.Errorf(".domain: must be set"))
240+
}
241+
if !deviceDomainRegex.MatchString(c.DeviceDomain) {
242+
errs = errors.Join(errs, fmt.Errorf(".domain: %q must be a valid domain name", c.DeviceDomain))
243+
}
244+
245+
// Validate partitions
246+
for i := range c.Partitions {
247+
if err := c.Partitions[i].validate(); err != nil {
248+
errs = errors.Join(errs, fmt.Errorf(".partitions[%d]%w", i, err))
249+
}
250+
}
251+
252+
// Validate hostdevs
253+
for i := range c.HostDevs {
254+
if err := c.HostDevs[i].validate(); err != nil {
255+
errs = errors.Join(errs, fmt.Errorf(".hostdevs[%d]%w", i, err))
256+
}
257+
}
258+
259+
return nil
260+
}
261+
262+
func parseConfig(reader io.Reader) (*Config, error) {
263+
// Parse the config file
264+
decoder := yaml.NewDecoder(reader)
265+
config := &Config{}
266+
if err := decoder.Decode(config); err != nil {
267+
return nil, err
268+
}
269+
270+
// Validate the config
271+
if err := config.validate(); err != nil {
272+
return nil, err
273+
}
274+
275+
return config, nil
276+
}

cmd/ydb-disk-manager/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"flag"
55
"fmt"
6-
"io/ioutil"
76
"net/http"
87
"os"
98
"os/signal"
@@ -45,7 +44,7 @@ func main() {
4544

4645
// Setting up the disks to check
4746
klog.V(0).Infof("Reading configuration file %s", confFileName)
48-
yamlFile, err := ioutil.ReadFile(confFileName)
47+
yamlFile, err := os.ReadFile(confFileName)
4948
if err != nil {
5049
klog.Fatalf("Reading configuration file failed with: %s", err)
5150
}

go.mod

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,37 @@
11
module github.com/ydb-platform/ydb-disk-manager
22

3-
go 1.18
3+
go 1.23.0
4+
5+
toolchain go1.24.1
46

57
require (
68
github.com/fsnotify/fsnotify v1.6.0
7-
golang.org/x/net v0.12.0
9+
github.com/jochenvg/go-udev v0.0.0-20240801134859-b65ed646224b
10+
github.com/kennygrant/sanitize v1.2.4
11+
github.com/onsi/ginkgo/v2 v2.23.0
12+
github.com/onsi/gomega v1.36.2
13+
golang.org/x/net v0.36.0
814
google.golang.org/grpc v1.56.2
9-
google.golang.org/protobuf v1.31.0
15+
google.golang.org/protobuf v1.36.1
1016
gopkg.in/yaml.v2 v2.4.0
17+
gopkg.in/yaml.v3 v3.0.1
1118
k8s.io/klog/v2 v2.70.1
1219
k8s.io/kubelet v0.25.3
1320
)
1421

1522
require (
16-
github.com/go-logr/logr v1.2.3 // indirect
23+
github.com/go-logr/logr v1.4.2 // indirect
24+
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
1725
github.com/gogo/protobuf v1.3.2 // indirect
1826
github.com/golang/protobuf v1.5.3 // indirect
27+
github.com/google/go-cmp v0.6.0 // indirect
28+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
29+
github.com/jkeiser/iter v0.0.0-20200628201005-c8aa0ae784d1 // indirect
1930
github.com/kr/pretty v0.3.1 // indirect
2031
github.com/rogpeppe/go-internal v1.11.0 // indirect
21-
golang.org/x/sys v0.10.0 // indirect
22-
golang.org/x/text v0.11.0 // indirect
32+
golang.org/x/sys v0.30.0 // indirect
33+
golang.org/x/text v0.22.0 // indirect
34+
golang.org/x/tools v0.30.0 // indirect
2335
google.golang.org/genproto/googleapis/rpc v0.0.0-20230724170836-66ad5b6ff146 // indirect
2436
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
2537
)

0 commit comments

Comments
 (0)