From 1a48fc9870ee30551fcbe7da7eef1c99a9719bf7 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Mon, 23 Mar 2026 17:51:39 -0700 Subject: [PATCH 1/5] feat: auto-backup and migrate DuckLake metadata on version upgrade MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When DuckLake spec version is older than expected (e.g. 0.3 → 0.4), automatically backup all ducklake_* metadata tables to a SQL file before attaching with AUTOMATIC_MIGRATION TRUE. This ensures safe rollback if migration fails. - Add server/ducklake_migration.go with version detection, backup, and shared ATTACH statement builder - Centralize ATTACH statement construction (was duplicated in 3 places) - Backup is written to /ducklake-backup--v.sql - Migration check runs once per process via sync.Once - Backup failure blocks migration (fail-safe) Co-Authored-By: Claude Opus 4.6 (1M context) --- server/checkpoint.go | 12 +- server/ducklake_migration.go | 322 +++++++++++++++++++++++++++++++++++ server/querylog.go | 12 +- server/server.go | 36 ++-- 4 files changed, 343 insertions(+), 39 deletions(-) create mode 100644 server/ducklake_migration.go diff --git a/server/checkpoint.go b/server/checkpoint.go index d8de843..6dbad61 100644 --- a/server/checkpoint.go +++ b/server/checkpoint.go @@ -59,17 +59,7 @@ func NewDuckLakeCheckpointer(cfg Config) (*DuckLakeCheckpointer, error) { } } - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - var attachStmt string - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - escapeSQLStringLiteral(dlCfg.MetadataStore), escapeSQLStringLiteral(dataPath)) - } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", escapeSQLStringLiteral(dlCfg.MetadataStore)) - } + attachStmt := buildDuckLakeAttachStmt(dlCfg, duckLakeMigrationNeeded()) if _, err := db.Exec(attachStmt); err != nil { _ = db.Close() return nil, fmt.Errorf("checkpoint: attach ducklake: %w", err) diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go new file mode 100644 index 0000000..83acc69 --- /dev/null +++ b/server/ducklake_migration.go @@ -0,0 +1,322 @@ +package server + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +// duckLakeSpecVersion is the DuckLake spec version that this build of duckgres expects. +// When the metadata store is at an older version, we backup and migrate automatically. +const duckLakeSpecVersion = "0.4" + +// dlMigration holds the result of the one-time migration check. +// The check runs at most once per process (sync.Once). +var dlMigration struct { + once sync.Once + needed bool // true if metadata store version < duckLakeSpecVersion + err error // non-nil if the check or backup failed + checkedV string // the version found in the metadata store +} + +// ensureDuckLakeMigrationCheck runs the migration check exactly once. +// If migration is needed, it backs up the metadata store before returning. +// The backup file is written to dataDir. +func ensureDuckLakeMigrationCheck(dlCfg DuckLakeConfig, dataDir string) { + dlMigration.once.Do(func() { + dlMigration.needed, dlMigration.checkedV, dlMigration.err = checkAndBackupIfNeeded(dlCfg, dataDir) + }) +} + +// duckLakeMigrationNeeded returns whether the ATTACH statement should include +// AUTOMATIC_MIGRATION TRUE. Safe to call after ensureDuckLakeMigrationCheck. +func duckLakeMigrationNeeded() bool { + return dlMigration.needed && dlMigration.err == nil +} + +// checkAndBackupIfNeeded connects to the metadata PostgreSQL store, checks the +// DuckLake spec version, and if migration is required, dumps all ducklake_* tables +// to a SQL backup file before returning. +func checkAndBackupIfNeeded(dlCfg DuckLakeConfig, dataDir string) (needed bool, version string, err error) { + if !strings.HasPrefix(dlCfg.MetadataStore, "postgres:") { + return false, "", nil + } + + connStr := strings.TrimPrefix(dlCfg.MetadataStore, "postgres:") + + pgDB, err := sql.Open("pgx", connStr) + if err != nil { + return false, "", fmt.Errorf("open metadata store: %w", err) + } + defer func() { _ = pgDB.Close() }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := pgDB.PingContext(ctx); err != nil { + return false, "", fmt.Errorf("connect to metadata store: %w", err) + } + + // Check if ducklake_metadata table exists (fresh install has no tables yet). + var exists bool + err = pgDB.QueryRowContext(ctx, + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'ducklake_metadata')").Scan(&exists) + if err != nil { + return false, "", fmt.Errorf("check ducklake_metadata existence: %w", err) + } + if !exists { + slog.Info("DuckLake metadata store has no ducklake_metadata table (fresh install), no migration needed.") + return false, "", nil + } + + // Read current spec version. + var ver string + err = pgDB.QueryRowContext(ctx, + "SELECT value FROM ducklake_metadata WHERE key = 'version'").Scan(&ver) + if err != nil { + return false, "", fmt.Errorf("read DuckLake spec version: %w", err) + } + + slog.Info("DuckLake metadata store version detected.", "version", ver, "expected", duckLakeSpecVersion) + + if ver >= duckLakeSpecVersion { + return false, ver, nil + } + + // Migration needed — backup first. + slog.Info("DuckLake metadata migration required. Backing up metadata store before upgrade.", + "from", ver, "to", duckLakeSpecVersion) + + if err := backupDuckLakeMetadata(pgDB, dataDir, ver); err != nil { + return true, ver, fmt.Errorf("backup metadata before migration: %w", err) + } + + return true, ver, nil +} + +// backupDuckLakeMetadata dumps all ducklake_* tables from the PostgreSQL metadata +// store to a SQL file. The file contains CREATE TABLE + INSERT statements that can +// be used to restore the metadata if the migration goes wrong. +func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + // Discover all ducklake_* tables. + rows, err := pgDB.QueryContext(ctx, + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'ducklake_%' ORDER BY table_name") + if err != nil { + return fmt.Errorf("list ducklake tables: %w", err) + } + + var tables []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + rows.Close() + return fmt.Errorf("scan table name: %w", err) + } + tables = append(tables, name) + } + rows.Close() + if err := rows.Err(); err != nil { + return fmt.Errorf("iterate table names: %w", err) + } + + if len(tables) == 0 { + slog.Warn("No ducklake_* tables found in metadata store, nothing to back up.") + return nil + } + + // Create backup file. + timestamp := time.Now().UTC().Format("20060102T150405Z") + backupPath := filepath.Join(dataDir, fmt.Sprintf("ducklake-backup-%s-v%s.sql", timestamp, version)) + + f, err := os.Create(backupPath) + if err != nil { + return fmt.Errorf("create backup file %s: %w", backupPath, err) + } + defer func() { _ = f.Close() }() + + // Write header. + fmt.Fprintf(f, "-- DuckLake metadata backup before migration (v%s → v%s)\n", version, duckLakeSpecVersion) + fmt.Fprintf(f, "-- Generated: %s\n", time.Now().UTC().Format(time.RFC3339)) + fmt.Fprintf(f, "-- Tables: %d\n\n", len(tables)) + fmt.Fprintln(f, "BEGIN;") + + totalRows := 0 + for _, table := range tables { + count, err := backupTable(ctx, pgDB, f, table) + if err != nil { + return fmt.Errorf("backup table %s: %w", table, err) + } + totalRows += count + } + + fmt.Fprintln(f, "\nCOMMIT;") + + if err := f.Close(); err != nil { + return fmt.Errorf("close backup file: %w", err) + } + + info, _ := os.Stat(backupPath) + sizeMB := float64(0) + if info != nil { + sizeMB = float64(info.Size()) / 1024 / 1024 + } + + slog.Info("DuckLake metadata backup completed.", + "path", backupPath, + "tables", len(tables), + "rows", totalRows, + "size_mb", fmt.Sprintf("%.1f", sizeMB)) + + return nil +} + +// backupTable writes CREATE TABLE and INSERT statements for a single table. +// Returns the number of rows backed up. +func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (int, error) { + // Get column definitions. + colRows, err := pgDB.QueryContext(ctx, + "SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position", table) + if err != nil { + return 0, fmt.Errorf("get columns: %w", err) + } + + type colDef struct { + name string + dataType string + nullable string + } + var cols []colDef + for colRows.Next() { + var c colDef + if err := colRows.Scan(&c.name, &c.dataType, &c.nullable); err != nil { + colRows.Close() + return 0, fmt.Errorf("scan column: %w", err) + } + cols = append(cols, c) + } + colRows.Close() + if err := colRows.Err(); err != nil { + return 0, fmt.Errorf("iterate columns: %w", err) + } + + if len(cols) == 0 { + return 0, nil + } + + // Write CREATE TABLE. + fmt.Fprintf(f, "\n-- Table: %s\n", table) + fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", table) + for i, c := range cols { + nullStr := "" + if c.nullable == "NO" { + nullStr = " NOT NULL" + } + comma := "," + if i == len(cols)-1 { + comma = "" + } + fmt.Fprintf(f, " %s %s%s%s\n", c.name, c.dataType, nullStr, comma) + } + fmt.Fprintln(f, ");") + + // Build column name list for SELECT and INSERT. + colNames := make([]string, len(cols)) + for i, c := range cols { + colNames[i] = c.name + } + colList := strings.Join(colNames, ", ") + + // Query all rows. + dataRows, err := pgDB.QueryContext(ctx, fmt.Sprintf("SELECT %s FROM %s", colList, table)) + if err != nil { + return 0, fmt.Errorf("select data: %w", err) + } + defer dataRows.Close() + + count := 0 + scanDest := make([]any, len(cols)) + scanPtrs := make([]any, len(cols)) + for i := range scanDest { + scanPtrs[i] = &scanDest[i] + } + + for dataRows.Next() { + if err := dataRows.Scan(scanPtrs...); err != nil { + return count, fmt.Errorf("scan row: %w", err) + } + + vals := make([]string, len(cols)) + for i, v := range scanDest { + vals[i] = formatSQLValue(v) + } + + fmt.Fprintf(f, "INSERT INTO %s (%s) VALUES (%s);\n", + table, colList, strings.Join(vals, ", ")) + count++ + } + if err := dataRows.Err(); err != nil { + return count, fmt.Errorf("iterate rows: %w", err) + } + + return count, nil +} + +// formatSQLValue formats a Go value as a SQL literal for INSERT statements. +func formatSQLValue(v any) string { + if v == nil { + return "NULL" + } + switch val := v.(type) { + case bool: + if val { + return "TRUE" + } + return "FALSE" + case int64: + return fmt.Sprintf("%d", val) + case float64: + return fmt.Sprintf("%g", val) + case []byte: + return fmt.Sprintf("'%s'", strings.ReplaceAll(string(val), "'", "''")) + case string: + return fmt.Sprintf("'%s'", strings.ReplaceAll(val, "'", "''")) + case time.Time: + return fmt.Sprintf("'%s'", val.Format(time.RFC3339Nano)) + default: + s := fmt.Sprintf("%v", val) + return fmt.Sprintf("'%s'", strings.ReplaceAll(s, "'", "''")) + } +} + +// buildDuckLakeAttachStmt builds the ATTACH statement for DuckLake. +// If migrate is true, adds AUTOMATIC_MIGRATION TRUE to the options. +func buildDuckLakeAttachStmt(dlCfg DuckLakeConfig, migrate bool) string { + connStr := escapeSQLStringLiteral(dlCfg.MetadataStore) + dataPath := dlCfg.ObjectStore + if dataPath == "" { + dataPath = dlCfg.DataPath + } + + var options []string + if dataPath != "" { + options = append(options, fmt.Sprintf("DATA_PATH '%s'", escapeSQLStringLiteral(dataPath))) + } + if migrate { + options = append(options, "AUTOMATIC_MIGRATION TRUE") + } + + if len(options) > 0 { + return fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (%s)", + connStr, strings.Join(options, ", ")) + } + return fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", connStr) +} diff --git a/server/querylog.go b/server/querylog.go index dbe36f9..a74358d 100644 --- a/server/querylog.go +++ b/server/querylog.go @@ -95,17 +95,7 @@ func NewQueryLogger(cfg Config) (*QueryLogger, error) { } // Attach DuckLake - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - var attachStmt string - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - escapeSQLStringLiteral(dlCfg.MetadataStore), escapeSQLStringLiteral(dataPath)) - } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", escapeSQLStringLiteral(dlCfg.MetadataStore)) - } + attachStmt := buildDuckLakeAttachStmt(dlCfg, duckLakeMigrationNeeded()) if _, err := db.Exec(attachStmt); err != nil { _ = db.Close() return nil, fmt.Errorf("querylog: attach ducklake: %w", err) diff --git a/server/server.go b/server/server.go index 070a554..5a56e96 100644 --- a/server/server.go +++ b/server/server.go @@ -766,7 +766,7 @@ func ConfigureDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, us // Attach DuckLake catalog if configured (but don't set as default yet) duckLakeMode := false - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { // If DuckLake was explicitly configured, fail the connection. // Silent fallback to local DB causes schema/table mismatches. if cfg.DuckLake.MetadataStore != "" { @@ -817,7 +817,7 @@ func ActivateDBConnection(db *sql.DB, cfg Config, duckLakeSem chan struct{}, use return fmt.Errorf("tenant activation requires ducklake metadata_store") } - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { return fmt.Errorf("DuckLake configured but attachment failed: %w", err) } @@ -854,7 +854,7 @@ func CreatePassthroughDBConnection(cfg Config, duckLakeSem chan struct{}, userna initClickHouseMacros(db) // Attach DuckLake catalog if configured (same data, no pg_catalog views) - if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem); err != nil { + if err := AttachDuckLake(db, cfg.DuckLake, duckLakeSem, cfg.DataDir); err != nil { if cfg.DuckLake.MetadataStore != "" { _ = db.Close() return nil, fmt.Errorf("DuckLake configured but attachment failed: %w", err) @@ -929,7 +929,8 @@ func hasCacheHTTPFS(extensions []string) bool { // AttachDuckLake attaches a DuckLake catalog if configured (but does NOT set it as default). // Call setDuckLakeDefault after creating per-connection views in memory.main. // This is a standalone function so it can be reused by control plane workers. -func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error { +// dataDir is used for writing migration backup files if a schema upgrade is needed. +func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}, dataDir string) error { if dlCfg.MetadataStore == "" { return nil // DuckLake not configured } @@ -984,21 +985,22 @@ func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}) error { "Consider connecting directly to PostgreSQL instead.") } - // Build the ATTACH statement - // Format without data path: ATTACH 'ducklake:' AS ducklake - // Format with data path: ATTACH 'ducklake:' AS ducklake (DATA_PATH '') + // Check if DuckLake metadata needs migration (runs once per process). + // If migration is needed, backs up all metadata tables before proceeding. + ensureDuckLakeMigrationCheck(dlCfg, dataDir) + if dlMigration.err != nil { + return fmt.Errorf("DuckLake migration check failed: %w", dlMigration.err) + } + + // Build the ATTACH statement. // See: https://ducklake.select/docs/stable/duckdb/usage/connecting - var attachStmt string - dataPath := dlCfg.ObjectStore - if dataPath == "" { - dataPath = dlCfg.DataPath - } - if dataPath != "" { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake (DATA_PATH '%s')", - dlCfg.MetadataStore, dataPath) - slog.Info("Attaching DuckLake catalog with data path.", "metadata", redactConnectionString(dlCfg.MetadataStore), "data", dataPath) + migrate := duckLakeMigrationNeeded() + attachStmt := buildDuckLakeAttachStmt(dlCfg, migrate) + if migrate { + slog.Info("Attaching DuckLake catalog with automatic migration.", + "from", dlMigration.checkedV, "to", duckLakeSpecVersion, + "metadata", redactConnectionString(dlCfg.MetadataStore)) } else { - attachStmt = fmt.Sprintf("ATTACH 'ducklake:%s' AS ducklake", dlCfg.MetadataStore) slog.Info("Attaching DuckLake catalog.", "metadata", redactConnectionString(dlCfg.MetadataStore)) } From 1572326d5681b40546f08759a404c2ba991e62a1 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Mon, 23 Mar 2026 17:57:02 -0700 Subject: [PATCH 2/5] fix: address code review feedback on DuckLake migration - Quote all SQL identifiers with double quotes to handle reserved words (e.g. "key", "value" in ducklake_metadata table) - Replace string comparison with numeric version parsing to handle versions like "0.10" correctly - Move migration check before the DuckLake semaphore so backup doesn't block other connections for up to 10 minutes - Add fsync before closing backup file for crash safety - Fix double-close on backup file using closed flag - Add comment explaining sync.Once is correct for multitenant mode (each worker process serves one tenant) - Add comment on []byte assumption in formatSQLValue - Log backup path at INFO before starting (not just after) - Add unit tests for buildDuckLakeAttachStmt, formatSQLValue, quoteIdent, versionLessThan, and duckLakeMigrationNeeded Co-Authored-By: Claude Opus 4.6 (1M context) --- server/ducklake_migration.go | 97 ++++++++++++-- server/ducklake_migration_test.go | 205 ++++++++++++++++++++++++++++++ server/server.go | 16 ++- 3 files changed, 297 insertions(+), 21 deletions(-) create mode 100644 server/ducklake_migration_test.go diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go index 83acc69..6594b7c 100644 --- a/server/ducklake_migration.go +++ b/server/ducklake_migration.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -18,16 +19,24 @@ const duckLakeSpecVersion = "0.4" // dlMigration holds the result of the one-time migration check. // The check runs at most once per process (sync.Once). +// +// In multitenant control-plane mode, each worker process serves a single tenant +// with its own metadata store, so the per-process sync.Once is correct. +// If this changes (multiple metadata stores per process), this must be replaced +// with a sync.Map keyed by metadata store connection string. var dlMigration struct { once sync.Once - needed bool // true if metadata store version < duckLakeSpecVersion - err error // non-nil if the check or backup failed + needed bool // true if metadata store version < duckLakeSpecVersion + err error // non-nil if the check or backup failed checkedV string // the version found in the metadata store } // ensureDuckLakeMigrationCheck runs the migration check exactly once. // If migration is needed, it backs up the metadata store before returning. // The backup file is written to dataDir. +// +// This should be called BEFORE acquiring the DuckLake attachment semaphore, +// since the backup can take minutes for large metadata stores. func ensureDuckLakeMigrationCheck(dlCfg DuckLakeConfig, dataDir string) { dlMigration.once.Do(func() { dlMigration.needed, dlMigration.checkedV, dlMigration.err = checkAndBackupIfNeeded(dlCfg, dataDir) @@ -40,6 +49,39 @@ func duckLakeMigrationNeeded() bool { return dlMigration.needed && dlMigration.err == nil } +// parseDuckLakeVersion parses a DuckLake version string like "0.3" into +// (major, minor) integers for reliable numeric comparison. +// Returns (0, 0, err) if the string cannot be parsed. +func parseDuckLakeVersion(ver string) (major, minor int, err error) { + parts := strings.SplitN(ver, ".", 2) + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid version format: %q", ver) + } + major, err = strconv.Atoi(parts[0]) + if err != nil { + return 0, 0, fmt.Errorf("invalid major version in %q: %w", ver, err) + } + minor, err = strconv.Atoi(parts[1]) + if err != nil { + return 0, 0, fmt.Errorf("invalid minor version in %q: %w", ver, err) + } + return major, minor, nil +} + +// versionLessThan returns true if version a is strictly less than version b. +// Both must be in "major.minor" format (e.g., "0.3", "0.4", "0.10"). +func versionLessThan(a, b string) (bool, error) { + aMaj, aMin, err := parseDuckLakeVersion(a) + if err != nil { + return false, err + } + bMaj, bMin, err := parseDuckLakeVersion(b) + if err != nil { + return false, err + } + return aMaj < bMaj || (aMaj == bMaj && aMin < bMin), nil +} + // checkAndBackupIfNeeded connects to the metadata PostgreSQL store, checks the // DuckLake spec version, and if migration is required, dumps all ducklake_* tables // to a SQL backup file before returning. @@ -78,14 +120,18 @@ func checkAndBackupIfNeeded(dlCfg DuckLakeConfig, dataDir string) (needed bool, // Read current spec version. var ver string err = pgDB.QueryRowContext(ctx, - "SELECT value FROM ducklake_metadata WHERE key = 'version'").Scan(&ver) + `SELECT "value" FROM ducklake_metadata WHERE "key" = 'version'`).Scan(&ver) if err != nil { return false, "", fmt.Errorf("read DuckLake spec version: %w", err) } slog.Info("DuckLake metadata store version detected.", "version", ver, "expected", duckLakeSpecVersion) - if ver >= duckLakeSpecVersion { + less, err := versionLessThan(ver, duckLakeSpecVersion) + if err != nil { + return false, ver, fmt.Errorf("compare DuckLake versions: %w", err) + } + if !less { return false, ver, nil } @@ -137,11 +183,18 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error timestamp := time.Now().UTC().Format("20060102T150405Z") backupPath := filepath.Join(dataDir, fmt.Sprintf("ducklake-backup-%s-v%s.sql", timestamp, version)) + slog.Info("Starting DuckLake metadata backup.", "path", backupPath, "tables", len(tables)) + f, err := os.Create(backupPath) if err != nil { return fmt.Errorf("create backup file %s: %w", backupPath, err) } - defer func() { _ = f.Close() }() + closed := false + defer func() { + if !closed { + _ = f.Close() + } + }() // Write header. fmt.Fprintf(f, "-- DuckLake metadata backup before migration (v%s → v%s)\n", version, duckLakeSpecVersion) @@ -160,9 +213,15 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error fmt.Fprintln(f, "\nCOMMIT;") + // Flush to disk before closing — this is a critical safety net file. + if err := f.Sync(); err != nil { + return fmt.Errorf("fsync backup file: %w", err) + } + if err := f.Close(); err != nil { return fmt.Errorf("close backup file: %w", err) } + closed = true info, _ := os.Stat(backupPath) sizeMB := float64(0) @@ -179,6 +238,12 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error return nil } +// quoteIdent quotes a PostgreSQL identifier with double quotes. +// Any embedded double quotes are doubled per SQL standard. +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` +} + // backupTable writes CREATE TABLE and INSERT statements for a single table. // Returns the number of rows backed up. func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (int, error) { @@ -212,9 +277,10 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i return 0, nil } - // Write CREATE TABLE. + // Write CREATE TABLE with quoted identifiers. + quotedTable := quoteIdent(table) fmt.Fprintf(f, "\n-- Table: %s\n", table) - fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", table) + fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", quotedTable) for i, c := range cols { nullStr := "" if c.nullable == "NO" { @@ -224,19 +290,20 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i if i == len(cols)-1 { comma = "" } - fmt.Fprintf(f, " %s %s%s%s\n", c.name, c.dataType, nullStr, comma) + fmt.Fprintf(f, " %s %s%s%s\n", quoteIdent(c.name), c.dataType, nullStr, comma) } fmt.Fprintln(f, ");") - // Build column name list for SELECT and INSERT. - colNames := make([]string, len(cols)) + // Build quoted column name list for SELECT and INSERT. + quotedColNames := make([]string, len(cols)) for i, c := range cols { - colNames[i] = c.name + quotedColNames[i] = quoteIdent(c.name) } - colList := strings.Join(colNames, ", ") + quotedColList := strings.Join(quotedColNames, ", ") // Query all rows. - dataRows, err := pgDB.QueryContext(ctx, fmt.Sprintf("SELECT %s FROM %s", colList, table)) + dataRows, err := pgDB.QueryContext(ctx, + fmt.Sprintf("SELECT %s FROM %s", quotedColList, quotedTable)) if err != nil { return 0, fmt.Errorf("select data: %w", err) } @@ -260,7 +327,7 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i } fmt.Fprintf(f, "INSERT INTO %s (%s) VALUES (%s);\n", - table, colList, strings.Join(vals, ", ")) + quotedTable, quotedColList, strings.Join(vals, ", ")) count++ } if err := dataRows.Err(); err != nil { @@ -271,6 +338,8 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i } // formatSQLValue formats a Go value as a SQL literal for INSERT statements. +// Note: []byte is treated as UTF-8 text (fine for DuckLake metadata which stores +// only text, integers, and booleans — no bytea columns). func formatSQLValue(v any) string { if v == nil { return "NULL" diff --git a/server/ducklake_migration_test.go b/server/ducklake_migration_test.go new file mode 100644 index 0000000..ee414f9 --- /dev/null +++ b/server/ducklake_migration_test.go @@ -0,0 +1,205 @@ +package server + +import ( + "fmt" + "testing" + "time" +) + +func TestBuildDuckLakeAttachStmt(t *testing.T) { + tests := []struct { + name string + dlCfg DuckLakeConfig + migrate bool + want string + }{ + { + name: "basic without data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake", + }, + { + name: "with object store", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path')", + }, + { + name: "with data path fallback", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + DataPath: "/local/data", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH '/local/data')", + }, + { + name: "object store takes precedence over data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + DataPath: "/local/data", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path')", + }, + { + name: "with migration flag", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + ObjectStore: "s3://bucket/path", + }, + migrate: true, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (DATA_PATH 's3://bucket/path', AUTOMATIC_MIGRATION TRUE)", + }, + { + name: "migration without data path", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost dbname=dl", + }, + migrate: true, + want: "ATTACH 'ducklake:postgres:host=localhost dbname=dl' AS ducklake (AUTOMATIC_MIGRATION TRUE)", + }, + { + name: "escapes single quotes in connection string", + dlCfg: DuckLakeConfig{ + MetadataStore: "postgres:host=localhost password=it's_secret", + }, + migrate: false, + want: "ATTACH 'ducklake:postgres:host=localhost password=it''s_secret' AS ducklake", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := buildDuckLakeAttachStmt(tt.dlCfg, tt.migrate) + if got != tt.want { + t.Errorf("buildDuckLakeAttachStmt() =\n %s\nwant:\n %s", got, tt.want) + } + }) + } +} + +func TestFormatSQLValue(t *testing.T) { + tests := []struct { + name string + v any + want string + }{ + {"nil", nil, "NULL"}, + {"true", true, "TRUE"}, + {"false", false, "FALSE"}, + {"int", int64(42), "42"}, + {"negative int", int64(-1), "-1"}, + {"float", float64(3.14), "3.14"}, + {"string", "hello", "'hello'"}, + {"string with quote", "it's", "'it''s'"}, + {"string with double quote", `say "hi"`, `'say "hi"'`}, + {"empty string", "", "''"}, + {"bytes", []byte("data"), "'data'"}, + {"bytes with quote", []byte("it's"), "'it''s'"}, + {"time", time.Date(2026, 3, 23, 12, 0, 0, 0, time.UTC), "'2026-03-23T12:00:00Z'"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := formatSQLValue(tt.v) + if got != tt.want { + t.Errorf("formatSQLValue(%v) = %s, want %s", tt.v, got, tt.want) + } + }) + } +} + +func TestQuoteIdent(t *testing.T) { + tests := []struct { + name string + in string + want string + }{ + {"simple", "table_name", `"table_name"`}, + {"reserved word", "key", `"key"`}, + {"reserved word value", "value", `"value"`}, + {"with double quote", `say"hi`, `"say""hi"`}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := quoteIdent(tt.in) + if got != tt.want { + t.Errorf("quoteIdent(%q) = %s, want %s", tt.in, got, tt.want) + } + }) + } +} + +func TestVersionLessThan(t *testing.T) { + tests := []struct { + a, b string + want bool + }{ + {"0.3", "0.4", true}, + {"0.4", "0.4", false}, + {"0.4", "0.3", false}, + {"0.4", "0.10", true}, // numeric, not lexicographic + {"0.10", "0.4", false}, // numeric, not lexicographic + {"0.9", "0.10", true}, + {"1.0", "0.10", false}, + {"0.3", "1.0", true}, + } + + for _, tt := range tests { + t.Run(tt.a+"_vs_"+tt.b, func(t *testing.T) { + got, err := versionLessThan(tt.a, tt.b) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Errorf("versionLessThan(%q, %q) = %v, want %v", tt.a, tt.b, got, tt.want) + } + }) + } +} + +func TestVersionLessThan_Invalid(t *testing.T) { + tests := []struct { + a, b string + }{ + {"abc", "0.4"}, + {"0.4", "abc"}, + {"0.4.1", "0.4"}, + {"", "0.4"}, + } + + for _, tt := range tests { + t.Run(tt.a+"_vs_"+tt.b, func(t *testing.T) { + _, err := versionLessThan(tt.a, tt.b) + if err == nil { + t.Errorf("versionLessThan(%q, %q) expected error, got nil", tt.a, tt.b) + } + }) + } +} + +func TestDuckLakeMigrationNeeded_FalseWhenError(t *testing.T) { + // Save and restore global state (skip sync.Once — it's not copyable). + origNeeded := dlMigration.needed + origErr := dlMigration.err + defer func() { + dlMigration.needed = origNeeded + dlMigration.err = origErr + }() + + dlMigration.needed = true + dlMigration.err = fmt.Errorf("backup failed") + + if duckLakeMigrationNeeded() { + t.Error("duckLakeMigrationNeeded() should return false when err is set") + } +} diff --git a/server/server.go b/server/server.go index 5a56e96..0e1cef0 100644 --- a/server/server.go +++ b/server/server.go @@ -935,6 +935,15 @@ func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}, dataDir return nil // DuckLake not configured } + // Check if DuckLake metadata needs migration (runs once per process). + // If migration is needed, backs up all metadata tables before proceeding. + // This runs BEFORE the semaphore because the backup can take minutes for + // large metadata stores, and we don't want to block other connections. + ensureDuckLakeMigrationCheck(dlCfg, dataDir) + if dlMigration.err != nil { + return fmt.Errorf("DuckLake migration check failed: %w", dlMigration.err) + } + // Serialize DuckLake attachment to avoid race conditions where multiple // connections try to attach simultaneously, causing errors like // "database with name '__ducklake_metadata_ducklake' already exists". @@ -985,13 +994,6 @@ func AttachDuckLake(db *sql.DB, dlCfg DuckLakeConfig, sem chan struct{}, dataDir "Consider connecting directly to PostgreSQL instead.") } - // Check if DuckLake metadata needs migration (runs once per process). - // If migration is needed, backs up all metadata tables before proceeding. - ensureDuckLakeMigrationCheck(dlCfg, dataDir) - if dlMigration.err != nil { - return fmt.Errorf("DuckLake migration check failed: %w", dlMigration.err) - } - // Build the ATTACH statement. // See: https://ducklake.select/docs/stable/duckdb/usage/connecting migrate := duckLakeMigrationNeeded() From f54f964297fafe1eace8a270153a2ac3c07efc5a Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Mon, 23 Mar 2026 21:16:26 -0700 Subject: [PATCH 3/5] fix: check errcheck lint errors in ducklake_migration.go Handle return values from rows.Close(), fmt.Fprintf(), fmt.Fprintln(), and dataRows.Close() to satisfy the errcheck linter. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/ducklake_migration.go | 52 +++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go index 6594b7c..7dd6f7b 100644 --- a/server/ducklake_migration.go +++ b/server/ducklake_migration.go @@ -164,12 +164,12 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error for rows.Next() { var name string if err := rows.Scan(&name); err != nil { - rows.Close() + _ = rows.Close() return fmt.Errorf("scan table name: %w", err) } tables = append(tables, name) } - rows.Close() + _ = rows.Close() if err := rows.Err(); err != nil { return fmt.Errorf("iterate table names: %w", err) } @@ -197,10 +197,18 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error }() // Write header. - fmt.Fprintf(f, "-- DuckLake metadata backup before migration (v%s → v%s)\n", version, duckLakeSpecVersion) - fmt.Fprintf(f, "-- Generated: %s\n", time.Now().UTC().Format(time.RFC3339)) - fmt.Fprintf(f, "-- Tables: %d\n\n", len(tables)) - fmt.Fprintln(f, "BEGIN;") + if _, err := fmt.Fprintf(f, "-- DuckLake metadata backup before migration (v%s → v%s)\n", version, duckLakeSpecVersion); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintf(f, "-- Generated: %s\n", time.Now().UTC().Format(time.RFC3339)); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintf(f, "-- Tables: %d\n\n", len(tables)); err != nil { + return fmt.Errorf("write backup header: %w", err) + } + if _, err := fmt.Fprintln(f, "BEGIN;"); err != nil { + return fmt.Errorf("write backup header: %w", err) + } totalRows := 0 for _, table := range tables { @@ -211,7 +219,9 @@ func backupDuckLakeMetadata(pgDB *sql.DB, dataDir string, version string) error totalRows += count } - fmt.Fprintln(f, "\nCOMMIT;") + if _, err := fmt.Fprintln(f, "\nCOMMIT;"); err != nil { + return fmt.Errorf("write backup footer: %w", err) + } // Flush to disk before closing — this is a critical safety net file. if err := f.Sync(); err != nil { @@ -263,12 +273,12 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i for colRows.Next() { var c colDef if err := colRows.Scan(&c.name, &c.dataType, &c.nullable); err != nil { - colRows.Close() + _ = colRows.Close() return 0, fmt.Errorf("scan column: %w", err) } cols = append(cols, c) } - colRows.Close() + _ = colRows.Close() if err := colRows.Err(); err != nil { return 0, fmt.Errorf("iterate columns: %w", err) } @@ -279,8 +289,12 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i // Write CREATE TABLE with quoted identifiers. quotedTable := quoteIdent(table) - fmt.Fprintf(f, "\n-- Table: %s\n", table) - fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", quotedTable) + if _, err := fmt.Fprintf(f, "\n-- Table: %s\n", table); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + if _, err := fmt.Fprintf(f, "CREATE TABLE IF NOT EXISTS %s (\n", quotedTable); err != nil { + return 0, fmt.Errorf("write: %w", err) + } for i, c := range cols { nullStr := "" if c.nullable == "NO" { @@ -290,9 +304,13 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i if i == len(cols)-1 { comma = "" } - fmt.Fprintf(f, " %s %s%s%s\n", quoteIdent(c.name), c.dataType, nullStr, comma) + if _, err := fmt.Fprintf(f, " %s %s%s%s\n", quoteIdent(c.name), c.dataType, nullStr, comma); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + } + if _, err := fmt.Fprintln(f, ");"); err != nil { + return 0, fmt.Errorf("write: %w", err) } - fmt.Fprintln(f, ");") // Build quoted column name list for SELECT and INSERT. quotedColNames := make([]string, len(cols)) @@ -307,7 +325,7 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i if err != nil { return 0, fmt.Errorf("select data: %w", err) } - defer dataRows.Close() + defer func() { _ = dataRows.Close() }() count := 0 scanDest := make([]any, len(cols)) @@ -326,8 +344,10 @@ func backupTable(ctx context.Context, pgDB *sql.DB, f *os.File, table string) (i vals[i] = formatSQLValue(v) } - fmt.Fprintf(f, "INSERT INTO %s (%s) VALUES (%s);\n", - quotedTable, quotedColList, strings.Join(vals, ", ")) + if _, err := fmt.Fprintf(f, "INSERT INTO %s (%s) VALUES (%s);\n", + quotedTable, quotedColList, strings.Join(vals, ", ")); err != nil { + return count, fmt.Errorf("write: %w", err) + } count++ } if err := dataRows.Err(); err != nil { From 0a444b59a9b5bd5519b514146644cd0deb39f097 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 24 Mar 2026 16:49:36 -0700 Subject: [PATCH 4/5] fix: retry migration check on transient errors instead of failing permanently The sync.Once caused a transient failure (e.g., metadata store not yet reachable during k8s pod startup) to permanently block all connections. Replace with a sync.Mutex that locks in the result only on success, allowing retries on failure. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/ducklake_migration.go | 40 ++++++++++++++++++++++--------- server/ducklake_migration_test.go | 10 +++++++- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go index 7dd6f7b..203ebdc 100644 --- a/server/ducklake_migration.go +++ b/server/ducklake_migration.go @@ -17,30 +17,48 @@ import ( // When the metadata store is at an older version, we backup and migrate automatically. const duckLakeSpecVersion = "0.4" -// dlMigration holds the result of the one-time migration check. -// The check runs at most once per process (sync.Once). +// dlMigration holds the result of the migration check. +// The check retries on transient errors (e.g., metadata store not reachable yet) +// but locks in the result once it succeeds. // // In multitenant control-plane mode, each worker process serves a single tenant -// with its own metadata store, so the per-process sync.Once is correct. +// with its own metadata store, so the per-process state is correct. // If this changes (multiple metadata stores per process), this must be replaced // with a sync.Map keyed by metadata store connection string. var dlMigration struct { - once sync.Once + mu sync.Mutex + done bool // true once the check has completed successfully needed bool // true if metadata store version < duckLakeSpecVersion - err error // non-nil if the check or backup failed + err error // non-nil if the most recent check or backup failed checkedV string // the version found in the metadata store } -// ensureDuckLakeMigrationCheck runs the migration check exactly once. -// If migration is needed, it backs up the metadata store before returning. -// The backup file is written to dataDir. +// ensureDuckLakeMigrationCheck runs the migration check, retrying on transient errors. +// Once the check succeeds (regardless of whether migration is needed), the result +// is locked in and subsequent calls are no-ops. If the check fails, the error is +// stored but the next call will retry — this prevents a transient failure (e.g., +// metadata store not yet reachable during pod startup) from permanently blocking +// all connections. // +// The backup file is written to dataDir. // This should be called BEFORE acquiring the DuckLake attachment semaphore, // since the backup can take minutes for large metadata stores. func ensureDuckLakeMigrationCheck(dlCfg DuckLakeConfig, dataDir string) { - dlMigration.once.Do(func() { - dlMigration.needed, dlMigration.checkedV, dlMigration.err = checkAndBackupIfNeeded(dlCfg, dataDir) - }) + dlMigration.mu.Lock() + defer dlMigration.mu.Unlock() + + if dlMigration.done { + return + } + + needed, ver, err := checkAndBackupIfNeeded(dlCfg, dataDir) + dlMigration.needed = needed + dlMigration.checkedV = ver + dlMigration.err = err + + if err == nil { + dlMigration.done = true + } } // duckLakeMigrationNeeded returns whether the ATTACH statement should include diff --git a/server/ducklake_migration_test.go b/server/ducklake_migration_test.go index ee414f9..364fa59 100644 --- a/server/ducklake_migration_test.go +++ b/server/ducklake_migration_test.go @@ -188,16 +188,24 @@ func TestVersionLessThan_Invalid(t *testing.T) { } func TestDuckLakeMigrationNeeded_FalseWhenError(t *testing.T) { - // Save and restore global state (skip sync.Once — it's not copyable). + // Save and restore global state. + dlMigration.mu.Lock() + origDone := dlMigration.done origNeeded := dlMigration.needed origErr := dlMigration.err + dlMigration.mu.Unlock() defer func() { + dlMigration.mu.Lock() + dlMigration.done = origDone dlMigration.needed = origNeeded dlMigration.err = origErr + dlMigration.mu.Unlock() }() + dlMigration.mu.Lock() dlMigration.needed = true dlMigration.err = fmt.Errorf("backup failed") + dlMigration.mu.Unlock() if duckLakeMigrationNeeded() { t.Error("duckLakeMigrationNeeded() should return false when err is set") From 92ed9c5a6c22b97036fb3ee6b75991adfe67c747 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 24 Mar 2026 17:07:38 -0700 Subject: [PATCH 5/5] fix: set duckLakeSpecVersion to 0.3 to match current DuckDB driver The bundled DuckDB (duckdb-go v2.5.3) ships DuckLake 0.3, which does not support the AUTOMATIC_MIGRATION option. Setting the expected version to 0.4 caused migration to trigger on every fresh metadata store, failing with "Unsupported option automatic_migration for DuckLake". Bump to 0.4 only after upgrading to DuckDB 1.5.x. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/ducklake_migration.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/ducklake_migration.go b/server/ducklake_migration.go index 203ebdc..2e2836d 100644 --- a/server/ducklake_migration.go +++ b/server/ducklake_migration.go @@ -15,7 +15,10 @@ import ( // duckLakeSpecVersion is the DuckLake spec version that this build of duckgres expects. // When the metadata store is at an older version, we backup and migrate automatically. -const duckLakeSpecVersion = "0.4" +// IMPORTANT: This must match the DuckLake version bundled with the current DuckDB driver. +// Bump this to "0.4" only after upgrading duckdb-go to a version that ships DuckLake 0.4 +// (DuckDB 1.5.x), since AUTOMATIC_MIGRATION is not supported by older DuckLake extensions. +const duckLakeSpecVersion = "0.3" // dlMigration holds the result of the migration check. // The check retries on transient errors (e.g., metadata store not reachable yet)