diff --git a/README.md b/README.md index b8d53ce..b3fdce1 100644 --- a/README.md +++ b/README.md @@ -200,7 +200,6 @@ Run with config file: | `DUCKGRES_PROCESS_ISOLATION` | Enable process isolation (`1` or `true`) | `false` | | `DUCKGRES_IDLE_TIMEOUT` | Connection idle timeout (e.g., `30m`, `1h`, `-1` to disable) | `24h` | | `DUCKGRES_K8S_SHARED_WARM_TARGET` | Neutral shared warm-worker target for K8s multi-tenant mode (`0` disables prewarm) | `0` | -| `DUCKGRES_K8S_SHARED_WARM_WORKERS` | Enable the reserve -> activate -> hot shared warm-worker path in K8s multi-tenant mode | `false` | | `DUCKGRES_DUCKLAKE_METADATA_STORE` | DuckLake metadata connection string | - | | `POSTHOG_API_KEY` | PostHog project API key (`phc_...`); enables log export | - | | `POSTHOG_HOST` | PostHog ingest host | `us.i.posthog.com` | @@ -601,7 +600,7 @@ kill -USR2 In Kubernetes environments, `--worker-backend remote` is now the multitenant path only. It requires `--config-store`, and the control plane then spawns worker pods via the Kubernetes API, communicates with them over gRPC (Arrow Flight SQL), and uses owner references for automatic garbage collection when the control plane pod is deleted. -The shared warm-worker activation path is gated by `--k8s-shared-warm-workers` / `k8s.shared_warm_workers`. Its default is `false`, which keeps the existing remote behavior; when enabled, newly reserved warm workers must receive tenant runtime over the activation RPC before they can serve sessions. +When a shared warm-worker target is configured (`--k8s-shared-warm-target`), the pool keeps workers neutral at startup, reserves them per org, activates tenant runtime over the activation RPC, and retires them after use. The full lifecycle is: idle → reserved → activating → hot → draining → retired. ```bash # Local multitenant K8s workflow @@ -612,7 +611,7 @@ See [`k8s/README.md`](k8s/README.md) for the full architecture, configuration re On the multi-tenant path, the config store now keeps per-team managed-warehouse metadata in addition to team/user auth and limits. That team-scoped contract is intended to become the source of truth for the tenant warehouse DB, the tenant DuckLake metadata store (which may live on shared Aurora or a dedicated RDS instance), object-store settings, worker identity, secret references, and provisioning state. The older config-store `DuckLakeConfig` singleton remains only as a legacy cluster-wide setting and should not be treated as authoritative for multi-tenant runtime wiring. -When `DUCKGRES_K8S_SHARED_WARM_WORKERS=true`, the shared K8s pool keeps workers neutral at startup, reserves them per team, activates tenant runtime over the control-plane RPC channel, and retires them after use. Leave it disabled to keep the compatibility path during rollout. +The shared K8s pool keeps workers neutral at startup, reserves them per org, activates tenant runtime over the control-plane RPC channel, and retires them after use. Managed-warehouse contract notes: diff --git a/config_resolution.go b/config_resolution.go index a0a7b51..63fa1dd 100644 --- a/config_resolution.go +++ b/config_resolution.go @@ -52,7 +52,6 @@ type configCLIInputs struct { K8sWorkerServiceAccount string K8sMaxWorkers int K8sSharedWarmTarget int - K8sSharedWarmWorkers bool QueryLog bool } @@ -74,7 +73,6 @@ type resolvedConfig struct { K8sWorkerServiceAccount string K8sMaxWorkers int K8sSharedWarmTarget int - K8sSharedWarmWorkers bool ConfigStoreConn string ConfigPollInterval time.Duration AdminToken string @@ -131,7 +129,6 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun var k8sWorkerPort int var k8sWorkerSecret, k8sWorkerConfigMap, k8sWorkerImagePullPolicy, k8sWorkerServiceAccount string var k8sMaxWorkers, k8sSharedWarmTarget int - var k8sSharedWarmWorkers bool var configStoreConn string var configPollInterval time.Duration var adminToken string @@ -385,9 +382,6 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun if fileCfg.K8s.SharedWarmTarget != 0 { k8sSharedWarmTarget = fileCfg.K8s.SharedWarmTarget } - if fileCfg.K8s.SharedWarmWorkers { - k8sSharedWarmWorkers = true - } } if v := getenv("DUCKGRES_HOST"); v != "" { @@ -635,14 +629,6 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun warn("Invalid DUCKGRES_K8S_SHARED_WARM_TARGET: " + err.Error()) } } - if v := getenv("DUCKGRES_K8S_SHARED_WARM_WORKERS"); v != "" { - if b, err := strconv.ParseBool(v); err == nil { - k8sSharedWarmWorkers = b - } else { - warn("Invalid DUCKGRES_K8S_SHARED_WARM_WORKERS: " + err.Error()) - } - } - // Query log env vars if v := getenv("DUCKGRES_QUERY_LOG_ENABLED"); v != "" { if b, err := strconv.ParseBool(v); err == nil { @@ -839,9 +825,6 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun if cli.Set["k8s-shared-warm-target"] { k8sSharedWarmTarget = cli.K8sSharedWarmTarget } - if cli.Set["k8s-shared-warm-workers"] { - k8sSharedWarmWorkers = cli.K8sSharedWarmWorkers - } if cli.Set["query-log"] { cfg.QueryLog.Enabled = cli.QueryLog } @@ -911,7 +894,6 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun K8sWorkerServiceAccount: k8sWorkerServiceAccount, K8sMaxWorkers: k8sMaxWorkers, K8sSharedWarmTarget: k8sSharedWarmTarget, - K8sSharedWarmWorkers: k8sSharedWarmWorkers, ConfigStoreConn: configStoreConn, ConfigPollInterval: configPollInterval, AdminToken: adminToken, diff --git a/controlplane/control.go b/controlplane/control.go index 22a2998..4d79bd2 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -80,7 +80,6 @@ type K8sConfig struct { ServiceAccount string // ServiceAccount name for worker pods (default: "default") MaxWorkers int // Global cap for the shared K8s worker pool (0 = auto-derived) SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled) - SharedWarmWorkers bool // Enable reserve->activate->hot lifecycle on the shared warm pool } // ControlPlane manages the TCP listener and routes connections to Flight SQL workers. diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 1c9de2e..a6c2e06 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -28,6 +28,8 @@ import ( "k8s.io/client-go/tools/cache" ) +const defaultActivatingTimeout = 2 * time.Minute + // K8sWorkerPool manages worker pods in Kubernetes. type K8sWorkerPool struct { mu sync.RWMutex @@ -54,7 +56,6 @@ type K8sWorkerPool struct { memoryBudget int64 // total memory budget in bytes orgID string // org ID for pod labels (multi-tenant mode) workerIDGenerator func() int // shared ID generator across orgs (nil = internal counter) - sharedWarmActivation bool cachedToken string // cached bearer token (immutable after setup) informer cache.SharedIndexInformer stopInform chan struct{} @@ -64,6 +65,8 @@ type K8sWorkerPool struct { spawnWarmWorkerFunc func(ctx context.Context, id int) error spawnWarmWorkerBackgroundFunc func(id int) activateTenantFunc func(ctx context.Context, worker *ManagedWorker, payload TenantActivationPayload) error + + activatingTimeout time.Duration // max time a worker can stay in reserved/activating before being reaped } // NewK8sWorkerPool creates a K8sWorkerPool using in-cluster credentials. @@ -128,7 +131,6 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( memoryBudget: cfg.MemoryBudget, orgID: cfg.OrgID, workerIDGenerator: cfg.WorkerIDGenerator, - sharedWarmActivation: cfg.SharedWarmActivation, spawnSem: make(chan struct{}, spawnConcurrency), } @@ -429,12 +431,10 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { }, }, } - if p.sharedWarmActivation { - pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ - Name: "DUCKGRES_SHARED_WARM_WORKER", - Value: "true", - }) - } + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "DUCKGRES_SHARED_WARM_WORKER", + Value: "true", + }) // Add writable data directory for DuckDB databases pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ @@ -508,6 +508,7 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { p.mu.Lock() p.workers[id] = w workerCount := len(p.workers) + observeWarmPoolLifecycleGauges(p.workers) p.mu.Unlock() observeControlPlaneWorkers(workerCount) @@ -638,6 +639,9 @@ func (p *K8sWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, erro idle := p.findIdleWorkerLocked() if idle != nil { idle.activeSessions++ + if idle.activeSessions > idle.peakSessions { + idle.peakSessions = idle.activeSessions + } p.mu.Unlock() slog.Debug("Reusing idle worker.", "worker", idle.ID, "active_sessions", idle.activeSessions) return idle, nil @@ -653,6 +657,9 @@ func (p *K8sWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, erro w := p.leastLoadedWorkerLocked() if w != nil { w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } if canSpawn { id := p.allocateWorkerIDLocked() p.spawning++ @@ -692,6 +699,9 @@ func (p *K8sWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, erro } p.mu.Lock() w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } p.mu.Unlock() return w, nil } @@ -738,13 +748,17 @@ func (p *K8sWorkerPool) ReleaseWorker(id int) { // RetireWorker removes a worker from the pool and deletes its pod. func (p *K8sWorkerPool) RetireWorker(id int) { + p.retireWorkerWithReason(id, RetireReasonNormal) +} + +func (p *K8sWorkerPool) retireWorkerWithReason(id int, reason string) { p.mu.Lock() w, ok := p.workers[id] if !ok { p.mu.Unlock() return } - p.markWorkerRetiredLocked(w) + p.markWorkerRetiredLocked(w, reason) delete(p.workers, id) workerCount := len(p.workers) p.mu.Unlock() @@ -755,6 +769,10 @@ func (p *K8sWorkerPool) RetireWorker(id int) { // RetireWorkerIfNoSessions retires a worker only if it has no active sessions. func (p *K8sWorkerPool) RetireWorkerIfNoSessions(id int) bool { + return p.retireWorkerIfNoSessionsWithReason(id, RetireReasonNormal) +} + +func (p *K8sWorkerPool) retireWorkerIfNoSessionsWithReason(id int, reason string) bool { p.mu.Lock() w, ok := p.workers[id] if !ok { @@ -765,7 +783,7 @@ func (p *K8sWorkerPool) RetireWorkerIfNoSessions(id int) bool { w.activeSessions-- } if w.activeSessions == 0 { - p.markWorkerRetiredLocked(w) + p.markWorkerRetiredLocked(w, reason) delete(p.workers, id) workerCount := len(p.workers) p.mu.Unlock() @@ -819,7 +837,7 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana } if err := activate(ctx, worker, payload); err != nil { - p.RetireWorker(worker.ID) + p.retireWorkerWithReason(worker.ID, RetireReasonActivationFailure) return err } @@ -832,7 +850,11 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana if err != nil { return err } - return worker.SetSharedState(nextState) + if setErr := worker.SetSharedState(nextState); setErr != nil { + return setErr + } + observeWarmPoolLifecycleGauges(p.workers) + return nil } // ReserveSharedWorker reserves a neutral warm worker for later tenant activation. @@ -867,6 +889,8 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor p.mu.Unlock() return nil, err } + idle.reservedAt = time.Now() + observeWarmPoolLifecycleGauges(p.workers) if p.shouldReplenishWarmCapacityLocked() { id := p.allocateWorkerIDLocked() @@ -889,6 +913,9 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor p.mu.Lock() p.spawning-- + if err == nil { + observeWarmPoolLifecycleGauges(p.workers) + } p.mu.Unlock() if err != nil { @@ -934,17 +961,18 @@ func (p *K8sWorkerPool) SpawnMinWorkers(count int) error { ctx := context.Background() - for _, id := range ids { - if err := p.spawnWarmWorker(ctx, id); err != nil { + for _, id := range ids { + if err := p.spawnWarmWorker(ctx, id); err != nil { + p.mu.Lock() + p.spawning-- + p.mu.Unlock() + return err + } p.mu.Lock() p.spawning-- + observeWarmPoolLifecycleGauges(p.workers) p.mu.Unlock() - return err } - p.mu.Lock() - p.spawning-- - p.mu.Unlock() - } return nil } @@ -1089,6 +1117,7 @@ func (p *K8sWorkerPool) ShutdownAll() { p.shuttingDown = true workers := make([]*ManagedWorker, 0, len(p.workers)) for _, w := range p.workers { + p.markWorkerRetiredLocked(w, RetireReasonShutdown) workers = append(workers, w) } p.mu.Unlock() @@ -1129,11 +1158,9 @@ func (p *K8sWorkerPool) retireWorkerPod(id int, w *ManagedWorker) { }) } -// idleReaper periodically retires workers that have been idle too long. +// idleReaper periodically retires workers that have been idle too long and +// reaps stuck activating/reserved workers. func (p *K8sWorkerPool) idleReaper() { - if p.idleTimeout <= 0 { - return - } ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() @@ -1142,7 +1169,10 @@ func (p *K8sWorkerPool) idleReaper() { case <-p.shutdownCh: return case <-ticker.C: - p.reapIdleWorkers() + if p.idleTimeout > 0 { + p.reapIdleWorkers() + } + p.reapStuckActivatingWorkers() } } } @@ -1165,6 +1195,7 @@ func (p *K8sWorkerPool) reapIdleWorkers() { break } if p.isWarmIdleWorkerLocked(w) && !w.lastUsed.IsZero() && now.Sub(w.lastUsed) > p.idleTimeout { + p.markWorkerRetiredLocked(w, RetireReasonIdleTimeout) toRetire = append(toRetire, struct { id int w *ManagedWorker @@ -1185,6 +1216,61 @@ func (p *K8sWorkerPool) reapIdleWorkers() { } } +// reapStuckActivatingWorkers retires workers that have been in reserved or +// activating state for longer than the activating timeout. +func (p *K8sWorkerPool) reapStuckActivatingWorkers() { + timeout := p.activatingTimeout + if timeout <= 0 { + timeout = defaultActivatingTimeout + } + + p.mu.Lock() + var toRetire []struct { + id int + w *ManagedWorker + } + now := time.Now() + for id, w := range p.workers { + select { + case <-w.done: + continue + default: + } + lifecycle := w.SharedState().NormalizedLifecycle() + if (lifecycle == WorkerLifecycleReserved || lifecycle == WorkerLifecycleActivating) && + !w.reservedAt.IsZero() && now.Sub(w.reservedAt) > timeout { + p.markWorkerRetiredLocked(w, RetireReasonStuckActivating) + toRetire = append(toRetire, struct { + id int + w *ManagedWorker + }{id, w}) + delete(p.workers, id) + } + } + + var spawnIDs []int + for range toRetire { + if p.shouldReplenishWarmCapacityLocked() { + id := p.allocateWorkerIDLocked() + p.spawning++ + spawnIDs = append(spawnIDs, id) + } + } + workerCount := len(p.workers) + p.mu.Unlock() + + if len(toRetire) > 0 { + slog.Warn("Reaping stuck activating workers.", "count", len(toRetire)) + observeControlPlaneWorkers(workerCount) + for _, entry := range toRetire { + go p.retireWorkerPod(entry.id, entry.w) + } + for _, id := range spawnIDs { + p.spawnWarmWorkerBackground(id) + } + } +} + // --- Shared scheduling helpers (same logic as FlightWorkerPool) --- func (p *K8sWorkerPool) findIdleWorkerLocked() *ManagedWorker { @@ -1311,6 +1397,7 @@ func (p *K8sWorkerPool) removeWorkerLocked(id int) (*ManagedWorker, int, int, bo if !ok { return nil, len(p.workers), 0, false } + p.markWorkerRetiredLocked(w, RetireReasonCrash) delete(p.workers, id) workerCount := len(p.workers) if !p.shouldReplenishWarmCapacityLocked() { @@ -1384,12 +1471,17 @@ func (p *K8sWorkerPool) spawnWarmWorkerBackground(id int) { go p.spawnWorkerBackground(id) } -func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker) { +func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker, reason string) { + if w.SharedState().NormalizedLifecycle() == WorkerLifecycleHot { + observeHotWorkerSessions(w.peakSessions) + } nextState, err := w.SharedState().Transition(WorkerLifecycleRetired, nil) if err != nil { return } _ = w.SetSharedState(nextState) + observeWorkerRetirement(reason) + observeWarmPoolLifecycleGauges(p.workers) } // podNameForWorker returns the pod name for a given worker ID, diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index cd3dab5..8844b16 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -137,7 +137,6 @@ func SetupMultiTenant( ImagePullPolicy: cfg.K8s.ImagePullPolicy, ServiceAccount: cfg.K8s.ServiceAccount, MemoryBudget: int64(memBudget), - SharedWarmActivation: cfg.K8s.SharedWarmWorkers, } router, err := NewOrgRouter(store, baseCfg, cfg, srv) diff --git a/controlplane/org_reserved_pool.go b/controlplane/org_reserved_pool.go index b18202f..84043dc 100644 --- a/controlplane/org_reserved_pool.go +++ b/controlplane/org_reserved_pool.go @@ -5,6 +5,7 @@ package controlplane import ( "context" "fmt" + "log/slog" "time" "github.com/posthog/duckgres/controlplane/configstore" @@ -20,7 +21,6 @@ type OrgReservedPool struct { orgID string maxWorkers int leaseDuration time.Duration - sharedWarmWorkers bool resolveOrgConfig func() (*configstore.OrgConfig, error) activateReservedWorker func(context.Context, *ManagedWorker, *configstore.OrgConfig) error } @@ -54,6 +54,9 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context) (*ManagedWorker, er if idle := p.findIdleAssignedWorkerLocked(); idle != nil { idle.activeSessions++ + if idle.activeSessions > idle.peakSessions { + idle.peakSessions = idle.activeSessions + } p.shared.mu.Unlock() return idle, nil } @@ -70,16 +73,19 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context) (*ManagedWorker, er return nil, err } - if p.sharedWarmWorkers { - if err := p.activateWorkerForOrg(ctx, worker); err != nil { - p.shared.RetireWorker(worker.ID) - return nil, err - } + if err := p.activateWorkerForOrg(ctx, worker); err != nil { + slog.Warn("Worker activation failed.", "worker", worker.ID, "org", p.orgID, "error", err) + observeActivationFailure() + p.shared.retireWorkerWithReason(worker.ID, RetireReasonActivationFailure) + return nil, err } p.shared.mu.Lock() if owned := p.workerBelongsToOrgLocked(worker); owned { worker.activeSessions++ + if worker.activeSessions > worker.peakSessions { + worker.peakSessions = worker.activeSessions + } p.shared.mu.Unlock() return worker, nil } @@ -89,6 +95,9 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context) (*ManagedWorker, er if w := p.leastLoadedAssignedWorkerLocked(); w != nil { w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } p.shared.mu.Unlock() return w, nil } @@ -143,12 +152,6 @@ func (p *OrgReservedPool) SetMaxWorkers(n int) { p.maxWorkers = n } -func (p *OrgReservedPool) EnableSharedWarmActivation(enabled bool) { - p.shared.mu.Lock() - defer p.shared.mu.Unlock() - p.sharedWarmWorkers = enabled -} - func (p *OrgReservedPool) ShutdownAll() { p.shared.mu.RLock() workers := make([]int, 0, len(p.shared.workers)) @@ -160,7 +163,7 @@ func (p *OrgReservedPool) ShutdownAll() { p.shared.mu.RUnlock() for _, id := range workers { - p.shared.RetireWorker(id) + p.shared.retireWorkerWithReason(id, RetireReasonShutdown) } } @@ -220,9 +223,6 @@ func (p *OrgReservedPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool if !p.workerBelongsToOrgLocked(w) { return false } - if !p.sharedWarmWorkers { - return true - } return w.SharedState().NormalizedLifecycle() == WorkerLifecycleHot } @@ -239,6 +239,7 @@ func (p *OrgReservedPool) activateWorkerForOrg(ctx context.Context, worker *Mana p.shared.mu.Unlock() return err } + observeWarmPoolLifecycleGauges(p.shared.workers) } p.shared.mu.Unlock() @@ -259,8 +260,18 @@ func (p *OrgReservedPool) activateWorkerForOrg(ctx context.Context, worker *Mana if err != nil { return err } - return worker.SetSharedState(nextState) + if err := worker.SetSharedState(nextState); err != nil { + return err + } + if !worker.reservedAt.IsZero() { + observeActivationDuration(time.Since(worker.reservedAt)) + } + observeWarmPoolLifecycleGauges(p.shared.workers) + return nil case WorkerLifecycleHot: + if !worker.reservedAt.IsZero() { + observeActivationDuration(time.Since(worker.reservedAt)) + } return nil default: return fmt.Errorf("worker %d finished activation in unexpected lifecycle %q", worker.ID, worker.SharedState().NormalizedLifecycle()) @@ -268,9 +279,6 @@ func (p *OrgReservedPool) activateWorkerForOrg(ctx context.Context, worker *Mana } func (p *OrgReservedPool) activateReservedWorkerDefault(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { - if !p.sharedWarmWorkers { - return nil - } if org == nil { return fmt.Errorf("org config is required for activation") } diff --git a/controlplane/org_reserved_pool_test.go b/controlplane/org_reserved_pool_test.go index c373ecc..9a7ac5d 100644 --- a/controlplane/org_reserved_pool_test.go +++ b/controlplane/org_reserved_pool_test.go @@ -20,6 +20,12 @@ func TestOrgReservedPoolAcquireReservesOrgWorker(t *testing.T) { } pool := NewOrgReservedPool(shared, "analytics", 2) + pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return &configstore.OrgConfig{Name: "analytics"}, nil + } + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + return nil + } worker, err := pool.AcquireWorker(context.Background()) if err != nil { t.Fatalf("AcquireWorker: %v", err) @@ -32,8 +38,8 @@ func TestOrgReservedPoolAcquireReservesOrgWorker(t *testing.T) { if state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected analytics assignment, got %#v", state.Assignment) } - if state.Lifecycle != WorkerLifecycleReserved { - t.Fatalf("expected reserved lifecycle, got %q", state.Lifecycle) + if state.Lifecycle != WorkerLifecycleHot { + t.Fatalf("expected hot lifecycle after activation, got %q", state.Lifecycle) } } @@ -59,6 +65,12 @@ func TestOrgReservedPoolAcquireSkipsOtherOrgsWorkers(t *testing.T) { } pool := NewOrgReservedPool(shared, "analytics", 2) + pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return &configstore.OrgConfig{Name: "analytics"}, nil + } + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + return nil + } worker, err := pool.AcquireWorker(context.Background()) if err != nil { t.Fatalf("AcquireWorker: %v", err) @@ -75,7 +87,7 @@ func TestOrgReservedPoolReleaseWorkerRetiresOnLastSession(t *testing.T) { shared, _ := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 9, activeSessions: 1, done: make(chan struct{})} if err := worker.SetSharedState(SharedWorkerState{ - Lifecycle: WorkerLifecycleReserved, + Lifecycle: WorkerLifecycleHot, Assignment: &WorkerAssignment{ OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), @@ -105,7 +117,6 @@ func TestOrgReservedWorkerPoolAcquireActivatesReservedWorkerWhenEnabledWithOrgCo activated := false pool := NewOrgReservedPool(shared, "analytics", 2) - pool.sharedWarmWorkers = true pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { return &configstore.OrgConfig{ Name: "analytics", @@ -151,7 +162,6 @@ func TestOrgReservedWorkerPoolAcquireActivatesUsingLatestResolvedOrgConfig(t *te } pool := NewOrgReservedPool(shared, "analytics", 2) - pool.sharedWarmWorkers = true pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { return currentOrg, nil } diff --git a/controlplane/org_router.go b/controlplane/org_router.go index 04190bf..44e6ede 100644 --- a/controlplane/org_router.go +++ b/controlplane/org_router.go @@ -105,8 +105,6 @@ func (tr *OrgRouter) createOrgStack(tc *configstore.OrgConfig) (*OrgStack, error } return org, nil } - pool.EnableSharedWarmActivation(tr.globalCfg.K8s.SharedWarmWorkers) - rebalancer := NewMemoryRebalancer(uint64(memoryBudget), 0, nil, tr.globalCfg.MemoryRebalance) sessions := NewSessionManager(pool, rebalancer) rebalancer.SetSessionLister(sessions) @@ -216,9 +214,6 @@ func (tr *OrgRouter) HandleConfigChange(old, new *configstore.Snapshot) { } stack.Pool.SetMaxWorkers(maxWorkers) } - if reserved, ok := stack.Pool.(*OrgReservedPool); ok { - reserved.EnableSharedWarmActivation(tr.globalCfg.K8s.SharedWarmWorkers) - } } tr.mu.Unlock() } diff --git a/controlplane/warm_pool_metrics.go b/controlplane/warm_pool_metrics.go new file mode 100644 index 0000000..bb439fa --- /dev/null +++ b/controlplane/warm_pool_metrics.go @@ -0,0 +1,120 @@ +//go:build kubernetes + +package controlplane + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// --- Warm-worker lifecycle gauges --- + +var warmWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_warm_workers", + Help: "Number of idle (unassigned) warm workers in the shared pool", +}) + +var reservedWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_reserved_workers", + Help: "Number of workers reserved for an org but not yet activated", +}) + +var activatingWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_activating_workers", + Help: "Number of workers currently in the activating state", +}) + +var hotWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_hot_workers", + Help: "Number of activated, tenant-bound workers serving sessions", +}) + +var drainingWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_draining_workers", + Help: "Number of workers currently draining sessions before retirement", +}) + +// --- Activation latency and failure counters --- + +var activationDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "duckgres_activation_duration_seconds", + Help: "Time from worker reservation to the worker becoming hot", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60}, +}) + +var activationFailuresCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_activation_failures_total", + Help: "Total number of failed worker activations", +}) + +// --- Retirement metrics --- + +var workerRetirementsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "duckgres_worker_retirements_total", + Help: "Total number of worker retirements", +}, []string{"reason"}) + +var hotWorkerSessionsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "duckgres_hot_worker_sessions_total", + Help: "Number of sessions served per hot worker at retirement time", + Buckets: []float64{0, 1, 2, 5, 10, 25, 50, 100}, +}) + +// Retirement reason constants. +const ( + RetireReasonNormal = "normal" + RetireReasonActivationFailure = "activation_failure" + RetireReasonLeaseExpiry = "lease_expiry" + RetireReasonCrash = "crash" + RetireReasonShutdown = "shutdown" + RetireReasonIdleTimeout = "idle_timeout" + RetireReasonStuckActivating = "stuck_activating" +) + +// observeWarmPoolLifecycleGauges recalculates all lifecycle gauges from the +// current worker map. Must be called with p.mu held (at least RLock). +func observeWarmPoolLifecycleGauges(workers map[int]*ManagedWorker) { + var idle, reserved, activating, hot, draining int + for _, w := range workers { + select { + case <-w.done: + continue + default: + } + switch w.SharedState().NormalizedLifecycle() { + case WorkerLifecycleIdle: + idle++ + case WorkerLifecycleReserved: + reserved++ + case WorkerLifecycleActivating: + activating++ + case WorkerLifecycleHot: + hot++ + case WorkerLifecycleDraining: + draining++ + } + } + warmWorkersGauge.Set(float64(idle)) + reservedWorkersGauge.Set(float64(reserved)) + activatingWorkersGauge.Set(float64(activating)) + hotWorkersGauge.Set(float64(hot)) + drainingWorkersGauge.Set(float64(draining)) +} + +func observeActivationDuration(d time.Duration) { + activationDurationHistogram.Observe(d.Seconds()) +} + +func observeActivationFailure() { + activationFailuresCounter.Inc() +} + +func observeWorkerRetirement(reason string) { + workerRetirementsCounter.WithLabelValues(reason).Inc() +} + +func observeHotWorkerSessions(sessionCount int) { + hotWorkerSessionsHistogram.Observe(float64(sessionCount)) +} diff --git a/controlplane/warm_pool_metrics_test.go b/controlplane/warm_pool_metrics_test.go new file mode 100644 index 0000000..5412880 --- /dev/null +++ b/controlplane/warm_pool_metrics_test.go @@ -0,0 +1,308 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "testing" + "time" + + "github.com/posthog/duckgres/controlplane/configstore" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// resetMetrics resets all warm pool counters/histograms for test isolation. +func resetMetrics() { + workerRetirementsCounter.Reset() +} + +func TestObserveWarmPoolLifecycleGauges(t *testing.T) { + workers := map[int]*ManagedWorker{ + 1: makeTestWorker(WorkerLifecycleIdle, nil), + 2: makeTestWorker(WorkerLifecycleIdle, nil), + 3: makeTestWorker(WorkerLifecycleReserved, &WorkerAssignment{OrgID: "org-1", LeaseExpiresAt: time.Now().Add(time.Hour)}), + 4: makeTestWorker(WorkerLifecycleActivating, &WorkerAssignment{OrgID: "org-1", LeaseExpiresAt: time.Now().Add(time.Hour)}), + 5: makeTestWorker(WorkerLifecycleHot, &WorkerAssignment{OrgID: "org-2", LeaseExpiresAt: time.Now().Add(time.Hour)}), + 6: makeTestWorker(WorkerLifecycleHot, &WorkerAssignment{OrgID: "org-2", LeaseExpiresAt: time.Now().Add(time.Hour)}), + 7: makeTestWorker(WorkerLifecycleDraining, &WorkerAssignment{OrgID: "org-3", LeaseExpiresAt: time.Now().Add(time.Hour)}), + } + + observeWarmPoolLifecycleGauges(workers) + + assertGaugeValue(t, warmWorkersGauge, 2) + assertGaugeValue(t, reservedWorkersGauge, 1) + assertGaugeValue(t, activatingWorkersGauge, 1) + assertGaugeValue(t, hotWorkersGauge, 2) + assertGaugeValue(t, drainingWorkersGauge, 1) +} + +func TestObserveWarmPoolLifecycleGauges_SkipsDeadWorkers(t *testing.T) { + dead := makeTestWorker(WorkerLifecycleIdle, nil) + close(dead.done) // mark as dead + + workers := map[int]*ManagedWorker{ + 1: makeTestWorker(WorkerLifecycleIdle, nil), + 2: dead, + } + + observeWarmPoolLifecycleGauges(workers) + + assertGaugeValue(t, warmWorkersGauge, 1) +} + +func TestMarkWorkerRetiredLocked_RecordsRetirementMetric(t *testing.T) { + resetMetrics() + pool, _ := newTestK8sPool(t, 5) + + w := makeTestWorker(WorkerLifecycleIdle, nil) + pool.workers[1] = w + + pool.markWorkerRetiredLocked(w, RetireReasonIdleTimeout) + + if w.SharedState().NormalizedLifecycle() != WorkerLifecycleRetired { + t.Fatalf("expected retired, got %s", w.SharedState().NormalizedLifecycle()) + } + + val := counterLabelValue(workerRetirementsCounter, RetireReasonIdleTimeout) + if val != 1 { + t.Fatalf("expected 1 retirement with reason idle_timeout, got %v", val) + } +} + +func TestMarkWorkerRetiredLocked_RecordsHotWorkerSessions(t *testing.T) { + resetMetrics() + pool, _ := newTestK8sPool(t, 5) + + w := makeTestWorker(WorkerLifecycleHot, &WorkerAssignment{OrgID: "org-1", LeaseExpiresAt: time.Now().Add(time.Hour)}) + w.peakSessions = 5 + pool.workers[1] = w + + pool.markWorkerRetiredLocked(w, RetireReasonNormal) + + if w.SharedState().NormalizedLifecycle() != WorkerLifecycleRetired { + t.Fatalf("expected retired, got %s", w.SharedState().NormalizedLifecycle()) + } +} + +func TestReservedAtTracking(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + + w := makeTestWorker(WorkerLifecycleIdle, nil) + pool.workers[1] = w + + before := time.Now() + _, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "org-1", + LeaseExpiresAt: time.Now().Add(time.Hour), + }) + if err != nil { + t.Fatalf("ReserveSharedWorker failed: %v", err) + } + after := time.Now() + + if w.reservedAt.Before(before) || w.reservedAt.After(after) { + t.Fatalf("reservedAt %v not between %v and %v", w.reservedAt, before, after) + } +} + +func TestPeakSessionsTracking(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + + w := makeTestWorker(WorkerLifecycleIdle, nil) + pool.workers[1] = w + + // Simulate session assignments + pool.mu.Lock() + w.activeSessions = 3 + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } + pool.mu.Unlock() + + pool.mu.Lock() + w.activeSessions = 1 + pool.mu.Unlock() + + if w.peakSessions != 3 { + t.Fatalf("expected peakSessions=3, got %d", w.peakSessions) + } +} + +func TestSpawnMinWorkersUpdatesWarmWorkersGauge(t *testing.T) { + observeWarmPoolLifecycleGauges(map[int]*ManagedWorker{}) + pool, _ := newTestK8sPool(t, 5) + + pool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { + pool.mu.Lock() + pool.workers[id] = makeTestWorker(WorkerLifecycleIdle, nil) + pool.mu.Unlock() + return nil + } + + if err := pool.SpawnMinWorkers(1); err != nil { + t.Fatalf("SpawnMinWorkers failed: %v", err) + } + + assertGaugeValue(t, warmWorkersGauge, 1) +} + +func TestActivateWorkerForOrgUpdatesActivatingGauge(t *testing.T) { + observeWarmPoolLifecycleGauges(map[int]*ManagedWorker{}) + pool, _ := newTestK8sPool(t, 5) + worker := makeTestWorker(WorkerLifecycleReserved, &WorkerAssignment{ + OrgID: "org-1", + LeaseExpiresAt: time.Now().Add(time.Hour), + }) + worker.reservedAt = time.Now() + pool.workers[1] = worker + observeWarmPoolLifecycleGauges(pool.workers) + + orgPool := NewOrgReservedPool(pool, "org-1", 1) + orgPool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return &configstore.OrgConfig{Name: "org-1"}, nil + } + orgPool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + assertGaugeValue(t, reservedWorkersGauge, 0) + assertGaugeValue(t, activatingWorkersGauge, 1) + return nil + } + + if err := orgPool.activateWorkerForOrg(context.Background(), worker); err != nil { + t.Fatalf("activateWorkerForOrg failed: %v", err) + } +} + +func TestActivateWorkerForOrgRecordsActivationDurationWhenWorkerAlreadyHot(t *testing.T) { + observeWarmPoolLifecycleGauges(map[int]*ManagedWorker{}) + pool, _ := newTestK8sPool(t, 5) + worker := makeTestWorker(WorkerLifecycleReserved, &WorkerAssignment{ + OrgID: "org-1", + LeaseExpiresAt: time.Now().Add(time.Hour), + }) + worker.reservedAt = time.Now().Add(-2 * time.Second) + pool.workers[1] = worker + + orgPool := NewOrgReservedPool(pool, "org-1", 1) + orgPool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return &configstore.OrgConfig{Name: "org-1"}, nil + } + orgPool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + nextState, err := worker.SharedState().Transition(WorkerLifecycleHot, nil) + if err != nil { + return err + } + return worker.SetSharedState(nextState) + } + + before := metricHistogramCount(t, "duckgres_activation_duration_seconds") + if err := orgPool.activateWorkerForOrg(context.Background(), worker); err != nil { + t.Fatalf("activateWorkerForOrg failed: %v", err) + } + after := metricHistogramCount(t, "duckgres_activation_duration_seconds") + + if after-before != 1 { + t.Fatalf("expected activation duration histogram sample count delta 1, got %d", after-before) + } +} + +func TestReapStuckActivatingWorkers(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.minWorkers = 2 + pool.activatingTimeout = 50 * time.Millisecond + + spawnedIDs := make(chan int, 10) + pool.spawnWarmWorkerBackgroundFunc = func(id int) { + spawnedIDs <- id + } + + // One idle worker (healthy), one stuck activating worker + idle := makeTestWorker(WorkerLifecycleIdle, nil) + pool.workers[1] = idle + + stuck := makeTestWorker(WorkerLifecycleActivating, &WorkerAssignment{ + OrgID: "org-1", + LeaseExpiresAt: time.Now().Add(time.Hour), + }) + stuck.reservedAt = time.Now().Add(-time.Minute) // reserved 1 minute ago + pool.workers[2] = stuck + + pool.reapStuckActivatingWorkers() + + // Stuck worker should be removed + pool.mu.RLock() + _, stuckExists := pool.workers[2] + _, idleExists := pool.workers[1] + pool.mu.RUnlock() + + if stuckExists { + t.Fatal("stuck worker should have been reaped") + } + if !idleExists { + t.Fatal("idle worker should not have been reaped") + } + + // Should have spawned a replacement since minWorkers=2 and only 1 remains + select { + case <-spawnedIDs: + // good + case <-time.After(time.Second): + t.Fatal("expected replacement worker to be spawned") + } +} + +func TestReapStuckActivatingWorkers_RecentlyReservedNotReaped(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.activatingTimeout = 2 * time.Minute + + w := makeTestWorker(WorkerLifecycleActivating, &WorkerAssignment{ + OrgID: "org-1", + LeaseExpiresAt: time.Now().Add(time.Hour), + }) + w.reservedAt = time.Now() // just reserved + pool.workers[1] = w + + pool.reapStuckActivatingWorkers() + + pool.mu.RLock() + _, exists := pool.workers[1] + pool.mu.RUnlock() + + if !exists { + t.Fatal("recently reserved worker should not be reaped") + } +} + +// --- Helpers --- + +func makeTestWorker(lifecycle WorkerLifecycleState, assignment *WorkerAssignment) *ManagedWorker { + w := &ManagedWorker{ + done: make(chan struct{}), + } + state := SharedWorkerState{Lifecycle: lifecycle, Assignment: assignment} + _ = w.SetSharedState(state) + return w +} + +func assertGaugeValue(t *testing.T, gauge prometheus.Gauge, expected float64) { + t.Helper() + m := &dto.Metric{} + if err := gauge.Write(m); err != nil { + t.Fatalf("failed to read gauge: %v", err) + } + if got := m.GetGauge().GetValue(); got != expected { + t.Fatalf("expected gauge value %v, got %v", expected, got) + } +} + +func counterLabelValue(cv *prometheus.CounterVec, label string) float64 { + m := &dto.Metric{} + counter, err := cv.GetMetricWithLabelValues(label) + if err != nil { + return 0 + } + if err := counter.Write(m); err != nil { + return 0 + } + return m.GetCounter().GetValue() +} diff --git a/controlplane/worker_mgr.go b/controlplane/worker_mgr.go index 4acb25e..c941c8f 100644 --- a/controlplane/worker_mgr.go +++ b/controlplane/worker_mgr.go @@ -38,6 +38,8 @@ type ManagedWorker struct { activeSessions int // Number of sessions currently assigned to this worker lastUsed time.Time // Last time a session was destroyed on this worker sharedState SharedWorkerState + reservedAt time.Time //nolint:unused // only set in kubernetes warm-pool reservation path + peakSessions int // High-water mark of concurrent sessions (for retirement metrics) } // SharedState returns the additive shared warm-worker lifecycle metadata for @@ -541,6 +543,9 @@ func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, e idle := p.findIdleWorkerLocked() if idle != nil { idle.activeSessions++ + if idle.activeSessions > idle.peakSessions { + idle.peakSessions = idle.activeSessions + } p.mu.Unlock() return idle, nil } @@ -570,6 +575,9 @@ func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, e p.mu.Lock() w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } p.mu.Unlock() return w, nil } @@ -578,6 +586,9 @@ func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, e w := p.leastLoadedWorkerLocked() if w != nil { w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } p.mu.Unlock() return w, nil } @@ -614,6 +625,9 @@ func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, e p.mu.Lock() w.activeSessions++ + if w.activeSessions > w.peakSessions { + w.peakSessions = w.activeSessions + } p.mu.Unlock() return w, nil } diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 0f09c69..76a9f69 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -60,7 +60,6 @@ type K8sWorkerPoolConfig struct { MemoryBudget int64 // Total memory budget in bytes; used to derive per-worker resource limits OrgID string // Org ID for pod labels (multi-tenant mode) WorkerIDGenerator func() int // Shared ID generator across orgs (nil = internal counter) - SharedWarmActivation bool } // K8sPoolFactory creates a K8sWorkerPool. Registered at init time by the diff --git a/docs/runbooks/drain-hot-workers.md b/docs/runbooks/drain-hot-workers.md new file mode 100644 index 0000000..68460a2 --- /dev/null +++ b/docs/runbooks/drain-hot-workers.md @@ -0,0 +1,33 @@ +# Runbook: Drain Hot Workers + +## When to use + +- Rolling out a new worker image or DuckDB version +- Reducing capacity for a specific org +- Investigating a misbehaving worker that is still serving sessions + +## Metrics to watch + +| Metric | Alert threshold | What it means | +|--------|----------------|---------------| +| `duckgres_hot_workers` | Dropping faster than expected | Workers are retiring before replacements come up | +| `duckgres_draining_workers` | > 0 sustained | Workers are stuck draining (sessions not ending) | +| `duckgres_warm_workers` | < `minWorkers` | Replacement pool is depleted | + +## Procedure + +1. **Identify the worker(s) to drain.** Use pod labels and the control-plane logs to identify the worker pod serving the sessions you want to replace. + +2. **Delete the worker pod.** The control plane treats the worker as retired/crashed and replaces it if the pool is below `minWorkers`. + +3. **Watch replacement capacity.** Monitor `duckgres_warm_workers` and `duckgres_hot_workers` while the replacement comes up. + +4. **Verify replacement capacity.** After the worker retires, confirm that `duckgres_warm_workers` returns to the expected level. The pool auto-replenishes when idle count drops below `minWorkers`. + +## Rollback + +If draining causes capacity issues: + +1. Check `duckgres_warm_workers` — if it's 0, the pool is depleted and new sessions will block. +2. Increase `minWorkers` temporarily to force more pre-warmed capacity. +3. If a drain is stuck (worker stays in draining > 5 minutes), force-retire it by deleting the pod. The control plane will detect the crash and replenish. diff --git a/docs/runbooks/replenish-capacity.md b/docs/runbooks/replenish-capacity.md new file mode 100644 index 0000000..ebdb7ba --- /dev/null +++ b/docs/runbooks/replenish-capacity.md @@ -0,0 +1,46 @@ +# Runbook: Replenish Warm Pool Capacity + +## When to use + +- `duckgres_warm_workers` is 0 and sessions are queuing +- After a mass retirement event (rolling update, crash storm) +- Scaling up for anticipated traffic + +## Background + +The shared warm pool maintains `minWorkers` idle workers at all times. When a worker is reserved, retired, or crashes, the pool automatically spawns a replacement if the idle count drops below `minWorkers`. If the Kubernetes cluster cannot schedule new pods (resource pressure, node failures), the pool can become depleted. + +## Metrics to watch + +| Metric | Alert threshold | What it means | +|--------|----------------|---------------| +| `duckgres_warm_workers` | < 1 for > 30s | No idle capacity, new sessions must wait for a spawn | +| `duckgres_hot_workers` | Near `maxWorkers` | Pool is at capacity, may need to scale `maxWorkers` | +| `duckgres_worker_retirements_total{reason="crash"}` | Spike | Workers are crashing, replacements may also crash | + +## Procedure + +1. **Check why capacity is low.** Look at retirement reasons: + - `crash` — workers are dying, check pod logs and OOM events + - `idle_timeout` — workers are being reaped, increase `minWorkers` or decrease `idleTimeout` + - `stuck_activating` — activation is broken, see [stuck-activating-workers](stuck-activating-workers.md) + +2. **Check Kubernetes scheduling.** If pods are Pending: + ```bash + kubectl get pods -l app=duckgres-worker --field-selector status.phase=Pending + kubectl describe pod + ``` + Common causes: insufficient memory/CPU, node pool at max size, resource quotas. + +3. **Scale up if needed.** + - Increase `minWorkers` in the control plane config to pre-warm more workers + - Increase `maxWorkers` if the pool is legitimately at capacity + - Scale the Kubernetes node pool if scheduling is the bottleneck + +4. **Verify recovery.** After scaling, confirm: + - `duckgres_warm_workers` returns to `minWorkers` + - New sessions are no longer queuing (check `duckgres_worker_acquire_duration_seconds` if available) + +## Emergency: Force-spawn workers + +If the auto-replenishment loop is stuck, restart the control plane pod. On startup it calls `SpawnMinWorkers(minWorkers)` which synchronously creates the minimum pool. diff --git a/docs/runbooks/stuck-activating-workers.md b/docs/runbooks/stuck-activating-workers.md new file mode 100644 index 0000000..600d133 --- /dev/null +++ b/docs/runbooks/stuck-activating-workers.md @@ -0,0 +1,44 @@ +# Runbook: Recover Stuck Activating Workers + +## When to use + +- `duckgres_activating_workers` gauge is non-zero for more than 2 minutes +- `duckgres_activation_failures_total` is increasing +- Sessions are timing out because no hot workers are available + +## Background + +When an org requests a session and no hot worker is available, the control plane reserves an idle warm worker and activates it (loads the org's DuckLake catalog, configures tenant settings, etc.). If activation fails or hangs, the worker stays in `reserved` or `activating` state indefinitely. + +The automatic stuck-worker reaper runs every minute and retires workers that have been in `reserved` or `activating` state for longer than 2 minutes. Reaped workers are replaced automatically if the pool is below `minWorkers`. + +## Metrics to watch + +| Metric | What it means | +|--------|---------------| +| `duckgres_activating_workers` | Workers currently activating (should be 0 or briefly 1-2) | +| `duckgres_reserved_workers` | Workers reserved but not yet activating | +| `duckgres_activation_failures_total` | Total failed activations; use control-plane logs for failure details | +| `duckgres_worker_retirements_total{reason="stuck_activating"}` | How many workers have been auto-reaped | + +## Procedure + +1. **Check if the reaper is working.** Look for `duckgres_worker_retirements_total{reason="stuck_activating"}` increasing. If it is, the system is self-healing — focus on why activations are failing. + +2. **Diagnose activation failures.** Check control-plane logs for activation errors. Common causes: + - Org config resolver failing (missing config in configstore) + - DuckLake catalog unreachable (S3/Postgres connectivity) + - Worker pod OOMKilled during activation + +3. **If the reaper is not running** (e.g., idleReaper goroutine crashed), manually delete the stuck pods: + ```bash + kubectl get pods -l app=duckgres-worker --field-selector status.phase=Running | grep activating + kubectl delete pod --grace-period=10 + ``` + +4. **Verify recovery.** After stuck workers are cleaned up, check that `duckgres_warm_workers` replenishes to `minWorkers`. + +## Prevention + +- Ensure org configs are pre-validated before allowing session creation +- Monitor `duckgres_activation_duration_seconds` p99 to detect slow activations before they time out diff --git a/duckdbservice/flight_handler_test.go b/duckdbservice/flight_handler_test.go index 3903f24..5bf1bc1 100644 --- a/duckdbservice/flight_handler_test.go +++ b/duckdbservice/flight_handler_test.go @@ -107,7 +107,7 @@ func TestHealthCheckReturnsImmediatelyAfterWarmup(t *testing.T) { } } -func TestCreateSessionRequiresActivationForSharedWarmWorkers(t *testing.T) { +func TestCreateSessionRequiresActivationForSharedWarmMode(t *testing.T) { pool := &SessionPool{ sessions: make(map[string]*Session), stopRefresh: make(map[string]func()), diff --git a/duckgres.example.yaml b/duckgres.example.yaml index 1090eac..8b20bcd 100644 --- a/duckgres.example.yaml +++ b/duckgres.example.yaml @@ -100,15 +100,10 @@ ducklake: # k8s: # # Global cap for shared K8s workers. 0 auto-derives from memory_budget. # max_workers: 0 -# # Neutral shared warm-worker target. Separate from per-team limits and -# # process.min_workers. 0 disables shared prewarming. +# # Neutral shared warm-worker target. Separate from per-org limits and +# # process.min_workers. 0 disables shared prewarming. When > 0, the pool +# # maintains this many idle workers and activates them on-demand per org. # shared_warm_target: 0 -# # Enable the reserve -> activate -> hot shared warm-worker path. -# # Default false for rollout safety. -# shared_warm_workers: false -# # Enable reserve -> activate -> hot lifecycle for shared warm workers. -# # Default: false -# shared_warm_workers: false # Process isolation (default: true) # Each client connection spawns a separate OS process, so a DuckDB crash diff --git a/justfile b/justfile index d2a96ad..3f0a3ff 100644 --- a/justfile +++ b/justfile @@ -135,9 +135,20 @@ multitenant-config-store-down-kind: multitenant-seed-local: docker exec -i duckgres-config-store psql -v ON_ERROR_STOP=1 -U duckgres -d duckgres_config < k8s/local-config-store.seed.sql -# Seed a default local tenant/user into the config store for the kind-backed K8s flow +# Seed a default local tenant/user into the config store for the kind-backed K8s flow. +# Retries up to 30 seconds waiting for the control plane to finish migrating the schema. [group('dev')] multitenant-seed-kind: + #!/usr/bin/env bash + set -euo pipefail + for i in $(seq 1 30); do + if docker exec -i duckgres-config-store psql -v ON_ERROR_STOP=1 -U duckgres -d duckgres_config < k8s/kind/config-store.seed.sql 2>/dev/null; then + exit 0 + fi + echo "Waiting for config store schema (attempt $i/30)..." + sleep 1 + done + echo "Config store schema not ready after 30s, final attempt:" docker exec -i duckgres-config-store psql -v ON_ERROR_STOP=1 -U duckgres -d duckgres_config < k8s/kind/config-store.seed.sql # Deploy the local multi-tenant control plane to the optional OrbStack Kubernetes workflow diff --git a/k8s/README.md b/k8s/README.md index c877a24..bb91be1 100644 --- a/k8s/README.md +++ b/k8s/README.md @@ -67,7 +67,6 @@ Key flags for Kubernetes multitenant mode: | `--k8s-worker-secret` | `DUCKGRES_K8S_WORKER_SECRET` | K8s Secret name for bearer token | | `--k8s-worker-configmap` | `DUCKGRES_K8S_WORKER_CONFIGMAP` | ConfigMap name for worker config | | `--k8s-shared-warm-target` | `DUCKGRES_K8S_SHARED_WARM_TARGET` | Neutral shared warm-worker target for multi-tenant K8s mode (`0` disables prewarm) | -| `--k8s-shared-warm-workers` | `DUCKGRES_K8S_SHARED_WARM_WORKERS` | Enable reserve -> activate -> hot lifecycle for shared warm workers | The bearer token secret is used to authenticate gRPC connections between the control plane and workers. If the secret exists but is empty, the CP auto-generates a random token and populates it. diff --git a/k8s/control-plane-deployment.yaml b/k8s/control-plane-deployment.yaml index aa0c3c0..c3bad17 100644 --- a/k8s/control-plane-deployment.yaml +++ b/k8s/control-plane-deployment.yaml @@ -43,8 +43,6 @@ spec: - "duckgres-config" - "--k8s-shared-warm-target" - "1" - - "--k8s-shared-warm-workers" - - "true" - "--cert" - "/certs/server.crt" - "--key" diff --git a/k8s/control-plane-multitenant-local.yaml b/k8s/control-plane-multitenant-local.yaml index 76d98d8..91d51e8 100644 --- a/k8s/control-plane-multitenant-local.yaml +++ b/k8s/control-plane-multitenant-local.yaml @@ -42,8 +42,6 @@ spec: - "duckgres-config" - "--k8s-shared-warm-target" - "1" - - "--k8s-shared-warm-workers" - - "true" - "--cert" - "/certs/server.crt" - "--key" diff --git a/k8s/kind/control-plane.yaml b/k8s/kind/control-plane.yaml index 093d71f..6a338a6 100644 --- a/k8s/kind/control-plane.yaml +++ b/k8s/kind/control-plane.yaml @@ -42,7 +42,6 @@ spec: - "duckgres-config" - "--k8s-shared-warm-target" - "1" - - "--k8s-shared-warm-workers" - "--cert" - "/certs/server.crt" - "--key" diff --git a/main.go b/main.go index fc38e66..a5ca480 100644 --- a/main.go +++ b/main.go @@ -71,7 +71,6 @@ type K8sFileConfig struct { WorkerServiceAccount string `yaml:"worker_service_account"` MaxWorkers int `yaml:"max_workers"` SharedWarmTarget int `yaml:"shared_warm_target"` - SharedWarmWorkers bool `yaml:"shared_warm_workers"` } type QueryLogFileConfig struct { @@ -240,7 +239,6 @@ func main() { k8sWorkerServiceAccount := flag.String("k8s-worker-service-account", "", "ServiceAccount name for K8s worker pods (env: DUCKGRES_K8S_WORKER_SERVICE_ACCOUNT)") k8sMaxWorkers := flag.Int("k8s-max-workers", 0, "Max K8s workers in the shared pool, 0=auto-derived (env: DUCKGRES_K8S_MAX_WORKERS)") k8sSharedWarmTarget := flag.Int("k8s-shared-warm-target", 0, "Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled (env: DUCKGRES_K8S_SHARED_WARM_TARGET)") - k8sSharedWarmWorkers := flag.Bool("k8s-shared-warm-workers", false, "Enable shared warm-worker activation path in K8s multi-tenant mode (env: DUCKGRES_K8S_SHARED_WARM_WORKERS)") // Config store flags (multi-tenant mode) configStore := flag.String("config-store", "", "PostgreSQL connection string for config store (env: DUCKGRES_CONFIG_STORE)") @@ -304,7 +302,6 @@ func main() { fmt.Fprintf(os.Stderr, " DUCKGRES_ADMIN_TOKEN Bearer token for admin API authentication\n") fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_MAX_WORKERS Max K8s workers in the shared pool\n") fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_SHARED_WARM_TARGET Neutral shared warm-worker target for K8s multi-tenant mode\n") - fmt.Fprintf(os.Stderr, " DUCKGRES_K8S_SHARED_WARM_WORKERS Enable shared warm-worker activation path for K8s multi-tenant mode\n") fmt.Fprintf(os.Stderr, " DUCKGRES_LOG_LEVEL Log level: debug, info, warn, error (default: info)\n") fmt.Fprintf(os.Stderr, "\nPrecedence: CLI flags > environment variables > config file > defaults\n") } @@ -417,7 +414,6 @@ func main() { K8sWorkerServiceAccount: *k8sWorkerServiceAccount, K8sMaxWorkers: *k8sMaxWorkers, K8sSharedWarmTarget: *k8sSharedWarmTarget, - K8sSharedWarmWorkers: *k8sSharedWarmWorkers, QueryLog: *queryLog, }, os.Getenv, func(msg string) { slog.Warn(msg) @@ -562,7 +558,6 @@ func main() { ServiceAccount: resolved.K8sWorkerServiceAccount, MaxWorkers: resolved.K8sMaxWorkers, SharedWarmTarget: resolved.K8sSharedWarmTarget, - SharedWarmWorkers: resolved.K8sSharedWarmWorkers, }, } controlplane.RunControlPlane(cpCfg) diff --git a/main_test.go b/main_test.go index dbdb5be..d1a5f78 100644 --- a/main_test.go +++ b/main_test.go @@ -418,35 +418,6 @@ func TestResolveEffectiveConfigK8sSharedWarmTarget(t *testing.T) { } } -func TestResolveEffectiveConfigK8sSharedWarmWorkers(t *testing.T) { - fileCfg := &FileConfig{ - K8s: K8sFileConfig{ - SharedWarmWorkers: true, - }, - } - - resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(nil), nil) - if !resolved.K8sSharedWarmWorkers { - t.Fatal("expected k8s shared warm workers from file") - } - - env := map[string]string{ - "DUCKGRES_K8S_SHARED_WARM_WORKERS": "false", - } - resolved = resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(env), nil) - if resolved.K8sSharedWarmWorkers { - t.Fatal("expected env to disable k8s shared warm workers") - } - - resolved = resolveEffectiveConfig(fileCfg, configCLIInputs{ - Set: map[string]bool{"k8s-shared-warm-workers": true}, - K8sSharedWarmWorkers: true, - }, envFromMap(env), nil) - if !resolved.K8sSharedWarmWorkers { - t.Fatal("expected CLI to enable k8s shared warm workers") - } -} - func TestResolveEffectiveConfigInvalidMemoryBudget(t *testing.T) { env := map[string]string{ "DUCKGRES_MEMORY_BUDGET": "lots-of-memory", diff --git a/tests/k8s/k8s_test.go b/tests/k8s/k8s_test.go index cc16115..ae9757f 100644 --- a/tests/k8s/k8s_test.go +++ b/tests/k8s/k8s_test.go @@ -25,11 +25,17 @@ import ( var ( clientset *kubernetes.Clientset namespace string + kubeconfig string pgPort int portFwdCmd *exec.Cmd testEnv k8sTestEnvironment ) +const ( + duckgresServiceTarget = "svc/duckgres" + duckgresServicePort = 5432 +) + func TestMain(m *testing.M) { var err error testEnv, err = loadK8sTestEnvironment(os.Getenv) @@ -48,7 +54,7 @@ func TestMain(m *testing.M) { } // Build kubeconfig clientset - kubeconfig := envOr("DUCKGRES_K8S_TEST_KUBECONFIG", filepath.Join(os.Getenv("HOME"), ".kube", "config")) + kubeconfig = envOr("DUCKGRES_K8S_TEST_KUBECONFIG", filepath.Join(os.Getenv("HOME"), ".kube", "config")) config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { log.Fatalf("Failed to load kubeconfig: %v", err) @@ -62,14 +68,9 @@ func TestMain(m *testing.M) { log.Fatalf("Control-plane pod not ready: %v", err) } - pgPort, portFwdCmd, err = startPortForward(namespace, "svc/duckgres", 5432) - if err != nil { + if err := restartPortForward(); err != nil { log.Fatalf("Failed to start port-forward: %v", err) } - - if err := waitForPort(pgPort, 30*time.Second); err != nil { - log.Fatalf("Port-forward not ready: %v", err) - } if err := waitForDBReady(90 * time.Second); err != nil { log.Fatalf("Database not ready: %v", err) } @@ -77,10 +78,7 @@ func TestMain(m *testing.M) { code := m.Run() // Cleanup port-forward - if portFwdCmd != nil && portFwdCmd.Process != nil { - _ = portFwdCmd.Process.Kill() - _ = portFwdCmd.Wait() - } + closePortForward() // Cleanup K8s resources (unless skip_setup, meaning external management) if !skipSetup { @@ -96,11 +94,8 @@ func TestMain(m *testing.M) { // --- Test Cases --- func TestK8sBasicQuery(t *testing.T) { - db := openDB(t) - defer db.Close() - var result int - if err := db.QueryRow("SELECT 1").Scan(&result); err != nil { + if err := retryScanIntWithReconnect("SELECT 1", 30*time.Second, &result); err != nil { t.Fatalf("SELECT 1 failed: %v", err) } if result != 1 { @@ -110,11 +105,8 @@ func TestK8sBasicQuery(t *testing.T) { func TestK8sWorkerPodCreation(t *testing.T) { // Run a query first to ensure at least one worker is spawned - db := openDB(t) - defer db.Close() - var result int - if err := db.QueryRow("SELECT 42").Scan(&result); err != nil { + if err := retryScanIntWithReconnect("SELECT 42", 30*time.Second, &result); err != nil { t.Fatalf("query failed: %v", err) } @@ -151,11 +143,8 @@ func TestK8sSharedWarmWorkerActivation(t *testing.T) { t.Fatalf("expected prewarmed worker before first query: %v", err) } - db := openDB(t) - defer db.Close() - var attached int - if err := retryScanInt(db, "SELECT COUNT(*) FROM duckdb_databases() WHERE database_name = 'ducklake'", 90*time.Second, &attached); err != nil { + if err := retryScanIntWithReconnect("SELECT COUNT(*) FROM duckdb_databases() WHERE database_name = 'ducklake'", 90*time.Second, &attached); err != nil { t.Fatalf("shared warm worker activation did not attach ducklake: %v", err) } if attached != 1 { @@ -165,11 +154,7 @@ func TestK8sSharedWarmWorkerActivation(t *testing.T) { func TestK8sWorkerCrashRecovery(t *testing.T) { // Run a query to ensure a worker exists - db := openDB(t) - defer db.Close() - - var result int - if err := db.QueryRow("SELECT 1").Scan(&result); err != nil { + if err := retryQueryWithReconnect("SELECT 1", 30*time.Second); err != nil { t.Fatalf("initial query failed: %v", err) } @@ -195,18 +180,7 @@ func TestK8sWorkerCrashRecovery(t *testing.T) { // Wait for the pod to actually disappear waitForPodGone(t, namespace, workerName, 60*time.Second) - // The old connection may be broken. Open a new one and retry queries. - db.Close() - - // Wait a bit for the CP to detect the crash and be ready for new connections - time.Sleep(5 * time.Second) - - // Open a new connection and verify queries work (CP should spawn a replacement) - db2 := openDB(t) - defer db2.Close() - - err = retryQuery(db2, "SELECT 1", 30*time.Second) - if err != nil { + if err := retryQueryWithReconnect("SELECT 1", 60*time.Second); err != nil { t.Fatalf("query failed after worker crash recovery: %v", err) } } @@ -245,11 +219,7 @@ func TestK8sMultipleConcurrentConnections(t *testing.T) { func TestK8sWorkerSecurityContext(t *testing.T) { // Ensure a worker exists - db := openDB(t) - defer db.Close() - - var result int - if err := db.QueryRow("SELECT 1").Scan(&result); err != nil { + if err := retryQueryWithReconnect("SELECT 1", 30*time.Second); err != nil { t.Fatalf("query failed: %v", err) } @@ -291,14 +261,9 @@ func TestK8sWorkerSecurityContext(t *testing.T) { func TestK8sCPDeletionGarbageCollects(t *testing.T) { // Ensure a worker exists - db := openDB(t) - defer db.Close() - - var result int - if err := db.QueryRow("SELECT 1").Scan(&result); err != nil { + if err := retryQueryWithReconnect("SELECT 1", 30*time.Second); err != nil { t.Fatalf("query failed: %v", err) } - db.Close() // List worker pods workerPods, err := clientset.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ @@ -351,7 +316,13 @@ func TestK8sCPDeletionGarbageCollects(t *testing.T) { remaining := 0 for _, name := range workerNames { _, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), name, metav1.GetOptions{}) - if err == nil { + switch { + case err == nil: + remaining++ + case isPodGoneError(err): + continue + default: + t.Logf("transient error checking worker pod %s deletion: %v", name, err) remaining++ } } @@ -371,24 +342,12 @@ func TestK8sCPDeletionGarbageCollects(t *testing.T) { } // Restart port-forward since the old CP pod is gone - if portFwdCmd != nil && portFwdCmd.Process != nil { - _ = portFwdCmd.Process.Kill() - _ = portFwdCmd.Wait() - } - var pfErr error - pgPort, portFwdCmd, pfErr = startPortForward(namespace, "svc/duckgres", 5432) - if pfErr != nil { - t.Fatalf("failed to restart port-forward: %v", pfErr) - } - if err := waitForPort(pgPort, 30*time.Second); err != nil { - t.Fatalf("port-forward not ready after restart: %v", err) + if err := restartPortForward(); err != nil { + t.Fatalf("failed to restart port-forward: %v", err) } // Verify the system works again - db2 := openDB(t) - defer db2.Close() - err = retryQuery(db2, "SELECT 1", 60*time.Second) - if err != nil { + if err := retryQueryWithReconnect("SELECT 1", 60*time.Second); err != nil { t.Fatalf("query failed after CP recreation: %v", err) } } @@ -453,6 +412,36 @@ func startPortForward(ns, target string, remotePort int) (int, *exec.Cmd, error) return localPort, cmd, nil } +func closePortForward() { + if portFwdCmd == nil || portFwdCmd.Process == nil { + portFwdCmd = nil + return + } + + _ = portFwdCmd.Process.Kill() + _ = portFwdCmd.Wait() + portFwdCmd = nil +} + +func restartPortForward() error { + closePortForward() + + localPort, cmd, err := startPortForward(namespace, duckgresServiceTarget, duckgresServicePort) + if err != nil { + return err + } + + pgPort = localPort + portFwdCmd = cmd + + if err := waitForPort(pgPort, 30*time.Second); err != nil { + closePortForward() + return err + } + + return nil +} + func waitForPort(port int, timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { @@ -471,10 +460,15 @@ func waitForPodGone(t *testing.T, ns, name string, timeout time.Duration) { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { _, err := clientset.CoreV1().Pods(ns).Get(context.Background(), name, metav1.GetOptions{}) - if err != nil { - return // pod is gone + switch { + case err == nil: + time.Sleep(2 * time.Second) + case isPodGoneError(err): + return + default: + t.Logf("transient error checking pod %s deletion: %v", name, err) + time.Sleep(2 * time.Second) } - time.Sleep(2 * time.Second) } t.Logf("Warning: pod %s still exists after %s", name, timeout) } @@ -489,26 +483,14 @@ func waitForSingleReadyPod(ns, labelSelector string, timeout time.Duration) (str return "", err } - if len(pods.Items) != 1 { - time.Sleep(2 * time.Second) - continue - } - - for _, pod := range pods.Items { - if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning { - continue - } - for _, cond := range pod.Status.Conditions { - if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { - return pod.Name, nil - } - } + if name, ok := findReadyPodName(pods.Items); ok { + return name, nil } time.Sleep(2 * time.Second) } - return "", fmt.Errorf("expected one ready pod for %q within %s", labelSelector, timeout) + return "", fmt.Errorf("expected at least one ready pod for %q within %s", labelSelector, timeout) } func latestWorkerPod(t *testing.T) corev1.Pod { @@ -583,13 +565,7 @@ func openDBConn() (*sql.DB, error) { } func waitForDBReady(timeout time.Duration) error { - db, err := openDBConn() - if err != nil { - return fmt.Errorf("open database: %w", err) - } - defer db.Close() - - return retryQuery(db, "SELECT 1", timeout) + return retryQueryWithReconnect("SELECT 1", timeout) } func retryQuery(db *sql.DB, query string, timeout time.Duration) error { @@ -619,6 +595,47 @@ func retryScanInt(db *sql.DB, query string, timeout time.Duration, dest *int) er return fmt.Errorf("query %q failed after %s: %w", query, timeout, lastErr) } +func retryQueryWithReconnect(query string, timeout time.Duration) error { + return retryDBOperationWithReconnect(timeout, fmt.Sprintf("query %q", query), func(db *sql.DB) error { + var result int + return db.QueryRow(query).Scan(&result) + }) +} + +func retryScanIntWithReconnect(query string, timeout time.Duration, dest *int) error { + return retryDBOperationWithReconnect(timeout, fmt.Sprintf("query %q", query), func(db *sql.DB) error { + return db.QueryRow(query).Scan(dest) + }) +} + +// Use a fresh DB connection on each attempt so transient port-forward failures +// can be recovered by restarting the forwarder between retries. +func retryDBOperationWithReconnect(timeout time.Duration, description string, op func(*sql.DB) error) error { + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + db, err := openDBConn() + if err == nil { + err = op(db) + _ = db.Close() + } + if err == nil { + return nil + } + + lastErr = err + if isTransientDBError(err) { + if restartErr := restartPortForward(); restartErr != nil { + lastErr = fmt.Errorf("%w; restart port-forward: %v", err, restartErr) + } + } + + time.Sleep(2 * time.Second) + } + + return fmt.Errorf("%s failed after %s: %w", description, timeout, lastErr) +} + func findProjectRoot() string { dir, _ := os.Getwd() for { diff --git a/tests/k8s/runtime_helper_test.go b/tests/k8s/runtime_helper_test.go new file mode 100644 index 0000000..41cbb42 --- /dev/null +++ b/tests/k8s/runtime_helper_test.go @@ -0,0 +1,51 @@ +package k8s_test + +import ( + "strings" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +func isTransientDBError(err error) bool { + if err == nil { + return false + } + + msg := strings.ToLower(err.Error()) + for _, fragment := range []string{ + "eof", + "connection reset", + "broken pipe", + "bad connection", + "connection refused", + "lost connection to pod", + "i/o timeout", + "no route to host", + } { + if strings.Contains(msg, fragment) { + return true + } + } + + return false +} + +func findReadyPodName(pods []corev1.Pod) (string, bool) { + for _, pod := range pods { + if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning { + continue + } + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + return pod.Name, true + } + } + } + + return "", false +} + +func isPodGoneError(err error) bool { + return apierrors.IsNotFound(err) +} diff --git a/tests/k8s/runtime_helpers_test.go b/tests/k8s/runtime_helpers_test.go new file mode 100644 index 0000000..59de96a --- /dev/null +++ b/tests/k8s/runtime_helpers_test.go @@ -0,0 +1,85 @@ +package k8s_test + +import ( + "errors" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestIsTransientDBError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + want bool + }{ + {name: "EOF", err: errors.New("EOF"), want: true}, + {name: "connection reset", err: errors.New("read: connection reset by peer"), want: true}, + {name: "broken pipe", err: errors.New("write: broken pipe"), want: true}, + {name: "bad connection", err: errors.New("driver: bad connection"), want: true}, + {name: "connection refused", err: errors.New("dial tcp 127.0.0.1:5432: connect: connection refused"), want: true}, + {name: "not transient", err: errors.New("authentication failed"), want: false}, + {name: "nil", err: nil, want: false}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if got := isTransientDBError(tt.err); got != tt.want { + t.Fatalf("isTransientDBError(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +func TestFindReadyPodName(t *testing.T) { + t.Parallel() + + deleting := metav1.Now() + pods := []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "terminating", DeletionTimestamp: &deleting}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "pending"}, + Status: corev1.PodStatus{Phase: corev1.PodPending}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "ready"}, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + {Type: corev1.PodReady, Status: corev1.ConditionTrue}, + }, + }, + }, + } + + name, ok := findReadyPodName(pods) + if !ok { + t.Fatal("expected to find a ready pod") + } + if name != "ready" { + t.Fatalf("expected ready pod name, got %q", name) + } +} + +func TestIsPodGoneError(t *testing.T) { + t.Parallel() + + notFound := apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "duckgres-worker-1") + if !isPodGoneError(notFound) { + t.Fatal("expected NotFound to count as pod gone") + } + if isPodGoneError(errors.New("dial tcp timeout")) { + t.Fatal("unexpectedly treated generic error as pod gone") + } +} diff --git a/tests/k8s/setup_config_test.go b/tests/k8s/setup_config_test.go index aa109bf..975ec49 100644 --- a/tests/k8s/setup_config_test.go +++ b/tests/k8s/setup_config_test.go @@ -72,7 +72,7 @@ func TestLoadK8sTestEnvironmentRejectsUnknownSetupMode(t *testing.T) { } } -func TestKindSetupArtifactsEnableSharedWarmWorkers(t *testing.T) { +func TestKindSetupArtifactsEnableSharedWarmTarget(t *testing.T) { root := findProjectRootForUnitTest(t) manifestPath := filepath.Join(root, "k8s", "kind", "control-plane.yaml") @@ -86,7 +86,6 @@ func TestKindSetupArtifactsEnableSharedWarmWorkers(t *testing.T) { "postgres://duckgres:duckgres@duckgres-config-store:5432/duckgres_config?sslmode=disable", "--k8s-shared-warm-target", "1", - "--k8s-shared-warm-workers", } { if !strings.Contains(content, want) { t.Fatalf("expected %q in %s", want, manifestPath)