diff --git a/server/checkpoint.go b/server/checkpoint.go index d8de843..6dbad61 100644 --- a/server/checkpoint.go +++ b/server/checkpoint.go @@ -59,17 +59,7 @@ func NewDuckLakeCheckpointer(cfg Config) (*DuckLakeCheckpointer, error) { } } - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - var attachStmt string - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - escapeSQLStringLiteral(dlCfg.MetadataStore), escapeSQLStringLiteral(dataPath)) - } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", escapeSQLStringLiteral(dlCfg.MetadataStore)) - } + attachStmt := buildDuckLakeAttachStmt(dlCfg, duckLakeMigrationNeeded()) if _, err := db.Exec(attachStmt); err != nil { _ = db.Close() return nil, fmt.Errorf("checkpoint: attach ducklake: %w", err) diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go new file mode 100644 index 0000000..2e2836d --- /dev/null +++ b/server/ducklake_migration.go @@ -0,0 +1,432 @@ +package server + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" +) + +// duckLakeSpecVersion is the DuckLake spec version that this build of duckgres expects. +// When the metadata store is at an older version, we backup and migrate automatically. +// IMPORTANT: This must match the DuckLake version bundled with the current DuckDB driver. +// Bump this to "0.4" only after upgrading duckdb-go to a version that ships DuckLake 0.4 +// (DuckDB 1.5.x), since AUTOMATIC_MIGRATION is not supported by older DuckLake extensions. +const duckLakeSpecVersion = "0.3" + +// dlMigration holds the result of the migration check. +// The check retries on transient errors (e.g., metadata store not reachable yet) +// but locks in the result once it succeeds. +// +// In multitenant control-plane mode, each worker process serves a single tenant +// with its own metadata store, so the per-process state is correct. +// If this changes (multiple metadata stores per process), this must be replaced +// with a sync.Map keyed by metadata store connection string. +var dlMigration struct { + mu sync.Mutex + done bool // true once the check has completed successfully + needed bool // true if metadata store version < duckLakeSpecVersion + err error // non-nil if the most recent check or backup failed + checkedV string // the version found in the metadata store +} + +// ensureDuckLakeMigrationCheck runs the migration check, retrying on transient errors. +// Once the check succeeds (regardless of whether migration is needed), the result +// is locked in and subsequent calls are no-ops. If the check fails, the error is +// stored but the next call will retry — this prevents a transient failure (e.g., +// metadata store not yet reachable during pod startup) from permanently blocking +// all connections. +// +// The backup file is written to dataDir. +// This should be called BEFORE acquiring the DuckLake attachment semaphore, +// since the backup can take minutes for large metadata stores. +func ensureDuckLakeMigrationCheck(dlCfg DuckLakeConfig, dataDir string) { + dlMigration.mu.Lock() + defer dlMigration.mu.Unlock() + + if dlMigration.done { + return + } + + needed, ver, err := checkAndBackupIfNeeded(dlCfg, dataDir) + dlMigration.needed = needed + dlMigration.checkedV = ver + dlMigration.err = err + + if err == nil { + dlMigration.done = true + } +} + +// duckLakeMigrationNeeded returns whether the ATTACH statement should include +// AUTOMATIC_MIGRATION TRUE. Safe to call after ensureDuckLakeMigrationCheck. +func duckLakeMigrationNeeded() bool { + return dlMigration.needed && dlMigration.err == nil +} + +// parseDuckLakeVersion parses a DuckLake version string like "0.3" into +// (major, minor) integers for reliable numeric comparison. +// Returns (0, 0, err) if the string cannot be parsed. +func parseDuckLakeVersion(ver string) (major, minor int, err error) { + parts := strings.SplitN(ver, ".", 2) + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid version format: %q", ver) + } + major, err = strconv.Atoi(parts[0]) + if err != nil { + return 0, 0, fmt.Errorf("invalid major version in %q: %w", ver, err) + } + minor, err = strconv.Atoi(parts[1]) + if err != nil { + return 0, 0, fmt.Errorf("invalid minor version in %q: %w", ver, err) + } + return major, minor, nil +} + +// versionLessThan returns true if version a is strictly less than version b. +// Both must be in "major.minor" format (e.g., "0.3", "0.4", "0.10"). +func versionLessThan(a, b string) (bool, error) { + aMaj, aMin, err := parseDuckLakeVersion(a) + if err != nil { + return false, err + } + bMaj, bMin, err := parseDuckLakeVersion(b) + if err != nil { + return false, err + } + return aMaj < bMaj || (aMaj == bMaj && aMin < bMin), nil +} + +// checkAndBackupIfNeeded connects to the metadata PostgreSQL store, checks the +// DuckLake spec version, and if migration is required, dumps all ducklake_* tables +// to a SQL backup file before returning. +func checkAndBackupIfNeeded(dlCfg DuckLakeConfig, dataDir string) (needed bool, version string, err error) { + if !strings.HasPrefix(dlCfg.MetadataStore, "postgres:") { + return false, "", nil + } + + connStr := strings.TrimPrefix(dlCfg.MetadataStore, "postgres:") + + pgDB, err := sql.Open("pgx", connStr) + if err != nil { + return false, "", fmt.Errorf("open metadata store: %w", err) + } + defer func() { _ = pgDB.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := pgDB.PingContext(ctx); err != nil { + return false, "", fmt.Errorf("connect to metadata store: %w", err) + } + + // Check if ducklake_metadata table exists (fresh install has no tables yet). + var exists bool + err = pgDB.QueryRowContext(ctx, + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'ducklake_metadata')").Scan(&exists) + if err != nil { + return false, "", fmt.Errorf("check ducklake_metadata existence: %w", err) + } + if !exists { + slog.Info("DuckLake metadata store has no ducklake_metadata table (fresh install), no migration needed.") + return false, "", nil + } + + // Read current spec version. + var ver string + err = pgDB.QueryRowContext(ctx, + `SELECT "value" FROM ducklake_metadata WHERE "key" = 'version'`).Scan(&ver) + if err != nil { + return false, "", fmt.Errorf("read DuckLake spec version: %w", err) + } + + slog.Info("DuckLake metadata store version detected.", "version", ver, "expected", duckLakeSpecVersion) + + less, err := versionLessThan(ver, duckLakeSpecVersion) + if err != nil { + return false, ver, fmt.Errorf("compare DuckLake versions: %w", err) + } + if !less { + return false, ver, nil + } + + // Migration needed — backup first. + slog.Info("DuckLake metadata migration required. Backing up metadata store before upgrade.", + "from", ver, "to", duckLakeSpecVersion) + + if err := backupDuckLakeMetadata(pgDB, dataDir, ver); err != nil { + return true, ver, fmt.Errorf("backup metadata before migration: %w", err) + } + + return true, ver, nil +} + +// backupDuckLakeMetadata dumps all ducklake_* tables from the PostgreSQL metadata +// store to a SQL file. The file contains CREATE TABLE + INSERT statements that can +// be used to restore the metadata if the migration goes wrong. +func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + // Discover all ducklake_* tables. + rows, err := pgDB.QueryContext(ctx, + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'ducklake_%' ORDER BY table_name") + if err != nil { + return fmt.Errorf("list ducklake tables: %w", err) + } + + var tables []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + _ = rows.Close() + return fmt.Errorf("scan table name: %w", err) + } + tables = append(tables, name) + } + _ = rows.Close() + if err := rows.Err(); err != nil { + return fmt.Errorf("iterate table names: %w", err) + } + + if len(tables) == 0 { + slog.Warn("No ducklake_* tables found in metadata store, nothing to back up.") + return nil + } + + // Create backup file. + timestamp := time.Now().UTC().Format("20060102T150405Z") + backupPath := filepath.Join(dataDir, fmt.Sprintf("ducklake-backup-%s-v%s.sql", timestamp, version)) + + slog.Info("Starting DuckLake metadata backup.", "path", backupPath, "tables", len(tables)) + + f, err := os.Create(backupPath) + if err != nil { + return fmt.Errorf("create backup file %s: %w", backupPath, err) + } + closed := false + defer func() { + if !closed { + _ = f.Close() + } + }() + + // Write header. + if _, err := fmt.Fprintf(f, "-- DuckLake metadata backup before migration (v%s → v%s)\n", version, duckLakeSpecVersion); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintf(f, "-- Generated: %s\n", time.Now().UTC().Format(time.RFC3339)); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintf(f, "-- Tables: %d\n\n", len(tables)); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintln(f, "BEGIN;"); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + + totalRows := 0 + for _, table := range tables { + count, err := backupTable(ctx, pgDB, f, table) + if err != nil { + return fmt.Errorf("backup table %s: %w", table, err) + } + totalRows += count + } + + if _, err := fmt.Fprintln(f, "\nCOMMIT;"); err != nil { + return fmt.Errorf("write backup footer: %w", err) + } + + // Flush to disk before closing — this is a critical safety net file. + if err := f.Sync(); err != nil { + return fmt.Errorf("fsync backup file: %w", err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("close backup file: %w", err) + } + closed = true + + info, _ := os.Stat(backupPath) + sizeMB := float64(0) + if info != nil { + sizeMB = float64(info.Size()) / 1024 / 1024 + } + + slog.Info("DuckLake metadata backup completed.", + "path", backupPath, + "tables", len(tables), + "rows", totalRows, + "size_mb", fmt.Sprintf("%.1f", sizeMB)) + + return nil +} + +// quoteIdent quotes a PostgreSQL identifier with double quotes. +// Any embedded double quotes are doubled per SQL standard. +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` +} + +// backupTable writes CREATE TABLE and INSERT statements for a single table. +// Returns the number of rows backed up. +func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (int, error) { + // Get column definitions. + colRows, err := pgDB.QueryContext(ctx, + "SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position", table) + if err != nil { + return 0, fmt.Errorf("get columns: %w", err) + } + + type colDef struct { + name string + dataType string + nullable string + } + var cols []colDef + for colRows.Next() { + var c colDef + if err := colRows.Scan(&c.name, &c.dataType, &c.nullable); err != nil { + _ = colRows.Close() + return 0, fmt.Errorf("scan column: %w", err) + } + cols = append(cols, c) + } + _ = colRows.Close() + if err := colRows.Err(); err != nil { + return 0, fmt.Errorf("iterate columns: %w", err) + } + + if len(cols) == 0 { + return 0, nil + } + + // Write CREATE TABLE with quoted identifiers. + quotedTable := quoteIdent(table) + if _, err := fmt.Fprintf(f, "\n-- Table: %s\n", table); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + if _, err := fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", quotedTable); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + for i, c := range cols { + nullStr := "" + if c.nullable == "NO" { + nullStr = " NOT NULL" + } + comma := "," + if i == len(cols)-1 { + comma = "" + } + if _, err := fmt.Fprintf(f, " %s %s%s%s\n", quoteIdent(c.name), c.dataType, nullStr, comma); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + } + if _, err := fmt.Fprintln(f, ");"); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + + // Build quoted column name list for SELECT and INSERT. + quotedColNames := make([]string, len(cols)) + for i, c := range cols { + quotedColNames[i] = quoteIdent(c.name) + } + quotedColList := strings.Join(quotedColNames, ", ") + + // Query all rows. + dataRows, err := pgDB.QueryContext(ctx, + fmt.Sprintf("SELECT %s FROM %s", quotedColList, quotedTable)) + if err != nil { + return 0, fmt.Errorf("select data: %w", err) + } + defer func() { _ = dataRows.Close() }() + + count := 0 + scanDest := make([]any, len(cols)) + scanPtrs := make([]any, len(cols)) + for i := range scanDest { + scanPtrs[i] = &scanDest[i] + } + + for dataRows.Next() { + if err := dataRows.Scan(scanPtrs...); err != nil { + return count, fmt.Errorf("scan row: %w", err) + } + + vals := make([]string, len(cols)) + for i, v := range scanDest { + vals[i] = formatSQLValue(v) + } + + if _, err := fmt.Fprintf(f, "INSERT INTO %s (%s) VALUES (%s);\n", + quotedTable, quotedColList, strings.Join(vals, ", ")); err != nil { + return count, fmt.Errorf("write: %w", err) + } + count++ + } + if err := dataRows.Err(); err != nil { + return count, fmt.Errorf("iterate rows: %w", err) + } + + return count, nil +} + +// formatSQLValue formats a Go value as a SQL literal for INSERT statements. +// Note: []byte is treated as UTF-8 text (fine for DuckLake metadata which stores +// only text, integers, and booleans — no bytea columns). +func formatSQLValue(v any) string { + if v == nil { + return "NULL" + } + switch val := v.(type) { + case bool: + if val { + return "TRUE" + } + return "FALSE" + case int64: + return fmt.Sprintf("%d", val) + case float64: + return fmt.Sprintf("%g", val) + case []byte: + return fmt.Sprintf("'%s'", strings.ReplaceAll(string(val), "'", "''")) + case string: + return fmt.Sprintf("'%s'", strings.ReplaceAll(val, "'", "''")) + case time.Time: + return fmt.Sprintf("'%s'", val.Format(time.RFC3339Nano)) + default: + s := fmt.Sprintf("%v", val) + return fmt.Sprintf("'%s'", strings.ReplaceAll(s, "'", "''")) + } +} + +// buildDuckLakeAttachStmt builds the ATTACH statement for DuckLake. +// If migrate is true, adds AUTOMATIC_MIGRATION TRUE to the options. +func buildDuckLakeAttachStmt(dlCfg DuckLakeConfig, migrate bool) string { + connStr := escapeSQLStringLiteral(dlCfg.MetadataStore) + dataPath := dlCfg.ObjectStore + if dataPath == "" { + dataPath = dlCfg.DataPath + } + + var options []string + if dataPath != "" { + options = append(options, fmt.Sprintf("DATA_PATH '%s'", escapeSQLStringLiteral(dataPath))) + } + if migrate { + options = append(options, "AUTOMATIC_MIGRATION TRUE") + } + + if len(options) > 0 { + return fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (%s)", + connStr, strings.Join(options, ", ")) + } + return fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", connStr) +} diff --git a/server/ducklake_migration_test.go b/server/ducklake_migration_test.go new file mode 100644 index 0000000..364fa59 --- /dev/null +++ b/server/ducklake_migration_test.go @@ -0,0 +1,213 @@ +package server + +import ( + "fmt" + "testing" + "time" +) + +func TestBuildDuckLakeAttachStmt(t *testing.T) { + tests := []struct { + name string + dlCfg DuckLakeConfig + migrate bool + want string + }{ + { + name: "basic without data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake", + }, + { + name: "with object store", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path')", + }, + { + name: "with data path fallback", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + DataPath: "/local/data", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH '/local/data')", + }, + { + name: "object store takes precedence over data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + DataPath: "/local/data", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path')", + }, + { + name: "with migration flag", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + }, + migrate: true, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path', AUTOMATIC_MIGRATION TRUE)", + }, + { + name: "migration without data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + }, + migrate: true, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (AUTOMATIC_MIGRATION TRUE)", + }, + { + name: "escapes single quotes in connection string", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost password=it's_secret", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost password=it''s_secret' AS ducklake", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := buildDuckLakeAttachStmt(tt.dlCfg, tt.migrate) + if got != tt.want { + t.Errorf("buildDuckLakeAttachStmt() =\n %s\nwant:\n %s", got, tt.want) + } + }) + } +} + +func TestFormatSQLValue(t *testing.T) { + tests := []struct { + name string + v any + want string + }{ + {"nil", nil, "NULL"}, + {"true", true, "TRUE"}, + {"false", false, "FALSE"}, + {"int", int64(42), "42"}, + {"negative int", int64(-1), "-1"}, + {"float", float64(3.14), "3.14"}, + {"string", "hello", "'hello'"}, + {"string with quote", "it's", "'it''s'"}, + {"string with double quote", `say "hi"`, `'say "hi"'`}, + {"empty string", "", "''"}, + {"bytes", []byte("data"), "'data'"}, + {"bytes with quote", []byte("it's"), "'it''s'"}, + {"time", time.Date(2026, 3, 23, 12, 0, 0, 0, time.UTC), "'2026-03-23T12:00:00Z'"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := formatSQLValue(tt.v) + if got != tt.want { + t.Errorf("formatSQLValue(%v) = %s, want %s", tt.v, got, tt.want) + } + }) + } +} + +func TestQuoteIdent(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + {"simple", "table_name", `"table_name"`}, + {"reserved word", "key", `"key"`}, + {"reserved word value", "value", `"value"`}, + {"with double quote", `say"hi`, `"say""hi"`}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := quoteIdent(tt.in) + if got != tt.want { + t.Errorf("quoteIdent(%q) = %s, want %s", tt.in, got, tt.want) + } + }) + } +} + +func TestVersionLessThan(t *testing.T) { + tests := []struct { + a, b string + want bool + }{ + {"0.3", "0.4", true}, + {"0.4", "0.4", false}, + {"0.4", "0.3", false}, + {"0.4", "0.10", true}, // numeric, not lexicographic + {"0.10", "0.4", false}, // numeric, not lexicographic + {"0.9", "0.10", true}, + {"1.0", "0.10", false}, + {"0.3", "1.0", true}, + } + + for _, tt := range tests { + t.Run(tt.a+"_vs_"+tt.b, func(t *testing.T) { + got, err := versionLessThan(tt.a, tt.b) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Errorf("versionLessThan(%q, %q) = %v, want %v", tt.a, tt.b, got, tt.want) + } + }) + } +} + +func TestVersionLessThan_Invalid(t *testing.T) { + tests := []struct { + a, b string + }{ + {"abc", "0.4"}, + {"0.4", "abc"}, + {"0.4.1", "0.4"}, + {"", "0.4"}, + } + + for _, tt := range tests { + t.Run(tt.a+"_vs_"+tt.b, func(t *testing.T) { + _, err := versionLessThan(tt.a, tt.b) + if err == nil { + t.Errorf("versionLessThan(%q, %q) expected error, got nil", tt.a, tt.b) + } + }) + } +} + +func TestDuckLakeMigrationNeeded_FalseWhenError(t *testing.T) { + // Save and restore global state. + dlMigration.mu.Lock() + origDone := dlMigration.done + origNeeded := dlMigration.needed + origErr := dlMigration.err + dlMigration.mu.Unlock() + defer func() { + dlMigration.mu.Lock() + dlMigration.done = origDone + dlMigration.needed = origNeeded + dlMigration.err = origErr + dlMigration.mu.Unlock() + }() + + dlMigration.mu.Lock() + dlMigration.needed = true + dlMigration.err = fmt.Errorf("backup failed") + dlMigration.mu.Unlock() + + if duckLakeMigrationNeeded() { + t.Error("duckLakeMigrationNeeded() should return false when err is set") + } +} diff --git a/server/querylog.go b/server/querylog.go index dbe36f9..a74358d 100644 --- a/server/querylog.go +++ b/server/querylog.go @@ -95,17 +95,7 @@ func NewQueryLogger(cfg Config) (*QueryLogger, error) { } // Attach DuckLake - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - var attachStmt string - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - escapeSQLStringLiteral(dlCfg.MetadataStore), escapeSQLStringLiteral(dataPath)) - } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", escapeSQLStringLiteral(dlCfg.MetadataStore)) - } + attachStmt := buildDuckLakeAttachStmt(dlCfg, duckLakeMigrationNeeded()) if _, err := db.Exec(attachStmt); err != nil { _ = db.Close() return nil, fmt.Errorf("querylog: attach ducklake: %w", err) diff --git a/server/server.go b/server/server.go index 070a554..0e1cef0 100644 --- a/server/server.go +++ b/server/server.go @@ -766,7 +766,7 @@ func ConfigureDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, us // Attach DuckLake catalog if configured (but don't set as default yet) duckLakeMode := false - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { // If DuckLake was explicitly configured, fail the connection. // Silent fallback to local DB causes schema/table mismatches. if cfg.DuckLake.MetadataStore != "" { @@ -817,7 +817,7 @@ func ActivateDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, use return fmt.Errorf("tenant activation requires ducklake metadata_store") } - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { return fmt.Errorf("DuckLake configured but attachment failed: %w", err) } @@ -854,7 +854,7 @@ func CreatePassthroughDBConnection(cfg Config, duckLakeSem chan struct{}, userna initClickHouseMacros(db) // Attach DuckLake catalog if configured (same data, no pg_catalog views) - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { if cfg.DuckLake.MetadataStore != "" { _ = db.Close() return nil, fmt.Errorf("DuckLake configured but attachment failed: %w", err) @@ -929,11 +929,21 @@ func hasCacheHTTPFS(extensions []string) bool { // AttachDuckLake attaches a DuckLake catalog if configured (but does NOT set it as default). // Call setDuckLakeDefault after creating per-connection views in memory.main. // This is a standalone function so it can be reused by control plane workers. -func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error { +// dataDir is used for writing migration backup files if a schema upgrade is needed. +func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}, dataDir string) error { if dlCfg.MetadataStore == "" { return nil // DuckLake not configured } + // Check if DuckLake metadata needs migration (runs once per process). + // If migration is needed, backs up all metadata tables before proceeding. + // This runs BEFORE the semaphore because the backup can take minutes for + // large metadata stores, and we don't want to block other connections. + ensureDuckLakeMigrationCheck(dlCfg, dataDir) + if dlMigration.err != nil { + return fmt.Errorf("DuckLake migration check failed: %w", dlMigration.err) + } + // Serialize DuckLake attachment to avoid race conditions where multiple // connections try to attach simultaneously, causing errors like // "database with name '__ducklake_metadata_ducklake' already exists". @@ -984,21 +994,15 @@ func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error { "Consider connecting directly to PostgreSQL instead.") } - // Build the ATTACH statement - // Format without data path: ATTACH 'ducklake:' AS ducklake - // Format with data path: ATTACH 'ducklake:' AS ducklake (DATA_PATH '') + // Build the ATTACH statement. // See: https://ducklake.select/docs/stable/duckdb/usage/connecting - var attachStmt string - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - dlCfg.MetadataStore, dataPath) - slog.Info("Attaching DuckLake catalog with data path.", "metadata", redactConnectionString(dlCfg.MetadataStore), "data", dataPath) + migrate := duckLakeMigrationNeeded() + attachStmt := buildDuckLakeAttachStmt(dlCfg, migrate) + if migrate { + slog.Info("Attaching DuckLake catalog with automatic migration.", + "from", dlMigration.checkedV, "to", duckLakeSpecVersion, + "metadata", redactConnectionString(dlCfg.MetadataStore)) } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", dlCfg.MetadataStore) slog.Info("Attaching DuckLake catalog.", "metadata", redactConnectionString(dlCfg.MetadataStore)) }