Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
41f11cd
Refactor team → org across multitenant control plane
EDsCODE Mar 23, 2026
c7efd6e
Add provisioning controller and REST API for managed warehouses
EDsCODE Mar 19, 2026
5837088
Merge main into eric/provisioning-controller
EDsCODE Mar 23, 2026
d387c5e
Merge branch 'main' into eric/provisioning-controller
EDsCODE Mar 24, 2026
5bd462d
Merge admin and provisioning APIs onto single :8080 port
EDsCODE Mar 24, 2026
e29904e
Fix Duckling CR spec to match XRD schema
EDsCODE Mar 24, 2026
2558a58
Switch API auth to X-Duckgres-Internal-Secret header
EDsCODE Mar 24, 2026
a688506
Align controller with current Duckling XRD/composition
EDsCODE Mar 24, 2026
88f36bf
Add STS AssumeRole credential brokering for multi-tenant S3 access
EDsCODE Mar 24, 2026
8c5b0ef
Fix provisioning controller review issues
EDsCODE Mar 24, 2026
d87999c
Add pod log diagnostics to K8s CI and fix remaining port refs
EDsCODE Mar 24, 2026
1b2333b
Fix Gin route conflict: align admin API org param :name → :id
EDsCODE Mar 24, 2026
2adc0de
Move provisioning warehouse status to /warehouse/status to avoid rout…
EDsCODE Mar 24, 2026
9a414ed
Add readiness probe to kind control-plane deployment
EDsCODE Mar 24, 2026
e104ebd
Merge main into eric/provisioning-controller
EDsCODE Mar 24, 2026
f0c4e6f
Merge main into eric/provisioning-controller
EDsCODE Mar 24, 2026
cd03472
Retrigger CI
EDsCODE Mar 24, 2026
1aa8597
Keep metrics on :9090, API + dashboard on :8080
EDsCODE Mar 24, 2026
003ed46
Merge main into eric/provisioning-controller
EDsCODE Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions config_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type configCLIInputs struct {
MaxConnections int
ConfigStoreConn string
ConfigPollInterval string
AdminToken string
InternalSecret string
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
Expand All @@ -52,6 +52,8 @@ type configCLIInputs struct {
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sSharedWarmTarget int
AWSAccountID string
AWSRegion string
QueryLog bool
}

Expand All @@ -73,9 +75,11 @@ type resolvedConfig struct {
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sSharedWarmTarget int
AWSAccountID string
AWSRegion string
ConfigStoreConn string
ConfigPollInterval time.Duration
AdminToken string
InternalSecret string
}

func defaultServerConfig() server.Config {
Expand Down Expand Up @@ -129,9 +133,11 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
var k8sWorkerPort int
var k8sWorkerSecret, k8sWorkerConfigMap, k8sWorkerImagePullPolicy, k8sWorkerServiceAccount string
var k8sMaxWorkers, k8sSharedWarmTarget int
var awsAccountID string
var awsRegion string
var configStoreConn string
var configPollInterval time.Duration
var adminToken string
var internalSecret string

if fileCfg != nil {
if fileCfg.Host != "" {
Expand Down Expand Up @@ -581,8 +587,8 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
warn("Invalid DUCKGRES_CONFIG_POLL_INTERVAL duration: " + err.Error())
}
}
if v := getenv("DUCKGRES_ADMIN_TOKEN"); v != "" {
adminToken = v
if v := getenv("DUCKGRES_INTERNAL_SECRET"); v != "" {
internalSecret = v
}
if v := getenv("DUCKGRES_WORKER_BACKEND"); v != "" {
workerBackend = v
Expand Down Expand Up @@ -629,6 +635,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
warn("Invalid DUCKGRES_K8S_SHARED_WARM_TARGET: " + err.Error())
}
}
if v := getenv("DUCKGRES_AWS_ACCOUNT_ID"); v != "" {
awsAccountID = v
}
if v := getenv("DUCKGRES_AWS_REGION"); v != "" {
awsRegion = v
}

// Query log env vars
if v := getenv("DUCKGRES_QUERY_LOG_ENABLED"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
Expand Down Expand Up @@ -789,8 +802,8 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
warn("Invalid --config-poll-interval duration: " + err.Error())
}
}
if cli.Set["admin-token"] {
adminToken = cli.AdminToken
if cli.Set["internal-secret"] {
internalSecret = cli.InternalSecret
}
if cli.Set["worker-backend"] {
workerBackend = cli.WorkerBackend
Expand Down Expand Up @@ -825,6 +838,12 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if cli.Set["k8s-shared-warm-target"] {
k8sSharedWarmTarget = cli.K8sSharedWarmTarget
}
if cli.Set["aws-account-id"] {
awsAccountID = cli.AWSAccountID
}
if cli.Set["aws-region"] {
awsRegion = cli.AWSRegion
}
if cli.Set["query-log"] {
cfg.QueryLog.Enabled = cli.QueryLog
}
Expand Down Expand Up @@ -894,8 +913,10 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
K8sWorkerServiceAccount: k8sWorkerServiceAccount,
K8sMaxWorkers: k8sMaxWorkers,
K8sSharedWarmTarget: k8sSharedWarmTarget,
AWSAccountID: awsAccountID,
AWSRegion: awsRegion,
ConfigStoreConn: configStoreConn,
ConfigPollInterval: configPollInterval,
AdminToken: adminToken,
InternalSecret: internalSecret,
}
}
2 changes: 1 addition & 1 deletion controlplane/activation_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestBuildTenantActivationPayloadBuildsDuckLakeRuntimeFromWarehouseSecrets(t
},
}

payload, err := BuildTenantActivationPayload(context.Background(), pool.clientset, pool.namespace, org)
payload, err := BuildTenantActivationPayload(context.Background(), pool.clientset, pool.namespace, org, nil)
if err != nil {
t.Fatalf("BuildTenantActivationPayload: %v", err)
}
Expand Down
25 changes: 14 additions & 11 deletions controlplane/admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"gorm.io/gorm/clause"
)

var errWarehousePayloadNotAllowed = errors.New("warehouse payload must be updated via /orgs/:name/warehouse")
var errWarehousePayloadNotAllowed = errors.New("warehouse payload must be updated via /orgs/:id/warehouse")

// WorkerStatus represents a worker's current status for the API.
type WorkerStatus struct {
Expand Down Expand Up @@ -71,11 +71,11 @@ func registerAPIWithStore(r *gin.RouterGroup, store apiStore, info OrgStackInfo)
// Orgs CRUD
r.GET("/orgs", h.listOrgs)
r.POST("/orgs", h.createOrg)
r.GET("/orgs/:name", h.getOrg)
r.PUT("/orgs/:name", h.updateOrg)
r.DELETE("/orgs/:name", h.deleteOrg)
r.GET("/orgs/:name/warehouse", h.getManagedWarehouse)
r.PUT("/orgs/:name/warehouse", h.putManagedWarehouse)
r.GET("/orgs/:id", h.getOrg)
r.PUT("/orgs/:id", h.updateOrg)
r.DELETE("/orgs/:id", h.deleteOrg)
r.GET("/orgs/:id/warehouse", h.getManagedWarehouse)
r.PUT("/orgs/:id/warehouse", h.putManagedWarehouse)

// Users CRUD
r.GET("/users", h.listUsers)
Expand Down Expand Up @@ -285,6 +285,9 @@ func (s *gormAPIStore) UpsertManagedWarehouse(orgID string, warehouse *configsto

func managedWarehouseUpsertColumns() []string {
return []string{
"image",
"aurora_min_acu",
"aurora_max_acu",
"warehouse_database_region",
"warehouse_database_endpoint",
"warehouse_database_port",
Expand Down Expand Up @@ -481,7 +484,7 @@ func (h *apiHandler) createOrg(c *gin.Context) {
}

func (h *apiHandler) getOrg(c *gin.Context) {
name := c.Param("name")
name := c.Param("id")
org, err := h.store.GetOrg(name)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "org not found"})
Expand All @@ -491,7 +494,7 @@ func (h *apiHandler) getOrg(c *gin.Context) {
}

func (h *apiHandler) updateOrg(c *gin.Context) {
name := c.Param("name")
name := c.Param("id")
var updates configstore.Org
if err := c.ShouldBindJSON(&updates); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
Expand All @@ -514,7 +517,7 @@ func (h *apiHandler) updateOrg(c *gin.Context) {
}

func (h *apiHandler) deleteOrg(c *gin.Context) {
name := c.Param("name")
name := c.Param("id")
ok, err := h.store.DeleteOrg(name)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
Expand All @@ -535,7 +538,7 @@ func validateOrgMutationPayload(org *configstore.Org) error {
}

func (h *apiHandler) getManagedWarehouse(c *gin.Context) {
warehouse, err := h.store.GetManagedWarehouse(c.Param("name"))
warehouse, err := h.store.GetManagedWarehouse(c.Param("id"))
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "managed warehouse not found"})
Expand All @@ -548,7 +551,7 @@ func (h *apiHandler) getManagedWarehouse(c *gin.Context) {
}

func (h *apiHandler) putManagedWarehouse(c *gin.Context) {
orgID := c.Param("name")
orgID := c.Param("id")
var req managedWarehouseRequest
if err := decodeStrictWarehouseRequest(c, &req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
Expand Down
7 changes: 4 additions & 3 deletions controlplane/admin/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ func renderLoginPage(c *gin.Context, next, errMsg string) {
}

func requestAdminToken(c *gin.Context) string {
auth := c.GetHeader("Authorization")
if strings.HasPrefix(auth, "Bearer ") {
return strings.TrimPrefix(auth, "Bearer ")
// Primary: X-Duckgres-Internal-Secret header (service-to-service)
if secret := c.GetHeader("X-Duckgres-Internal-Secret"); secret != "" {
return secret
}
// Fallback: cookie (dashboard UI)
if cookie, err := c.Cookie(adminTokenCookieName); err == nil {
return cookie
}
Expand Down
12 changes: 12 additions & 0 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type ManagedWarehouseWorkerIdentity struct {
type ManagedWarehouse struct {
OrgID string `gorm:"primaryKey;size:255" json:"org_id"`

Image string `gorm:"size:512" json:"image"`
AuroraMinACU float64 `json:"aurora_min_acu"`
AuroraMaxACU float64 `json:"aurora_max_acu"`

WarehouseDatabase ManagedWarehouseDatabase `gorm:"embedded;embeddedPrefix:warehouse_database_" json:"warehouse_database"`
MetadataStore ManagedWarehouseMetadataStore `gorm:"embedded;embeddedPrefix:metadata_store_" json:"metadata_store"`
S3 ManagedWarehouseS3 `gorm:"embedded;embeddedPrefix:s3_" json:"s3"`
Expand All @@ -112,6 +116,7 @@ type ManagedWarehouse struct {
IdentityStatusMessage string `gorm:"size:1024" json:"identity_status_message"`
SecretsState ManagedWarehouseProvisioningState `gorm:"size:32" json:"secrets_state"`
SecretsStatusMessage string `gorm:"size:1024" json:"secrets_status_message"`
ProvisioningStartedAt *time.Time `json:"provisioning_started_at"`
ReadyAt *time.Time `json:"ready_at"`
FailedAt *time.Time `json:"failed_at"`
CreatedAt time.Time `json:"created_at"`
Expand Down Expand Up @@ -195,6 +200,10 @@ type OrgConfig struct {
type ManagedWarehouseConfig struct {
OrgID string

Image string
AuroraMinACU float64
AuroraMaxACU float64

WarehouseDatabase ManagedWarehouseDatabase
MetadataStore ManagedWarehouseMetadataStore
S3 ManagedWarehouseS3
Expand Down Expand Up @@ -228,6 +237,9 @@ func copyManagedWarehouseConfig(warehouse *ManagedWarehouse) *ManagedWarehouseCo

cfg := &ManagedWarehouseConfig{
OrgID: warehouse.OrgID,
Image: warehouse.Image,
AuroraMinACU: warehouse.AuroraMinACU,
AuroraMaxACU: warehouse.AuroraMaxACU,
WarehouseDatabase: warehouse.WarehouseDatabase,
MetadataStore: warehouse.MetadataStore,
S3: warehouse.S3,
Expand Down
25 changes: 25 additions & 0 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,31 @@ func (cs *ConfigStore) OnChange(fn func(old, new *Snapshot)) {
cs.onChange = append(cs.onChange, fn)
}

// ListWarehousesByStates returns all warehouses with a state matching one of the given values.
// This is a direct DB query, not snapshot-based, for use by the provisioning controller.
func (cs *ConfigStore) ListWarehousesByStates(states []ManagedWarehouseProvisioningState) ([]ManagedWarehouse, error) {
var warehouses []ManagedWarehouse
if err := cs.db.Where("state IN ?", states).Find(&warehouses).Error; err != nil {
return nil, fmt.Errorf("list warehouses by states: %w", err)
}
return warehouses, nil
}

// UpdateWarehouseState performs a compare-and-swap update on a warehouse row.
// Only updates if the current state matches expectedState, preventing races.
func (cs *ConfigStore) UpdateWarehouseState(orgID string, expectedState ManagedWarehouseProvisioningState, updates map[string]interface{}) error {
result := cs.db.Model(&ManagedWarehouse{}).
Where("org_id = ? AND state = ?", orgID, expectedState).
Updates(updates)
if result.Error != nil {
return fmt.Errorf("update warehouse state: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("warehouse %q not in expected state %q", orgID, expectedState)
}
return nil
}

// DB exposes the GORM database for direct CRUD operations (used by admin API).
func (cs *ConfigStore) DB() *gorm.DB {
return cs.db
Expand Down
27 changes: 16 additions & 11 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type ControlPlaneConfig struct {
// Default: 30s.
ConfigPollInterval time.Duration

// AdminToken is the bearer token required for admin API requests.
// When empty, a random token is generated and logged at startup.
AdminToken string
// InternalSecret is the shared secret for API authentication.
// When empty, a random secret is generated and logged at startup.
InternalSecret string

}

type ProcessConfig struct {
Expand All @@ -80,6 +81,8 @@ type K8sConfig struct {
ServiceAccount string // ServiceAccount name for worker pods (default: "default")
MaxWorkers int // Global cap for the shared K8s worker pool (0 = auto-derived)
SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled)
AWSAccountID string // AWS account ID for constructing IAM role ARNs (STS credential brokering)
AWSRegion string // AWS region for STS client
}

// ControlPlane manages the TCP listener and routes connections to Flight SQL workers.
Expand Down Expand Up @@ -110,6 +113,7 @@ type ControlPlane struct {
// Multi-tenant fields (non-nil in remote multitenant mode)
orgRouter OrgRouterInterface
configStore ConfigStoreInterface
apiServer *http.Server // API server on :8080 (shut down on graceful exit)
}

// ConfigStoreInterface abstracts the config store for the control plane.
Expand Down Expand Up @@ -317,20 +321,14 @@ func RunControlPlane(cfg ControlPlaneConfig) {

// Multi-tenant mode: config store + per-org pools (K8s remote backend only)
if cfg.WorkerBackend == "remote" {
store, adapter, adminSrv, err := SetupMultiTenant(cfg, srv, memBudget, k8sMaxWorkers)
store, adapter, apiServer, err := SetupMultiTenant(cfg, srv, memBudget, k8sMaxWorkers)
if err != nil {
slog.Error("Failed to set up multi-tenant config store.", "error", err)
os.Exit(1)
}
cp.configStore = store
cp.orgRouter = adapter
// Replace the simple metrics server with the Gin admin server
if cfg.MetricsServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_ = cfg.MetricsServer.Shutdown(ctx)
cancel()
}
cfg.MetricsServer = adminSrv
cp.apiServer = apiServer
cp.cfg = cfg
_ = store // keep linter happy
} else {
Expand Down Expand Up @@ -961,6 +959,13 @@ func (cp *ControlPlane) handleUpgrade() {
}
cancel()
}
if cp.apiServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := cp.apiServer.Shutdown(ctx); err != nil {
slog.Warn("API server shutdown failed.", "error", err)
}
cancel()
}

// Stop ACME managers so the new CP can bind port 80 (HTTP-01) or
// manage DNS records. Nil out after close so drainAfterUpgrade
Expand Down
Loading
Loading