diff --git a/config_resolution.go b/config_resolution.go index 0baa73b..60d6dfc 100644 --- a/config_resolution.go +++ b/config_resolution.go @@ -21,6 +21,7 @@ type configCLIInputs struct { DataDir string CertFile string KeyFile string + FilePersistence bool ProcessIsolation bool IdleTimeout string MemoryLimit string @@ -225,6 +226,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun cfg.DuckLake.S3Profile = fileCfg.DuckLake.S3Profile } + cfg.FilePersistence = fileCfg.FilePersistence cfg.ProcessIsolation = fileCfg.ProcessIsolation if fileCfg.IdleTimeout != "" { if d, err := time.ParseDuration(fileCfg.IdleTimeout); err == nil { @@ -435,6 +437,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun if v := getenv("DUCKGRES_DUCKLAKE_S3_PROFILE"); v != "" { cfg.DuckLake.S3Profile = v } + if v := getenv("DUCKGRES_FILE_PERSISTENCE"); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + cfg.FilePersistence = b + } else { + warn("Invalid DUCKGRES_FILE_PERSISTENCE: " + err.Error()) + } + } if v := getenv("DUCKGRES_PROCESS_ISOLATION"); v != "" { if b, err := strconv.ParseBool(v); err == nil { cfg.ProcessIsolation = b @@ -638,6 +647,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun if cli.Set["key"] { cfg.TLSKeyFile = cli.KeyFile } + if cli.Set["file-persistence"] { + cfg.FilePersistence = cli.FilePersistence + } if cli.Set["process-isolation"] { cfg.ProcessIsolation = cli.ProcessIsolation } @@ -755,6 +767,12 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun cfg.ACMEDNSZoneID = "" } + // Validate file persistence requires data_dir + if cfg.FilePersistence && cfg.DataDir == "" { + warn("file_persistence is enabled but data_dir is empty; disabling file persistence") + cfg.FilePersistence = false + } + // Validate memory_limit format if explicitly set if cfg.MemoryLimit != "" && !server.ValidateMemoryLimit(cfg.MemoryLimit) { warn("Invalid memory_limit format: " + cfg.MemoryLimit + " (expected e.g. '4GB', '512MB')") diff --git a/main.go b/main.go index 65ff577..bd9f5d0 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ type FileConfig struct { RateLimit RateLimitFileConfig `yaml:"rate_limit"` Extensions []string `yaml:"extensions"` DuckLake DuckLakeFileConfig `yaml:"ducklake"` + FilePersistence bool `yaml:"file_persistence"` // Persist DuckDB to /.duckdb instead of :memory: ProcessIsolation bool `yaml:"process_isolation"` // Enable process isolation per connection IdleTimeout string `yaml:"idle_timeout"` // e.g., "24h", "1h", "-1" to disable MemoryLimit string `yaml:"memory_limit"` // DuckDB memory_limit per session (e.g., "4GB") @@ -194,6 +195,7 @@ func main() { dataDir := flag.String("data-dir", "", "Directory for DuckDB files (env: DUCKGRES_DATA_DIR)") certFile := flag.String("cert", "", "TLS certificate file (env: DUCKGRES_CERT)") keyFile := flag.String("key", "", "TLS private key file (env: DUCKGRES_KEY)") + filePersistence := flag.Bool("file-persistence", false, "Persist DuckDB to /.duckdb instead of in-memory (env: DUCKGRES_FILE_PERSISTENCE)") processIsolation := flag.Bool("process-isolation", false, "Enable process isolation (spawn child process per connection)") idleTimeout := flag.String("idle-timeout", "", "Connection idle timeout (e.g., '30m', '1h', '-1' to disable) (env: DUCKGRES_IDLE_TIMEOUT)") memoryLimit := flag.String("memory-limit", "", "DuckDB memory_limit per session (e.g., '4GB') (env: DUCKGRES_MEMORY_LIMIT)") @@ -259,6 +261,7 @@ func main() { fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n") + fmt.Fprintf(os.Stderr, " DUCKGRES_FILE_PERSISTENCE Persist DuckDB to /.duckdb (1 or true)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_PROCESS_ISOLATION Enable process isolation (1 or true)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_IDLE_TIMEOUT Connection idle timeout (e.g., 30m, 1h, -1 to disable)\n") fmt.Fprintf(os.Stderr, " DUCKGRES_MEMORY_LIMIT DuckDB memory_limit per session (e.g., 4GB)\n") @@ -353,6 +356,7 @@ func main() { DataDir: *dataDir, CertFile: *certFile, KeyFile: *keyFile, + FilePersistence: *filePersistence, ProcessIsolation: *processIsolation, IdleTimeout: *idleTimeout, MemoryLimit: *memoryLimit, diff --git a/main_test.go b/main_test.go index 8999c59..a1fa003 100644 --- a/main_test.go +++ b/main_test.go @@ -684,6 +684,86 @@ func TestResolveEffectiveConfigACMEDNSProviderValidation(t *testing.T) { } } +func TestResolveEffectiveConfigFilePersistenceFromFile(t *testing.T) { + fileCfg := &FileConfig{ + FilePersistence: true, + DataDir: "/tmp/data", + } + resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(nil), nil) + if !resolved.Server.FilePersistence { + t.Fatal("expected file_persistence from YAML to be true") + } +} + +func TestResolveEffectiveConfigFilePersistenceFromEnv(t *testing.T) { + env := map[string]string{ + "DUCKGRES_FILE_PERSISTENCE": "true", + } + resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(env), nil) + if !resolved.Server.FilePersistence { + t.Fatal("expected file_persistence from env to be true") + } +} + +func TestResolveEffectiveConfigFilePersistenceEnvOverridesFile(t *testing.T) { + fileCfg := &FileConfig{ + FilePersistence: true, + } + env := map[string]string{ + "DUCKGRES_FILE_PERSISTENCE": "false", + } + resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(env), nil) + if resolved.Server.FilePersistence { + t.Fatal("expected env false to override file true") + } +} + +func TestResolveEffectiveConfigFilePersistenceCLIOverridesEnv(t *testing.T) { + env := map[string]string{ + "DUCKGRES_FILE_PERSISTENCE": "false", + } + resolved := resolveEffectiveConfig(nil, configCLIInputs{ + Set: map[string]bool{"file-persistence": true}, + FilePersistence: true, + }, envFromMap(env), nil) + if !resolved.Server.FilePersistence { + t.Fatal("expected CLI true to override env false") + } +} + +func TestResolveEffectiveConfigFilePersistenceDefaultFalse(t *testing.T) { + resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(nil), nil) + if resolved.Server.FilePersistence { + t.Fatal("expected file_persistence to default to false") + } +} + +func TestResolveEffectiveConfigFilePersistenceRequiresDataDir(t *testing.T) { + // Override data_dir to empty via CLI to test the validation guard. + var warnings []string + resolved := resolveEffectiveConfig(&FileConfig{ + FilePersistence: true, + }, configCLIInputs{ + Set: map[string]bool{"data-dir": true}, + DataDir: "", // explicitly empty + }, envFromMap(nil), func(msg string) { + warnings = append(warnings, msg) + }) + if resolved.Server.FilePersistence { + t.Fatal("expected file_persistence to be disabled when data_dir is empty") + } + found := false + for _, w := range warnings { + if strings.Contains(w, "file_persistence") && strings.Contains(w, "data_dir") { + found = true + break + } + } + if !found { + t.Fatalf("expected warning about file_persistence + empty data_dir, got: %v", warnings) + } +} + func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) { fileCfg := &FileConfig{ TLS: TLSConfig{ diff --git a/server/conn.go b/server/conn.go index 23f318e..677f56a 100644 --- a/server/conn.go +++ b/server/conn.go @@ -162,6 +162,7 @@ type clientConn struct { portals map[string]*portal // portals by name txStatus byte // current transaction status ('I', 'T', or 'E') passthrough bool // true for passthrough users (skip transpiler + pg_catalog) + sharedDB bool // true when using file persistence pool (don't close DB directly) cursors map[string]*cursorState // server-side cursor emulation ctx context.Context // connection context, cancelled when connection is closed cancel context.CancelFunc // cancels the connection context @@ -397,8 +398,9 @@ func (c *clientConn) safeCleanupDB() { } } - // Detach DuckLake to release the RDS metadata connection (only if connection is healthy) - if connHealthy && c.server.cfg.DuckLake.MetadataStore != "" { + // Detach DuckLake to release the RDS metadata connection (only if connection is healthy). + // Skip for shared file-backed DBs — DuckLake stays attached for the pool lifetime. + if connHealthy && !c.sharedDB && c.server.cfg.DuckLake.MetadataStore != "" { // Must switch away from ducklake before detaching - DuckDB doesn't allow // detaching the default database ctx3, cancel3 := context.WithTimeout(context.Background(), cleanupTimeout) @@ -421,6 +423,13 @@ func (c *clientConn) safeCleanupDB() { } } + // For shared file-backed DBs, release our reference instead of closing. + // The pool will close the DB when the last reference is released. + if c.sharedDB { + c.server.releaseFileDB(c.username) + return + } + // Always attempt to close the database connection. // If the connection is broken, this may still throw, but we've done our best // to clean up the transaction state first. @@ -565,7 +574,7 @@ func (c *clientConn) serve() error { var db *sql.DB var err error if c.passthrough { - db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion) + db, err = c.server.createPassthroughDBConnection(c.username) } else { db, err = c.server.createDBConnection(c.username) } @@ -573,6 +582,7 @@ func (c *clientConn) serve() error { c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err)) return err } + c.sharedDB = c.server.cfg.FilePersistence c.executor = NewLocalExecutor(db) // Start background credential refresh for long-lived connections. diff --git a/server/server.go b/server/server.go index 2a4bc31..87ed71b 100644 --- a/server/server.go +++ b/server/server.go @@ -154,6 +154,11 @@ type Config struct { // uncleanly. Default: 24 hours. Set to a negative value (e.g., -1) to disable. IdleTimeout time.Duration + // FilePersistence stores DuckDB data in /.duckdb instead of :memory:. + // DuckDB memory-maps the file and serves queries from RAM, so performance is similar + // to in-memory mode while data persists across connections and restarts. + FilePersistence bool + // ProcessIsolation enables spawning each client connection in a separate OS process. // This prevents DuckDB C++ crashes from taking down the entire server. // When enabled, rate limiting and cancel requests are handled by the parent process, @@ -239,6 +244,14 @@ type DuckLakeConfig struct { S3Profile string // AWS profile name to use (for "config" chain) } +// fileDBEntry tracks a shared file-backed DuckDB connection with reference counting. +// Used when FilePersistence is enabled so multiple client connections for the same +// user share a single *sql.DB instead of fighting over the DuckDB file lock. +type fileDBEntry struct { + db *sql.DB + refs int +} + type Server struct { cfg Config listener net.Listener @@ -275,6 +288,12 @@ type Server struct { // Query logger for DuckLake system.query_log queryLogger *QueryLogger + + // fileDBs pools shared *sql.DB instances for file-backed persistence mode. + // Keyed by username. Multiple client connections for the same user share a + // single DuckDB file handle to avoid write-lock conflicts. + fileDBsMu sync.Mutex + fileDBs map[string]*fileDBEntry } func New(cfg Config) (*Server, error) { @@ -322,6 +341,7 @@ func New(cfg Config) (*Server, error) { activeQueries: make(map[BackendKey]context.CancelFunc), duckLakeSem: make(chan struct{}, 1), conns: make(map[int32]*clientConn), + fileDBs: make(map[string]*fileDBEntry), } // Configure TLS: ACME DNS-01, ACME HTTP-01, or static certificate files @@ -620,16 +640,99 @@ func (s *Server) listConns() []*clientConn { } // createDBConnection creates a DuckDB connection for a client session. -// This is a thin wrapper around CreateDBConnection using the server's config. +// When FilePersistence is enabled, connections are pooled per-user so multiple +// clients sharing the same username reuse a single DuckDB file handle. func (s *Server) createDBConnection(username string) (*sql.DB, error) { + if s.cfg.FilePersistence { + return s.acquireFileDB(username, false) + } return CreateDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion) } -// openBaseDB creates and configures a bare DuckDB in-memory connection with -// threads, memory limit, temp directory, extensions, and cache_httpfs settings. +// createPassthroughDBConnection creates a passthrough DuckDB connection. +// When FilePersistence is enabled, connections are pooled per-user. +func (s *Server) createPassthroughDBConnection(username string) (*sql.DB, error) { + if s.cfg.FilePersistence { + return s.acquireFileDB(username, true) + } + return CreatePassthroughDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion) +} + +// acquireFileDB returns a shared *sql.DB for the given user, creating one if needed. +// The caller must call releaseFileDB when the connection is done. +func (s *Server) acquireFileDB(username string, passthrough bool) (*sql.DB, error) { + s.fileDBsMu.Lock() + defer s.fileDBsMu.Unlock() + + if entry, ok := s.fileDBs[username]; ok { + entry.refs++ + slog.Debug("Reusing file-backed DuckDB.", "user", username, "refs", entry.refs) + return entry.db, nil + } + + // First connection for this user — create and configure the DB. + var db *sql.DB + var err error + if passthrough { + db, err = CreatePassthroughDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion) + } else { + db, err = CreateDBConnection(s.cfg, s.duckLakeSem, username, processStartTime, processVersion) + } + if err != nil { + return nil, err + } + + // Override openBaseDB's single-connection limits. Multiple client connections + // share this *sql.DB, so we need enough pooled connections for concurrency. + db.SetMaxOpenConns(0) // unlimited + db.SetMaxIdleConns(4) + + s.fileDBs[username] = &fileDBEntry{db: db, refs: 1} + return db, nil +} + +// releaseFileDB decrements the reference count for a shared file-backed DB. +// When the last reference is released, the DB is closed and removed from the pool. +func (s *Server) releaseFileDB(username string) { + s.fileDBsMu.Lock() + defer s.fileDBsMu.Unlock() + + entry, ok := s.fileDBs[username] + if !ok { + return + } + entry.refs-- + if entry.refs <= 0 { + if err := entry.db.Close(); err != nil { + slog.Warn("Failed to close shared file-backed DuckDB.", "user", username, "error", err) + } + delete(s.fileDBs, username) + slog.Debug("Closed file-backed DuckDB (last reference released).", "user", username) + } +} + +// openBaseDB creates and configures a DuckDB connection with threads, memory +// limit, temp directory, extensions, and cache_httpfs settings. // This shared setup is used by both regular and passthrough connections. +// +// When FilePersistence is enabled and DataDir is set, the database is file-backed +// at /.duckdb. DuckDB memory-maps the file and serves queries +// from RAM, so performance is close to in-memory while data persists across restarts. func openBaseDB(cfg Config, username string) (*sql.DB, error) { - db, err := sql.Open("duckdb", ":memory:") + dsn := ":memory:" + if cfg.FilePersistence && cfg.DataDir != "" && username != "" { + // Reject usernames that could escape DataDir via path traversal. + if strings.ContainsAny(username, "/\\") || strings.Contains(username, "..") { + return nil, fmt.Errorf("invalid username for file persistence: %q", username) + } + dsn = filepath.Join(cfg.DataDir, username+".duckdb") + // Ensure the data directory exists before opening the file. + if err := os.MkdirAll(cfg.DataDir, 0750); err != nil { + return nil, fmt.Errorf("failed to create data directory %s: %w", cfg.DataDir, err) + } + slog.Info("Opening file-backed DuckDB.", "path", dsn) + } + db, err := sql.Open("duckdb", dsn) if err != nil { return nil, fmt.Errorf("failed to open duckdb: %w", err) } diff --git a/server/server_test.go b/server/server_test.go index d688f53..3f2e933 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -309,6 +309,197 @@ func TestStartCredentialRefresh_RollbackAndRetryWhenNoActiveTransaction(t *testi } } +func TestOpenBaseDBInMemoryByDefault(t *testing.T) { + cfg := Config{} + db, err := openBaseDB(cfg, "testuser") + if err != nil { + t.Fatalf("openBaseDB failed: %v", err) + } + defer func() { _ = db.Close() }() + + var dbName string + err = db.QueryRow("SELECT current_database()").Scan(&dbName) + if err != nil { + t.Fatalf("failed to query current_database(): %v", err) + } + if dbName != "memory" { + t.Fatalf("expected in-memory database (current_database()='memory'), got %q", dbName) + } +} + +func TestOpenBaseDBFilePersistence(t *testing.T) { + dataDir := t.TempDir() + cfg := Config{ + FilePersistence: true, + DataDir: dataDir, + } + db, err := openBaseDB(cfg, "alice") + if err != nil { + t.Fatalf("openBaseDB failed: %v", err) + } + + // Write data + if _, err := db.Exec("CREATE TABLE test_persist (id INTEGER)"); err != nil { + t.Fatalf("failed to create table: %v", err) + } + if _, err := db.Exec("INSERT INTO test_persist VALUES (42)"); err != nil { + t.Fatalf("failed to insert: %v", err) + } + _ = db.Close() + + // Reopen the same file and verify data survives + db2, err := openBaseDB(cfg, "alice") + if err != nil { + t.Fatalf("openBaseDB (reopen) failed: %v", err) + } + defer func() { _ = db2.Close() }() + + var val int + err = db2.QueryRow("SELECT id FROM test_persist").Scan(&val) + if err != nil { + t.Fatalf("failed to read persisted data: %v", err) + } + if val != 42 { + t.Fatalf("expected persisted value 42, got %d", val) + } +} + +func TestOpenBaseDBFilePersistenceFallsBackWithoutDataDir(t *testing.T) { + cfg := Config{ + FilePersistence: true, + // DataDir intentionally empty — falls back to :memory: + } + db, err := openBaseDB(cfg, "testuser") + if err != nil { + t.Fatalf("openBaseDB failed: %v", err) + } + defer func() { _ = db.Close() }() + + var dbName string + err = db.QueryRow("SELECT current_database()").Scan(&dbName) + if err != nil { + t.Fatalf("failed to query current_database(): %v", err) + } + if dbName != "memory" { + t.Fatalf("expected fallback to in-memory when DataDir is empty, got %q", dbName) + } +} + +func TestOpenBaseDBFilePersistenceFallsBackWithoutUsername(t *testing.T) { + cfg := Config{ + FilePersistence: true, + DataDir: t.TempDir(), + } + db, err := openBaseDB(cfg, "") + if err != nil { + t.Fatalf("openBaseDB failed: %v", err) + } + defer func() { _ = db.Close() }() + + var dbName string + err = db.QueryRow("SELECT current_database()").Scan(&dbName) + if err != nil { + t.Fatalf("failed to query current_database(): %v", err) + } + if dbName != "memory" { + t.Fatalf("expected fallback to in-memory when username is empty, got %q", dbName) + } +} + +func TestOpenBaseDBFilePersistenceRejectsPathTraversal(t *testing.T) { + dataDir := t.TempDir() + cfg := Config{ + FilePersistence: true, + DataDir: dataDir, + } + + cases := []string{ + "../etc/evil", + "foo/bar", + "..\\windows", + "alice/../bob", + } + for _, username := range cases { + _, err := openBaseDB(cfg, username) + if err == nil { + t.Fatalf("expected error for username %q, got nil", username) + } + if !strings.Contains(err.Error(), "invalid username") { + t.Fatalf("expected 'invalid username' error for %q, got: %v", username, err) + } + } +} + +func TestFileDBPoolConcurrentConnections(t *testing.T) { + dataDir := t.TempDir() + s := &Server{ + cfg: Config{ + FilePersistence: true, + DataDir: dataDir, + }, + duckLakeSem: make(chan struct{}, 1), + fileDBs: make(map[string]*fileDBEntry), + } + + // First connection creates the DB + db1, err := s.acquireFileDB("pooluser", false) + if err != nil { + t.Fatalf("first acquireFileDB failed: %v", err) + } + + // Create a table via the first connection + if _, err := db1.Exec("CREATE TABLE pool_test (id INTEGER)"); err != nil { + t.Fatalf("failed to create table: %v", err) + } + + // Second connection should reuse the same DB + db2, err := s.acquireFileDB("pooluser", false) + if err != nil { + t.Fatalf("second acquireFileDB failed: %v", err) + } + if db1 != db2 { + t.Fatal("expected same *sql.DB instance for same user") + } + + // Verify both see the same table + var count int + if err := db2.QueryRow("SELECT count(*) FROM pool_test").Scan(&count); err != nil { + t.Fatalf("second connection can't see table: %v", err) + } + + // Release first reference — DB should stay open + s.releaseFileDB("pooluser") + if _, ok := s.fileDBs["pooluser"]; !ok { + t.Fatal("expected pool entry to survive with one reference remaining") + } + + // Release second reference — DB should be closed and removed + s.releaseFileDB("pooluser") + if _, ok := s.fileDBs["pooluser"]; ok { + t.Fatal("expected pool entry to be removed after last release") + } +} + +func TestOpenBaseDBCreatesDataDir(t *testing.T) { + // Use a nested path that doesn't exist yet + base := t.TempDir() + dataDir := base + "/nested/data" + cfg := Config{ + FilePersistence: true, + DataDir: dataDir, + } + db, err := openBaseDB(cfg, "alice") + if err != nil { + t.Fatalf("openBaseDB failed: %v", err) + } + defer func() { _ = db.Close() }() + + // Verify the data directory was created + if _, err := db.Exec("SELECT 1"); err != nil { + t.Fatalf("query failed after dir creation: %v", err) + } +} + func TestHasCacheHTTPFS(t *testing.T) { tests := []struct { name string