diff --git a/appliance/postgresql/cmd/flynn-postgres/main.go b/appliance/postgresql/cmd/flynn-postgres/main.go index 2dadc9ab64..ed0aa0722d 100755 --- a/appliance/postgresql/cmd/flynn-postgres/main.go +++ b/appliance/postgresql/cmd/flynn-postgres/main.go @@ -66,8 +66,8 @@ func main() { BinDir: "/usr/lib/postgresql/11/bin/", Password: password, Logger: log.New("component", "postgres"), - TimescaleDB: true, - ExtWhitelist: true, + TimescaleDB: false, + ExtWhitelist: false, WaitUpstream: true, SHMType: "posix", }) diff --git a/appliance/postgresql/process.go b/appliance/postgresql/process.go index bf71cfd887..ab74f6d358 100755 --- a/appliance/postgresql/process.go +++ b/appliance/postgresql/process.go @@ -372,7 +372,13 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) { return err } - if err := p.writeConfig(configData{ReadOnly: downstream != nil}); err != nil { + // Start read-write during initial primary setup. The database was + // just created with initdb so there is no user data to protect. We + // need read-write access to create the superuser and install + // extensions. Once the sync standby catches up, waitForSync will + // rewrite the config with the sync standby name (which also drops + // ReadOnly, making default_transaction_read_only = off permanent). + if err := p.writeConfig(configData{}); err != nil { log.Error("error writing postgres.conf", "path", p.configPath(), "err", err) return err } @@ -399,10 +405,6 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) { log.Error("error acquiring connection", "err", err) return err } - if _, err := tx.Exec("SET TRANSACTION READ WRITE"); err != nil { - log.Error("error setting transaction read-write", "err", err) - return err - } if _, err := tx.Exec(fmt.Sprintf(` DO $body$ @@ -424,13 +426,66 @@ func (p *Process) assumePrimary(downstream *discoverd.Instance) (err error) { return err } + // Pre-install commonly needed extensions in template1 so they are + // inherited by all databases created via CREATE DATABASE. This is + // required because application database users are not superusers + // and cannot run CREATE EXTENSION themselves (pgextwlist is not + // available in the current packages layer). + if extErr := p.installExtensionsInTemplate(); extErr != nil { + log.Error("error installing extensions in template1", "err", extErr) + return extErr + } + if downstream != nil { + // Now that setup is complete, switch to read-only mode to + // prevent user writes before the sync standby catches up. + // waitForSync will rewrite the config with the sync name + // (which drops ReadOnly) once replication is caught up. + if err := p.writeConfig(configData{ReadOnly: true}); err != nil { + log.Error("error writing read-only config", "err", err) + return err + } + if err := p.sighup(); err != nil { + log.Error("error reloading config for read-only mode", "err", err) + return err + } p.waitForSync(downstream, true) } return nil } +// installExtensionsInTemplate pre-installs commonly needed PostgreSQL +// extensions in template1 so they are inherited by all databases +// created via CREATE DATABASE. +func (p *Process) installExtensionsInTemplate() error { + port, _ := strconv.Atoi(p.port) + templateDB, err := pgx.Connect(pgx.ConnConfig{ + Host: "127.0.0.1", + User: "postgres", + Port: uint16(port), + Database: "template1", + }) + if err != nil { + return fmt.Errorf("connecting to template1: %s", err) + } + defer templateDB.Close() + + // When the primary has a downstream sync replica, postgresql.conf sets + // default_transaction_read_only = on. We need to override that for + // this session so that CREATE EXTENSION can perform writes. + if _, err := templateDB.Exec("SET default_transaction_read_only = off"); err != nil { + return fmt.Errorf("setting read-write mode on template1 connection: %s", err) + } + + for _, ext := range []string{"uuid-ossp", "pgcrypto"} { + if _, err := templateDB.Exec(fmt.Sprintf(`CREATE EXTENSION IF NOT EXISTS "%s"`, ext)); err != nil { + return fmt.Errorf("creating extension %s in template1: %s", ext, err) + } + } + return nil +} + func (p *Process) assumeStandby(upstream, downstream *discoverd.Instance) error { log := p.log.New("fn", "assumeStandby", "upstream", upstream.Addr) log.Info("starting up as standby") diff --git a/bootstrap/sirenia_wait_action.go b/bootstrap/sirenia_wait_action.go index ccc85fbc48..8a25c95626 100755 --- a/bootstrap/sirenia_wait_action.go +++ b/bootstrap/sirenia_wait_action.go @@ -26,7 +26,7 @@ func (a *SireniaWaitAction) Run(s *State) error { var leader *discoverd.Instance err = attempt.Strategy{ Min: 5, - Total: 5 * time.Minute, + Total: 10 * time.Minute, Delay: 500 * time.Millisecond, }.Run(func() error { leader, err = d.Service(a.Service).Leader() @@ -37,5 +37,5 @@ func (a *SireniaWaitAction) Run(s *State) error { } // connect using sirenia client and wait until database reports read/write - return client.NewClient(leader.Addr).WaitForReadWrite(5 * time.Minute) + return client.NewClient(leader.Addr).WaitForReadWrite(10 * time.Minute) } diff --git a/bootstrap/wait_action.go b/bootstrap/wait_action.go index 0bb0405952..f6f2c3a9f9 100755 --- a/bootstrap/wait_action.go +++ b/bootstrap/wait_action.go @@ -11,9 +11,10 @@ import ( ) type WaitAction struct { - URL string `json:"url"` - Host string `json:"host"` - Status int `json:"status"` + URL string `json:"url"` + Host string `json:"host"` + Status int `json:"status"` + Timeout int `json:"timeout"` // seconds, 0 = use default (10 minutes) } func init() { @@ -21,9 +22,14 @@ func init() { } func (a *WaitAction) Run(s *State) error { - const waitMax = 5 * time.Minute + const defaultWaitMax = 10 * time.Minute const waitInterval = 500 * time.Millisecond + waitMax := defaultWaitMax + if a.Timeout > 0 { + waitMax = time.Duration(a.Timeout) * time.Second + } + if a.Status == 0 { a.Status = 200 } diff --git a/cli/main.go b/cli/main.go index 750bd9e9a6..423edd6508 100755 --- a/cli/main.go +++ b/cli/main.go @@ -134,7 +134,7 @@ See 'flynn help ' for more information on a specific command. return } - fmt.Fprintln(os.Stderr, "WARNING: Flynn is unmaintained and new installs will fail on June 1. See: https://github.com/flynn/flynn") + // Original unmaintained warning removed — this project is actively maintained. } type command struct { diff --git a/flannel/backend/vxlan/device.go b/flannel/backend/vxlan/device.go index f260b6d983..808c2eb696 100755 --- a/flannel/backend/vxlan/device.go +++ b/flannel/backend/vxlan/device.go @@ -40,6 +40,44 @@ func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) { return nil, err } + // Generate a unique MAC address derived from the VTEP IP to ensure + // each node's flannel.1 device has a distinct hardware address. + // Without this, nodes cloned from the same image get identical MACs + // which breaks VXLAN forwarding. + // + // IMPORTANT: Only set the MAC if it differs from the current one. + // Setting the MAC (even to the same value) causes the kernel to + // flush all ARP neighbor entries on the interface, which breaks + // overlay connectivity when flannel instances crash-loop and + // repeatedly reuse the existing device. + if ip4 := devAttrs.vtepAddr.To4(); ip4 != nil { + mac := net.HardwareAddr{0x02, 0x42, ip4[0], ip4[1], ip4[2], ip4[3]} + currentMAC := link.HardwareAddr + if !macEqual(currentMAC, mac) { + if err := netlink.LinkSetHardwareAddr(link, mac); err != nil { + // If setting MAC while UP fails, try bringing the link down first + log.Warningf("failed to set MAC on %s while UP: %v; retrying with link down", link.Name, err) + if err2 := netlink.LinkSetDown(link); err2 != nil { + log.Warningf("failed to bring %s down: %v", link.Name, err2) + } + if err2 := netlink.LinkSetHardwareAddr(link, mac); err2 != nil { + log.Errorf("failed to set unique MAC on %s even after link down: %v", link.Name, err2) + } else { + link.HardwareAddr = mac + log.Infof("set VXLAN device %s MAC to %s (after link down, derived from VTEP IP %s)", link.Name, mac, devAttrs.vtepAddr) + } + if err2 := netlink.LinkSetUp(link); err2 != nil { + log.Warningf("failed to bring %s back up: %v", link.Name, err2) + } + } else { + link.HardwareAddr = mac + log.Infof("set VXLAN device %s MAC to %s (derived from VTEP IP %s)", link.Name, mac, devAttrs.vtepAddr) + } + } else { + log.Infof("VXLAN device %s already has correct MAC %s, skipping set", link.Name, mac) + } + } + return &vxlanDevice{ link: link, }, nil @@ -56,6 +94,7 @@ func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) { incompat := vxlanLinksIncompat(vxlan, existing) if incompat == "" { + log.Infof("reusing existing %q device", vxlan.Name) return existing.(*netlink.Vxlan), nil } @@ -113,6 +152,19 @@ func (dev *vxlanDevice) Destroy() { netlink.LinkDel(dev.link) } +// macEqual compares two hardware addresses for equality. +func macEqual(a, b net.HardwareAddr) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + func (dev *vxlanDevice) MACAddr() net.HardwareAddr { return dev.link.HardwareAddr } @@ -127,8 +179,8 @@ type neigh struct { } func (dev *vxlanDevice) AddL2(n neigh) error { - log.Infof("calling NeighAdd: %v, %v", n.IP, n.MAC) - return netlink.NeighAdd(&netlink.Neigh{ + log.Infof("calling NeighSet (L2/FDB): %v, %v", n.IP, n.MAC) + return netlink.NeighSet(&netlink.Neigh{ LinkIndex: dev.link.Index, State: netlink.NUD_PERMANENT, Family: syscall.AF_BRIDGE, diff --git a/host/libcontainer_backend.go b/host/libcontainer_backend.go index 91319804e0..04a4a5c644 100755 --- a/host/libcontainer_backend.go +++ b/host/libcontainer_backend.go @@ -265,12 +265,23 @@ func (l *LibcontainerBackend) ConfigureNetworking(config *host.NetworkConfig) er return err } - // Read DNS config, discoverd uses the nameservers + // Read DNS config, discoverd uses the nameservers as upstream recursors. + // On systems with systemd-resolved, /etc/resolv.conf points to the stub + // resolver at 127.0.0.53 which is only reachable from the host's network + // namespace. Discoverd runs in a container and needs the real upstream + // resolvers, which systemd-resolved exposes at /run/systemd/resolve/resolv.conf. dnsConf, err := dns.ClientConfigFromFile("/etc/resolv.conf") if err != nil { return err } config.Resolvers = dnsConf.Servers + if isLoopbackResolvers(config.Resolvers) { + if altConf, err := dns.ClientConfigFromFile("/run/systemd/resolve/resolv.conf"); err == nil { + if !isLoopbackResolvers(altConf.Servers) { + config.Resolvers = altConf.Servers + } + } + } // Write a resolv.conf to be bind-mounted into containers pointing at the // future discoverd DNS listener @@ -755,7 +766,12 @@ func (l *LibcontainerBackend) Run(job *host.Job, runConfig *RunConfig, rateLimit config.Cgroups.Resources.Memory = *spec.Limit } if spec, ok := job.Resources[resource.TypeCPU]; ok && spec.Limit != nil { - config.Cgroups.Resources.CpuShares = milliCPUToShares(uint64(*spec.Limit)) + shares := milliCPUToShares(uint64(*spec.Limit)) + config.Cgroups.Resources.CpuShares = shares + // On cgroups v2, CpuGroupV2 reads CpuWeight instead of CpuShares + if cgroups.IsCgroup2UnifiedMode() { + config.Cgroups.Resources.CpuWeight = uint64(cpuSharesToWeight(int64(shares))) + } } c, err := l.factory.Create(job.ID, config) @@ -1042,16 +1058,16 @@ func (c *Container) watch(ready chan<- error, buffer host.LogBuffer) error { notifyOOM, err := c.container.NotifyOOM() if err != nil { - log.Error("error subscribing to OOM notifications", "err", err) - return err + log.Warn("error subscribing to OOM notifications (non-fatal)", "err", err) + } else { + go func() { + logger := c.l.LogMux.Logger(logagg.MsgIDInit, c.MuxConfig, "component", "flynn-host") + defer logger.Close() + for range notifyOOM { + logger.Crit("FATAL: a container process was killed due to lack of available memory") + } + }() } - go func() { - logger := c.l.LogMux.Logger(logagg.MsgIDInit, c.MuxConfig, "component", "flynn-host") - defer logger.Close() - for range notifyOOM { - logger.Crit("FATAL: a container process was killed due to lack of available memory") - } - }() log.Info("watching for changes") for change := range c.Client.StreamState() { @@ -1449,10 +1465,10 @@ func (l *LibcontainerBackend) persistGlobalState() error { } /* - Loads a series of jobs, and reconstructs whatever additional backend state was saved. +Loads a series of jobs, and reconstructs whatever additional backend state was saved. - This may include reconnecting rpc systems and communicating with containers - (thus this may take a significant moment; it's not just deserializing). +This may include reconnecting rpc systems and communicating with containers +(thus this may take a significant moment; it's not just deserializing). */ func (l *LibcontainerBackend) UnmarshalState(jobs map[string]*host.ActiveJob, jobBackendStates map[string][]byte, backendGlobalState []byte, buffers host.LogBuffers) error { log := l.Logger.New("fn", "UnmarshalState") @@ -1629,6 +1645,14 @@ func milliCPUToShares(milliCPU uint64) uint64 { const cgroupRoot = "/sys/fs/cgroup" func setupCGroups(partitions map[string]int64) error { + if cgroups.IsCgroup2UnifiedMode() { + return setupCGroupsV2(partitions) + } + return setupCGroupsV1(partitions) +} + +// setupCGroupsV1 mounts individual cgroup v1 subsystems and creates partitions. +func setupCGroupsV1(partitions map[string]int64) error { subsystems, err := cgroups.GetAllSubsystems() if err != nil { return fmt.Errorf("error getting cgroup subsystems: %s", err) @@ -1651,14 +1675,77 @@ func setupCGroups(partitions map[string]int64) error { } for name, shares := range partitions { - if err := createCGroupPartition(name, shares); err != nil { + if err := createCGroupPartitionV1(name, shares); err != nil { + return err + } + } + return nil +} + +// setupCGroupsV2 prepares cgroup v2 unified hierarchy and creates partitions. +func setupCGroupsV2(partitions map[string]int64) error { + // On cgroups v2, the unified hierarchy is already mounted at /sys/fs/cgroup. + // We need to enable the controllers we need in the subtree_control file + // at each level of our hierarchy. + + // Enable controllers at the root level for our subtree + controllers := "+cpu +memory +io +pids +cpuset" + rootSubtreeControl := filepath.Join(cgroupRoot, "cgroup.subtree_control") + if err := enableCgroupV2Controllers(rootSubtreeControl, controllers); err != nil { + return fmt.Errorf("error enabling cgroup v2 controllers at root: %s", err) + } + + // Create the "flynn" parent cgroup + flynnDir := filepath.Join(cgroupRoot, "flynn") + if err := os.MkdirAll(flynnDir, 0755); err != nil { + return fmt.Errorf("error creating flynn cgroup: %s", err) + } + + // Enable controllers in the flynn subtree for partition children + flynnSubtreeControl := filepath.Join(flynnDir, "cgroup.subtree_control") + if err := enableCgroupV2Controllers(flynnSubtreeControl, controllers); err != nil { + return fmt.Errorf("error enabling cgroup v2 controllers in flynn cgroup: %s", err) + } + + for name, shares := range partitions { + if err := createCGroupPartitionV2(name, shares); err != nil { return err } } return nil } -func createCGroupPartition(name string, cpuShares int64) error { +// enableCgroupV2Controllers writes controller names to a subtree_control file. +// Each controller is prefixed with "+" to enable it (e.g., "+cpu +memory"). +// Errors from individual controllers are logged but not fatal, since not all +// controllers may be available. +func enableCgroupV2Controllers(subtreeControlPath, controllers string) error { + for _, controller := range strings.Fields(controllers) { + if err := os.WriteFile(subtreeControlPath, []byte(controller), 0644); err != nil { + // Log but don't fail — some controllers (e.g., cpuset) may not + // be delegatable depending on systemd configuration + log15.Warn("failed to enable cgroup v2 controller", "controller", controller, "err", err) + } + } + return nil +} + +// cpuSharesToWeight converts cgroups v1 cpu.shares (2-262144, default 1024) +// to cgroups v2 cpu.weight (1-10000, default 100). +// Formula from kernel documentation: +// +// weight = 1 + ((shares - 2) * 9999) / 262142 +func cpuSharesToWeight(shares int64) int64 { + if shares < 2 { + shares = 2 + } + if shares > 262144 { + shares = 262144 + } + return 1 + ((shares-2)*9999)/262142 +} + +func createCGroupPartitionV1(name string, cpuShares int64) error { for _, group := range []string{"blkio", "cpu", "cpuacct", "cpuset", "devices", "freezer", "memory", "net_cls", "perf_event"} { if err := os.MkdirAll(filepath.Join(cgroupRoot, group, "flynn", name), 0755); err != nil { return fmt.Errorf("error creating partition cgroup: %s", err) @@ -1689,6 +1776,29 @@ func createCGroupPartition(name string, cpuShares int64) error { return nil } +func createCGroupPartitionV2(name string, cpuShares int64) error { + // On cgroups v2, all controllers share a single hierarchy + partDir := filepath.Join(cgroupRoot, "flynn", name) + if err := os.MkdirAll(partDir, 0755); err != nil { + return fmt.Errorf("error creating partition cgroup: %s", err) + } + + // Enable controllers in this partition for container children + controllers := "+cpu +memory +io +pids +cpuset" + subtreeControl := filepath.Join(partDir, "cgroup.subtree_control") + if err := enableCgroupV2Controllers(subtreeControl, controllers); err != nil { + return fmt.Errorf("error enabling controllers in partition: %s", err) + } + + // Set CPU weight (v2 equivalent of cpu.shares) + weight := cpuSharesToWeight(cpuShares) + if err := os.WriteFile(filepath.Join(partDir, "cpu.weight"), strconv.AppendInt(nil, weight, 10), 0644); err != nil { + return fmt.Errorf("error writing cpu.weight: %s", err) + } + + return nil +} + type Tmpfs struct { Path string Size int64 @@ -1717,6 +1827,23 @@ func createTmpfs(size int64) (*Tmpfs, error) { return &Tmpfs{Path: f.Name(), Size: size}, nil } +// isLoopbackResolvers returns true if all DNS servers in the list are +// loopback addresses (127.x.x.x or ::1). This indicates the system is using +// a local DNS stub (like systemd-resolved) that won't be reachable from +// containers running in separate network namespaces. +func isLoopbackResolvers(servers []string) bool { + if len(servers) == 0 { + return false + } + for _, s := range servers { + ip := net.ParseIP(s) + if ip == nil || !ip.IsLoopback() { + return false + } + } + return true +} + func forceMemoryOvercommit() error { path := "/proc/sys/vm/overcommit_memory" data, err := os.ReadFile(path) diff --git a/host/volume/zfs/zfs.go b/host/volume/zfs/zfs.go index c17042d3fd..b02b54c049 100755 --- a/host/volume/zfs/zfs.go +++ b/host/volume/zfs/zfs.go @@ -16,9 +16,9 @@ import ( "github.com/flynn/flynn/host/volume" "github.com/flynn/flynn/pkg/attempt" "github.com/flynn/flynn/pkg/random" + "github.com/inconshreveable/log15" zfs "github.com/mistifyio/go-zfs" "github.com/rancher/sparse-tools/sparse" - "github.com/inconshreveable/log15" ) // blockSize is the block size used when creating new zvols @@ -41,11 +41,11 @@ type Provider struct { } /* - Describes zfs config used at provider setup time. +Describes zfs config used at provider setup time. - `volume.ProviderSpec.Config` is deserialized to this for zfs. +`volume.ProviderSpec.Config` is deserialized to this for zfs. - Also is the output of `MarshalGlobalState`. +Also is the output of `MarshalGlobalState`. */ type ProviderConfig struct { // DatasetName specifies the zfs dataset this provider will create volumes under. @@ -63,12 +63,12 @@ type ProviderConfig struct { } /* - Describes parameters for creating a zpool. +Describes parameters for creating a zpool. - Currently this only supports file-type vdevs; be aware that these are - convenient, but may have limited performance. Advanced users should - consider configuring a zpool using block devices directly, and specifying - use of datasets in those zpools those rather than this fallback mechanism. +Currently this only supports file-type vdevs; be aware that these are +convenient, but may have limited performance. Advanced users should +consider configuring a zpool using block devices directly, and specifying +use of datasets in those zpools those rather than this fallback mechanism. */ type MakeDev struct { BackingFilename string `json:"filename"` @@ -267,8 +267,22 @@ func (p *Provider) ImportFilesystem(fs *volume.Filesystem) (volume.Volume, error if f, ok := fs.Data.(sparse.FileIoProcessor); ok { if err := p.copySparse(dev, f); err != nil { - p.destroy(v) - return nil, err + // FIEMAP may not be supported on all filesystems (e.g. tmpfs). + // Fall back to sequential copy if sparse copy fails. + log15.Warn("sparse copy failed, falling back to sequential copy", "err", err) + if _, seekErr := dev.Seek(0, io.SeekStart); seekErr != nil { + p.destroy(v) + return nil, fmt.Errorf("error seeking zvol after sparse copy failure: %s", seekErr) + } + srcFile := f.GetFile() + if _, seekErr := srcFile.Seek(0, io.SeekStart); seekErr != nil { + p.destroy(v) + return nil, fmt.Errorf("error seeking source after sparse copy failure: %s", seekErr) + } + if _, err := io.Copy(dev, srcFile); err != nil { + p.destroy(v) + return nil, err + } } } else { n, err := io.Copy(dev, fs.Data) @@ -484,7 +498,7 @@ type zfsHaves struct { } /* - Returns the set of snapshot UIDs available in this volume's backing dataset. +Returns the set of snapshot UIDs available in this volume's backing dataset. */ func (p *Provider) ListHaves(vol volume.Volume) ([]json.RawMessage, error) { zvol, err := p.owns(vol) @@ -553,24 +567,24 @@ func (p *Provider) SendSnapshot(vol volume.Volume, haves []json.RawMessage, outp } /* - ReceiveSnapshot both accepts a snapshotted filesystem as a byte stream, - and applies that state to the given `vol` (i.e., if this were git, it's like - `git fetch && git pull` at the same time; regretably, it's pretty hard to get - zfs to separate those operations). If there are local working changes in - the volume, they will be overwritten. - - In addition to the given volume being mutated on disk, a reference to the - new snapshot will be returned (this can be used for cleanup, though be aware - that with zfs, removing snapshots may impact the ability to use incremental - deltas when receiving future snapshots). - - Also note that ZFS is *extremely* picky about receiving snapshots; in - addition to obvious failure modes like an incremental snapshot with - insufficient data, the following complications apply: - - Sending an incremental snapshot with too much history will fail. - - Sending a full snapshot to a volume with any other snapshots will fail. - In the former case, you can renegociate; in the latter, you will have to - either *destroy snapshots* or make a new volume. +ReceiveSnapshot both accepts a snapshotted filesystem as a byte stream, +and applies that state to the given `vol` (i.e., if this were git, it's like +`git fetch && git pull` at the same time; regretably, it's pretty hard to get +zfs to separate those operations). If there are local working changes in +the volume, they will be overwritten. + +In addition to the given volume being mutated on disk, a reference to the +new snapshot will be returned (this can be used for cleanup, though be aware +that with zfs, removing snapshots may impact the ability to use incremental +deltas when receiving future snapshots). + +Also note that ZFS is *extremely* picky about receiving snapshots; in +addition to obvious failure modes like an incremental snapshot with +insufficient data, the following complications apply: +- Sending an incremental snapshot with too much history will fail. +- Sending a full snapshot to a volume with any other snapshots will fail. +In the former case, you can renegociate; in the latter, you will have to +either *destroy snapshots* or make a new volume. */ func (p *Provider) ReceiveSnapshot(vol volume.Volume, input io.Reader) (volume.Volume, error) { zvol, err := p.owns(vol) diff --git a/opencode.json b/opencode.json index bee7680313..32e0021c0c 100755 --- a/opencode.json +++ b/opencode.json @@ -4,25 +4,5 @@ "*": { "*": "allow" } - }, - "provider": { - "vLLM": { - "npm": "@ai-sdk/openai-compatible", - "name": "vLLM", - "options": { - "baseURL": "http://localhost:8000/v1" - }, - "models": { - "google/gemma-4-26B-A4B-it": { - "name": "google/gemma-4-26B-A4B-it" - }, - "openai/gpt-oss-120b": { - "name": "openai/gpt-oss-120b" - }, - "Qwen/Qwen3-Coder-Next-FP8": { - "name": "Qwen/Qwen3-Coder-Next-FP8" - } - } - } } } diff --git a/router/server.go b/router/server.go index f3c7788326..aa0dadfa2f 100755 --- a/router/server.go +++ b/router/server.go @@ -253,10 +253,19 @@ func main() { shutdown.Fatal(listenErr{apiAddr, err}) } - httpAddr := net.JoinHostPort(os.Getenv("LISTEN_IP"), strconv.Itoa(httpPorts[0])) + // Use EXTERNAL_IP for discoverd registration so other services can reach + // this router instance. LISTEN_IP (typically 0.0.0.0) is for binding only + // and is not a routable address. If EXTERNAL_IP is not set, fall back to + // LISTEN_IP for backward compatibility (single-node/loopback setups). + regIP := os.Getenv("EXTERNAL_IP") + if regIP == "" { + regIP = os.Getenv("LISTEN_IP") + } + apiRegAddr := net.JoinHostPort(regIP, *apiPort) + httpRegAddr := net.JoinHostPort(regIP, strconv.Itoa(httpPorts[0])) services := map[string]string{ - "router-api": apiAddr, - "router-http": httpAddr, + "router-api": apiRegAddr, + "router-http": httpRegAddr, } for service, addr := range services { log.Info("registering service", "name", service, "addr", addr) diff --git a/test/apps/slugbuilder-limit/bin/compile b/test/apps/slugbuilder-limit/bin/compile index 7fc3fa5511..4cf8403149 100755 --- a/test/apps/slugbuilder-limit/bin/compile +++ b/test/apps/slugbuilder-limit/bin/compile @@ -2,4 +2,9 @@ set -e -cat /sys/fs/cgroup/memory/memory.limit_in_bytes +if [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then + cat /sys/fs/cgroup/memory/memory.limit_in_bytes +else + CG=/sys/fs/cgroup$(cat /proc/1/cgroup | head -1 | cut -d: -f3) + cat "$CG/memory.max" +fi diff --git a/test/helper.go b/test/helper.go index 22e8945f7f..20db32323e 100755 --- a/test/helper.go +++ b/test/helper.go @@ -357,9 +357,33 @@ func (h *hostnames) Remove(t *c.C, ip string) { const ( resourceMem int64 = 256 * units.MiB resourceMaxFD int64 = 1024 - resourceCmd = "cat /sys/fs/cgroup/memory/memory.limit_in_bytes; cat /sys/fs/cgroup/cpu/cpu.shares; ulimit -n" + // resourceCmd auto-detects cgroups v1 vs v2 and reads the memory limit, + // CPU weight/shares, and max file descriptors from inside the container. + // On v1: reads /sys/fs/cgroup/memory/memory.limit_in_bytes and /sys/fs/cgroup/cpu/cpu.shares + // On v2: reads memory.max and cpu.weight from the container's own cgroup + // (discovered via /proc/1/cgroup, e.g. /sys/fs/cgroup/flynn/system//) + resourceCmd = "if [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.limit_in_bytes; cat /sys/fs/cgroup/cpu/cpu.shares; else CG=/sys/fs/cgroup$(cat /proc/1/cgroup | head -1 | cut -d: -f3); cat $CG/memory.max; cat $CG/cpu.weight; fi; ulimit -n" ) +// cpuSharesToWeight converts cgroups v1 cpu.shares (2-262144, default 1024) +// to cgroups v2 cpu.weight (1-10000, default 100). +// This must match the conversion in host/libcontainer_backend.go. +func cpuSharesToWeight(shares int64) int64 { + if shares < 2 { + shares = 2 + } + if shares > 262144 { + shares = 262144 + } + return 1 + ((shares-2)*9999)/262142 +} + +// isCgroupV2 returns true if the system uses cgroups v2 (unified hierarchy). +func isCgroupV2() bool { + _, err := os.Stat("/sys/fs/cgroup/cgroup.controllers") + return err == nil +} + func testResources() resource.Resources { r := resource.Resources{ resource.TypeMemory: resource.Spec{Limit: typeconv.Int64Ptr(resourceMem)}, @@ -374,7 +398,13 @@ func assertResourceLimits(t *c.C, out string) { limits := strings.Split(strings.TrimSpace(out), "\n") t.Assert(limits, c.HasLen, 3) t.Assert(limits[0], c.Equals, strconv.FormatInt(resourceMem, 10)) - t.Assert(limits[1], c.Equals, strconv.FormatInt(768, 10)) + // On cgroups v2, cpu.weight is used instead of cpu.shares. + // 750 milliCPU -> 768 shares (v1) or cpuSharesToWeight(768) weight (v2). + expectedCPU := int64(768) + if isCgroupV2() { + expectedCPU = cpuSharesToWeight(768) + } + t.Assert(limits[1], c.Equals, strconv.FormatInt(expectedCPU, 10)) t.Assert(limits[2], c.Equals, strconv.FormatInt(resourceMaxFD, 10)) } diff --git a/test/main.go b/test/main.go index 800401e25c..d3439fe1d1 100755 --- a/test/main.go +++ b/test/main.go @@ -51,9 +51,12 @@ func main() { } } - if err = setupGitreceive(); err != nil { - log.Println(err) - return + // Only setup gitreceive if we're running git-related tests or all tests + if args.Run == "" || regexp.MustCompile(`(?i)(git|GitDeploy|GitreceiveSuite|TaffyDeploy)`).MatchString(args.Run) { + if err = setupGitreceive(); err != nil { + log.Println(err) + return + } } res = check.RunAll(&check.RunConf{ diff --git a/test/test_cli.go b/test/test_cli.go index b27008a297..f2e05ec27d 100755 --- a/test/test_cli.go +++ b/test/test_cli.go @@ -942,14 +942,20 @@ func (s *CLISuite) TestRunLimits(t *c.C) { limits := strings.Split(strings.TrimSpace(cmd.Output), "\n") t.Assert(limits, c.HasLen, 3) t.Assert(limits[0], c.Equals, strconv.FormatInt(*defaults[resource.TypeMemory].Limit, 10)) - t.Assert(limits[1], c.Equals, strconv.FormatInt(1024, 10)) + // On cgroups v2, cpu.weight is used instead of cpu.shares. + // Default 1000 milliCPU -> 1024 shares (v1) or cpuSharesToWeight(1024) weight (v2). + expectedCPU := int64(1024) + if isCgroupV2() { + expectedCPU = cpuSharesToWeight(1024) + } + t.Assert(limits[1], c.Equals, strconv.FormatInt(expectedCPU, 10)) t.Assert(limits[2], c.Equals, strconv.FormatInt(*defaults[resource.TypeMaxFD].Limit, 10)) cmd = app.flynn("run", "--limits", "memory=200MB,max_fd=9000", "sh", "-c", resourceCmd) t.Assert(cmd, Succeeds) limits = strings.Split(strings.TrimSpace(cmd.Output), "\n") t.Assert(limits, c.HasLen, 3) t.Assert(limits[0], c.Equals, strconv.FormatInt(200*units.MiB, 10)) - t.Assert(limits[1], c.Equals, strconv.FormatInt(1024, 10)) + t.Assert(limits[1], c.Equals, strconv.FormatInt(expectedCPU, 10)) t.Assert(limits[2], c.Equals, strconv.FormatInt(9000, 10)) } diff --git a/test/test_controller.go b/test/test_controller.go index 0c06bbec21..51d962d197 100755 --- a/test/test_controller.go +++ b/test/test_controller.go @@ -212,9 +212,10 @@ func (s *ControllerSuite) TestResourceLimitsOneOffJob(t *c.C) { app, release := s.createApp(t) rwc, err := s.controllerClient(t).RunJobAttached(app.ID, &ct.NewJob{ - ReleaseID: release.ID, - Args: []string{"sh", "-c", resourceCmd}, - Resources: testResources(), + ReleaseID: release.ID, + Args: []string{"sh", "-c", resourceCmd}, + Resources: testResources(), + DisableLog: true, }) t.Assert(err, c.IsNil) attachClient := cluster.NewAttachClient(rwc) diff --git a/test/test_host.go b/test/test_host.go index 61e7ce08c5..b8310d7db2 100755 --- a/test/test_host.go +++ b/test/test_host.go @@ -220,10 +220,10 @@ func (a *IshApp) Cleanup() { } /* - Make an 'ish' application on the given host, returning it when - it has registered readiness with discoverd. +Make an 'ish' application on the given host, returning it when +it has registered readiness with discoverd. - User will want to defer a.Cleanup() to clean up. +User will want to defer a.Cleanup() to clean up. */ func (s *Helper) makeIshApp(t *c.C, a *IshApp) (*IshApp, error) { // pick a unique string to use as service name so this works with concurrent tests. @@ -430,7 +430,7 @@ func (s *HostSuite) TestResourceLimits(t *c.C) { s.clusterClient(t), s.createArtifact(t, "test-apps"), &host.Job{ - Config: host.ContainerConfig{Args: []string{"sh", "-c", resourceCmd}}, + Config: host.ContainerConfig{Args: []string{"sh", "-c", resourceCmd}, DisableLog: true}, Resources: testResources(), }, ) diff --git a/vendor/github.com/miekg/dns/clientconfig.go b/vendor/github.com/miekg/dns/clientconfig.go index cfa9ad0b22..cd25a7d7d6 100755 --- a/vendor/github.com/miekg/dns/clientconfig.go +++ b/vendor/github.com/miekg/dns/clientconfig.go @@ -83,7 +83,7 @@ func ClientConfigFromFile(resolvconf string) (*ClientConfig, error) { n = 1 } c.Timeout = n - case len(s) >= 8 && s[:9] == "attempts:": + case len(s) >= 9 && s[:9] == "attempts:": n, _ := strconv.Atoi(s[9:]) if n < 1 { n = 1 diff --git a/vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/apply_raw.go b/vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/apply_raw.go index 512fd70010..707ebfc319 100755 --- a/vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/apply_raw.go +++ b/vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs/apply_raw.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package fs @@ -270,7 +271,9 @@ func (m *Manager) Set(container *configs.Config) error { } } - if m.Paths["cpu"] != "" { + // CheckCpushares reads cpu.shares which only exists on cgroups v1. + // On cgroups v2, cpu.weight is used instead and is already set by CpuGroupV2.Set(). + if m.Paths["cpu"] != "" && !cgroups.IsCgroup2UnifiedMode() { if err := CheckCpushares(m.Paths["cpu"], container.Cgroups.Resources.CpuShares); err != nil { return err } diff --git a/vendor/github.com/opencontainers/runc/libcontainer/notify_linux.go b/vendor/github.com/opencontainers/runc/libcontainer/notify_linux.go index 47a06783d6..b10666a6d9 100755 --- a/vendor/github.com/opencontainers/runc/libcontainer/notify_linux.go +++ b/vendor/github.com/opencontainers/runc/libcontainer/notify_linux.go @@ -1,13 +1,17 @@ +//go:build linux // +build linux package libcontainer import ( + "bufio" "fmt" "io/ioutil" "os" "path/filepath" + "strings" + "github.com/opencontainers/runc/libcontainer/cgroups" "golang.org/x/sys/unix" ) @@ -67,6 +71,10 @@ func registerMemoryEvent(cgDir string, evName string, arg string) (<-chan struct // notifyOnOOM returns channel on which you can expect event about OOM, // if process died without OOM this channel will be closed. func notifyOnOOM(paths map[string]string) (<-chan struct{}, error) { + if cgroups.IsCgroup2UnifiedMode() { + return notifyOnOOMV2(paths) + } + dir := paths[oomCgroupName] if dir == "" { return nil, fmt.Errorf("path %q missing", oomCgroupName) @@ -75,6 +83,78 @@ func notifyOnOOM(paths map[string]string) (<-chan struct{}, error) { return registerMemoryEvent(dir, "memory.oom_control", "") } +// notifyOnOOMV2 uses inotify on memory.events to detect OOM kills on cgroups v2. +// On v2, the memory.events file contains key-value pairs including "oom_kill N". +// When an OOM kill occurs, the kernel modifies this file. +func notifyOnOOMV2(paths map[string]string) (<-chan struct{}, error) { + dir := paths[oomCgroupName] + if dir == "" { + // On cgroups v2, paths are keyed by "" (unified) not "memory" + dir = paths[""] + } + if dir == "" { + return nil, fmt.Errorf("path for memory cgroup missing") + } + + eventsPath := filepath.Join(dir, "memory.events") + if _, err := os.Stat(eventsPath); err != nil { + return nil, fmt.Errorf("cannot access memory.events: %s", err) + } + + fd, err := unix.InotifyInit1(unix.IN_CLOEXEC) + if err != nil { + return nil, fmt.Errorf("inotify_init1: %s", err) + } + + _, err = unix.InotifyAddWatch(fd, eventsPath, unix.IN_MODIFY) + if err != nil { + unix.Close(fd) + return nil, fmt.Errorf("inotify_add_watch: %s", err) + } + + ch := make(chan struct{}) + go func() { + defer func() { + unix.Close(fd) + close(ch) + }() + buf := make([]byte, 4096) + for { + _, err := unix.Read(fd, buf) + if err != nil { + return + } + // Check if an OOM kill actually happened by reading memory.events + if oomKillCount(eventsPath) > 0 { + ch <- struct{}{} + } + } + }() + return ch, nil +} + +// oomKillCount reads memory.events and returns the oom_kill counter value. +func oomKillCount(eventsPath string) uint64 { + f, err := os.Open(eventsPath) + if err != nil { + return 0 + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "oom_kill ") { + parts := strings.Fields(line) + if len(parts) == 2 { + var n uint64 + fmt.Sscanf(parts[1], "%d", &n) + return n + } + } + } + return 0 +} + func notifyMemoryPressure(paths map[string]string, level PressureLevel) (<-chan struct{}, error) { dir := paths[oomCgroupName] if dir == "" {