diff --git a/controlplane/activation_payload_test.go b/controlplane/activation_payload_test.go index efb283a..be72889 100644 --- a/controlplane/activation_payload_test.go +++ b/controlplane/activation_payload_test.go @@ -30,7 +30,7 @@ func TestBuildTenantActivationPayloadBuildsDuckLakeRuntimeFromWarehouseSecrets(t t.Fatalf("create s3 secret: %v", err) } - team := &configstore.TeamConfig{ + org := &configstore.OrgConfig{ Name: "analytics", Users: map[string]string{ "alice": "ignored", @@ -62,13 +62,13 @@ func TestBuildTenantActivationPayloadBuildsDuckLakeRuntimeFromWarehouseSecrets(t }, } - payload, err := BuildTenantActivationPayload(context.Background(), pool.clientset, pool.namespace, team) + payload, err := BuildTenantActivationPayload(context.Background(), pool.clientset, pool.namespace, org) if err != nil { t.Fatalf("BuildTenantActivationPayload: %v", err) } - if payload.TeamName != "analytics" { - t.Fatalf("expected team analytics, got %q", payload.TeamName) + if payload.OrgID != "analytics" { + t.Fatalf("expected org analytics, got %q", payload.OrgID) } if len(payload.Usernames) != 2 { t.Fatalf("expected two users, got %v", payload.Usernames) diff --git a/controlplane/admin/api.go b/controlplane/admin/api.go index 0df1dd8..2648242 100644 --- a/controlplane/admin/api.go +++ b/controlplane/admin/api.go @@ -14,12 +14,12 @@ import ( "gorm.io/gorm/clause" ) -var errWarehousePayloadNotAllowed = errors.New("warehouse payload must be updated via /teams/:name/warehouse") +var errWarehousePayloadNotAllowed = errors.New("warehouse payload must be updated via /orgs/:name/warehouse") // WorkerStatus represents a worker's current status for the API. type WorkerStatus struct { ID int `json:"id"` - Team string `json:"team"` + Org string `json:"org"` ActiveSessions int `json:"active_sessions"` Status string `json:"status"` } @@ -28,19 +28,19 @@ type WorkerStatus struct { type SessionStatus struct { PID int32 `json:"pid"` WorkerID int `json:"worker_id"` - Team string `json:"team"` + Org string `json:"org"` } // ClusterStatus aggregates cluster state for the dashboard. type ClusterStatus struct { - TotalTeams int `json:"total_teams"` - TotalWorkers int `json:"total_workers"` - TotalSessions int `json:"total_sessions"` - Teams []TeamStatus `json:"teams"` + TotalOrgs int `json:"total_orgs"` + TotalWorkers int `json:"total_workers"` + TotalSessions int `json:"total_sessions"` + Orgs []OrgStatus `json:"orgs"` } -// TeamStatus is a per-team summary. -type TeamStatus struct { +// OrgStatus is a per-org summary. +type OrgStatus struct { Name string `json:"name"` Workers int `json:"workers"` ActiveSessions int `json:"active_sessions"` @@ -48,33 +48,33 @@ type TeamStatus struct { MemoryBudget string `json:"memory_budget"` } -// TeamStackInfo provides info about a team's live state. -// Implemented by the controlplane.TeamRouter via adapter. -type TeamStackInfo interface { - // AllTeamStats returns per-team worker and session counts. - AllTeamStats() []TeamStatus - // AllWorkerStatuses returns all workers across teams. +// OrgStackInfo provides info about an org's live state. +// Implemented by the controlplane.OrgRouter via adapter. +type OrgStackInfo interface { + // AllOrgStats returns per-org worker and session counts. + AllOrgStats() []OrgStatus + // AllWorkerStatuses returns all workers across orgs. AllWorkerStatuses() []WorkerStatus - // AllSessionStatuses returns all active sessions across teams. + // AllSessionStatuses returns all active sessions across orgs. AllSessionStatuses() []SessionStatus } // RegisterAPI registers all admin REST endpoints on the given router group. -func RegisterAPI(r *gin.RouterGroup, store *configstore.ConfigStore, info TeamStackInfo) { +func RegisterAPI(r *gin.RouterGroup, store *configstore.ConfigStore, info OrgStackInfo) { registerAPIWithStore(r, newGormAPIStore(store), info) } -func registerAPIWithStore(r *gin.RouterGroup, store apiStore, info TeamStackInfo) { +func registerAPIWithStore(r *gin.RouterGroup, store apiStore, info OrgStackInfo) { h := &apiHandler{store: store, info: info} - // Teams CRUD - r.GET("/teams", h.listTeams) - r.POST("/teams", h.createTeam) - r.GET("/teams/:name", h.getTeam) - r.PUT("/teams/:name", h.updateTeam) - r.DELETE("/teams/:name", h.deleteTeam) - r.GET("/teams/:name/warehouse", h.getManagedWarehouse) - r.PUT("/teams/:name/warehouse", h.putManagedWarehouse) + // Orgs CRUD + r.GET("/orgs", h.listOrgs) + r.POST("/orgs", h.createOrg) + r.GET("/orgs/:name", h.getOrg) + r.PUT("/orgs/:name", h.updateOrg) + r.DELETE("/orgs/:name", h.deleteOrg) + r.GET("/orgs/:name/warehouse", h.getManagedWarehouse) + r.PUT("/orgs/:name/warehouse", h.putManagedWarehouse) // Users CRUD r.GET("/users", h.listUsers) @@ -104,20 +104,20 @@ func registerAPIWithStore(r *gin.RouterGroup, store apiStore, info TeamStackInfo } type apiStore interface { - ListTeams() ([]configstore.Team, error) - CreateTeam(team *configstore.Team) error - GetTeam(name string) (*configstore.Team, error) - UpdateTeam(name string, updates configstore.Team) (*configstore.Team, bool, error) - DeleteTeam(name string) (bool, error) - - ListUsers() ([]configstore.TeamUser, error) - CreateUser(user *configstore.TeamUser) error - GetUser(username string) (*configstore.TeamUser, error) - UpdateUser(username, passwordHash, teamName string) (*configstore.TeamUser, bool, error) + ListOrgs() ([]configstore.Org, error) + CreateOrg(org *configstore.Org) error + GetOrg(name string) (*configstore.Org, error) + UpdateOrg(name string, updates configstore.Org) (*configstore.Org, bool, error) + DeleteOrg(name string) (bool, error) + + ListUsers() ([]configstore.OrgUser, error) + CreateUser(user *configstore.OrgUser) error + GetUser(username string) (*configstore.OrgUser, error) + UpdateUser(username, passwordHash, orgID string) (*configstore.OrgUser, bool, error) DeleteUser(username string) (bool, error) - GetManagedWarehouse(teamName string) (*configstore.ManagedWarehouse, error) - UpsertManagedWarehouse(teamName string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) + GetManagedWarehouse(orgID string) (*configstore.ManagedWarehouse, error) + UpsertManagedWarehouse(orgID string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) GetGlobalConfig() (configstore.GlobalConfig, error) SaveGlobalConfig(cfg *configstore.GlobalConfig) error @@ -141,29 +141,29 @@ func (s *gormAPIStore) db() *gorm.DB { return s.store.DB() } -func (s *gormAPIStore) ListTeams() ([]configstore.Team, error) { - var teams []configstore.Team - if err := s.db().Preload("Users").Preload("Warehouse").Find(&teams).Error; err != nil { +func (s *gormAPIStore) ListOrgs() ([]configstore.Org, error) { + var orgs []configstore.Org + if err := s.db().Preload("Users").Preload("Warehouse").Find(&orgs).Error; err != nil { return nil, err } - return teams, nil + return orgs, nil } -func (s *gormAPIStore) CreateTeam(team *configstore.Team) error { - team.Warehouse = nil - return s.db().Omit("Warehouse").Create(team).Error +func (s *gormAPIStore) CreateOrg(org *configstore.Org) error { + org.Warehouse = nil + return s.db().Omit("Warehouse").Create(org).Error } -func (s *gormAPIStore) GetTeam(name string) (*configstore.Team, error) { - var team configstore.Team - if err := s.db().Preload("Users").Preload("Warehouse").First(&team, "name = ?", name).Error; err != nil { +func (s *gormAPIStore) GetOrg(name string) (*configstore.Org, error) { + var org configstore.Org + if err := s.db().Preload("Users").Preload("Warehouse").First(&org, "name = ?", name).Error; err != nil { return nil, err } - return &team, nil + return &org, nil } -func (s *gormAPIStore) UpdateTeam(name string, updates configstore.Team) (*configstore.Team, bool, error) { - result := s.db().Model(&configstore.Team{}).Where("name = ?", name).Updates(map[string]interface{}{ +func (s *gormAPIStore) UpdateOrg(name string, updates configstore.Org) (*configstore.Org, bool, error) { + result := s.db().Model(&configstore.Org{}).Where("name = ?", name).Updates(map[string]interface{}{ "max_workers": updates.MaxWorkers, "memory_budget": updates.MemoryBudget, "idle_timeout_s": updates.IdleTimeoutS, @@ -174,20 +174,20 @@ func (s *gormAPIStore) UpdateTeam(name string, updates configstore.Team) (*confi if result.RowsAffected == 0 { return nil, false, nil } - team, err := s.GetTeam(name) + org, err := s.GetOrg(name) if err != nil { return nil, true, err } - return team, true, nil + return org, true, nil } -func (s *gormAPIStore) DeleteTeam(name string) (bool, error) { +func (s *gormAPIStore) DeleteOrg(name string) (bool, error) { returnRows := int64(0) err := s.db().Transaction(func(tx *gorm.DB) error { - if err := tx.Where("team_name = ?", name).Delete(&configstore.TeamUser{}).Error; err != nil { + if err := tx.Where("org_id = ?", name).Delete(&configstore.OrgUser{}).Error; err != nil { return err } - result := tx.Where("name = ?", name).Delete(&configstore.Team{}) + result := tx.Where("name = ?", name).Delete(&configstore.Org{}) if result.Error != nil { return result.Error } @@ -200,35 +200,35 @@ func (s *gormAPIStore) DeleteTeam(name string) (bool, error) { return returnRows > 0, nil } -func (s *gormAPIStore) ListUsers() ([]configstore.TeamUser, error) { - var users []configstore.TeamUser +func (s *gormAPIStore) ListUsers() ([]configstore.OrgUser, error) { + var users []configstore.OrgUser if err := s.db().Find(&users).Error; err != nil { return nil, err } return users, nil } -func (s *gormAPIStore) CreateUser(user *configstore.TeamUser) error { +func (s *gormAPIStore) CreateUser(user *configstore.OrgUser) error { return s.db().Create(user).Error } -func (s *gormAPIStore) GetUser(username string) (*configstore.TeamUser, error) { - var user configstore.TeamUser +func (s *gormAPIStore) GetUser(username string) (*configstore.OrgUser, error) { + var user configstore.OrgUser if err := s.db().First(&user, "username = ?", username).Error; err != nil { return nil, err } return &user, nil } -func (s *gormAPIStore) UpdateUser(username, passwordHash, teamName string) (*configstore.TeamUser, bool, error) { +func (s *gormAPIStore) UpdateUser(username, passwordHash, orgID string) (*configstore.OrgUser, bool, error) { updates := map[string]interface{}{} if passwordHash != "" { updates["password"] = passwordHash } - if teamName != "" { - updates["team_name"] = teamName + if orgID != "" { + updates["org_id"] = orgID } - result := s.db().Model(&configstore.TeamUser{}).Where("username = ?", username).Updates(updates) + result := s.db().Model(&configstore.OrgUser{}).Where("username = ?", username).Updates(updates) if result.Error != nil { return nil, false, result.Error } @@ -243,39 +243,39 @@ func (s *gormAPIStore) UpdateUser(username, passwordHash, teamName string) (*con } func (s *gormAPIStore) DeleteUser(username string) (bool, error) { - result := s.db().Where("username = ?", username).Delete(&configstore.TeamUser{}) + result := s.db().Where("username = ?", username).Delete(&configstore.OrgUser{}) if result.Error != nil { return false, result.Error } return result.RowsAffected > 0, nil } -func (s *gormAPIStore) GetManagedWarehouse(teamName string) (*configstore.ManagedWarehouse, error) { +func (s *gormAPIStore) GetManagedWarehouse(orgID string) (*configstore.ManagedWarehouse, error) { var warehouse configstore.ManagedWarehouse - if err := s.db().First(&warehouse, "team_name = ?", teamName).Error; err != nil { + if err := s.db().First(&warehouse, "org_id = ?", orgID).Error; err != nil { return nil, err } return &warehouse, nil } -func (s *gormAPIStore) UpsertManagedWarehouse(teamName string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) { +func (s *gormAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) { var count int64 - if err := s.db().Model(&configstore.Team{}).Where("name = ?", teamName).Count(&count).Error; err != nil { + if err := s.db().Model(&configstore.Org{}).Where("name = ?", orgID).Count(&count).Error; err != nil { return nil, false, err } if count == 0 { return nil, false, nil } - warehouse.TeamName = teamName + warehouse.OrgID = orgID warehouse.UpdatedAt = time.Now().UTC() if err := s.db().Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "team_name"}}, + Columns: []clause.Column{{Name: "org_id"}}, DoUpdates: clause.AssignmentColumns(managedWarehouseUpsertColumns()), }).Create(warehouse).Error; err != nil { return nil, true, err } - stored, err := s.GetManagedWarehouse(teamName) + stored, err := s.GetManagedWarehouse(orgID) if err != nil { return nil, true, err } @@ -386,7 +386,7 @@ func (s *gormAPIStore) SaveQueryLogConfig(cfg *configstore.QueryLogConfig) error type apiHandler struct { store apiStore - info TeamStackInfo + info OrgStackInfo } type managedWarehouseRequest struct { @@ -447,87 +447,87 @@ func decodeStrictWarehouseRequest(c *gin.Context, dst *managedWarehouseRequest) return dec.Decode(dst) } -// --- Teams --- +// --- Orgs --- -func (h *apiHandler) listTeams(c *gin.Context) { - teams, err := h.store.ListTeams() +func (h *apiHandler) listOrgs(c *gin.Context) { + orgs, err := h.store.ListOrgs() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } - c.JSON(http.StatusOK, teams) + c.JSON(http.StatusOK, orgs) } -func (h *apiHandler) createTeam(c *gin.Context) { - var team configstore.Team - if err := c.ShouldBindJSON(&team); err != nil { +func (h *apiHandler) createOrg(c *gin.Context) { + var org configstore.Org + if err := c.ShouldBindJSON(&org); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if err := validateTeamMutationPayload(&team); err != nil { + if err := validateOrgMutationPayload(&org); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if team.Name == "" { + if org.Name == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "name is required"}) return } - if err := h.store.CreateTeam(&team); err != nil { + if err := h.store.CreateOrg(&org); err != nil { c.JSON(http.StatusConflict, gin.H{"error": err.Error()}) return } - c.JSON(http.StatusCreated, team) + c.JSON(http.StatusCreated, org) } -func (h *apiHandler) getTeam(c *gin.Context) { +func (h *apiHandler) getOrg(c *gin.Context) { name := c.Param("name") - team, err := h.store.GetTeam(name) + org, err := h.store.GetOrg(name) if err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": "team not found"}) + c.JSON(http.StatusNotFound, gin.H{"error": "org not found"}) return } - c.JSON(http.StatusOK, team) + c.JSON(http.StatusOK, org) } -func (h *apiHandler) updateTeam(c *gin.Context) { +func (h *apiHandler) updateOrg(c *gin.Context) { name := c.Param("name") - var updates configstore.Team + var updates configstore.Org if err := c.ShouldBindJSON(&updates); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if err := validateTeamMutationPayload(&updates); err != nil { + if err := validateOrgMutationPayload(&updates); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - team, ok, err := h.store.UpdateTeam(name, updates) + org, ok, err := h.store.UpdateOrg(name, updates) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } if !ok { - c.JSON(http.StatusNotFound, gin.H{"error": "team not found"}) + c.JSON(http.StatusNotFound, gin.H{"error": "org not found"}) return } - c.JSON(http.StatusOK, team) + c.JSON(http.StatusOK, org) } -func (h *apiHandler) deleteTeam(c *gin.Context) { +func (h *apiHandler) deleteOrg(c *gin.Context) { name := c.Param("name") - ok, err := h.store.DeleteTeam(name) + ok, err := h.store.DeleteOrg(name) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } if !ok { - c.JSON(http.StatusNotFound, gin.H{"error": "team not found"}) + c.JSON(http.StatusNotFound, gin.H{"error": "org not found"}) return } c.JSON(http.StatusOK, gin.H{"deleted": name}) } -func validateTeamMutationPayload(team *configstore.Team) error { - if team != nil && team.Warehouse != nil { +func validateOrgMutationPayload(org *configstore.Org) error { + if org != nil && org.Warehouse != nil { return errWarehousePayloadNotAllowed } return nil @@ -547,20 +547,20 @@ func (h *apiHandler) getManagedWarehouse(c *gin.Context) { } func (h *apiHandler) putManagedWarehouse(c *gin.Context) { - teamName := c.Param("name") + orgID := c.Param("name") var req managedWarehouseRequest if err := decodeStrictWarehouseRequest(c, &req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } warehouse := req.toManagedWarehouse() - stored, ok, err := h.store.UpsertManagedWarehouse(teamName, &warehouse) + stored, ok, err := h.store.UpsertManagedWarehouse(orgID, &warehouse) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } if !ok { - c.JSON(http.StatusNotFound, gin.H{"error": "team not found"}) + c.JSON(http.StatusNotFound, gin.H{"error": "org not found"}) return } c.JSON(http.StatusOK, stored) @@ -578,18 +578,18 @@ func (h *apiHandler) listUsers(c *gin.Context) { } func (h *apiHandler) createUser(c *gin.Context) { - // Use a raw struct because TeamUser.Password has json:"-" + // Use a raw struct because OrgUser.Password has json:"-" var raw struct { Username string `json:"username"` Password string `json:"password"` - TeamName string `json:"team_name"` + OrgID string `json:"org_id"` } if err := c.ShouldBindJSON(&raw); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if raw.Username == "" || raw.TeamName == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "username and team_name are required"}) + if raw.Username == "" || raw.OrgID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "username and org_id are required"}) return } if raw.Password == "" { @@ -601,10 +601,10 @@ func (h *apiHandler) createUser(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to hash password"}) return } - user := configstore.TeamUser{ + user := configstore.OrgUser{ Username: raw.Username, Password: hash, - TeamName: raw.TeamName, + OrgID: raw.OrgID, } if err := h.store.CreateUser(&user); err != nil { c.JSON(http.StatusConflict, gin.H{"error": err.Error()}) @@ -627,7 +627,7 @@ func (h *apiHandler) updateUser(c *gin.Context) { username := c.Param("username") var raw struct { Password string `json:"password"` - TeamName string `json:"team_name"` + OrgID string `json:"org_id"` } if err := c.ShouldBindJSON(&raw); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -642,7 +642,7 @@ func (h *apiHandler) updateUser(c *gin.Context) { } passwordHash = hash } - user, ok, err := h.store.UpdateUser(username, passwordHash, raw.TeamName) + user, ok, err := h.store.UpdateUser(username, passwordHash, raw.OrgID) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return @@ -790,18 +790,18 @@ func (h *apiHandler) getClusterStatus(c *gin.Context) { return } - teamStats := h.info.AllTeamStats() + orgStats := h.info.AllOrgStats() totalWorkers := 0 totalSessions := 0 - for _, ts := range teamStats { - totalWorkers += ts.Workers - totalSessions += ts.ActiveSessions + for _, os := range orgStats { + totalWorkers += os.Workers + totalSessions += os.ActiveSessions } c.JSON(http.StatusOK, ClusterStatus{ - TotalTeams: len(teamStats), + TotalOrgs: len(orgStats), TotalWorkers: totalWorkers, TotalSessions: totalSessions, - Teams: teamStats, + Orgs: orgStats, }) } diff --git a/controlplane/admin/api_postgres_test.go b/controlplane/admin/api_postgres_test.go index bec0f10..d8d4380 100644 --- a/controlplane/admin/api_postgres_test.go +++ b/controlplane/admin/api_postgres_test.go @@ -87,8 +87,8 @@ func resetConfigStoreTables(t *testing.T, db *gorm.DB) { for _, model := range []any{ &configstore.ManagedWarehouse{}, - &configstore.TeamUser{}, - &configstore.Team{}, + &configstore.OrgUser{}, + &configstore.Org{}, } { if err := db.Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(model).Error; err != nil { t.Fatalf("delete %T: %v", model, err) @@ -100,13 +100,13 @@ func TestUpsertManagedWarehousePreservesCreatedAt(t *testing.T) { store := newPostgresConfigStore(t) apiStore := newGormAPIStore(store).(*gormAPIStore) - if err := store.DB().Create(&configstore.Team{Name: "analytics"}).Error; err != nil { - t.Fatalf("create team: %v", err) + if err := store.DB().Create(&configstore.Org{Name: "analytics"}).Error; err != nil { + t.Fatalf("create org: %v", err) } createdAt := time.Date(2024, time.January, 2, 3, 4, 5, 0, time.UTC) original := &configstore.ManagedWarehouse{ - TeamName: "analytics", + OrgID: "analytics", State: configstore.ManagedWarehouseStatePending, CreatedAt: createdAt, UpdatedAt: createdAt, diff --git a/controlplane/admin/api_test.go b/controlplane/admin/api_test.go index 537fe5e..64d08b2 100644 --- a/controlplane/admin/api_test.go +++ b/controlplane/admin/api_test.go @@ -18,67 +18,67 @@ import ( ) type fakeAPIStore struct { - teams map[string]*configstore.Team - users map[string]*configstore.TeamUser + orgs map[string]*configstore.Org + users map[string]*configstore.OrgUser warehouses map[string]*configstore.ManagedWarehouse } func newFakeAPIStore() *fakeAPIStore { return &fakeAPIStore{ - teams: make(map[string]*configstore.Team), - users: make(map[string]*configstore.TeamUser), + orgs: make(map[string]*configstore.Org), + users: make(map[string]*configstore.OrgUser), warehouses: make(map[string]*configstore.ManagedWarehouse), } } -func (s *fakeAPIStore) ListTeams() ([]configstore.Team, error) { - teams := make([]configstore.Team, 0, len(s.teams)) - for _, team := range s.teams { - teams = append(teams, *copyTeam(team)) +func (s *fakeAPIStore) ListOrgs() ([]configstore.Org, error) { + orgs := make([]configstore.Org, 0, len(s.orgs)) + for _, org := range s.orgs { + orgs = append(orgs, *copyOrg(org)) } - return teams, nil + return orgs, nil } -func (s *fakeAPIStore) CreateTeam(team *configstore.Team) error { - if _, ok := s.teams[team.Name]; ok { - return errors.New("duplicate team") +func (s *fakeAPIStore) CreateOrg(org *configstore.Org) error { + if _, ok := s.orgs[org.Name]; ok { + return errors.New("duplicate org") } - clone := copyTeam(team) + clone := copyOrg(org) clone.Warehouse = nil - s.teams[team.Name] = clone + s.orgs[org.Name] = clone return nil } -func (s *fakeAPIStore) GetTeam(name string) (*configstore.Team, error) { - team, ok := s.teams[name] +func (s *fakeAPIStore) GetOrg(name string) (*configstore.Org, error) { + org, ok := s.orgs[name] if !ok { return nil, gorm.ErrRecordNotFound } - return copyTeam(team), nil + return copyOrg(org), nil } -func (s *fakeAPIStore) UpdateTeam(name string, updates configstore.Team) (*configstore.Team, bool, error) { - team, ok := s.teams[name] +func (s *fakeAPIStore) UpdateOrg(name string, updates configstore.Org) (*configstore.Org, bool, error) { + org, ok := s.orgs[name] if !ok { return nil, false, nil } - team.MaxWorkers = updates.MaxWorkers - team.MemoryBudget = updates.MemoryBudget - team.IdleTimeoutS = updates.IdleTimeoutS - return copyTeam(team), true, nil + org.MaxWorkers = updates.MaxWorkers + org.MemoryBudget = updates.MemoryBudget + org.IdleTimeoutS = updates.IdleTimeoutS + return copyOrg(org), true, nil } -func (s *fakeAPIStore) DeleteTeam(name string) (bool, error) { - if _, ok := s.teams[name]; !ok { +func (s *fakeAPIStore) DeleteOrg(name string) (bool, error) { + if _, ok := s.orgs[name]; !ok { return false, nil } - delete(s.teams, name) + delete(s.orgs, name) delete(s.warehouses, name) return true, nil } -func (s *fakeAPIStore) ListUsers() ([]configstore.TeamUser, error) { - users := make([]configstore.TeamUser, 0, len(s.users)) +func (s *fakeAPIStore) ListUsers() ([]configstore.OrgUser, error) { + users := make([]configstore.OrgUser, 0, len(s.users)) for _, user := range s.users { clone := *user users = append(users, clone) @@ -86,7 +86,7 @@ func (s *fakeAPIStore) ListUsers() ([]configstore.TeamUser, error) { return users, nil } -func (s *fakeAPIStore) CreateUser(user *configstore.TeamUser) error { +func (s *fakeAPIStore) CreateUser(user *configstore.OrgUser) error { if _, ok := s.users[user.Username]; ok { return errors.New("duplicate user") } @@ -95,7 +95,7 @@ func (s *fakeAPIStore) CreateUser(user *configstore.TeamUser) error { return nil } -func (s *fakeAPIStore) GetUser(username string) (*configstore.TeamUser, error) { +func (s *fakeAPIStore) GetUser(username string) (*configstore.OrgUser, error) { user, ok := s.users[username] if !ok { return nil, gorm.ErrRecordNotFound @@ -104,7 +104,7 @@ func (s *fakeAPIStore) GetUser(username string) (*configstore.TeamUser, error) { return &clone, nil } -func (s *fakeAPIStore) UpdateUser(username, passwordHash, teamName string) (*configstore.TeamUser, bool, error) { +func (s *fakeAPIStore) UpdateUser(username, passwordHash, orgID string) (*configstore.OrgUser, bool, error) { user, ok := s.users[username] if !ok { return nil, false, nil @@ -112,8 +112,8 @@ func (s *fakeAPIStore) UpdateUser(username, passwordHash, teamName string) (*con if passwordHash != "" { user.Password = passwordHash } - if teamName != "" { - user.TeamName = teamName + if orgID != "" { + user.OrgID = orgID } clone := *user return &clone, true, nil @@ -127,23 +127,23 @@ func (s *fakeAPIStore) DeleteUser(username string) (bool, error) { return true, nil } -func (s *fakeAPIStore) GetManagedWarehouse(teamName string) (*configstore.ManagedWarehouse, error) { - warehouse, ok := s.warehouses[teamName] +func (s *fakeAPIStore) GetManagedWarehouse(orgID string) (*configstore.ManagedWarehouse, error) { + warehouse, ok := s.warehouses[orgID] if !ok { return nil, gorm.ErrRecordNotFound } return copyWarehouse(warehouse), nil } -func (s *fakeAPIStore) UpsertManagedWarehouse(teamName string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) { - team, ok := s.teams[teamName] +func (s *fakeAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configstore.ManagedWarehouse) (*configstore.ManagedWarehouse, bool, error) { + org, ok := s.orgs[orgID] if !ok { return nil, false, nil } clone := copyWarehouse(warehouse) - clone.TeamName = teamName - s.warehouses[teamName] = clone - team.Warehouse = copyWarehouse(clone) + clone.OrgID = orgID + s.warehouses[orgID] = clone + org.Warehouse = copyWarehouse(clone) return copyWarehouse(clone), true, nil } @@ -187,17 +187,17 @@ func copyWarehouse(warehouse *configstore.ManagedWarehouse) *configstore.Managed return &clone } -func copyTeam(team *configstore.Team) *configstore.Team { - if team == nil { +func copyOrg(org *configstore.Org) *configstore.Org { + if org == nil { return nil } - clone := *team - if team.Warehouse != nil { - clone.Warehouse = copyWarehouse(team.Warehouse) + clone := *org + if org.Warehouse != nil { + clone.Warehouse = copyWarehouse(org.Warehouse) } - if len(team.Users) > 0 { - clone.Users = make([]configstore.TeamUser, len(team.Users)) - copy(clone.Users, team.Users) + if len(org.Users) > 0 { + clone.Users = make([]configstore.OrgUser, len(org.Users)) + copy(clone.Users, org.Users) } return &clone } @@ -209,9 +209,9 @@ func newTestAPIRouter(store apiStore) *gin.Engine { return r } -func seedTeamWithWarehouse(store *fakeAPIStore, name string) { +func seedOrgWithWarehouse(store *fakeAPIStore, name string) { warehouse := &configstore.ManagedWarehouse{ - TeamName: name, + OrgID: name, WarehouseDatabase: configstore.ManagedWarehouseDatabase{ Region: "us-east-1", Endpoint: fmt.Sprintf("%s.cluster.example", name), @@ -266,19 +266,19 @@ func seedTeamWithWarehouse(store *fakeAPIStore, name string) { IdentityState: configstore.ManagedWarehouseStateReady, SecretsState: configstore.ManagedWarehouseStateReady, } - store.teams[name] = &configstore.Team{ + store.orgs[name] = &configstore.Org{ Name: name, Warehouse: copyWarehouse(warehouse), } store.warehouses[name] = warehouse } -func TestGetTeamIncludesWarehouse(t *testing.T) { +func TestGetOrgIncludesWarehouse(t *testing.T) { store := newFakeAPIStore() - seedTeamWithWarehouse(store, "analytics") + seedOrgWithWarehouse(store, "analytics") router := newTestAPIRouter(store) - req := httptest.NewRequest(http.MethodGet, "/api/v1/teams/analytics", nil) + req := httptest.NewRequest(http.MethodGet, "/api/v1/orgs/analytics", nil) rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -286,27 +286,27 @@ func TestGetTeamIncludesWarehouse(t *testing.T) { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusOK, rec.Body.String()) } - var team configstore.Team - if err := json.Unmarshal(rec.Body.Bytes(), &team); err != nil { - t.Fatalf("unmarshal team: %v", err) + var org configstore.Org + if err := json.Unmarshal(rec.Body.Bytes(), &org); err != nil { + t.Fatalf("unmarshal org: %v", err) } - if team.Warehouse == nil { - t.Fatal("expected warehouse in team response") + if org.Warehouse == nil { + t.Fatal("expected warehouse in org response") } - if team.Warehouse.WarehouseDatabase.DatabaseName != "analytics_warehouse" { - t.Fatalf("expected analytics_warehouse, got %q", team.Warehouse.WarehouseDatabase.DatabaseName) + if org.Warehouse.WarehouseDatabase.DatabaseName != "analytics_warehouse" { + t.Fatalf("expected analytics_warehouse, got %q", org.Warehouse.WarehouseDatabase.DatabaseName) } - if team.Warehouse.MetadataStore.Kind != "dedicated_rds" { - t.Fatalf("expected metadata store kind dedicated_rds, got %q", team.Warehouse.MetadataStore.Kind) + if org.Warehouse.MetadataStore.Kind != "dedicated_rds" { + t.Fatalf("expected metadata store kind dedicated_rds, got %q", org.Warehouse.MetadataStore.Kind) } } -func TestListTeamsIncludesWarehouse(t *testing.T) { +func TestListOrgsIncludesWarehouse(t *testing.T) { store := newFakeAPIStore() - seedTeamWithWarehouse(store, "analytics") + seedOrgWithWarehouse(store, "analytics") router := newTestAPIRouter(store) - req := httptest.NewRequest(http.MethodGet, "/api/v1/teams", nil) + req := httptest.NewRequest(http.MethodGet, "/api/v1/orgs", nil) rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -314,24 +314,24 @@ func TestListTeamsIncludesWarehouse(t *testing.T) { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusOK, rec.Body.String()) } - var teams []configstore.Team - if err := json.Unmarshal(rec.Body.Bytes(), &teams); err != nil { - t.Fatalf("unmarshal teams: %v", err) + var orgs []configstore.Org + if err := json.Unmarshal(rec.Body.Bytes(), &orgs); err != nil { + t.Fatalf("unmarshal orgs: %v", err) } - if len(teams) != 1 { - t.Fatalf("expected 1 team, got %d", len(teams)) + if len(orgs) != 1 { + t.Fatalf("expected 1 org, got %d", len(orgs)) } - if teams[0].Warehouse == nil { - t.Fatal("expected nested warehouse in team list response") + if orgs[0].Warehouse == nil { + t.Fatal("expected nested warehouse in org list response") } } func TestGetWarehouseReturnsNotFoundWhenMissing(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{Name: "analytics"} + store.orgs["analytics"] = &configstore.Org{Name: "analytics"} router := newTestAPIRouter(store) - req := httptest.NewRequest(http.MethodGet, "/api/v1/teams/analytics/warehouse", nil) + req := httptest.NewRequest(http.MethodGet, "/api/v1/orgs/analytics/warehouse", nil) rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -340,9 +340,9 @@ func TestGetWarehouseReturnsNotFoundWhenMissing(t *testing.T) { } } -func TestPutWarehouseUpsertsForExistingTeam(t *testing.T) { +func TestPutWarehouseUpsertsForExistingOrg(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{Name: "analytics"} + store.orgs["analytics"] = &configstore.Org{Name: "analytics"} router := newTestAPIRouter(store) body := []byte(`{ @@ -405,7 +405,7 @@ func TestPutWarehouseUpsertsForExistingTeam(t *testing.T) { "secrets_state": "ready" }`) - req := httptest.NewRequest(http.MethodPut, "/api/v1/teams/analytics/warehouse", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPut, "/api/v1/orgs/analytics/warehouse", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -418,8 +418,8 @@ func TestPutWarehouseUpsertsForExistingTeam(t *testing.T) { if warehouse == nil { t.Fatal("expected stored warehouse") } - if warehouse.TeamName != "analytics" { - t.Fatalf("expected team_name analytics, got %q", warehouse.TeamName) + if warehouse.OrgID != "analytics" { + t.Fatalf("expected org_id analytics, got %q", warehouse.OrgID) } if warehouse.RuntimeConfig.Name != "analytics-runtime" { t.Fatalf("expected runtime secret analytics-runtime, got %q", warehouse.RuntimeConfig.Name) @@ -432,11 +432,11 @@ func TestPutWarehouseUpsertsForExistingTeam(t *testing.T) { } } -func TestPutWarehouseRejectsUnknownTeam(t *testing.T) { +func TestPutWarehouseRejectsUnknownOrg(t *testing.T) { store := newFakeAPIStore() router := newTestAPIRouter(store) - req := httptest.NewRequest(http.MethodPut, "/api/v1/teams/unknown/warehouse", bytes.NewReader([]byte(`{"state":"ready"}`))) + req := httptest.NewRequest(http.MethodPut, "/api/v1/orgs/unknown/warehouse", bytes.NewReader([]byte(`{"state":"ready"}`))) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -448,18 +448,18 @@ func TestPutWarehouseRejectsUnknownTeam(t *testing.T) { func TestPutWarehouseRejectsServerManagedFields(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{Name: "analytics"} + store.orgs["analytics"] = &configstore.Org{Name: "analytics"} router := newTestAPIRouter(store) body := []byte(`{ - "team_name": "wrong-team", + "org_id": "wrong-org", "created_at": "2026-03-18T10:00:00Z", "warehouse_database": { "database_name": "analytics_warehouse" } }`) - req := httptest.NewRequest(http.MethodPut, "/api/v1/teams/analytics/warehouse", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPut, "/api/v1/orgs/analytics/warehouse", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -471,7 +471,7 @@ func TestPutWarehouseRejectsServerManagedFields(t *testing.T) { func TestPutWarehouseAllowsCustomProvisioningStates(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{Name: "analytics"} + store.orgs["analytics"] = &configstore.Org{Name: "analytics"} router := newTestAPIRouter(store) body := []byte(`{ @@ -483,7 +483,7 @@ func TestPutWarehouseAllowsCustomProvisioningStates(t *testing.T) { "secrets_state": "waiting-external-secret" }`) - req := httptest.NewRequest(http.MethodPut, "/api/v1/teams/analytics/warehouse", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPut, "/api/v1/orgs/analytics/warehouse", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -507,7 +507,7 @@ func TestPutWarehouseAllowsCustomProvisioningStates(t *testing.T) { } } -func TestCreateTeamRejectsNestedWarehousePayload(t *testing.T) { +func TestCreateOrgRejectsNestedWarehousePayload(t *testing.T) { store := newFakeAPIStore() router := newTestAPIRouter(store) @@ -519,7 +519,7 @@ func TestCreateTeamRejectsNestedWarehousePayload(t *testing.T) { } }`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/teams", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -527,14 +527,14 @@ func TestCreateTeamRejectsNestedWarehousePayload(t *testing.T) { if rec.Code != http.StatusBadRequest { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusBadRequest, rec.Body.String()) } - if _, ok := store.teams["analytics"]; ok { - t.Fatal("expected team create to be rejected when warehouse payload is present") + if _, ok := store.orgs["analytics"]; ok { + t.Fatal("expected org create to be rejected when warehouse payload is present") } } -func TestUpdateTeamRejectsNestedWarehousePayload(t *testing.T) { +func TestUpdateOrgRejectsNestedWarehousePayload(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{Name: "analytics", MaxWorkers: 2} + store.orgs["analytics"] = &configstore.Org{Name: "analytics", MaxWorkers: 2} router := newTestAPIRouter(store) body := []byte(`{ @@ -544,7 +544,7 @@ func TestUpdateTeamRejectsNestedWarehousePayload(t *testing.T) { } }`) - req := httptest.NewRequest(http.MethodPut, "/api/v1/teams/analytics", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPut, "/api/v1/orgs/analytics", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -552,20 +552,20 @@ func TestUpdateTeamRejectsNestedWarehousePayload(t *testing.T) { if rec.Code != http.StatusBadRequest { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusBadRequest, rec.Body.String()) } - if store.teams["analytics"].MaxWorkers != 2 { - t.Fatalf("expected team update to be rejected, max_workers = %d", store.teams["analytics"].MaxWorkers) + if store.orgs["analytics"].MaxWorkers != 2 { + t.Fatalf("expected org update to be rejected, max_workers = %d", store.orgs["analytics"].MaxWorkers) } } -func TestGetTeamOmitsMinWorkers(t *testing.T) { +func TestGetOrgOmitsMinWorkers(t *testing.T) { store := newFakeAPIStore() - store.teams["analytics"] = &configstore.Team{ + store.orgs["analytics"] = &configstore.Org{ Name: "analytics", MaxWorkers: 2, } router := newTestAPIRouter(store) - req := httptest.NewRequest(http.MethodGet, "/api/v1/teams/analytics", nil) + req := httptest.NewRequest(http.MethodGet, "/api/v1/orgs/analytics", nil) rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -573,7 +573,7 @@ func TestGetTeamOmitsMinWorkers(t *testing.T) { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusOK, rec.Body.String()) } if bytes.Contains(rec.Body.Bytes(), []byte(`"min_workers"`)) { - t.Fatalf("expected team response to omit min_workers, got %s", rec.Body.String()) + t.Fatalf("expected org response to omit min_workers, got %s", rec.Body.String()) } } @@ -583,8 +583,8 @@ func TestManagedWarehouseUpsertColumnsExcludeCreatedAt(t *testing.T) { if slices.Contains(columns, "created_at") { t.Fatal("expected created_at to be excluded from managed warehouse upserts") } - if slices.Contains(columns, "team_name") { - t.Fatal("expected team_name to be excluded from managed warehouse upserts") + if slices.Contains(columns, "org_id") { + t.Fatal("expected org_id to be excluded from managed warehouse upserts") } if !slices.Contains(columns, "updated_at") { t.Fatal("expected updated_at to be included in managed warehouse upserts") diff --git a/controlplane/admin/dashboard.go b/controlplane/admin/dashboard.go index 16ebecc..6984663 100644 --- a/controlplane/admin/dashboard.go +++ b/controlplane/admin/dashboard.go @@ -37,7 +37,7 @@ func APIAuthMiddleware(adminToken string) gin.HandlerFunc { // RegisterDashboard serves the admin dashboard on the Gin engine. func RegisterDashboard(r *gin.Engine, adminToken string) { r.GET("/", dashboardPageHandler("index.html", adminToken)) - r.GET("/teams", dashboardPageHandler("teams.html", adminToken)) + r.GET("/orgs", dashboardPageHandler("orgs.html", adminToken)) r.GET("/workers", dashboardPageHandler("workers.html", adminToken)) r.GET("/sessions", dashboardPageHandler("sessions.html", adminToken)) r.GET("/settings", dashboardPageHandler("settings.html", adminToken)) diff --git a/controlplane/admin/static/index.html b/controlplane/admin/static/index.html index 62668d9..1745ab5 100644 --- a/controlplane/admin/static/index.html +++ b/controlplane/admin/static/index.html @@ -16,7 +16,7 @@
Overview - Teams + Orgs Ducklings Sessions Settings @@ -34,10 +34,10 @@

Cluster Overview

-

Teams

-
Orgs +
-
Loading teams...
+
Loading orgs...
@@ -54,8 +54,8 @@

Teams

const data = JSON.parse(evt.detail.serverResponse); evt.detail.serverResponse = `
-
${data.total_teams}
-
Teams
+
${data.total_orgs}
+
Orgs
${data.total_workers}
@@ -67,16 +67,16 @@

Teams

`; } catch(e) {} } - if (evt.detail.target.id === 'team-list') { + if (evt.detail.target.id === 'org-list') { try { - const teams = JSON.parse(evt.detail.serverResponse); - if (!teams || teams.length === 0) { - evt.detail.serverResponse = '
No teams configured
'; + const orgs = JSON.parse(evt.detail.serverResponse); + if (!orgs || orgs.length === 0) { + evt.detail.serverResponse = '
No orgs configured
'; return; } let html = '' + ''; - teams.forEach(t => { + orgs.forEach(t => { html += ` diff --git a/controlplane/admin/static/teams.html b/controlplane/admin/static/orgs.html similarity index 80% rename from controlplane/admin/static/teams.html rename to controlplane/admin/static/orgs.html index f36fa3e..d114b0b 100644 --- a/controlplane/admin/static/teams.html +++ b/controlplane/admin/static/orgs.html @@ -3,7 +3,7 @@ - Duckgres - Teams + Duckgres - Orgs @@ -16,7 +16,7 @@
Overview - Teams + Orgs Ducklings Sessions Settings @@ -26,19 +26,19 @@
-

Teams

+

Orgs

NameUsersMax WorkersMemory Budget
${esc(t.name)} ${(t.users || []).length}
' + '' + ''; - teams.forEach(t => { + orgs.forEach(t => { const safeName = esc(t.name); html += ` @@ -82,8 +82,8 @@

Create Team

`; }); diff --git a/controlplane/admin/static/sessions.html b/controlplane/admin/static/sessions.html index 0b59e8d..d7d308f 100644 --- a/controlplane/admin/static/sessions.html +++ b/controlplane/admin/static/sessions.html @@ -16,7 +16,7 @@
Overview - Teams + Orgs Ducklings Sessions Settings @@ -48,11 +48,11 @@

Active Sessions

return; } let html = '
NameUsersMax WorkersMemory BudgetActions
${safeName} ${t.max_workers || 'unlimited'} ${esc(t.memory_budget || 'default')} -
' + - ''; + ''; sessions.forEach(s => { html += ` - + `; }); html += '
PIDTeamWorker
PIDOrgWorker
${s.pid}${esc(s.team)}${esc(s.org)} ${s.worker_id}
'; diff --git a/controlplane/admin/static/settings.html b/controlplane/admin/static/settings.html index 1100c03..540cf62 100644 --- a/controlplane/admin/static/settings.html +++ b/controlplane/admin/static/settings.html @@ -16,7 +16,7 @@
Overview - Teams + Orgs Ducklings Sessions Settings diff --git a/controlplane/admin/static/workers.html b/controlplane/admin/static/workers.html index a9c0a67..7104aa2 100644 --- a/controlplane/admin/static/workers.html +++ b/controlplane/admin/static/workers.html @@ -16,7 +16,7 @@
Overview - Teams + Orgs Ducklings Sessions Settings @@ -48,12 +48,12 @@

Ducklings (Workers)

return; } let html = '' + - ''; + ''; workers.forEach(w => { const statusColor = w.status === 'active' ? 'text-green-400' : 'text-yellow-400'; html += ` - + `; }); diff --git a/controlplane/configstore/models.go b/controlplane/configstore/models.go index 5a07575..8215ee7 100644 --- a/controlplane/configstore/models.go +++ b/controlplane/configstore/models.go @@ -2,30 +2,30 @@ package configstore import "time" -// Team represents a tenant with per-team resource limits. -type Team struct { +// Org represents a tenant with per-org resource limits. +type Org struct { Name string `gorm:"primaryKey;size:255" json:"name"` MaxWorkers int `gorm:"default:0" json:"max_workers"` MemoryBudget string `gorm:"size:32" json:"memory_budget"` IdleTimeoutS int `gorm:"default:0" json:"idle_timeout_s"` - Users []TeamUser `gorm:"foreignKey:TeamName;references:Name" json:"users,omitempty"` - Warehouse *ManagedWarehouse `gorm:"foreignKey:TeamName;references:Name;constraint:OnDelete:CASCADE" json:"warehouse,omitempty"` + Users []OrgUser `gorm:"foreignKey:OrgID;references:Name" json:"users,omitempty"` + Warehouse *ManagedWarehouse `gorm:"foreignKey:OrgID;references:Name;constraint:OnDelete:CASCADE" json:"warehouse,omitempty"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } -func (Team) TableName() string { return "duckgres_teams" } +func (Org) TableName() string { return "duckgres_orgs" } -// TeamUser maps a username to a team with credentials. -type TeamUser struct { +// OrgUser maps a username to an org with credentials. +type OrgUser struct { Username string `gorm:"primaryKey;size:255" json:"username"` Password string `gorm:"size:255;not null" json:"-"` - TeamName string `gorm:"size:255;not null;index" json:"team_name"` + OrgID string `gorm:"size:255;not null;index" json:"org_id"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } -func (TeamUser) TableName() string { return "duckgres_team_users" } +func (OrgUser) TableName() string { return "duckgres_org_users" } // ManagedWarehouseProvisioningState is an open string used for warehouse lifecycle status. // The constants below are the canonical values used by current tooling, but callers may @@ -48,7 +48,7 @@ type SecretRef struct { Key string `gorm:"size:255" json:"key"` } -// ManagedWarehouseDatabase stores primary warehouse DB metadata for a team. +// ManagedWarehouseDatabase stores primary warehouse DB metadata for an org. type ManagedWarehouseDatabase struct { Region string `gorm:"size:64" json:"region"` Endpoint string `gorm:"size:512" json:"endpoint"` @@ -57,7 +57,7 @@ type ManagedWarehouseDatabase struct { Username string `gorm:"size:255" json:"username"` } -// ManagedWarehouseMetadataStore stores team-scoped DuckLake metadata DB info. +// ManagedWarehouseMetadataStore stores org-scoped DuckLake metadata DB info. type ManagedWarehouseMetadataStore struct { Kind string `gorm:"size:64" json:"kind"` Engine string `gorm:"size:64" json:"engine"` @@ -68,7 +68,7 @@ type ManagedWarehouseMetadataStore struct { Username string `gorm:"size:255" json:"username"` } -// ManagedWarehouseS3 stores object-store metadata for a team's warehouse. +// ManagedWarehouseS3 stores object-store metadata for an org's warehouse. type ManagedWarehouseS3 struct { Provider string `gorm:"size:64" json:"provider"` Region string `gorm:"size:64" json:"region"` @@ -79,16 +79,16 @@ type ManagedWarehouseS3 struct { URLStyle string `gorm:"size:16" json:"url_style"` } -// ManagedWarehouseWorkerIdentity stores team-scoped worker identity metadata. +// ManagedWarehouseWorkerIdentity stores org-scoped worker identity metadata. type ManagedWarehouseWorkerIdentity struct { Namespace string `gorm:"size:255" json:"namespace"` ServiceAccountName string `gorm:"size:255" json:"service_account_name"` IAMRoleARN string `gorm:"size:512" json:"iam_role_arn"` } -// ManagedWarehouse is the config-store source of truth for a team's managed warehouse metadata. +// ManagedWarehouse is the config-store source of truth for an org's managed warehouse metadata. type ManagedWarehouse struct { - TeamName string `gorm:"primaryKey;size:255" json:"team_name"` + OrgID string `gorm:"primaryKey;size:255" json:"org_id"` WarehouseDatabase ManagedWarehouseDatabase `gorm:"embedded;embeddedPrefix:warehouse_database_" json:"warehouse_database"` MetadataStore ManagedWarehouseMetadataStore `gorm:"embedded;embeddedPrefix:metadata_store_" json:"metadata_store"` @@ -136,7 +136,7 @@ type GlobalConfig struct { func (GlobalConfig) TableName() string { return "duckgres_global_config" } // DuckLakeConfig is a singleton row (ID=1) for legacy cluster-wide DuckLake settings. -// In multi-tenant mode, the managed-warehouse contract is the intended per-team source of truth. +// In multi-tenant mode, the managed-warehouse contract is the intended per-org source of truth. type DuckLakeConfig struct { ID uint `gorm:"primaryKey" json:"-"` MetadataStore string `gorm:"size:1024" json:"metadata_store"` @@ -181,8 +181,8 @@ type QueryLogConfig struct { func (QueryLogConfig) TableName() string { return "duckgres_query_log_config" } -// TeamConfig is a convenience view combining team metadata with resource limits. -type TeamConfig struct { +// OrgConfig is a convenience view combining org metadata with resource limits. +type OrgConfig struct { Name string MaxWorkers int MemoryBudget string @@ -191,9 +191,9 @@ type TeamConfig struct { Warehouse *ManagedWarehouseConfig } -// ManagedWarehouseConfig is the in-memory snapshot view of a team's warehouse metadata. +// ManagedWarehouseConfig is the in-memory snapshot view of an org's warehouse metadata. type ManagedWarehouseConfig struct { - TeamName string + OrgID string WarehouseDatabase ManagedWarehouseDatabase MetadataStore ManagedWarehouseMetadataStore @@ -227,7 +227,7 @@ func copyManagedWarehouseConfig(warehouse *ManagedWarehouse) *ManagedWarehouseCo } cfg := &ManagedWarehouseConfig{ - TeamName: warehouse.TeamName, + OrgID: warehouse.OrgID, WarehouseDatabase: warehouse.WarehouseDatabase, MetadataStore: warehouse.MetadataStore, S3: warehouse.S3, diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 40b05f6..a50aaf5 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -15,8 +15,8 @@ import ( // Snapshot holds a point-in-time copy of all config data for fast lookups. type Snapshot struct { - Teams map[string]*TeamConfig - UserTeam map[string]string // username -> team name + Orgs map[string]*OrgConfig + UserOrg map[string]string // username -> org name UserPassword map[string]string // username -> password Global GlobalConfig DuckLake DuckLakeConfig @@ -49,9 +49,9 @@ func NewConfigStore(connStr string, pollInterval time.Duration) (*ConfigStore, e // Auto-migrate all models if err := db.AutoMigrate( - &Team{}, + &Org{}, &ManagedWarehouse{}, - &TeamUser{}, + &OrgUser{}, &GlobalConfig{}, &DuckLakeConfig{}, &RateLimitConfig{}, @@ -78,7 +78,7 @@ func NewConfigStore(connStr string, pollInterval time.Duration) (*ConfigStore, e } cs.snapshot = snap - slog.Info("Config store connected.", "teams", len(snap.Teams), "users", len(snap.UserTeam)) + slog.Info("Config store connected.", "orgs", len(snap.Orgs), "users", len(snap.UserOrg)) return cs, nil } @@ -117,9 +117,9 @@ func (cs *ConfigStore) Start(ctx context.Context) { // load fetches all config from the database and builds a Snapshot. func (cs *ConfigStore) load() (*Snapshot, error) { - var teams []Team - if err := cs.db.Preload("Users").Preload("Warehouse").Find(&teams).Error; err != nil { - return nil, fmt.Errorf("load teams: %w", err) + var orgs []Org + if err := cs.db.Preload("Users").Preload("Warehouse").Find(&orgs).Error; err != nil { + return nil, fmt.Errorf("load orgs: %w", err) } var global GlobalConfig @@ -135,8 +135,8 @@ func (cs *ConfigStore) load() (*Snapshot, error) { cs.db.First(&queryLog, 1) snap := &Snapshot{ - Teams: make(map[string]*TeamConfig), - UserTeam: make(map[string]string), + Orgs: make(map[string]*OrgConfig), + UserOrg: make(map[string]string), UserPassword: make(map[string]string), Global: global, DuckLake: duckLake, @@ -144,21 +144,21 @@ func (cs *ConfigStore) load() (*Snapshot, error) { QueryLog: queryLog, } - for _, t := range teams { - tc := &TeamConfig{ - Name: t.Name, - MaxWorkers: t.MaxWorkers, - MemoryBudget: t.MemoryBudget, - IdleTimeoutS: t.IdleTimeoutS, + for _, o := range orgs { + oc := &OrgConfig{ + Name: o.Name, + MaxWorkers: o.MaxWorkers, + MemoryBudget: o.MemoryBudget, + IdleTimeoutS: o.IdleTimeoutS, Users: make(map[string]string), - Warehouse: copyManagedWarehouseConfig(t.Warehouse), + Warehouse: copyManagedWarehouseConfig(o.Warehouse), } - for _, u := range t.Users { - tc.Users[u.Username] = u.Password - snap.UserTeam[u.Username] = t.Name + for _, u := range o.Users { + oc.Users[u.Username] = u.Password + snap.UserOrg[u.Username] = o.Name snap.UserPassword[u.Username] = u.Password } - snap.Teams[t.Name] = tc + snap.Orgs[o.Name] = oc } return snap, nil @@ -172,7 +172,7 @@ func (cs *ConfigStore) Snapshot() *Snapshot { } // ValidateUser checks username/password against the cached snapshot. -// Passwords are compared using bcrypt. Returns the team name and whether auth succeeded. +// Passwords are compared using bcrypt. Returns the org name and whether auth succeeded. func (cs *ConfigStore) ValidateUser(username, password string) (string, bool) { cs.mu.RLock() defer cs.mu.RUnlock() @@ -188,7 +188,7 @@ func (cs *ConfigStore) ValidateUser(username, password string) (string, bool) { if err := bcrypt.CompareHashAndPassword([]byte(storedHash), []byte(password)); err != nil { return "", false } - return cs.snapshot.UserTeam[username], true + return cs.snapshot.UserOrg[username], true } // HashPassword hashes a plaintext password using bcrypt. @@ -200,14 +200,14 @@ func HashPassword(password string) (string, error) { return string(hash), nil } -// TeamForUser returns the team name for a user, or "" if not found. -func (cs *ConfigStore) TeamForUser(username string) string { +// OrgForUser returns the org name for a user, or "" if not found. +func (cs *ConfigStore) OrgForUser(username string) string { cs.mu.RLock() defer cs.mu.RUnlock() if cs.snapshot == nil { return "" } - return cs.snapshot.UserTeam[username] + return cs.snapshot.UserOrg[username] } // OnChange registers a callback that fires when the config snapshot changes. diff --git a/controlplane/configstore/store_test.go b/controlplane/configstore/store_test.go index f1c3094..118ba25 100644 --- a/controlplane/configstore/store_test.go +++ b/controlplane/configstore/store_test.go @@ -17,19 +17,19 @@ func mustHash(t *testing.T, password string) string { } func TestSnapshotBuild(t *testing.T) { - // Verify TeamConfig construction from models + // Verify OrgConfig construction from models hash1 := mustHash(t, "secret1") hash2 := mustHash(t, "secret2") hash3 := mustHash(t, "secret3") readyAt := time.Date(2026, time.March, 17, 12, 0, 0, 0, time.UTC) - teams := []Team{ + orgs := []Org{ { Name: "analytics", MaxWorkers: 4, MemoryBudget: "8GB", Warehouse: &ManagedWarehouse{ - TeamName: "analytics", + OrgID: "analytics", WarehouseDatabase: ManagedWarehouseDatabase{ Region: "us-east-1", Endpoint: "analytics.cluster-xyz.us-east-1.rds.amazonaws.com", @@ -50,15 +50,15 @@ func TestSnapshotBuild(t *testing.T) { Provider: "aws", Region: "us-east-1", Bucket: "analytics-bucket", - PathPrefix: "ducklake/team-analytics/", + PathPrefix: "ducklake/org-analytics/", Endpoint: "s3.us-east-1.amazonaws.com", UseSSL: true, URLStyle: "vhost", }, WorkerIdentity: ManagedWarehouseWorkerIdentity{ Namespace: "duckgres", - ServiceAccountName: "team-analytics-worker", - IAMRoleARN: "arn:aws:iam::123456789012:role/team-analytics-worker", + ServiceAccountName: "org-analytics-worker", + IAMRoleARN: "arn:aws:iam::123456789012:role/org-analytics-worker", }, WarehouseDatabaseCredentials: SecretRef{ Namespace: "duckgres", @@ -89,86 +89,86 @@ func TestSnapshotBuild(t *testing.T) { SecretsState: ManagedWarehouseStateReady, ReadyAt: &readyAt, }, - Users: []TeamUser{ - {Username: "alice", Password: hash1, TeamName: "analytics"}, - {Username: "bob", Password: hash2, TeamName: "analytics"}, + Users: []OrgUser{ + {Username: "alice", Password: hash1, OrgID: "analytics"}, + {Username: "bob", Password: hash2, OrgID: "analytics"}, }, }, { Name: "ingestion", MaxWorkers: 2, - Users: []TeamUser{ - {Username: "charlie", Password: hash3, TeamName: "ingestion"}, + Users: []OrgUser{ + {Username: "charlie", Password: hash3, OrgID: "ingestion"}, }, }, } snap := &Snapshot{ - Teams: make(map[string]*TeamConfig), - UserTeam: make(map[string]string), + Orgs: make(map[string]*OrgConfig), + UserOrg: make(map[string]string), UserPassword: make(map[string]string), } - for _, t2 := range teams { - tc := &TeamConfig{ - Name: t2.Name, - MaxWorkers: t2.MaxWorkers, - MemoryBudget: t2.MemoryBudget, - IdleTimeoutS: t2.IdleTimeoutS, + for _, o := range orgs { + oc := &OrgConfig{ + Name: o.Name, + MaxWorkers: o.MaxWorkers, + MemoryBudget: o.MemoryBudget, + IdleTimeoutS: o.IdleTimeoutS, Users: make(map[string]string), } - if t2.Warehouse != nil { - tc.Warehouse = copyManagedWarehouseConfig(t2.Warehouse) + if o.Warehouse != nil { + oc.Warehouse = copyManagedWarehouseConfig(o.Warehouse) } - for _, u := range t2.Users { - tc.Users[u.Username] = u.Password - snap.UserTeam[u.Username] = t2.Name + for _, u := range o.Users { + oc.Users[u.Username] = u.Password + snap.UserOrg[u.Username] = o.Name snap.UserPassword[u.Username] = u.Password } - snap.Teams[t2.Name] = tc + snap.Orgs[o.Name] = oc } - // Verify team config - if len(snap.Teams) != 2 { - t.Fatalf("expected 2 teams, got %d", len(snap.Teams)) + // Verify org config + if len(snap.Orgs) != 2 { + t.Fatalf("expected 2 orgs, got %d", len(snap.Orgs)) } - if snap.Teams["analytics"].MaxWorkers != 4 { - t.Errorf("expected analytics max_workers=4, got %d", snap.Teams["analytics"].MaxWorkers) + if snap.Orgs["analytics"].MaxWorkers != 4 { + t.Errorf("expected analytics max_workers=4, got %d", snap.Orgs["analytics"].MaxWorkers) } - if snap.Teams["analytics"].MemoryBudget != "8GB" { - t.Errorf("expected analytics memory_budget=8GB, got %s", snap.Teams["analytics"].MemoryBudget) + if snap.Orgs["analytics"].MemoryBudget != "8GB" { + t.Errorf("expected analytics memory_budget=8GB, got %s", snap.Orgs["analytics"].MemoryBudget) } - if len(snap.Teams["analytics"].Users) != 2 { - t.Errorf("expected 2 analytics users, got %d", len(snap.Teams["analytics"].Users)) + if len(snap.Orgs["analytics"].Users) != 2 { + t.Errorf("expected 2 analytics users, got %d", len(snap.Orgs["analytics"].Users)) } - if snap.Teams["analytics"].Warehouse == nil { + if snap.Orgs["analytics"].Warehouse == nil { t.Fatal("expected analytics warehouse to be present") } - if snap.Teams["analytics"].Warehouse.WarehouseDatabase.DatabaseName != "analytics_wh" { - t.Fatalf("expected analytics warehouse db name analytics_wh, got %q", snap.Teams["analytics"].Warehouse.WarehouseDatabase.DatabaseName) + if snap.Orgs["analytics"].Warehouse.WarehouseDatabase.DatabaseName != "analytics_wh" { + t.Fatalf("expected analytics warehouse db name analytics_wh, got %q", snap.Orgs["analytics"].Warehouse.WarehouseDatabase.DatabaseName) } - if snap.Teams["analytics"].Warehouse.MetadataStore.Kind != "dedicated_rds" { - t.Fatalf("expected metadata store kind dedicated_rds, got %q", snap.Teams["analytics"].Warehouse.MetadataStore.Kind) + if snap.Orgs["analytics"].Warehouse.MetadataStore.Kind != "dedicated_rds" { + t.Fatalf("expected metadata store kind dedicated_rds, got %q", snap.Orgs["analytics"].Warehouse.MetadataStore.Kind) } - if snap.Teams["analytics"].Warehouse.MetadataStoreCredentials.Name != "analytics-metadata" { - t.Fatalf("expected metadata secret analytics-metadata, got %q", snap.Teams["analytics"].Warehouse.MetadataStoreCredentials.Name) + if snap.Orgs["analytics"].Warehouse.MetadataStoreCredentials.Name != "analytics-metadata" { + t.Fatalf("expected metadata secret analytics-metadata, got %q", snap.Orgs["analytics"].Warehouse.MetadataStoreCredentials.Name) } - if snap.Teams["analytics"].Warehouse.RuntimeConfig.Name != "analytics-runtime" { - t.Fatalf("expected runtime config secret analytics-runtime, got %q", snap.Teams["analytics"].Warehouse.RuntimeConfig.Name) + if snap.Orgs["analytics"].Warehouse.RuntimeConfig.Name != "analytics-runtime" { + t.Fatalf("expected runtime config secret analytics-runtime, got %q", snap.Orgs["analytics"].Warehouse.RuntimeConfig.Name) } - if snap.Teams["analytics"].Warehouse.ReadyAt == nil || !snap.Teams["analytics"].Warehouse.ReadyAt.Equal(readyAt) { - t.Fatalf("expected ready_at %v, got %v", readyAt, snap.Teams["analytics"].Warehouse.ReadyAt) + if snap.Orgs["analytics"].Warehouse.ReadyAt == nil || !snap.Orgs["analytics"].Warehouse.ReadyAt.Equal(readyAt) { + t.Fatalf("expected ready_at %v, got %v", readyAt, snap.Orgs["analytics"].Warehouse.ReadyAt) } - if snap.Teams["ingestion"].Warehouse != nil { + if snap.Orgs["ingestion"].Warehouse != nil { t.Fatal("expected ingestion warehouse to be nil") } - // Verify user → team mapping - if snap.UserTeam["alice"] != "analytics" { - t.Errorf("expected alice in analytics, got %s", snap.UserTeam["alice"]) + // Verify user -> org mapping + if snap.UserOrg["alice"] != "analytics" { + t.Errorf("expected alice in analytics, got %s", snap.UserOrg["alice"]) } - if snap.UserTeam["charlie"] != "ingestion" { - t.Errorf("expected charlie in ingestion, got %s", snap.UserTeam["charlie"]) + if snap.UserOrg["charlie"] != "ingestion" { + t.Errorf("expected charlie in ingestion, got %s", snap.UserOrg["charlie"]) } // Verify bcrypt password hashes are stored (not plaintext) @@ -196,8 +196,8 @@ func TestTableNames(t *testing.T) { model interface{ TableName() string } want string }{ - {Team{}, "duckgres_teams"}, - {TeamUser{}, "duckgres_team_users"}, + {Org{}, "duckgres_orgs"}, + {OrgUser{}, "duckgres_org_users"}, {ManagedWarehouse{}, "duckgres_managed_warehouses"}, {GlobalConfig{}, "duckgres_global_config"}, {DuckLakeConfig{}, "duckgres_ducklake_config"}, diff --git a/controlplane/control.go b/controlplane/control.go index 37ed4aa..ae29138 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -108,19 +108,19 @@ type ControlPlane struct { acmeDNSManager *server.ACMEDNSManager // ACME manager for DNS-01 (nil when not using DNS challenges) // Multi-tenant fields (non-nil in remote multitenant mode) - teamRouter TeamRouterInterface + orgRouter OrgRouterInterface configStore ConfigStoreInterface } // ConfigStoreInterface abstracts the config store for the control plane. // Defined here to avoid circular imports with the configstore package. type ConfigStoreInterface interface { - ValidateUser(username, password string) (teamName string, ok bool) - TeamForUser(username string) string + ValidateUser(username, password string) (orgID string, ok bool) + OrgForUser(username string) string } -// TeamRouterInterface abstracts the team router for the control plane. -type TeamRouterInterface interface { +// OrgRouterInterface abstracts the org router for the control plane. +type OrgRouterInterface interface { StackForUser(username string) (pool WorkerPool, sessions *SessionManager, rebalancer *MemoryRebalancer, ok bool) ShutdownAll() } @@ -315,7 +315,7 @@ func RunControlPlane(cfg ControlPlaneConfig) { acmeDNSManager: acmeDNSMgr, } - // Multi-tenant mode: config store + per-team pools (K8s remote backend only) + // Multi-tenant mode: config store + per-org pools (K8s remote backend only) if cfg.WorkerBackend == "remote" { store, adapter, adminSrv, err := SetupMultiTenant(cfg, srv, memBudget, k8sMaxWorkers) if err != nil { @@ -323,7 +323,7 @@ func RunControlPlane(cfg ControlPlaneConfig) { os.Exit(1) } cp.configStore = store - cp.teamRouter = adapter + cp.orgRouter = adapter // Replace the simple metrics server with the Gin admin server if cfg.MetricsServer != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -659,7 +659,7 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { // Authenticate: use config store (multi-tenant) or YAML users (single-tenant) if cp.configStore != nil { - teamName, ok := cp.configStore.ValidateUser(username, password) + orgID, ok := cp.configStore.ValidateUser(username, password) if !ok { slog.Warn("Authentication failed.", "user", username, "remote_addr", remoteAddr) banned := server.RecordFailedAuthAttempt(cp.rateLimiter, remoteAddr) @@ -670,7 +670,7 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { _ = writer.Flush() return } - _ = teamName // used for routing below + _ = orgID // used for routing below } else { if !server.ValidateUserPassword(cp.cfg.Users, username, password) { slog.Warn("Authentication failed.", "user", username, "remote_addr", remoteAddr) @@ -694,13 +694,13 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { slog.Info("User authenticated.", "user", username, "remote_addr", remoteAddr) // Resolve the session manager and rebalancer for this connection. - // In multi-tenant mode, each team has its own stack. + // In multi-tenant mode, each org has its own stack. var sessions *SessionManager var rebalancer *MemoryRebalancer - if cp.teamRouter != nil { - _, sess, rebal, ok := cp.teamRouter.StackForUser(username) + if cp.orgRouter != nil { + _, sess, rebal, ok := cp.orgRouter.StackForUser(username) if !ok { - _ = server.WriteErrorResponse(writer, "FATAL", "28000", "no team configured for user") + _ = server.WriteErrorResponse(writer, "FATAL", "28000", "no org configured for user") _ = writer.Flush() return } @@ -900,8 +900,8 @@ func (cp *ControlPlane) shutdown() { cp.wg.Wait() slog.Info("Shutting down workers...") - if cp.teamRouter != nil { - cp.teamRouter.ShutdownAll() + if cp.orgRouter != nil { + cp.orgRouter.ShutdownAll() } else if cp.pool != nil { cp.pool.ShutdownAll() } @@ -1078,8 +1078,8 @@ func (cp *ControlPlane) drainAfterUpgrade() { } // Shut down workers - if cp.teamRouter != nil { - cp.teamRouter.ShutdownAll() + if cp.orgRouter != nil { + cp.orgRouter.ShutdownAll() } else if cp.pool != nil { cp.pool.ShutdownAll() } diff --git a/controlplane/flight_ingress_metrics_k8s.go b/controlplane/flight_ingress_metrics_k8s.go index 9179d3e..b37a1b2 100644 --- a/controlplane/flight_ingress_metrics_k8s.go +++ b/controlplane/flight_ingress_metrics_k8s.go @@ -7,49 +7,49 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -// --- Per-team metrics (multi-tenant mode) --- - -var teamWorkersActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "duckgres_team_workers_active", - Help: "Number of active workers per team", -}, []string{"team"}) - -var teamWorkersIdleGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "duckgres_team_workers_idle", - Help: "Number of idle workers per team", -}, []string{"team"}) - -var teamSessionsActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "duckgres_team_sessions_active", - Help: "Number of active sessions per team", -}, []string{"team"}) - -var teamWorkerSpawnsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "duckgres_team_worker_spawns_total", - Help: "Total worker spawns per team", -}, []string{"team"}) - -var teamWorkerCrashesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "duckgres_team_worker_crashes_total", - Help: "Total worker crashes per team", -}, []string{"team"}) - -func observeTeamWorkersActive(team string, count int) { - teamWorkersActiveGauge.WithLabelValues(team).Set(float64(count)) +// --- Per-org metrics (multi-tenant mode) --- + +var orgWorkersActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "duckgres_org_workers_active", + Help: "Number of active workers per org", +}, []string{"org"}) + +var orgWorkersIdleGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "duckgres_org_workers_idle", + Help: "Number of idle workers per org", +}, []string{"org"}) + +var orgSessionsActiveGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "duckgres_org_sessions_active", + Help: "Number of active sessions per org", +}, []string{"org"}) + +var orgWorkerSpawnsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "duckgres_org_worker_spawns_total", + Help: "Total worker spawns per org", +}, []string{"org"}) + +var orgWorkerCrashesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "duckgres_org_worker_crashes_total", + Help: "Total worker crashes per org", +}, []string{"org"}) + +func observeOrgWorkersActive(org string, count int) { + orgWorkersActiveGauge.WithLabelValues(org).Set(float64(count)) } -func observeTeamWorkersIdle(team string, count int) { - teamWorkersIdleGauge.WithLabelValues(team).Set(float64(count)) +func observeOrgWorkersIdle(org string, count int) { + orgWorkersIdleGauge.WithLabelValues(org).Set(float64(count)) } -func observeTeamSessionsActive(team string, count int) { - teamSessionsActiveGauge.WithLabelValues(team).Set(float64(count)) +func observeOrgSessionsActive(org string, count int) { + orgSessionsActiveGauge.WithLabelValues(org).Set(float64(count)) } -func observeTeamWorkerSpawn(team string) { - teamWorkerSpawnsCounter.WithLabelValues(team).Inc() +func observeOrgWorkerSpawn(org string) { + orgWorkerSpawnsCounter.WithLabelValues(org).Inc() } -func observeTeamWorkerCrash(team string) { - teamWorkerCrashesCounter.WithLabelValues(team).Inc() +func observeOrgWorkerCrash(org string) { + orgWorkerCrashesCounter.WithLabelValues(org).Inc() } diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index a875bdb..1c9de2e 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -52,8 +52,8 @@ type K8sWorkerPool struct { imagePullPolicy corev1.PullPolicy serviceAccount string memoryBudget int64 // total memory budget in bytes - teamName string // team name for pod labels (multi-tenant mode) - workerIDGenerator func() int // shared ID generator across teams (nil = internal counter) + orgID string // org ID for pod labels (multi-tenant mode) + workerIDGenerator func() int // shared ID generator across orgs (nil = internal counter) sharedWarmActivation bool cachedToken string // cached bearer token (immutable after setup) informer cache.SharedIndexInformer @@ -126,7 +126,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( imagePullPolicy: corev1.PullPolicy(cfg.ImagePullPolicy), serviceAccount: cfg.ServiceAccount, memoryBudget: cfg.MemoryBudget, - teamName: cfg.TeamName, + orgID: cfg.OrgID, workerIDGenerator: cfg.WorkerIDGenerator, sharedWarmActivation: cfg.SharedWarmActivation, spawnSem: make(chan struct{}, spawnConcurrency), @@ -246,8 +246,8 @@ func (p *K8sWorkerPool) readBearerToken(ctx context.Context) (string, error) { // startInformer starts a SharedIndexInformer to watch worker pods. func (p *K8sWorkerPool) startInformer() { labelSelector := fmt.Sprintf("duckgres/control-plane=%s", p.cpID) - if p.teamName != "" { - labelSelector += fmt.Sprintf(",duckgres/team=%s", p.teamName) + if p.orgID != "" { + labelSelector += fmt.Sprintf(",duckgres/org=%s", p.orgID) } factory := informers.NewSharedInformerFactoryWithOptions( p.clientset, @@ -371,8 +371,8 @@ func (p *K8sWorkerPool) SpawnWorker(ctx context.Context, id int) error { "duckgres/control-plane": p.cpID, "duckgres/worker-id": strconv.Itoa(id), } - if p.teamName != "" { - podLabels["duckgres/team"] = p.teamName + if p.orgID != "" { + podLabels["duckgres/org"] = p.orgID } // Build pod spec @@ -811,7 +811,7 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana if activate == nil { activate = func(ctx context.Context, worker *ManagedWorker, payload TenantActivationPayload) error { return worker.ActivateTenant(ctx, server.WorkerActivationPayload{ - TeamName: payload.TeamName, + OrgID: payload.OrgID, LeaseExpiresAt: payload.LeaseExpiresAt, DuckLake: payload.DuckLake, }) @@ -1393,10 +1393,10 @@ func (p *K8sWorkerPool) markWorkerRetiredLocked(w *ManagedWorker) { } // podNameForWorker returns the pod name for a given worker ID, -// including the team name if set (multi-tenant mode). +// including the org ID if set (multi-tenant mode). func (p *K8sWorkerPool) podNameForWorker(id int) string { - if p.teamName != "" { - return fmt.Sprintf("duckgres-worker-%s-%s-%d", p.cpID, p.teamName, id) + if p.orgID != "" { + return fmt.Sprintf("duckgres-worker-%s-%s-%d", p.cpID, p.orgID, id) } return fmt.Sprintf("duckgres-worker-%s-%d", p.cpID, id) } diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index 392c64a..706b565 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -207,7 +207,7 @@ func TestK8sPoolActivateReservedWorkerTransitionsToHot(t *testing.T) { if err := worker.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -218,14 +218,14 @@ func TestK8sPoolActivateReservedWorkerTransitionsToHot(t *testing.T) { if got.ID != worker.ID { t.Fatalf("expected worker %d, got %d", worker.ID, got.ID) } - if payload.TeamName != "analytics" { + if payload.OrgID != "analytics" { t.Fatalf("expected analytics payload, got %#v", payload) } return nil } err := pool.ActivateReservedWorker(context.Background(), worker, TenantActivationPayload{ - TeamName: "analytics", + OrgID: "analytics", Usernames: []string{"alice"}, }) if err != nil { @@ -243,7 +243,7 @@ func TestK8sPoolActivateReservedWorkerRetiresOnFailure(t *testing.T) { if err := worker.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -255,7 +255,7 @@ func TestK8sPoolActivateReservedWorkerRetiresOnFailure(t *testing.T) { } err := pool.ActivateReservedWorker(context.Background(), worker, TenantActivationPayload{ - TeamName: "analytics", + OrgID: "analytics", Usernames: []string{"alice"}, }) if err == nil { @@ -369,7 +369,7 @@ func TestK8sPoolSpawnMinWorkersCountsOnlyNeutralIdleWorkersAsWarmCapacity(t *tes if err := worker.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -403,7 +403,7 @@ func TestK8sPoolFindIdleWorkerSkipsReservedSharedWorker(t *testing.T) { if err := reserved.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -438,7 +438,7 @@ func TestK8sPoolReserveSharedWorkerReservesIdleWorkerAndReplenishesWarmCapacity( leaseExpiry := time.Date(2026, time.March, 20, 16, 0, 0, 0, time.UTC) worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: leaseExpiry, }) if err != nil { @@ -452,7 +452,7 @@ func TestK8sPoolReserveSharedWorkerReservesIdleWorkerAndReplenishesWarmCapacity( if state.Lifecycle != WorkerLifecycleReserved { t.Fatalf("expected reserved lifecycle, got %q", state.Lifecycle) } - if state.Assignment == nil || state.Assignment.TeamName != "analytics" { + if state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected analytics assignment, got %#v", state.Assignment) } if !state.Assignment.LeaseExpiresAt.Equal(leaseExpiry) { @@ -506,7 +506,7 @@ func TestK8sPoolReserveSharedWorkerSpawnsWhenPoolIsCold(t *testing.T) { } worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ - TeamName: "billing", + OrgID: "billing", LeaseExpiresAt: time.Now().Add(time.Hour), }) if err != nil { @@ -532,7 +532,7 @@ func TestK8sPoolIdleReaperSkipsReservedSharedWorker(t *testing.T) { if err := reserved.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index eaa9023..f7c78da 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -18,13 +18,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -// teamRouterAdapter wraps TeamRouter to implement both TeamRouterInterface -// (for the control plane) and admin.TeamStackInfo (for the admin API). -type teamRouterAdapter struct { - router *TeamRouter +// orgRouterAdapter wraps OrgRouter to implement both OrgRouterInterface +// (for the control plane) and admin.OrgStackInfo (for the admin API). +type orgRouterAdapter struct { + router *OrgRouter } -func (a *teamRouterAdapter) StackForUser(username string) (WorkerPool, *SessionManager, *MemoryRebalancer, bool) { +func (a *orgRouterAdapter) StackForUser(username string) (WorkerPool, *SessionManager, *MemoryRebalancer, bool) { stack, ok := a.router.StackForUser(username) if !ok { return nil, nil, nil, false @@ -32,28 +32,28 @@ func (a *teamRouterAdapter) StackForUser(username string) (WorkerPool, *SessionM return stack.Pool, stack.Sessions, stack.Rebalancer, true } -func (a *teamRouterAdapter) ShutdownAll() { +func (a *orgRouterAdapter) ShutdownAll() { a.router.ShutdownAll() } -func (a *teamRouterAdapter) AllTeamStats() []admin.TeamStatus { +func (a *orgRouterAdapter) AllOrgStats() []admin.OrgStatus { stacks := a.router.AllStacks() - stats := make([]admin.TeamStatus, 0, len(stacks)) + stats := make([]admin.OrgStatus, 0, len(stacks)) for name, stack := range stacks { sessionCount := stack.Sessions.SessionCount() - stats = append(stats, admin.TeamStatus{ + stats = append(stats, admin.OrgStatus{ Name: name, ActiveSessions: sessionCount, MaxWorkers: stack.Config.MaxWorkers, MemoryBudget: stack.Config.MemoryBudget, }) - // Emit per-team Prometheus metrics - observeTeamSessionsActive(name, sessionCount) + // Emit per-org Prometheus metrics + observeOrgSessionsActive(name, sessionCount) } return stats } -func (a *teamRouterAdapter) AllWorkerStatuses() []admin.WorkerStatus { +func (a *orgRouterAdapter) AllWorkerStatuses() []admin.WorkerStatus { stacks := a.router.AllStacks() var result []admin.WorkerStatus for name, stack := range stacks { @@ -74,19 +74,19 @@ func (a *teamRouterAdapter) AllWorkerStatuses() []admin.WorkerStatus { } result = append(result, admin.WorkerStatus{ ID: wID, - Team: name, + Org: name, ActiveSessions: count, Status: status, }) } - // Emit per-team worker Prometheus metrics - observeTeamWorkersActive(name, activeCount) - observeTeamWorkersIdle(name, idleCount) + // Emit per-org worker Prometheus metrics + observeOrgWorkersActive(name, activeCount) + observeOrgWorkersIdle(name, idleCount) } return result } -func (a *teamRouterAdapter) AllSessionStatuses() []admin.SessionStatus { +func (a *orgRouterAdapter) AllSessionStatuses() []admin.SessionStatus { stacks := a.router.AllStacks() var result []admin.SessionStatus for name, stack := range stacks { @@ -94,7 +94,7 @@ func (a *teamRouterAdapter) AllSessionStatuses() []admin.SessionStatus { result = append(result, admin.SessionStatus{ PID: s.PID, WorkerID: s.WorkerID, - Team: name, + Org: name, }) } } @@ -102,17 +102,17 @@ func (a *teamRouterAdapter) AllSessionStatuses() []admin.SessionStatus { } // Compile-time checks. -var _ TeamRouterInterface = (*teamRouterAdapter)(nil) -var _ admin.TeamStackInfo = (*teamRouterAdapter)(nil) +var _ OrgRouterInterface = (*orgRouterAdapter)(nil) +var _ admin.OrgStackInfo = (*orgRouterAdapter)(nil) -// SetupMultiTenant initializes the config store, team router, and Gin admin server. +// SetupMultiTenant initializes the config store, org router, and Gin admin server. // Called from RunControlPlane when --config-store is set with remote backend. func SetupMultiTenant( cfg ControlPlaneConfig, srv *server.Server, memBudget uint64, maxWorkers int, -) (ConfigStoreInterface, TeamRouterInterface, *http.Server, error) { +) (ConfigStoreInterface, OrgRouterInterface, *http.Server, error) { pollInterval := cfg.ConfigPollInterval if pollInterval <= 0 { pollInterval = 30 * time.Second @@ -139,12 +139,12 @@ func SetupMultiTenant( SharedWarmActivation: cfg.K8s.SharedWarmWorkers, } - router, err := NewTeamRouter(store, baseCfg, cfg, srv) + router, err := NewOrgRouter(store, baseCfg, cfg, srv) if err != nil { return nil, nil, nil, err } - adpt := &teamRouterAdapter{router: router} + adpt := &orgRouterAdapter{router: router} // Register config change handler store.OnChange(router.HandleConfigChange) diff --git a/controlplane/multitenant_stub.go b/controlplane/multitenant_stub.go index f6a0c11..9efb937 100644 --- a/controlplane/multitenant_stub.go +++ b/controlplane/multitenant_stub.go @@ -15,6 +15,6 @@ func SetupMultiTenant( srv *server.Server, memBudget uint64, maxWorkers int, -) (ConfigStoreInterface, TeamRouterInterface, *http.Server, error) { +) (ConfigStoreInterface, OrgRouterInterface, *http.Server, error) { return nil, nil, nil, fmt.Errorf("multi-tenant mode requires -tags kubernetes build") } diff --git a/controlplane/team_activation_test.go b/controlplane/org_activation_test.go similarity index 91% rename from controlplane/team_activation_test.go rename to controlplane/org_activation_test.go index db211e9..555a9f9 100644 --- a/controlplane/team_activation_test.go +++ b/controlplane/org_activation_test.go @@ -40,7 +40,7 @@ func TestSharedWorkerActivatorBuildsActivationRequestFromManagedWarehouse(t *tes defaultNamespace: "duckgres-workers", } - team := &configstore.TeamConfig{ + org := &configstore.OrgConfig{ Name: "analytics", Warehouse: &configstore.ManagedWarehouseConfig{ MetadataStore: configstore.ManagedWarehouseMetadataStore{ @@ -72,17 +72,17 @@ func TestSharedWorkerActivatorBuildsActivationRequestFromManagedWarehouse(t *tes } assignment := &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Date(2026, time.March, 22, 12, 0, 0, 0, time.UTC), } - req, err := activator.BuildActivationRequest(context.Background(), team, assignment) + req, err := activator.BuildActivationRequest(context.Background(), org, assignment) if err != nil { t.Fatalf("BuildActivationRequest: %v", err) } - if req.TeamName != "analytics" { - t.Fatalf("expected team analytics, got %q", req.TeamName) + if req.OrgID != "analytics" { + t.Fatalf("expected org analytics, got %q", req.OrgID) } if !req.LeaseExpiresAt.Equal(assignment.LeaseExpiresAt) { t.Fatalf("expected lease expiry %v, got %v", assignment.LeaseExpiresAt, req.LeaseExpiresAt) @@ -107,8 +107,8 @@ func TestSharedWorkerActivatorRequiresManagedWarehouse(t *testing.T) { defaultNamespace: "duckgres-workers", } - _, err := activator.BuildActivationRequest(context.Background(), &configstore.TeamConfig{Name: "analytics"}, &WorkerAssignment{ - TeamName: "analytics", + _, err := activator.BuildActivationRequest(context.Background(), &configstore.OrgConfig{Name: "analytics"}, &WorkerAssignment{ + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }) if err == nil { diff --git a/controlplane/team_reserved_pool.go b/controlplane/org_reserved_pool.go similarity index 61% rename from controlplane/team_reserved_pool.go rename to controlplane/org_reserved_pool.go index edf9b31..b18202f 100644 --- a/controlplane/team_reserved_pool.go +++ b/controlplane/org_reserved_pool.go @@ -12,23 +12,23 @@ import ( const defaultSharedWorkerReservationLease = 24 * time.Hour -// TeamReservedWorkerPool presents one team's reserved slice of a shared K8s warm pool. +// OrgReservedPool presents one org's reserved slice of a shared K8s warm pool. // It preserves the existing WorkerPool contract for SessionManager while ensuring -// workers are reserved to a single team for their lifetime and retired after use. -type TeamReservedWorkerPool struct { +// workers are reserved to a single org for their lifetime and retired after use. +type OrgReservedPool struct { shared *K8sWorkerPool - teamName string + orgID string maxWorkers int leaseDuration time.Duration sharedWarmWorkers bool - resolveTeamConfig func() (*configstore.TeamConfig, error) - activateReservedWorker func(context.Context, *ManagedWorker, *configstore.TeamConfig) error + resolveOrgConfig func() (*configstore.OrgConfig, error) + activateReservedWorker func(context.Context, *ManagedWorker, *configstore.OrgConfig) error } -func NewTeamReservedWorkerPool(shared *K8sWorkerPool, teamName string, maxWorkers int) *TeamReservedWorkerPool { - pool := &TeamReservedWorkerPool{ +func NewOrgReservedPool(shared *K8sWorkerPool, orgID string, maxWorkers int) *OrgReservedPool { + pool := &OrgReservedPool{ shared: shared, - teamName: teamName, + orgID: orgID, maxWorkers: maxWorkers, leaseDuration: defaultSharedWorkerReservationLease, } @@ -36,7 +36,7 @@ func NewTeamReservedWorkerPool(shared *K8sWorkerPool, teamName string, maxWorker return pool } -func (p *TeamReservedWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, error) { +func (p *OrgReservedPool) AcquireWorker(ctx context.Context) (*ManagedWorker, error) { for { select { case <-ctx.Done(): @@ -63,7 +63,7 @@ func (p *TeamReservedWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWor p.shared.mu.Unlock() worker, err := p.shared.ReserveSharedWorker(ctx, &WorkerAssignment{ - TeamName: p.teamName, + OrgID: p.orgID, LeaseExpiresAt: time.Now().Add(p.leaseDuration), }) if err != nil { @@ -71,14 +71,14 @@ func (p *TeamReservedWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWor } if p.sharedWarmWorkers { - if err := p.activateWorkerForTeam(ctx, worker); err != nil { + if err := p.activateWorkerForOrg(ctx, worker); err != nil { p.shared.RetireWorker(worker.ID) return nil, err } } p.shared.mu.Lock() - if owned := p.workerBelongsToTeamLocked(worker); owned { + if owned := p.workerBelongsToOrgLocked(worker); owned { worker.activeSessions++ p.shared.mu.Unlock() return worker, nil @@ -102,58 +102,58 @@ func (p *TeamReservedWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWor } } -func (p *TeamReservedWorkerPool) ReleaseWorker(id int) { +func (p *OrgReservedPool) ReleaseWorker(id int) { _ = p.RetireWorkerIfNoSessions(id) } -func (p *TeamReservedWorkerPool) RetireWorker(id int) { +func (p *OrgReservedPool) RetireWorker(id int) { if _, ok := p.Worker(id); !ok { return } p.shared.RetireWorker(id) } -func (p *TeamReservedWorkerPool) RetireWorkerIfNoSessions(id int) bool { +func (p *OrgReservedPool) RetireWorkerIfNoSessions(id int) bool { if _, ok := p.Worker(id); !ok { return false } return p.shared.RetireWorkerIfNoSessions(id) } -func (p *TeamReservedWorkerPool) Worker(id int) (*ManagedWorker, bool) { +func (p *OrgReservedPool) Worker(id int) (*ManagedWorker, bool) { p.shared.mu.RLock() defer p.shared.mu.RUnlock() w, ok := p.shared.workers[id] - if !ok || !p.workerBelongsToTeamLocked(w) { + if !ok || !p.workerBelongsToOrgLocked(w) { return nil, false } return w, true } -func (p *TeamReservedWorkerPool) SpawnMinWorkers(count int) error { +func (p *OrgReservedPool) SpawnMinWorkers(count int) error { return nil } -func (p *TeamReservedWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler) { +func (p *OrgReservedPool) HealthCheckLoop(ctx context.Context, interval time.Duration, onCrash WorkerCrashHandler, onProgress ProgressHandler) { } -func (p *TeamReservedWorkerPool) SetMaxWorkers(n int) { +func (p *OrgReservedPool) SetMaxWorkers(n int) { p.shared.mu.Lock() defer p.shared.mu.Unlock() p.maxWorkers = n } -func (p *TeamReservedWorkerPool) EnableSharedWarmActivation(enabled bool) { +func (p *OrgReservedPool) EnableSharedWarmActivation(enabled bool) { p.shared.mu.Lock() defer p.shared.mu.Unlock() p.sharedWarmWorkers = enabled } -func (p *TeamReservedWorkerPool) ShutdownAll() { +func (p *OrgReservedPool) ShutdownAll() { p.shared.mu.RLock() workers := make([]int, 0, len(p.shared.workers)) for id, w := range p.shared.workers { - if p.workerBelongsToTeamLocked(w) { + if p.workerBelongsToOrgLocked(w) { workers = append(workers, id) } } @@ -164,7 +164,7 @@ func (p *TeamReservedWorkerPool) ShutdownAll() { } } -func (p *TeamReservedWorkerPool) findIdleAssignedWorkerLocked() *ManagedWorker { +func (p *OrgReservedPool) findIdleAssignedWorkerLocked() *ManagedWorker { for _, w := range p.shared.workers { select { case <-w.done: @@ -178,7 +178,7 @@ func (p *TeamReservedWorkerPool) findIdleAssignedWorkerLocked() *ManagedWorker { return nil } -func (p *TeamReservedWorkerPool) leastLoadedAssignedWorkerLocked() *ManagedWorker { +func (p *OrgReservedPool) leastLoadedAssignedWorkerLocked() *ManagedWorker { var best *ManagedWorker for _, w := range p.shared.workers { select { @@ -196,7 +196,7 @@ func (p *TeamReservedWorkerPool) leastLoadedAssignedWorkerLocked() *ManagedWorke return best } -func (p *TeamReservedWorkerPool) assignedWorkerCountLocked() int { +func (p *OrgReservedPool) assignedWorkerCountLocked() int { count := 0 for _, w := range p.shared.workers { select { @@ -204,20 +204,20 @@ func (p *TeamReservedWorkerPool) assignedWorkerCountLocked() int { continue default: } - if p.workerBelongsToTeamLocked(w) { + if p.workerBelongsToOrgLocked(w) { count++ } } return count } -func (p *TeamReservedWorkerPool) workerBelongsToTeamLocked(w *ManagedWorker) bool { +func (p *OrgReservedPool) workerBelongsToOrgLocked(w *ManagedWorker) bool { state := w.SharedState() - return state.Assignment != nil && state.Assignment.TeamName == p.teamName && state.NormalizedLifecycle() != WorkerLifecycleRetired + return state.Assignment != nil && state.Assignment.OrgID == p.orgID && state.NormalizedLifecycle() != WorkerLifecycleRetired } -func (p *TeamReservedWorkerPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool { - if !p.workerBelongsToTeamLocked(w) { +func (p *OrgReservedPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool { + if !p.workerBelongsToOrgLocked(w) { return false } if !p.sharedWarmWorkers { @@ -226,7 +226,7 @@ func (p *TeamReservedWorkerPool) workerReadyForSchedulingLocked(w *ManagedWorker return w.SharedState().NormalizedLifecycle() == WorkerLifecycleHot } -func (p *TeamReservedWorkerPool) activateWorkerForTeam(ctx context.Context, worker *ManagedWorker) error { +func (p *OrgReservedPool) activateWorkerForOrg(ctx context.Context, worker *ManagedWorker) error { p.shared.mu.Lock() state := worker.SharedState().NormalizedLifecycle() if state == WorkerLifecycleReserved { @@ -242,12 +242,12 @@ func (p *TeamReservedWorkerPool) activateWorkerForTeam(ctx context.Context, work } p.shared.mu.Unlock() - team, err := p.lookupTeamConfig() + org, err := p.lookupOrgConfig() if err != nil { return err } - if err := p.activateReservedWorker(ctx, worker, team); err != nil { + if err := p.activateReservedWorker(ctx, worker, org); err != nil { return err } @@ -267,14 +267,14 @@ func (p *TeamReservedWorkerPool) activateWorkerForTeam(ctx context.Context, work } } -func (p *TeamReservedWorkerPool) activateReservedWorkerDefault(ctx context.Context, worker *ManagedWorker, team *configstore.TeamConfig) error { +func (p *OrgReservedPool) activateReservedWorkerDefault(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { if !p.sharedWarmWorkers { return nil } - if team == nil { - return fmt.Errorf("team config is required for activation") + if org == nil { + return fmt.Errorf("org config is required for activation") } - payload, err := BuildTenantActivationPayload(ctx, p.shared.clientset, p.shared.namespace, team) + payload, err := BuildTenantActivationPayload(ctx, p.shared.clientset, p.shared.namespace, org) if err != nil { return err } @@ -284,16 +284,16 @@ func (p *TeamReservedWorkerPool) activateReservedWorkerDefault(ctx context.Conte return p.shared.ActivateReservedWorker(ctx, worker, payload) } -func (p *TeamReservedWorkerPool) lookupTeamConfig() (*configstore.TeamConfig, error) { - if p.resolveTeamConfig == nil { - return nil, fmt.Errorf("team config resolver is not configured for team %s", p.teamName) +func (p *OrgReservedPool) lookupOrgConfig() (*configstore.OrgConfig, error) { + if p.resolveOrgConfig == nil { + return nil, fmt.Errorf("org config resolver is not configured for org %s", p.orgID) } - team, err := p.resolveTeamConfig() + org, err := p.resolveOrgConfig() if err != nil { return nil, err } - if team == nil { - return nil, fmt.Errorf("team config resolver returned nil for team %s", p.teamName) + if org == nil { + return nil, fmt.Errorf("org config resolver returned nil for org %s", p.orgID) } - return team, nil + return org, nil } diff --git a/controlplane/team_reserved_pool_test.go b/controlplane/org_reserved_pool_test.go similarity index 69% rename from controlplane/team_reserved_pool_test.go rename to controlplane/org_reserved_pool_test.go index e071175..c373ecc 100644 --- a/controlplane/team_reserved_pool_test.go +++ b/controlplane/org_reserved_pool_test.go @@ -10,7 +10,7 @@ import ( "github.com/posthog/duckgres/controlplane/configstore" ) -func TestTeamReservedWorkerPoolAcquireReservesTeamWorker(t *testing.T) { +func TestOrgReservedPoolAcquireReservesOrgWorker(t *testing.T) { shared, _ := newTestK8sPool(t, 5) shared.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { shared.mu.Lock() @@ -19,7 +19,7 @@ func TestTeamReservedWorkerPoolAcquireReservesTeamWorker(t *testing.T) { return nil } - pool := NewTeamReservedWorkerPool(shared, "analytics", 2) + pool := NewOrgReservedPool(shared, "analytics", 2) worker, err := pool.AcquireWorker(context.Background()) if err != nil { t.Fatalf("AcquireWorker: %v", err) @@ -29,7 +29,7 @@ func TestTeamReservedWorkerPoolAcquireReservesTeamWorker(t *testing.T) { } state := worker.SharedState() - if state.Assignment == nil || state.Assignment.TeamName != "analytics" { + if state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected analytics assignment, got %#v", state.Assignment) } if state.Lifecycle != WorkerLifecycleReserved { @@ -37,13 +37,13 @@ func TestTeamReservedWorkerPoolAcquireReservesTeamWorker(t *testing.T) { } } -func TestTeamReservedWorkerPoolAcquireSkipsOtherTeamsWorkers(t *testing.T) { +func TestOrgReservedPoolAcquireSkipsOtherOrgsWorkers(t *testing.T) { shared, _ := newTestK8sPool(t, 5) other := &ManagedWorker{ID: 1, done: make(chan struct{})} if err := other.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "billing", + OrgID: "billing", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -58,26 +58,26 @@ func TestTeamReservedWorkerPoolAcquireSkipsOtherTeamsWorkers(t *testing.T) { return nil } - pool := NewTeamReservedWorkerPool(shared, "analytics", 2) + pool := NewOrgReservedPool(shared, "analytics", 2) worker, err := pool.AcquireWorker(context.Background()) if err != nil { t.Fatalf("AcquireWorker: %v", err) } if worker.ID == other.ID { - t.Fatal("expected analytics pool to reserve its own worker, not borrow another team's worker") + t.Fatal("expected analytics pool to reserve its own worker, not borrow another org's worker") } - if state := worker.SharedState(); state.Assignment == nil || state.Assignment.TeamName != "analytics" { + if state := worker.SharedState(); state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected analytics assignment, got %#v", state.Assignment) } } -func TestTeamReservedWorkerPoolReleaseWorkerRetiresOnLastSession(t *testing.T) { +func TestOrgReservedPoolReleaseWorkerRetiresOnLastSession(t *testing.T) { shared, _ := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 9, activeSessions: 1, done: make(chan struct{})} if err := worker.SetSharedState(SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: time.Now().Add(time.Hour), }, }); err != nil { @@ -85,7 +85,7 @@ func TestTeamReservedWorkerPoolReleaseWorkerRetiresOnLastSession(t *testing.T) { } shared.workers[worker.ID] = worker - pool := NewTeamReservedWorkerPool(shared, "analytics", 1) + pool := NewOrgReservedPool(shared, "analytics", 1) pool.ReleaseWorker(worker.ID) time.Sleep(100 * time.Millisecond) @@ -94,7 +94,7 @@ func TestTeamReservedWorkerPoolReleaseWorkerRetiresOnLastSession(t *testing.T) { } } -func TestTeamReservedWorkerPoolAcquireActivatesReservedWorkerWhenEnabledWithTeamConfig(t *testing.T) { +func TestOrgReservedWorkerPoolAcquireActivatesReservedWorkerWhenEnabledWithOrgConfig(t *testing.T) { shared, _ := newTestK8sPool(t, 5) shared.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { shared.mu.Lock() @@ -104,19 +104,19 @@ func TestTeamReservedWorkerPoolAcquireActivatesReservedWorkerWhenEnabledWithTeam } activated := false - pool := NewTeamReservedWorkerPool(shared, "analytics", 2) + pool := NewOrgReservedPool(shared, "analytics", 2) pool.sharedWarmWorkers = true - pool.resolveTeamConfig = func() (*configstore.TeamConfig, error) { - return &configstore.TeamConfig{ + pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return &configstore.OrgConfig{ Name: "analytics", Users: map[string]string{ "alice": "ignored", }, }, nil } - pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, team *configstore.TeamConfig) error { - if team == nil || team.Name != "analytics" { - t.Fatalf("expected analytics team config, got %#v", team) + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + if org == nil || org.Name != "analytics" { + t.Fatalf("expected analytics org config, got %#v", org) } activated = true return nil @@ -134,7 +134,7 @@ func TestTeamReservedWorkerPoolAcquireActivatesReservedWorkerWhenEnabledWithTeam } } -func TestTeamReservedWorkerPoolAcquireActivatesUsingLatestResolvedTeamConfig(t *testing.T) { +func TestOrgReservedWorkerPoolAcquireActivatesUsingLatestResolvedOrgConfig(t *testing.T) { shared, _ := newTestK8sPool(t, 5) shared.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { shared.mu.Lock() @@ -143,25 +143,25 @@ func TestTeamReservedWorkerPoolAcquireActivatesUsingLatestResolvedTeamConfig(t * return nil } - currentTeam := &configstore.TeamConfig{ + currentOrg := &configstore.OrgConfig{ Name: "analytics", Users: map[string]string{ "alice": "ignored", }, } - pool := NewTeamReservedWorkerPool(shared, "analytics", 2) + pool := NewOrgReservedPool(shared, "analytics", 2) pool.sharedWarmWorkers = true - pool.resolveTeamConfig = func() (*configstore.TeamConfig, error) { - return currentTeam, nil + pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { + return currentOrg, nil } - var capturedTeam *configstore.TeamConfig - pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, team *configstore.TeamConfig) error { - capturedTeam = team + var capturedOrg *configstore.OrgConfig + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + capturedOrg = org return nil } - currentTeam = &configstore.TeamConfig{ + currentOrg = &configstore.OrgConfig{ Name: "analytics", Users: map[string]string{ "bob": "ignored", @@ -175,10 +175,10 @@ func TestTeamReservedWorkerPoolAcquireActivatesUsingLatestResolvedTeamConfig(t * if got := worker.SharedState().Lifecycle; got != WorkerLifecycleHot { t.Fatalf("expected hot lifecycle after activation, got %q", got) } - if capturedTeam == nil { - t.Fatal("expected activation to receive resolved team config") + if capturedOrg == nil { + t.Fatal("expected activation to receive resolved org config") } - if len(capturedTeam.Users) != 1 || capturedTeam.Users["bob"] != "ignored" { - t.Fatalf("expected latest resolved team config to be used, got %#v", capturedTeam.Users) + if len(capturedOrg.Users) != 1 || capturedOrg.Users["bob"] != "ignored" { + t.Fatalf("expected latest resolved org config to be used, got %#v", capturedOrg.Users) } } diff --git a/controlplane/team_router.go b/controlplane/org_router.go similarity index 51% rename from controlplane/team_router.go rename to controlplane/org_router.go index 6329467..04190bf 100644 --- a/controlplane/team_router.go +++ b/controlplane/org_router.go @@ -14,19 +14,19 @@ import ( "github.com/posthog/duckgres/server" ) -// TeamStack holds the isolated worker pool and session manager for a team. -type TeamStack struct { - Config *configstore.TeamConfig +// OrgStack holds the isolated worker pool and session manager for an org. +type OrgStack struct { + Config *configstore.OrgConfig Pool WorkerPool Sessions *SessionManager Rebalancer *MemoryRebalancer cancel context.CancelFunc } -// TeamRouter manages per-team stacks, creating/destroying them as config changes. -type TeamRouter struct { +// OrgRouter manages per-org stacks, creating/destroying them as config changes. +type OrgRouter struct { mu sync.RWMutex - teams map[string]*TeamStack + orgs map[string]*OrgStack configStore *configstore.ConfigStore baseCfg K8sWorkerPoolConfig sharedPool *K8sWorkerPool @@ -36,10 +36,10 @@ type TeamRouter struct { sharedCancel context.CancelFunc } -// NewTeamRouter creates a TeamRouter from the initial config snapshot. -func NewTeamRouter(store *configstore.ConfigStore, baseCfg K8sWorkerPoolConfig, globalCfg ControlPlaneConfig, srv *server.Server) (*TeamRouter, error) { - tr := &TeamRouter{ - teams: make(map[string]*TeamStack), +// NewOrgRouter creates an OrgRouter from the initial config snapshot. +func NewOrgRouter(store *configstore.ConfigStore, baseCfg K8sWorkerPoolConfig, globalCfg ControlPlaneConfig, srv *server.Server) (*OrgRouter, error) { + tr := &OrgRouter{ + orgs: make(map[string]*OrgStack), configStore: store, baseCfg: baseCfg, globalCfg: globalCfg, @@ -47,7 +47,7 @@ func NewTeamRouter(store *configstore.ConfigStore, baseCfg K8sWorkerPoolConfig, } sharedCfg := baseCfg - sharedCfg.TeamName = "" + sharedCfg.OrgID = "" sharedCfg.WorkerIDGenerator = func() int { return int(tr.nextWorkerID.Add(1)) } @@ -67,9 +67,9 @@ func NewTeamRouter(store *configstore.ConfigStore, baseCfg K8sWorkerPoolConfig, go tr.sharedPool.HealthCheckLoop(sharedCtx, tr.globalCfg.HealthCheckInterval, tr.onSharedWorkerCrash, tr.onSharedWorkerProgress) snap := store.Snapshot() - for _, tc := range snap.Teams { - if _, err := tr.createTeamStack(tc); err != nil { - slog.Error("Failed to create team stack.", "team", tc.Name, "error", err) + for _, tc := range snap.Orgs { + if _, err := tr.createOrgStack(tc); err != nil { + slog.Error("Failed to create org stack.", "org", tc.Name, "error", err) continue } } @@ -79,8 +79,8 @@ func NewTeamRouter(store *configstore.ConfigStore, baseCfg K8sWorkerPoolConfig, return tr, nil } -// createTeamStack creates an isolated pool + session manager for a team. -func (tr *TeamRouter) createTeamStack(tc *configstore.TeamConfig) (*TeamStack, error) { +// createOrgStack creates an isolated pool + session manager for an org. +func (tr *OrgRouter) createOrgStack(tc *configstore.OrgConfig) (*OrgStack, error) { ctx, cancel := context.WithCancel(context.Background()) maxWorkers := tc.MaxWorkers @@ -93,17 +93,17 @@ func (tr *TeamRouter) createTeamStack(tc *configstore.TeamConfig) (*TeamStack, e memoryBudget = int64(server.ParseMemoryBytes(tc.MemoryBudget)) } - pool := NewTeamReservedWorkerPool(tr.sharedPool, tc.Name, maxWorkers) - pool.resolveTeamConfig = func() (*configstore.TeamConfig, error) { + pool := NewOrgReservedPool(tr.sharedPool, tc.Name, maxWorkers) + pool.resolveOrgConfig = func() (*configstore.OrgConfig, error) { snap := tr.configStore.Snapshot() if snap == nil { - return nil, fmt.Errorf("config snapshot unavailable for team %s", tc.Name) + return nil, fmt.Errorf("config snapshot unavailable for org %s", tc.Name) } - team, ok := snap.Teams[tc.Name] + org, ok := snap.Orgs[tc.Name] if !ok { - return nil, fmt.Errorf("team %s not found in config snapshot", tc.Name) + return nil, fmt.Errorf("org %s not found in config snapshot", tc.Name) } - return team, nil + return org, nil } pool.EnableSharedWarmActivation(tr.globalCfg.K8s.SharedWarmWorkers) @@ -111,8 +111,8 @@ func (tr *TeamRouter) createTeamStack(tc *configstore.TeamConfig) (*TeamStack, e sessions := NewSessionManager(pool, rebalancer) rebalancer.SetSessionLister(sessions) - // Periodic per-team metrics emission - teamName := tc.Name + // Periodic per-org metrics emission + orgID := tc.Name go func() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -122,12 +122,12 @@ func (tr *TeamRouter) createTeamStack(tc *configstore.TeamConfig) (*TeamStack, e return case <-ticker.C: sessionCount := sessions.SessionCount() - observeTeamSessionsActive(teamName, sessionCount) + observeOrgSessionsActive(orgID, sessionCount) } } }() - stack := &TeamStack{ + stack := &OrgStack{ Config: tc, Pool: pool, Sessions: sessions, @@ -136,26 +136,26 @@ func (tr *TeamRouter) createTeamStack(tc *configstore.TeamConfig) (*TeamStack, e } tr.mu.Lock() - tr.teams[tc.Name] = stack + tr.orgs[tc.Name] = stack tr.mu.Unlock() - slog.Info("Team stack created.", "team", tc.Name, "max_workers", maxWorkers) + slog.Info("Org stack created.", "org", tc.Name, "max_workers", maxWorkers) _ = ctx // keep linter happy return stack, nil } -// DestroyTeamStack drains and cleans up a team's resources. -func (tr *TeamRouter) DestroyTeamStack(teamName string) { +// DestroyOrgStack drains and cleans up an org's resources. +func (tr *OrgRouter) DestroyOrgStack(orgID string) { tr.mu.Lock() - stack, ok := tr.teams[teamName] + stack, ok := tr.orgs[orgID] if !ok { tr.mu.Unlock() return } - delete(tr.teams, teamName) + delete(tr.orgs, orgID) tr.mu.Unlock() - slog.Info("Destroying team stack.", "team", teamName) + slog.Info("Destroying org stack.", "org", orgID) stack.cancel() stack.Pool.ShutdownAll() if stack.Rebalancer != nil { @@ -163,52 +163,52 @@ func (tr *TeamRouter) DestroyTeamStack(teamName string) { } } -// StackForUser resolves a username to its team stack. -func (tr *TeamRouter) StackForUser(username string) (*TeamStack, bool) { - teamName := tr.configStore.TeamForUser(username) - if teamName == "" { +// StackForUser resolves a username to its org stack. +func (tr *OrgRouter) StackForUser(username string) (*OrgStack, bool) { + orgID := tr.configStore.OrgForUser(username) + if orgID == "" { return nil, false } tr.mu.RLock() - stack, ok := tr.teams[teamName] + stack, ok := tr.orgs[orgID] tr.mu.RUnlock() return stack, ok } -// HandleConfigChange reconciles team stacks when the config snapshot changes. -func (tr *TeamRouter) HandleConfigChange(old, new *configstore.Snapshot) { - // Detect new teams - for name, tc := range new.Teams { - if _, existed := old.Teams[name]; !existed { - slog.Info("New team detected, creating stack.", "team", name) - if _, err := tr.createTeamStack(tc); err != nil { - slog.Error("Failed to create team stack on config change.", "team", name, "error", err) +// HandleConfigChange reconciles org stacks when the config snapshot changes. +func (tr *OrgRouter) HandleConfigChange(old, new *configstore.Snapshot) { + // Detect new orgs + for name, tc := range new.Orgs { + if _, existed := old.Orgs[name]; !existed { + slog.Info("New org detected, creating stack.", "org", name) + if _, err := tr.createOrgStack(tc); err != nil { + slog.Error("Failed to create org stack on config change.", "org", name, "error", err) } } } - // Detect removed teams - for name := range old.Teams { - if _, exists := new.Teams[name]; !exists { - slog.Info("Team removed, destroying stack.", "team", name) - tr.DestroyTeamStack(name) + // Detect removed orgs + for name := range old.Orgs { + if _, exists := new.Orgs[name]; !exists { + slog.Info("Org removed, destroying stack.", "org", name) + tr.DestroyOrgStack(name) } } - // Refresh existing team stacks and update worker limits when needed. - for name, newTC := range new.Teams { - oldTC, existed := old.Teams[name] + // Refresh existing org stacks and update worker limits when needed. + for name, newTC := range new.Orgs { + oldTC, existed := old.Orgs[name] if !existed { continue } limitsChanged := oldTC.MaxWorkers != newTC.MaxWorkers || oldTC.MemoryBudget != newTC.MemoryBudget tr.mu.Lock() - if stack, ok := tr.teams[name]; ok { + if stack, ok := tr.orgs[name]; ok { stack.Config = newTC if limitsChanged { - slog.Info("Team config changed.", "team", name, + slog.Info("Org config changed.", "org", name, "old_max_workers", oldTC.MaxWorkers, "new_max_workers", newTC.MaxWorkers) maxWorkers := newTC.MaxWorkers if maxWorkers == 0 { @@ -216,7 +216,7 @@ func (tr *TeamRouter) HandleConfigChange(old, new *configstore.Snapshot) { } stack.Pool.SetMaxWorkers(maxWorkers) } - if reserved, ok := stack.Pool.(*TeamReservedWorkerPool); ok { + if reserved, ok := stack.Pool.(*OrgReservedPool); ok { reserved.EnableSharedWarmActivation(tr.globalCfg.K8s.SharedWarmWorkers) } } @@ -226,29 +226,29 @@ func (tr *TeamRouter) HandleConfigChange(old, new *configstore.Snapshot) { tr.reconcileWarmCapacity(new) } -// AllStacks returns a snapshot of all team stacks for admin API usage. -func (tr *TeamRouter) AllStacks() map[string]*TeamStack { +// AllStacks returns a snapshot of all org stacks for admin API usage. +func (tr *OrgRouter) AllStacks() map[string]*OrgStack { tr.mu.RLock() defer tr.mu.RUnlock() - result := make(map[string]*TeamStack, len(tr.teams)) - for k, v := range tr.teams { + result := make(map[string]*OrgStack, len(tr.orgs)) + for k, v := range tr.orgs { result[k] = v } return result } -// ShutdownAll shuts down all team stacks. -func (tr *TeamRouter) ShutdownAll() { +// ShutdownAll shuts down all org stacks. +func (tr *OrgRouter) ShutdownAll() { tr.mu.Lock() - teams := make(map[string]*TeamStack, len(tr.teams)) - for k, v := range tr.teams { - teams[k] = v + orgs := make(map[string]*OrgStack, len(tr.orgs)) + for k, v := range tr.orgs { + orgs[k] = v } - tr.teams = make(map[string]*TeamStack) + tr.orgs = make(map[string]*OrgStack) tr.mu.Unlock() - for name, stack := range teams { - slog.Info("Shutting down team stack.", "team", name) + for name, stack := range orgs { + slog.Info("Shutting down org stack.", "org", name) stack.cancel() stack.Pool.ShutdownAll() if stack.Rebalancer != nil { @@ -264,7 +264,7 @@ func (tr *TeamRouter) ShutdownAll() { } } -func (tr *TeamRouter) reconcileWarmCapacity(snap *configstore.Snapshot) { +func (tr *OrgRouter) reconcileWarmCapacity(snap *configstore.Snapshot) { if tr.sharedPool == nil || snap == nil { return } @@ -276,26 +276,26 @@ func (tr *TeamRouter) reconcileWarmCapacity(snap *configstore.Snapshot) { tr.sharedPool.SetWarmCapacityTarget(target) if target > 0 { - observeTeamWorkerSpawn("shared") + observeOrgWorkerSpawn("shared") if err := tr.sharedPool.SpawnMinWorkers(target); err != nil { slog.Warn("Failed to reconcile shared warm capacity.", "target", target, "error", err) } } } -func (tr *TeamRouter) onSharedWorkerCrash(workerID int) { - stack, teamName, ok := tr.stackForWorker(workerID) +func (tr *OrgRouter) onSharedWorkerCrash(workerID int) { + stack, orgID, ok := tr.stackForWorker(workerID) if !ok { return } - observeTeamWorkerCrash(teamName) + observeOrgWorkerCrash(orgID) stack.Sessions.OnWorkerCrash(workerID, func(pid int32) { - slog.Warn("Session orphaned by worker crash.", "team", teamName, "pid", pid, "worker", workerID) + slog.Warn("Session orphaned by worker crash.", "org", orgID, "pid", pid, "worker", workerID) }) } -func (tr *TeamRouter) onSharedWorkerProgress(workerID int, progress map[string]*SessionProgress) { +func (tr *OrgRouter) onSharedWorkerProgress(workerID int, progress map[string]*SessionProgress) { stack, _, ok := tr.stackForWorker(workerID) if !ok { return @@ -303,13 +303,13 @@ func (tr *TeamRouter) onSharedWorkerProgress(workerID int, progress map[string]* stack.Sessions.UpdateProgress(workerID, progress) } -func (tr *TeamRouter) stackForWorker(workerID int) (*TeamStack, string, bool) { +func (tr *OrgRouter) stackForWorker(workerID int) (*OrgStack, string, bool) { tr.mu.RLock() defer tr.mu.RUnlock() - for teamName, stack := range tr.teams { + for orgID, stack := range tr.orgs { if stack.Sessions.SessionCountForWorker(workerID) > 0 { - return stack, teamName, true + return stack, orgID, true } } return nil, "", false diff --git a/controlplane/team_router_test.go b/controlplane/org_router_test.go similarity index 67% rename from controlplane/team_router_test.go rename to controlplane/org_router_test.go index 8281e54..72339df 100644 --- a/controlplane/team_router_test.go +++ b/controlplane/org_router_test.go @@ -9,7 +9,7 @@ import ( "github.com/posthog/duckgres/controlplane/configstore" ) -func TestTeamRouterReconcileWarmCapacityUsesExplicitSharedWarmTarget(t *testing.T) { +func TestOrgRouterReconcileWarmCapacityUsesExplicitSharedWarmTarget(t *testing.T) { sharedPool, _ := newTestK8sPool(t, 10) sharedPool.spawnWarmWorkerFunc = func(ctx context.Context, id int) error { sharedPool.mu.Lock() @@ -17,7 +17,7 @@ func TestTeamRouterReconcileWarmCapacityUsesExplicitSharedWarmTarget(t *testing. sharedPool.workers[id] = &ManagedWorker{ID: id, done: make(chan struct{})} return nil } - tr := &TeamRouter{ + tr := &OrgRouter{ sharedPool: sharedPool, globalCfg: ControlPlaneConfig{ K8s: K8sConfig{ @@ -27,7 +27,7 @@ func TestTeamRouterReconcileWarmCapacityUsesExplicitSharedWarmTarget(t *testing. } snap := &configstore.Snapshot{ - Teams: map[string]*configstore.TeamConfig{ + Orgs: map[string]*configstore.OrgConfig{ "analytics": {Name: "analytics"}, "billing": {Name: "billing"}, }, @@ -40,11 +40,11 @@ func TestTeamRouterReconcileWarmCapacityUsesExplicitSharedWarmTarget(t *testing. } } -func TestTeamRouterHandleConfigChangeRefreshesRuntimeOnlyUpdates(t *testing.T) { +func TestOrgRouterHandleConfigChangeRefreshesRuntimeOnlyUpdates(t *testing.T) { sharedPool, _ := newTestK8sPool(t, 10) - pool := NewTeamReservedWorkerPool(sharedPool, "analytics", 2) + pool := NewOrgReservedPool(sharedPool, "analytics", 2) - oldTC := &configstore.TeamConfig{ + oldTC := &configstore.OrgConfig{ Name: "analytics", Warehouse: &configstore.ManagedWarehouseConfig{ MetadataStore: configstore.ManagedWarehouseMetadataStore{ @@ -52,7 +52,7 @@ func TestTeamRouterHandleConfigChangeRefreshesRuntimeOnlyUpdates(t *testing.T) { }, }, } - newTC := &configstore.TeamConfig{ + newTC := &configstore.OrgConfig{ Name: "analytics", Warehouse: &configstore.ManagedWarehouseConfig{ MetadataStore: configstore.ManagedWarehouseMetadataStore{ @@ -61,8 +61,8 @@ func TestTeamRouterHandleConfigChangeRefreshesRuntimeOnlyUpdates(t *testing.T) { }, } - tr := &TeamRouter{ - teams: map[string]*TeamStack{ + tr := &OrgRouter{ + orgs: map[string]*OrgStack{ "analytics": { Config: oldTC, Pool: pool, @@ -73,11 +73,11 @@ func TestTeamRouterHandleConfigChangeRefreshesRuntimeOnlyUpdates(t *testing.T) { } tr.HandleConfigChange( - &configstore.Snapshot{Teams: map[string]*configstore.TeamConfig{"analytics": oldTC}}, - &configstore.Snapshot{Teams: map[string]*configstore.TeamConfig{"analytics": newTC}}, + &configstore.Snapshot{Orgs: map[string]*configstore.OrgConfig{"analytics": oldTC}}, + &configstore.Snapshot{Orgs: map[string]*configstore.OrgConfig{"analytics": newTC}}, ) - if got := tr.teams["analytics"].Config.Warehouse.MetadataStore.Endpoint; got != "new-metadata.internal" { + if got := tr.orgs["analytics"].Config.Warehouse.MetadataStore.Endpoint; got != "new-metadata.internal" { t.Fatalf("expected runtime-only update to refresh stack config, got %q", got) } } diff --git a/controlplane/shared_worker_activator.go b/controlplane/shared_worker_activator.go index 8efe28a..4db84dc 100644 --- a/controlplane/shared_worker_activator.go +++ b/controlplane/shared_worker_activator.go @@ -22,7 +22,7 @@ type SharedWorkerActivator struct { } type TenantActivationPayload struct { - TeamName string `json:"team_name"` + OrgID string `json:"org_id"` Usernames []string `json:"usernames,omitempty"` LeaseExpiresAt time.Time `json:"lease_expires_at,omitempty"` DuckLake server.DuckLakeConfig `json:"ducklake"` @@ -38,35 +38,35 @@ func NewSharedWorkerActivator(shared *K8sWorkerPool) *SharedWorkerActivator { } } -func (a *SharedWorkerActivator) ActivateReservedWorker(ctx context.Context, worker *ManagedWorker, team *configstore.TeamConfig) error { - if team == nil { - return fmt.Errorf("team config is required for activation") +func (a *SharedWorkerActivator) ActivateReservedWorker(ctx context.Context, worker *ManagedWorker, org *configstore.OrgConfig) error { + if org == nil { + return fmt.Errorf("org config is required for activation") } state := worker.SharedState() if state.Assignment == nil { - return fmt.Errorf("worker %d has no team assignment", worker.ID) + return fmt.Errorf("worker %d has no org assignment", worker.ID) } - payload, err := a.BuildActivationRequest(ctx, team, state.Assignment) + payload, err := a.BuildActivationRequest(ctx, org, state.Assignment) if err != nil { return err } return worker.ActivateTenant(ctx, server.WorkerActivationPayload{ - TeamName: payload.TeamName, + OrgID: payload.OrgID, LeaseExpiresAt: payload.LeaseExpiresAt, DuckLake: payload.DuckLake, }) } -func (a *SharedWorkerActivator) BuildActivationRequest(ctx context.Context, team *configstore.TeamConfig, assignment *WorkerAssignment) (TenantActivationPayload, error) { - if team == nil || team.Warehouse == nil { - return TenantActivationPayload{}, fmt.Errorf("team %q has no managed warehouse runtime", teamName(team)) +func (a *SharedWorkerActivator) BuildActivationRequest(ctx context.Context, org *configstore.OrgConfig, assignment *WorkerAssignment) (TenantActivationPayload, error) { + if org == nil || org.Warehouse == nil { + return TenantActivationPayload{}, fmt.Errorf("org %q has no managed warehouse runtime", orgName(org)) } if assignment == nil { return TenantActivationPayload{}, fmt.Errorf("worker assignment is required") } - warehouse := team.Warehouse + warehouse := org.Warehouse metadataPassword, err := a.readSecretValue(ctx, warehouse.MetadataStoreCredentials) if err != nil { return TenantActivationPayload{}, fmt.Errorf("metadata store credentials: %w", err) @@ -100,29 +100,29 @@ func (a *SharedWorkerActivator) BuildActivationRequest(ctx context.Context, team dl.S3Provider = "aws_sdk" } - usernames := make([]string, 0, len(team.Users)) - for username := range team.Users { + usernames := make([]string, 0, len(org.Users)) + for username := range org.Users { usernames = append(usernames, username) } slices.Sort(usernames) return TenantActivationPayload{ - TeamName: assignment.TeamName, + OrgID: assignment.OrgID, Usernames: usernames, LeaseExpiresAt: assignment.LeaseExpiresAt, DuckLake: dl, }, nil } -func BuildTenantActivationPayload(ctx context.Context, clientset kubernetes.Interface, defaultNamespace string, team *configstore.TeamConfig) (TenantActivationPayload, error) { +func BuildTenantActivationPayload(ctx context.Context, clientset kubernetes.Interface, defaultNamespace string, org *configstore.OrgConfig) (TenantActivationPayload, error) { activator := &SharedWorkerActivator{ clientset: clientset, defaultNamespace: defaultNamespace, } assignment := &WorkerAssignment{ - TeamName: teamName(team), + OrgID: orgName(org), } - return activator.BuildActivationRequest(ctx, team, assignment) + return activator.BuildActivationRequest(ctx, org, assignment) } func (a *SharedWorkerActivator) readSecretValue(ctx context.Context, ref configstore.SecretRef) (string, error) { @@ -204,9 +204,9 @@ func secretNamespace(ref configstore.SecretRef, fallback string) string { return fallback } -func teamName(team *configstore.TeamConfig) string { - if team == nil { +func orgName(org *configstore.OrgConfig) string { + if org == nil { return "" } - return team.Name + return org.Name } diff --git a/controlplane/worker_mgr_test.go b/controlplane/worker_mgr_test.go index 2042968..fda7db3 100644 --- a/controlplane/worker_mgr_test.go +++ b/controlplane/worker_mgr_test.go @@ -1140,7 +1140,7 @@ func TestManagedWorkerSetSharedStateClonesAssignment(t *testing.T) { input := SharedWorkerState{ Lifecycle: WorkerLifecycleReserved, Assignment: &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: leaseExpiry, }, } @@ -1150,7 +1150,7 @@ func TestManagedWorkerSetSharedStateClonesAssignment(t *testing.T) { t.Fatalf("SetSharedState: %v", err) } - input.Assignment.TeamName = "mutated" + input.Assignment.OrgID = "mutated" input.Assignment.LeaseExpiresAt = leaseExpiry.Add(time.Hour) got := w.SharedState() @@ -1160,19 +1160,19 @@ func TestManagedWorkerSetSharedStateClonesAssignment(t *testing.T) { if got.Assignment == input.Assignment { t.Fatal("expected worker state assignment to be cloned") } - if got.Assignment.TeamName != "analytics" { - t.Fatalf("expected stored team name analytics, got %q", got.Assignment.TeamName) + if got.Assignment.OrgID != "analytics" { + t.Fatalf("expected stored org ID analytics, got %q", got.Assignment.OrgID) } if !got.Assignment.LeaseExpiresAt.Equal(leaseExpiry) { t.Fatalf("expected stored lease expiry %v, got %v", leaseExpiry, got.Assignment.LeaseExpiresAt) } - got.Assignment.TeamName = "leaked" + got.Assignment.OrgID = "leaked" fresh := w.SharedState() if fresh.Assignment == nil { t.Fatal("expected stored assignment on subsequent read") } - if fresh.Assignment.TeamName != "analytics" { - t.Fatalf("expected readback clone to protect stored team name, got %q", fresh.Assignment.TeamName) + if fresh.Assignment.OrgID != "analytics" { + t.Fatalf("expected readback clone to protect stored org ID, got %q", fresh.Assignment.OrgID) } } diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 81ec902..0f09c69 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -58,8 +58,8 @@ type K8sWorkerPoolConfig struct { ImagePullPolicy string // Image pull policy for worker pods (e.g., "Never", "IfNotPresent", "Always") ServiceAccount string // ServiceAccount name for worker pods (default: "default") MemoryBudget int64 // Total memory budget in bytes; used to derive per-worker resource limits - TeamName string // Team name for pod labels (multi-tenant mode) - WorkerIDGenerator func() int // Shared ID generator across teams (nil = internal counter) + OrgID string // Org ID for pod labels (multi-tenant mode) + WorkerIDGenerator func() int // Shared ID generator across orgs (nil = internal counter) SharedWarmActivation bool } diff --git a/controlplane/worker_state.go b/controlplane/worker_state.go index 16fef04..a9c129b 100644 --- a/controlplane/worker_state.go +++ b/controlplane/worker_state.go @@ -20,9 +20,9 @@ const ( ) // WorkerAssignment carries tenant-specific metadata once a shared worker has -// been reserved for a team. +// been reserved for an org. type WorkerAssignment struct { - TeamName string + OrgID string LeaseExpiresAt time.Time } @@ -150,8 +150,8 @@ func resolveWorkerAssignment(current, proposed *WorkerAssignment) (*WorkerAssign return cloneWorkerAssignment(proposed), nil case proposed == nil: return cloneWorkerAssignment(current), nil - case current.TeamName != proposed.TeamName: - return nil, fmt.Errorf("assignment team cannot change from %q to %q", current.TeamName, proposed.TeamName) + case current.OrgID != proposed.OrgID: + return nil, fmt.Errorf("assignment org cannot change from %q to %q", current.OrgID, proposed.OrgID) default: return cloneWorkerAssignment(proposed), nil } @@ -161,8 +161,8 @@ func validateWorkerAssignment(assignment *WorkerAssignment) error { if assignment == nil { return fmt.Errorf("missing assignment") } - if assignment.TeamName == "" { - return fmt.Errorf("missing team name") + if assignment.OrgID == "" { + return fmt.Errorf("missing org ID") } if assignment.LeaseExpiresAt.IsZero() { return fmt.Errorf("missing lease expiry") diff --git a/controlplane/worker_state_test.go b/controlplane/worker_state_test.go index e33e9b9..475ab1d 100644 --- a/controlplane/worker_state_test.go +++ b/controlplane/worker_state_test.go @@ -23,7 +23,7 @@ func TestSharedWorkerStateTransitionLifecycle(t *testing.T) { leaseExpiry := time.Date(2026, time.March, 20, 16, 0, 0, 0, time.UTC) state, err := (SharedWorkerState{}).Transition(WorkerLifecycleReserved, &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: leaseExpiry, }) if err != nil { @@ -32,7 +32,7 @@ func TestSharedWorkerStateTransitionLifecycle(t *testing.T) { if got := state.NormalizedLifecycle(); got != WorkerLifecycleReserved { t.Fatalf("expected reserved lifecycle, got %q", got) } - if state.Assignment == nil || state.Assignment.TeamName != "analytics" { + if state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected analytics assignment, got %#v", state.Assignment) } if !state.Assignment.LeaseExpiresAt.Equal(leaseExpiry) { @@ -54,7 +54,7 @@ func TestSharedWorkerStateTransitionLifecycle(t *testing.T) { if got := state.NormalizedLifecycle(); got != WorkerLifecycleRetired { t.Fatalf("expected retired lifecycle, got %q", got) } - if state.Assignment == nil || state.Assignment.TeamName != "analytics" { + if state.Assignment == nil || state.Assignment.OrgID != "analytics" { t.Fatalf("expected retired worker to retain last assignment metadata, got %#v", state.Assignment) } } @@ -67,11 +67,11 @@ func TestSharedWorkerStateTransitionRejectsMissingOrInvalidAssignment(t *testing if _, err := (SharedWorkerState{}).Transition(WorkerLifecycleReserved, &WorkerAssignment{ LeaseExpiresAt: time.Now().Add(time.Hour), }); err == nil { - t.Fatal("expected reserve transition without team name to fail") + t.Fatal("expected reserve transition without org ID to fail") } if _, err := (SharedWorkerState{}).Transition(WorkerLifecycleReserved, &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", }); err == nil { t.Fatal("expected reserve transition without lease expiry to fail") } @@ -80,7 +80,7 @@ func TestSharedWorkerStateTransitionRejectsMissingOrInvalidAssignment(t *testing func TestSharedWorkerStateTransitionRejectsInvalidLifecycleMoves(t *testing.T) { leaseExpiry := time.Date(2026, time.March, 20, 16, 0, 0, 0, time.UTC) state, err := (SharedWorkerState{}).Transition(WorkerLifecycleReserved, &WorkerAssignment{ - TeamName: "analytics", + OrgID: "analytics", LeaseExpiresAt: leaseExpiry, }) if err != nil { @@ -97,7 +97,7 @@ func TestSharedWorkerStateTransitionRejectsInvalidLifecycleMoves(t *testing.T) { } if _, err := state.Transition(WorkerLifecycleHot, &WorkerAssignment{ - TeamName: "billing", + OrgID: "billing", LeaseExpiresAt: leaseExpiry.Add(time.Hour), }); err == nil { t.Fatal("expected activating -> hot transition to reject assignment changes") diff --git a/duckdbservice/activation.go b/duckdbservice/activation.go index 7b4a604..c08cb9a 100644 --- a/duckdbservice/activation.go +++ b/duckdbservice/activation.go @@ -14,7 +14,7 @@ import ( // ActivationPayload carries the tenant-specific runtime that is delivered to a // neutral shared warm worker over the control-plane RPC channel. type ActivationPayload struct { - TeamName string `json:"team_name"` + OrgID string `json:"org_id"` LeaseExpiresAt time.Time `json:"lease_expires_at"` DuckLake server.DuckLakeConfig `json:"ducklake"` } @@ -38,8 +38,8 @@ func (p *SessionPool) activateTenant(payload ActivationPayload) error { if !p.sharedWarmMode { return fmt.Errorf("tenant activation is not enabled for this worker") } - if strings.TrimSpace(payload.TeamName) == "" { - return fmt.Errorf("team_name is required") + if strings.TrimSpace(payload.OrgID) == "" { + return fmt.Errorf("org_id is required") } p.mu.RLock() @@ -49,7 +49,7 @@ func (p *SessionPool) activateTenant(payload ActivationPayload) error { if reflect.DeepEqual(current.payload, payload) { return nil } - return fmt.Errorf("worker already activated for team %q", current.payload.TeamName) + return fmt.Errorf("worker already activated for org %q", current.payload.OrgID) } cfg := p.cfg @@ -83,7 +83,7 @@ func (p *SessionPool) activateTenant(payload ActivationPayload) error { if reflect.DeepEqual(p.activation.payload, payload) { return nil } - return fmt.Errorf("worker already activated for team %q", p.activation.payload.TeamName) + return fmt.Errorf("worker already activated for org %q", p.activation.payload.OrgID) } p.activation = &activatedTenantRuntime{ diff --git a/duckdbservice/activation_test.go b/duckdbservice/activation_test.go index 9aae2b3..b5f58c5 100644 --- a/duckdbservice/activation_test.go +++ b/duckdbservice/activation_test.go @@ -44,7 +44,7 @@ func TestSessionPoolActivateTenantConfiguresTenantRuntime(t *testing.T) { }() err := pool.activateTenant(ActivationPayload{ - TeamName: "analytics", + OrgID: "analytics", DuckLake: server.DuckLakeConfig{ MetadataStore: "postgres:host=metadata.internal port=5432 user=ducklake password=secret dbname=ducklake", ObjectStore: "s3://analytics/warehouse/", @@ -55,8 +55,8 @@ func TestSessionPoolActivateTenantConfiguresTenantRuntime(t *testing.T) { } current := pool.currentActivation() - if current == nil || current.payload.TeamName != "analytics" { - t.Fatalf("expected activated team analytics, got %#v", current) + if current == nil || current.payload.OrgID != "analytics" { + t.Fatalf("expected activated org analytics, got %#v", current) } if captured.DuckLake.MetadataStore == "" || captured.DuckLake.ObjectStore == "" { t.Fatalf("expected activated DuckLake config to be applied, got %#v", captured.DuckLake) @@ -86,7 +86,7 @@ func TestSessionPoolActivateTenantRejectsSecondActivation(t *testing.T) { } if err := pool.activateTenant(ActivationPayload{ - TeamName: "analytics", + OrgID: "analytics", DuckLake: server.DuckLakeConfig{ MetadataStore: "postgres:host=metadata.internal port=5432 user=ducklake password=secret dbname=ducklake", }, @@ -95,7 +95,7 @@ func TestSessionPoolActivateTenantRejectsSecondActivation(t *testing.T) { } if err := pool.activateTenant(ActivationPayload{ - TeamName: "billing", + OrgID: "billing", DuckLake: server.DuckLakeConfig{ MetadataStore: "postgres:host=metadata.internal port=5432 user=ducklake password=secret dbname=ducklake", }, diff --git a/duckdbservice/flight_handler.go b/duckdbservice/flight_handler.go index 881f57a..b9768dd 100644 --- a/duckdbservice/flight_handler.go +++ b/duckdbservice/flight_handler.go @@ -95,11 +95,11 @@ func (h *FlightSQLHandler) doActivateTenant(body []byte, stream flight.FlightSer if err := json.Unmarshal(body, &payload); err != nil { return status.Errorf(codes.InvalidArgument, "invalid ActivateTenant request: %v", err) } - if strings.TrimSpace(payload.TeamName) == "" { - return status.Error(codes.InvalidArgument, "team_name is required") + if strings.TrimSpace(payload.OrgID) == "" { + return status.Error(codes.InvalidArgument, "org_id is required") } - if current := h.pool.currentActivation(); current != nil && current.payload.TeamName != payload.TeamName { - return status.Errorf(codes.FailedPrecondition, "activate tenant: worker already activated for team %q", current.payload.TeamName) + if current := h.pool.currentActivation(); current != nil && current.payload.OrgID != payload.OrgID { + return status.Errorf(codes.FailedPrecondition, "activate tenant: worker already activated for org %q", current.payload.OrgID) } if err := h.pool.activateTenantFunc(payload); err != nil { diff --git a/duckdbservice/flight_handler_test.go b/duckdbservice/flight_handler_test.go index 6044ec9..3903f24 100644 --- a/duckdbservice/flight_handler_test.go +++ b/duckdbservice/flight_handler_test.go @@ -157,7 +157,7 @@ func TestActivateTenantRejectsDifferentTenantAfterActivation(t *testing.T) { stream := &mockDoActionStream{} firstBody, err := json.Marshal(ActivationPayload{ - TeamName: "analytics", + OrgID: "analytics", }) if err != nil { t.Fatalf("marshal first request: %v", err) @@ -167,7 +167,7 @@ func TestActivateTenantRejectsDifferentTenantAfterActivation(t *testing.T) { } secondBody, err := json.Marshal(ActivationPayload{ - TeamName: "billing", + OrgID: "billing", }) if err != nil { t.Fatalf("marshal second request: %v", err) diff --git a/k8s/kind/config-store-seed.sql b/k8s/kind/config-store-seed.sql index d541d02..6be803c 100644 --- a/k8s/kind/config-store-seed.sql +++ b/k8s/kind/config-store-seed.sql @@ -1,10 +1,10 @@ -INSERT INTO duckgres_teams (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) +INSERT INTO duckgres_orgs (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) VALUES ('local', 0, '', 0, NOW(), NOW()) ON CONFLICT (name) DO UPDATE SET updated_at = NOW(); INSERT INTO duckgres_managed_warehouses ( - team_name, + org_id, warehouse_database_region, warehouse_database_endpoint, warehouse_database_port, @@ -73,7 +73,7 @@ VALUES ( 'minio', 'us-east-1', 'duckgres-local', - 'teams/local/', + 'orgs/local/', 'duckgres-local-minio:9000', false, 'path', @@ -109,7 +109,7 @@ VALUES ( NOW(), NOW() ) -ON CONFLICT (team_name) DO UPDATE +ON CONFLICT (org_id) DO UPDATE SET warehouse_database_region = EXCLUDED.warehouse_database_region, warehouse_database_endpoint = EXCLUDED.warehouse_database_endpoint, warehouse_database_port = EXCLUDED.warehouse_database_port, @@ -160,9 +160,9 @@ SET warehouse_database_region = EXCLUDED.warehouse_database_region, failed_at = EXCLUDED.failed_at, updated_at = NOW(); -INSERT INTO duckgres_team_users (username, password, team_name, created_at, updated_at) +INSERT INTO duckgres_org_users (username, password, org_id, created_at, updated_at) VALUES ('postgres', '$2a$10$TQyt73Vw91Q1d7YcE86EVuhms/0u4qBydMDyVvZYlqDwc3/VtQAbm', 'local', NOW(), NOW()) ON CONFLICT (username) DO UPDATE SET password = EXCLUDED.password, - team_name = EXCLUDED.team_name, + org_id = EXCLUDED.org_id, updated_at = NOW(); diff --git a/k8s/kind/config-store.seed.sql b/k8s/kind/config-store.seed.sql index 6253f1f..1dc6d0f 100644 --- a/k8s/kind/config-store.seed.sql +++ b/k8s/kind/config-store.seed.sql @@ -1,10 +1,10 @@ -INSERT INTO duckgres_teams (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) +INSERT INTO duckgres_orgs (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) VALUES ('local', 0, '', 0, NOW(), NOW()) ON CONFLICT (name) DO UPDATE SET updated_at = NOW(); INSERT INTO duckgres_managed_warehouses ( - team_name, + org_id, warehouse_database_region, warehouse_database_endpoint, warehouse_database_port, @@ -73,7 +73,7 @@ VALUES ( 'minio', 'us-east-1', 'duckgres-local', - 'teams/local/', + 'orgs/local/', 'duckgres-local-minio:9000', false, 'path', @@ -109,7 +109,7 @@ VALUES ( NOW(), NOW() ) -ON CONFLICT (team_name) DO UPDATE +ON CONFLICT (org_id) DO UPDATE SET warehouse_database_region = EXCLUDED.warehouse_database_region, warehouse_database_endpoint = EXCLUDED.warehouse_database_endpoint, warehouse_database_port = EXCLUDED.warehouse_database_port, @@ -160,9 +160,9 @@ SET warehouse_database_region = EXCLUDED.warehouse_database_region, failed_at = EXCLUDED.failed_at, updated_at = NOW(); -INSERT INTO duckgres_team_users (username, password, team_name, created_at, updated_at) +INSERT INTO duckgres_org_users (username, password, org_id, created_at, updated_at) VALUES ('postgres', '$2a$10$TQyt73Vw91Q1d7YcE86EVuhms/0u4qBydMDyVvZYlqDwc3/VtQAbm', 'local', NOW(), NOW()) ON CONFLICT (username) DO UPDATE SET password = EXCLUDED.password, - team_name = EXCLUDED.team_name, + org_id = EXCLUDED.org_id, updated_at = NOW(); diff --git a/k8s/local-config-store.seed.sql b/k8s/local-config-store.seed.sql index 4dca6e4..65c3706 100644 --- a/k8s/local-config-store.seed.sql +++ b/k8s/local-config-store.seed.sql @@ -1,10 +1,10 @@ -INSERT INTO duckgres_teams (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) +INSERT INTO duckgres_orgs (name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) VALUES ('local', 0, '', 0, NOW(), NOW()) ON CONFLICT (name) DO UPDATE SET updated_at = NOW(); INSERT INTO duckgres_managed_warehouses ( - team_name, + org_id, warehouse_database_region, warehouse_database_endpoint, warehouse_database_port, @@ -73,7 +73,7 @@ VALUES ( 'minio', 'us-east-1', 'duckgres-local', - 'teams/local/', + 'orgs/local/', 'host.docker.internal:39000', false, 'path', @@ -109,7 +109,7 @@ VALUES ( NOW(), NOW() ) -ON CONFLICT (team_name) DO UPDATE +ON CONFLICT (org_id) DO UPDATE SET warehouse_database_region = EXCLUDED.warehouse_database_region, warehouse_database_endpoint = EXCLUDED.warehouse_database_endpoint, warehouse_database_port = EXCLUDED.warehouse_database_port, @@ -160,9 +160,9 @@ SET warehouse_database_region = EXCLUDED.warehouse_database_region, failed_at = EXCLUDED.failed_at, updated_at = NOW(); -INSERT INTO duckgres_team_users (username, password, team_name, created_at, updated_at) +INSERT INTO duckgres_org_users (username, password, org_id, created_at, updated_at) VALUES ('postgres', '$2a$10$TQyt73Vw91Q1d7YcE86EVuhms/0u4qBydMDyVvZYlqDwc3/VtQAbm', 'local', NOW(), NOW()) ON CONFLICT (username) DO UPDATE SET password = EXCLUDED.password, - team_name = EXCLUDED.team_name, + org_id = EXCLUDED.org_id, updated_at = NOW(); diff --git a/server/worker_activation.go b/server/worker_activation.go index fe2a0fe..fb48433 100644 --- a/server/worker_activation.go +++ b/server/worker_activation.go @@ -5,7 +5,7 @@ import "time" // WorkerActivationPayload is the tenant runtime material delivered to a shared // warm worker over the control-plane RPC path. type WorkerActivationPayload struct { - TeamName string `json:"team_name"` + OrgID string `json:"org_id"` LeaseExpiresAt time.Time `json:"lease_expires_at"` DuckLake DuckLakeConfig `json:"ducklake"` } diff --git a/tests/controlplane/kind_config_store_seed_test.go b/tests/controlplane/kind_config_store_seed_test.go index f3bf4bf..d6c3979 100644 --- a/tests/controlplane/kind_config_store_seed_test.go +++ b/tests/controlplane/kind_config_store_seed_test.go @@ -23,29 +23,29 @@ func TestKindConfigStoreSeedSQL(t *testing.T) { } snap := store.Snapshot() - teamCfg := snap.Teams["local"] - if teamCfg == nil { - t.Fatal("expected local team from kind seed") + orgCfg := snap.Orgs["local"] + if orgCfg == nil { + t.Fatal("expected local org from kind seed") } - if teamCfg.Warehouse == nil { + if orgCfg.Warehouse == nil { t.Fatal("expected local warehouse from kind seed") } - if got := teamCfg.Warehouse.MetadataStore.Endpoint; got != "duckgres-local-ducklake-metadata" { + if got := orgCfg.Warehouse.MetadataStore.Endpoint; got != "duckgres-local-ducklake-metadata" { t.Fatalf("expected kind metadata endpoint, got %q", got) } - if got := teamCfg.Warehouse.MetadataStore.Port; got != 5432 { + if got := orgCfg.Warehouse.MetadataStore.Port; got != 5432 { t.Fatalf("expected kind metadata port 5432, got %d", got) } - if got := teamCfg.Warehouse.S3.Endpoint; got != "duckgres-local-minio:9000" { + if got := orgCfg.Warehouse.S3.Endpoint; got != "duckgres-local-minio:9000" { t.Fatalf("expected kind s3 endpoint, got %q", got) } - if got := teamCfg.Warehouse.S3.Bucket; got != "duckgres-local" { + if got := orgCfg.Warehouse.S3.Bucket; got != "duckgres-local" { t.Fatalf("expected duckgres-local bucket, got %q", got) } - if got := teamCfg.Warehouse.MetadataStoreCredentials.Name; got != "duckgres-local-metadata" { + if got := orgCfg.Warehouse.MetadataStoreCredentials.Name; got != "duckgres-local-metadata" { t.Fatalf("expected kind metadata secret ref, got %q", got) } - if got := teamCfg.Warehouse.S3Credentials.Name; got != "duckgres-local-s3" { + if got := orgCfg.Warehouse.S3Credentials.Name; got != "duckgres-local-s3" { t.Fatalf("expected kind s3 secret ref, got %q", got) } } diff --git a/tests/controlplane/managed_warehouse_postgres_test.go b/tests/controlplane/managed_warehouse_postgres_test.go index 8accbbf..620b0e7 100644 --- a/tests/controlplane/managed_warehouse_postgres_test.go +++ b/tests/controlplane/managed_warehouse_postgres_test.go @@ -29,18 +29,18 @@ func TestManagedWarehouseConfigStorePostgres(t *testing.T) { t.Fatalf("hash password: %v", err) } - if err := store.DB().Create(&configstore.Team{Name: "analytics"}).Error; err != nil { - t.Fatalf("create team: %v", err) + if err := store.DB().Create(&configstore.Org{Name: "analytics"}).Error; err != nil { + t.Fatalf("create org: %v", err) } - if err := store.DB().Create(&configstore.TeamUser{ + if err := store.DB().Create(&configstore.OrgUser{ Username: "alice", Password: passwordHash, - TeamName: "analytics", + OrgID: "analytics", }).Error; err != nil { t.Fatalf("create user: %v", err) } if err := store.DB().Create(&configstore.ManagedWarehouse{ - TeamName: "analytics", + OrgID: "analytics", WarehouseDatabase: configstore.ManagedWarehouseDatabase{ Region: "us-east-1", Endpoint: "analytics.cluster.example", @@ -72,42 +72,42 @@ func TestManagedWarehouseConfigStorePostgres(t *testing.T) { t.Fatalf("reload store: %v", err) } - teamCfg := store.Snapshot().Teams["analytics"] - if teamCfg == nil { - t.Fatal("expected analytics team in snapshot") + orgCfg := store.Snapshot().Orgs["analytics"] + if orgCfg == nil { + t.Fatal("expected analytics org in snapshot") } - if teamCfg.Warehouse == nil { + if orgCfg.Warehouse == nil { t.Fatal("expected warehouse to be preloaded into snapshot") } - if teamCfg.Warehouse.WarehouseDatabase.DatabaseName != "analytics_wh" { - t.Fatalf("expected analytics_wh, got %q", teamCfg.Warehouse.WarehouseDatabase.DatabaseName) + if orgCfg.Warehouse.WarehouseDatabase.DatabaseName != "analytics_wh" { + t.Fatalf("expected analytics_wh, got %q", orgCfg.Warehouse.WarehouseDatabase.DatabaseName) } - if teamCfg.Warehouse.MetadataStore.Kind != "dedicated_rds" { - t.Fatalf("expected metadata store kind dedicated_rds, got %q", teamCfg.Warehouse.MetadataStore.Kind) + if orgCfg.Warehouse.MetadataStore.Kind != "dedicated_rds" { + t.Fatalf("expected metadata store kind dedicated_rds, got %q", orgCfg.Warehouse.MetadataStore.Kind) } - if teamCfg.Warehouse.MetadataStore.DatabaseName != "ducklake_metadata" { - t.Fatalf("expected ducklake_metadata, got %q", teamCfg.Warehouse.MetadataStore.DatabaseName) + if orgCfg.Warehouse.MetadataStore.DatabaseName != "ducklake_metadata" { + t.Fatalf("expected ducklake_metadata, got %q", orgCfg.Warehouse.MetadataStore.DatabaseName) } - if teamCfg.Users["alice"] != passwordHash { + if orgCfg.Users["alice"] != passwordHash { t.Fatal("expected user credentials to remain loaded in snapshot") } - if err := store.DB().Create(&configstore.Team{Name: "cleanup"}).Error; err != nil { - t.Fatalf("create cleanup team: %v", err) + if err := store.DB().Create(&configstore.Org{Name: "cleanup"}).Error; err != nil { + t.Fatalf("create cleanup org: %v", err) } if err := store.DB().Create(&configstore.ManagedWarehouse{ - TeamName: "cleanup", - State: configstore.ManagedWarehouseStateReady, + OrgID: "cleanup", + State: configstore.ManagedWarehouseStateReady, }).Error; err != nil { t.Fatalf("create cleanup warehouse: %v", err) } - if err := store.DB().Delete(&configstore.Team{Name: "cleanup"}).Error; err != nil { - t.Fatalf("delete team: %v", err) + if err := store.DB().Delete(&configstore.Org{Name: "cleanup"}).Error; err != nil { + t.Fatalf("delete org: %v", err) } var count int64 - if err := store.DB().Model(&configstore.ManagedWarehouse{}).Where("team_name = ?", "cleanup").Count(&count).Error; err != nil { + if err := store.DB().Model(&configstore.ManagedWarehouse{}).Where("org_id = ?", "cleanup").Count(&count).Error; err != nil { t.Fatalf("count warehouses: %v", err) } if count != 0 { @@ -127,30 +127,30 @@ func TestLocalConfigStoreSeedSQL(t *testing.T) { } snap := store.Snapshot() - teamCfg := snap.Teams["local"] - if teamCfg == nil { - t.Fatal("expected local team from seed") + orgCfg := snap.Orgs["local"] + if orgCfg == nil { + t.Fatal("expected local org from seed") } - if teamCfg.Warehouse == nil { + if orgCfg.Warehouse == nil { t.Fatal("expected local warehouse from seed") } - if teamCfg.Warehouse.WarehouseDatabase.DatabaseName != "duckgres_local" { - t.Fatalf("expected duckgres_local warehouse db, got %q", teamCfg.Warehouse.WarehouseDatabase.DatabaseName) + if orgCfg.Warehouse.WarehouseDatabase.DatabaseName != "duckgres_local" { + t.Fatalf("expected duckgres_local warehouse db, got %q", orgCfg.Warehouse.WarehouseDatabase.DatabaseName) } - if teamCfg.Warehouse.MetadataStore.DatabaseName != "ducklake_metadata_local" { - t.Fatalf("expected ducklake_metadata_local metadata db, got %q", teamCfg.Warehouse.MetadataStore.DatabaseName) + if orgCfg.Warehouse.MetadataStore.DatabaseName != "ducklake_metadata_local" { + t.Fatalf("expected ducklake_metadata_local metadata db, got %q", orgCfg.Warehouse.MetadataStore.DatabaseName) } - if teamCfg.Warehouse.WarehouseDatabaseCredentials.Name != "duckgres-local-warehouse-db" { - t.Fatalf("expected duckgres-local-warehouse-db secret ref, got %q", teamCfg.Warehouse.WarehouseDatabaseCredentials.Name) + if orgCfg.Warehouse.WarehouseDatabaseCredentials.Name != "duckgres-local-warehouse-db" { + t.Fatalf("expected duckgres-local-warehouse-db secret ref, got %q", orgCfg.Warehouse.WarehouseDatabaseCredentials.Name) } - if teamCfg.Warehouse.State != configstore.ManagedWarehouseStateReady { - t.Fatalf("expected ready warehouse state, got %q", teamCfg.Warehouse.State) + if orgCfg.Warehouse.State != configstore.ManagedWarehouseStateReady { + t.Fatalf("expected ready warehouse state, got %q", orgCfg.Warehouse.State) } - if teamCfg.Warehouse.MetadataStoreState != configstore.ManagedWarehouseStateReady { - t.Fatalf("expected ready metadata store state, got %q", teamCfg.Warehouse.MetadataStoreState) + if orgCfg.Warehouse.MetadataStoreState != configstore.ManagedWarehouseStateReady { + t.Fatalf("expected ready metadata store state, got %q", orgCfg.Warehouse.MetadataStoreState) } - if _, ok := teamCfg.Users["postgres"]; !ok { - t.Fatal("expected seeded postgres user to belong to local team") + if _, ok := orgCfg.Users["postgres"]; !ok { + t.Fatal("expected seeded postgres user to belong to local org") } }
IDTeamStatusSessions
IDOrgStatusSessions
${w.id}${esc(w.team)}${esc(w.org)} ${esc(w.status)} ${w.active_sessions}