Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions k8s/control-plane-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ spec:
- name: pg
containerPort: 5432
protocol: TCP
- name: admin
containerPort: 9090
protocol: TCP
readinessProbe:
httpGet:
path: /health
port: admin
initialDelaySeconds: 2
periodSeconds: 2
failureThreshold: 15
volumeMounts:
- name: config
mountPath: /etc/duckgres
Expand Down
7 changes: 7 additions & 0 deletions k8s/control-plane-multitenant-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ spec:
mountPath: /certs
- name: data
mountPath: /data
readinessProbe:
httpGet:
path: /health
port: admin
initialDelaySeconds: 2
periodSeconds: 2
failureThreshold: 15
securityContext:
allowPrivilegeEscalation: false
resources:
Expand Down
7 changes: 7 additions & 0 deletions k8s/kind/control-plane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ spec:
mountPath: /certs
- name: data
mountPath: /data
readinessProbe:
httpGet:
path: /health
port: admin
initialDelaySeconds: 2
periodSeconds: 2
failureThreshold: 15
securityContext:
allowPrivilegeEscalation: false
resources:
Expand Down
36 changes: 34 additions & 2 deletions server/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ func NewDuckLakeCheckpointer(cfg Config) (*DuckLakeCheckpointer, error) {
return nil, fmt.Errorf("checkpoint: attach ducklake: %w", err)
}

// Create system.checkpoints table to record checkpoint history
if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS ducklake.system"); err != nil {
_ = db.Close()
return nil, fmt.Errorf("checkpoint: create schema: %w", err)
}
createTable := `CREATE TABLE IF NOT EXISTS ducklake.system.checkpoints (
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ NOT NULL,
duration_ms BIGINT NOT NULL,
status VARCHAR NOT NULL,
error VARCHAR
)`
if _, err := db.Exec(createTable); err != nil {
_ = db.Close()
return nil, fmt.Errorf("checkpoint: create table: %w", err)
}

c := &DuckLakeCheckpointer{
db: db,
interval: cfg.DuckLake.CheckpointInterval,
Expand Down Expand Up @@ -122,9 +139,24 @@ func (c *DuckLakeCheckpointer) run() {
slog.Info("DuckLake checkpoint starting.")
start := time.Now()
_, err := c.db.Exec("CHECKPOINT ducklake")
finished := time.Now()
duration := finished.Sub(start)

status := "success"
var errMsg *string
if err != nil {
status = "failed"
s := err.Error()
errMsg = &s
slog.Warn("DuckLake checkpoint failed.", "error", err)
return
} else {
slog.Info("DuckLake checkpoint complete.", "duration", duration.Round(time.Millisecond))
}

if _, logErr := c.db.Exec(
"INSERT INTO ducklake.system.checkpoints (started_at, finished_at, duration_ms, status, error) VALUES ($1, $2, $3, $4, $5)",
start, finished, duration.Milliseconds(), status, errMsg,
); logErr != nil {
slog.Warn("Failed to log checkpoint to system.checkpoints.", "error", logErr)
}
slog.Info("DuckLake checkpoint complete.", "duration", time.Since(start).Round(time.Millisecond))
}
124 changes: 124 additions & 0 deletions server/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,130 @@ func TestDuckLakeCheckpointerDisabledWhenIntervalZero(t *testing.T) {
}
}

func TestCheckpointerRunRecordsSuccess(t *testing.T) {
db, err := sql.Open("duckdb", ":memory:")
if err != nil {
t.Fatalf("open duckdb: %v", err)
}
defer func() { _ = db.Close() }()

// Attach an in-memory database as "ducklake" so CHECKPOINT succeeds
if _, err := db.Exec("ATTACH ':memory:' AS ducklake"); err != nil {
t.Fatalf("attach: %v", err)
}
if _, err := db.Exec("CREATE SCHEMA ducklake.system"); err != nil {
t.Fatalf("create schema: %v", err)
}
createTable := `CREATE TABLE ducklake.system.checkpoints (
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ NOT NULL,
duration_ms BIGINT NOT NULL,
status VARCHAR NOT NULL,
error VARCHAR
)`
if _, err := db.Exec(createTable); err != nil {
t.Fatalf("create table: %v", err)
}

c := &DuckLakeCheckpointer{db: db}
c.run()

var startedAt, finishedAt time.Time
var durationMs int64
var status string
var errMsg *string
err = db.QueryRow("SELECT started_at, finished_at, duration_ms, status, error FROM ducklake.system.checkpoints").
Scan(&startedAt, &finishedAt, &durationMs, &status, &errMsg)
if err != nil {
t.Fatalf("query row: %v", err)
}
if status != "success" {
t.Errorf("expected status 'success', got %q", status)
}
if errMsg != nil {
t.Errorf("expected nil error, got %q", *errMsg)
}
if durationMs < 0 {
t.Errorf("expected non-negative duration_ms, got %d", durationMs)
}
if !finishedAt.After(startedAt) && !finishedAt.Equal(startedAt) {
t.Errorf("expected finished_at >= started_at, got started=%v finished=%v", startedAt, finishedAt)
}

// Run again and verify accumulation
c.run()
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.system.checkpoints").Scan(&count); err != nil {
t.Fatalf("count: %v", err)
}
if count != 2 {
t.Errorf("expected 2 rows after 2 runs, got %d", count)
}
}

func TestCheckpointerRunRecordsFailure(t *testing.T) {
db, err := sql.Open("duckdb", ":memory:")
if err != nil {
t.Fatalf("open duckdb: %v", err)
}
defer func() { _ = db.Close() }()

// Attach an in-memory database as "ducklake" and create the table,
// then detach and reattach as read-only so CHECKPOINT fails.
if _, err := db.Exec("ATTACH ':memory:' AS ducklake"); err != nil {
t.Fatalf("attach: %v", err)
}
if _, err := db.Exec("CREATE SCHEMA ducklake.system"); err != nil {
t.Fatalf("create schema: %v", err)
}
createTable := `CREATE TABLE ducklake.system.checkpoints (
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ NOT NULL,
duration_ms BIGINT NOT NULL,
status VARCHAR NOT NULL,
error VARCHAR
)`
if _, err := db.Exec(createTable); err != nil {
t.Fatalf("create table: %v", err)
}

// Write a temp file so we can reattach read-only
tmpDir := t.TempDir()
tmpDB := tmpDir + "/test.db"
if _, err := db.Exec("DETACH ducklake"); err != nil {
t.Fatalf("detach: %v", err)
}
// Create a persistent DB, set up the table, detach, reattach read-only
if _, err := db.Exec("ATTACH '" + tmpDB + "' AS ducklake"); err != nil {
t.Fatalf("attach persistent: %v", err)
}
if _, err := db.Exec("CREATE SCHEMA ducklake.system"); err != nil {
t.Fatalf("create schema persistent: %v", err)
}
if _, err := db.Exec(createTable); err != nil {
t.Fatalf("create table persistent: %v", err)
}
if _, err := db.Exec("DETACH ducklake"); err != nil {
t.Fatalf("detach persistent: %v", err)
}
if _, err := db.Exec("ATTACH '" + tmpDB + "' AS ducklake (READ_ONLY)"); err != nil {
t.Fatalf("attach read-only: %v", err)
}

c := &DuckLakeCheckpointer{db: db}
c.run()

// On a read-only DB the INSERT fails — 0 rows expected.
// The test validates that run() handles INSERT errors gracefully without panicking.
var count int
if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.system.checkpoints").Scan(&count); err != nil {
t.Fatalf("count: %v", err)
}
if count != 0 {
t.Errorf("expected 0 rows on read-only DB, got %d", count)
}
}

func TestDuckLakeCheckpointerStopWaitsForLoop(t *testing.T) {
db, err := sql.Open("duckdb", ":memory:")
if err != nil {
Expand Down
Loading