Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 21 additions & 61 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,33 +643,10 @@ func (p *K8sWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, erro
return idle, nil
}

// 2. No idle worker — check if we have any live workers at all
// 2. No idle worker — spawn a new one if below capacity.
liveCount := p.liveWorkerCountLocked()
canSpawn := p.maxWorkers == 0 || liveCount < p.maxWorkers

if liveCount > 0 {
// We have live workers. Assign to the least-loaded one immediately
// and spawn a new worker in the background if below capacity.
w := p.leastLoadedWorkerLocked()
if w != nil {
w.activeSessions++
if canSpawn {
id := p.allocateWorkerIDLocked()
p.spawning++
p.mu.Unlock()
slog.Debug("Assigned to least-loaded worker, spawning new worker in background.",
"worker", w.ID, "active_sessions", w.activeSessions, "background_worker", id)
go p.spawnWorkerBackground(id)
} else {
p.mu.Unlock()
slog.Debug("Assigned to least-loaded worker (at capacity).",
"worker", w.ID, "active_sessions", w.activeSessions)
}
return w, nil
}
}

// 3. No live workers at all (cold start or all dead) — must block on spawn
if canSpawn {
id := p.allocateWorkerIDLocked()
p.spawning++
Expand All @@ -696,33 +673,17 @@ func (p *K8sWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, erro
return w, nil
}

// At capacity with all workers dead (spawning in progress) — wait and retry
// 3. At capacity — wait for a worker to become idle.
p.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond):
case <-time.After(200 * time.Millisecond):
// Retry
}
}
}

// spawnWorkerBackground spawns a worker pod without blocking AcquireWorker.
// The new worker becomes available for future sessions once ready.
func (p *K8sWorkerPool) spawnWorkerBackground(id int) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

err := p.SpawnWorker(ctx, id)

p.mu.Lock()
p.spawning--
p.mu.Unlock()

if err != nil {
slog.Warn("Background worker spawn failed.", "worker", id, "error", err)
}
}

// ReleaseWorker decrements the active session count for a worker.
func (p *K8sWorkerPool) ReleaseWorker(id int) {
p.mu.Lock()
Expand Down Expand Up @@ -1201,24 +1162,6 @@ func (p *K8sWorkerPool) findIdleWorkerLocked() *ManagedWorker {
return nil
}

func (p *K8sWorkerPool) leastLoadedWorkerLocked() *ManagedWorker {
var best *ManagedWorker
for _, w := range p.workers {
select {
case <-w.done:
continue
default:
}
if !p.isGenericSessionSchedulableWorkerLocked(w) {
continue
}
if best == nil || w.activeSessions < best.activeSessions {
best = w
}
}
return best
}

func (p *K8sWorkerPool) liveWorkerCountLocked() int {
count := p.spawning
for _, w := range p.workers {
Expand Down Expand Up @@ -1376,6 +1319,23 @@ func (p *K8sWorkerPool) spawnWarmWorker(ctx context.Context, id int) error {
return p.SpawnWorker(ctx, id)
}

// spawnWorkerBackground spawns a worker pod without blocking the caller.
// Used for warm pool replenishment after a worker is reserved.
func (p *K8sWorkerPool) spawnWorkerBackground(id int) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

err := p.SpawnWorker(ctx, id)

p.mu.Lock()
p.spawning--
p.mu.Unlock()

if err != nil {
slog.Warn("Background worker spawn failed.", "worker", id, "error", err)
}
}

func (p *K8sWorkerPool) spawnWarmWorkerBackground(id int) {
if p.spawnWarmWorkerBackgroundFunc != nil {
p.spawnWarmWorkerBackgroundFunc(id)
Expand Down
14 changes: 0 additions & 14 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,20 +301,6 @@ func TestK8sPool_FindIdleWorker(t *testing.T) {
}
}

func TestK8sPool_LeastLoadedWorker(t *testing.T) {
pool, _ := newTestK8sPool(t, 5)

done := make(chan struct{})
pool.workers[1] = &ManagedWorker{ID: 1, activeSessions: 5, done: done}
pool.workers[2] = &ManagedWorker{ID: 2, activeSessions: 2, done: done}
pool.workers[3] = &ManagedWorker{ID: 3, activeSessions: 3, done: done}

w := pool.leastLoadedWorkerLocked()
if w == nil || w.ID != 2 {
t.Fatalf("expected least loaded worker 2, got %v", w)
}
}

func TestK8sPool_LiveWorkerCount(t *testing.T) {
pool, _ := newTestK8sPool(t, 5)

Expand Down
4 changes: 2 additions & 2 deletions controlplane/session_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (sm *SessionManager) DestroySession(pid int32) {
case <-worker.done:
// Worker already dead, skip RPC
default:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
_ = worker.DestroySession(ctx, session.SessionToken)
cancel()
}
Expand All @@ -178,7 +178,7 @@ func (sm *SessionManager) DestroySession(pid int32) {
// Release the worker for reuse after cleanup is complete.
sm.pool.ReleaseWorker(session.WorkerID)

slog.Debug("Session destroyed.", "pid", pid, "worker", session.WorkerID)
slog.Info("Session destroyed, worker recycled.", "pid", pid, "worker", session.WorkerID)

// Rebalance remaining sessions
if sm.rebalancer != nil {
Expand Down
148 changes: 52 additions & 96 deletions controlplane/worker_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,108 +514,79 @@ func (p *FlightWorkerPool) SpawnMinWorkers(count int) error {

// AcquireWorker returns a worker for a new session.
//
// Strategy:
// Strategy (1:1 worker-to-session model):
// 1. Reuse an idle worker (0 active sessions) if available.
// 2. If the pool has fewer live workers than maxWorkers (or maxWorkers is 0),
// spawn a new worker process.
// 3. If the pool is at capacity, assign to the least-loaded live worker.
//
// This ensures the number of worker processes never exceeds maxWorkers while
// allowing unlimited concurrent sessions across the fixed pool.
// 3. If the pool is at capacity, wait with backoff until a worker becomes idle.
func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, error) {
acquireStart := time.Now()
defer func() {
observeControlPlaneWorkerAcquire(time.Since(acquireStart))
}()

p.mu.Lock()
if p.shuttingDown {
p.mu.Unlock()
return nil, fmt.Errorf("pool is shutting down")
}

// Remove dead worker entries so they don't inflate the count.
p.cleanDeadWorkersLocked()

// 1. Try to claim an idle worker before spawning a new one.
idle := p.findIdleWorkerLocked()
if idle != nil {
idle.activeSessions++
p.mu.Unlock()
return idle, nil
}

// 2. If below the process cap (or unlimited), spawn a new worker.
liveCount := p.liveWorkerCountLocked()
if p.maxWorkers == 0 || liveCount < p.maxWorkers {
id := p.nextWorkerID
p.nextWorkerID++
p.spawning++
p.mu.Unlock()

err := p.SpawnWorker(id)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

p.mu.Lock()
p.spawning--
p.mu.Unlock()

if err != nil {
return nil, err
if p.shuttingDown {
p.mu.Unlock()
return nil, fmt.Errorf("pool is shutting down")
}

w, ok := p.Worker(id)
if !ok {
return nil, fmt.Errorf("worker %d not found after spawn", id)
// Remove dead worker entries so they don't inflate the count.
p.cleanDeadWorkersLocked()

// 1. Try to claim an idle worker before spawning a new one.
idle := p.findIdleWorkerLocked()
if idle != nil {
idle.activeSessions++
p.mu.Unlock()
return idle, nil
}

p.mu.Lock()
w.activeSessions++
p.mu.Unlock()
return w, nil
}
// 2. If below the process cap (or unlimited), spawn a new worker.
liveCount := p.liveWorkerCountLocked()
if p.maxWorkers == 0 || liveCount < p.maxWorkers {
id := p.nextWorkerID
p.nextWorkerID++
p.spawning++
p.mu.Unlock()

// 3. At capacity — assign to the least-loaded live worker.
w := p.leastLoadedWorkerLocked()
if w != nil {
w.activeSessions++
p.mu.Unlock()
return w, nil
}
err := p.SpawnWorker(id)

// All workers are dead (already cleaned above). Spawn a replacement.
// Still respect maxWorkers — another goroutine may already be spawning.
liveCount = p.liveWorkerCountLocked()
if p.maxWorkers > 0 && liveCount >= p.maxWorkers {
// A spawn is already in progress; wait for it to finish and use that worker.
p.mu.Unlock()
// Brief backoff then retry — the in-progress spawn will add a worker shortly.
time.Sleep(100 * time.Millisecond)
return p.AcquireWorker(ctx)
}
id := p.nextWorkerID
p.nextWorkerID++
p.spawning++
p.mu.Unlock()
p.mu.Lock()
p.spawning--
p.mu.Unlock()

err := p.SpawnWorker(id)
if err != nil {
return nil, err
}

p.mu.Lock()
p.spawning--
p.mu.Unlock()
w, ok := p.Worker(id)
if !ok {
return nil, fmt.Errorf("worker %d not found after spawn", id)
}

if err != nil {
return nil, err
}
p.mu.Lock()
w.activeSessions++
p.mu.Unlock()
return w, nil
}

w, ok := p.Worker(id)
if !ok {
return nil, fmt.Errorf("worker %d not found after spawn", id)
// 3. At capacity — wait for a worker to become idle.
p.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(200 * time.Millisecond):
// Retry
}
}

p.mu.Lock()
w.activeSessions++
p.mu.Unlock()
return w, nil
}

// ReleaseWorker decrements the active session count for a worker and updates its lastUsed time.
Expand Down Expand Up @@ -704,22 +675,7 @@ func (p *FlightWorkerPool) findIdleWorkerLocked() *ManagedWorker {
return nil
}

// leastLoadedWorkerLocked returns the live worker with the fewest active
// sessions, or nil if all workers are dead. Caller must hold p.mu.
func (p *FlightWorkerPool) leastLoadedWorkerLocked() *ManagedWorker {
var best *ManagedWorker
for _, w := range p.workers {
select {
case <-w.done:
continue // dead
default:
}
if best == nil || w.activeSessions < best.activeSessions {
best = w
}
}
return best
}


// liveWorkerCountLocked returns the number of workers whose process is still
// running (done channel not closed) plus workers currently being spawned.
Expand Down
Loading
Loading