Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions internal/schema/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,40 @@ func TestSchemaRefHash_PlannerBindsToSchemaContentHash(t *testing.T) {

}

// The volatile reference counters (DatabaseXid/DatabaseMxid) advance on every
// capture even when nothing about the schema/sizing changed. They MUST stay out
// of the planner content hash, or bundle dedup (filesystem_store keys on
// content_hash) would write a fresh bundle on every single capture. This guards
// that invariant directly: two snapshots differing only in the reference counters
// hash identically. The stable raw relfrozenxid/relminmxid, by contrast, DO belong
// in the hash and are exercised by the other tests via the Tables payload.
func TestPlannerContentHash_IgnoresReferenceCounters(t *testing.T) {
base := &PlannerStatsSnapshot{
SchemaRefHash: "ddl-hash",
Tables: []TableSizingEntry{{
Table: QualifiedName{Schema: "public", Name: "events"},
Sizing: TableSizing{Reltuples: 1_000_000, RelfrozenXid: 100, RelminMxid: 1},
}},
}
drifted := *base
drifted.DatabaseXid = base.DatabaseXid + 5_000_000
drifted.DatabaseMxid = base.DatabaseMxid + 5_000_000

if got, want := ComputePlannerContentHash(&drifted), ComputePlannerContentHash(base); got != want {
t.Errorf("reference counters leaked into planner content hash: %s vs %s", got, want)
}

// sanity: the stable frozen xids still move the hash, so they aren't silently dropped.
moved := *base
moved.Tables = []TableSizingEntry{{
Table: base.Tables[0].Table,
Sizing: TableSizing{Reltuples: 1_000_000, RelfrozenXid: 200, RelminMxid: 1},
}}
if ComputePlannerContentHash(&moved) == ComputePlannerContentHash(base) {
t.Errorf("relfrozenxid change did not affect planner content hash")
}
}

// Same invariant for activity snapshots. Two nodes producing different
// schema_ref values mean the cluster has drifted; under matched DDL the
// binding must be stable across nodes.
Expand Down
7 changes: 5 additions & 2 deletions internal/schema/sql/planner_stats.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
-- name: fetch-planner-table-sizing
-- pg_class.reltuples + on-disk footprint (heap, total, indexes, toast)
-- pg_class.reltuples + on-disk footprint + freeze xids (0 for partitioned parents);
-- ages derived offline against database_xid/database_mxid to keep rows dedup-stable.
SELECT n.nspname AS schema_name,
c.relname AS table_name,
c.reltuples::float8 AS reltuples,
c.relpages::int8 AS relpages,
pg_catalog.pg_relation_size(c.oid)::int8 AS table_size,
pg_catalog.pg_total_relation_size(c.oid)::int8 AS total_relation_size,
pg_catalog.pg_indexes_size(c.oid)::int8 AS indexes_size,
COALESCE(pg_catalog.pg_total_relation_size(c.reltoastrelid), 0)::int8 AS toast_size
COALESCE(pg_catalog.pg_total_relation_size(c.reltoastrelid), 0)::int8 AS toast_size,
c.relfrozenxid::text::int8 AS relfrozenxid,
c.relminmxid::text::int8 AS relminmxid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r', 'p')
Expand Down
15 changes: 15 additions & 0 deletions internal/schema/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ func CapturePlannerStats(ctx context.Context, pool Querier, schemaRefHash string
return nil, fmt.Errorf("query current_database: %w", err)
}

// Reference counters for ageing relfrozenxid/relminmxid offline.
// xmax = next xid (pg_current_snapshot doesn't consume one, safe in our read tx).
// mxid_age('1')+1 = next_multixact, avoiding superuser-gated pg_control_checkpoint;
// cast before +1 since mxid_age is int4 and nears INT_MAX at wraparound.
var databaseXid, databaseMxid int64
if err := pool.QueryRow(ctx,
"SELECT pg_catalog.pg_snapshot_xmax(pg_catalog.pg_current_snapshot())::text::int8, "+
"(pg_catalog.mxid_age('1'::xid)::int8 + 1)",
).Scan(&databaseXid, &databaseMxid); err != nil {
return nil, fmt.Errorf("query reference counters: %w", err)
}

tables, err := fetchPlannerTableSizing(ctx, pool)
if err != nil {
return nil, fmt.Errorf("fetch table sizing: %w", err)
Expand All @@ -32,6 +44,8 @@ func CapturePlannerStats(ctx context.Context, pool Querier, schemaRefHash string
SchemaRefHash: schemaRefHash,
Database: database,
Timestamp: time.Now().UTC(),
DatabaseXid: databaseXid,
DatabaseMxid: databaseMxid,
Tables: tables,
Indexes: indexes,
Columns: columns,
Expand Down Expand Up @@ -93,6 +107,7 @@ func fetchPlannerTableSizing(ctx context.Context, pool Querier) ([]TableSizingEn
&e.Sizing.Reltuples, &e.Sizing.Relpages,
&e.Sizing.TableSize, &e.Sizing.TotalRelationSize,
&e.Sizing.IndexesSize, &e.Sizing.ToastSize,
&e.Sizing.RelfrozenXid, &e.Sizing.RelminMxid,
)
return e, err
})
Expand Down
47 changes: 40 additions & 7 deletions internal/schema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,35 @@ type TableSizing struct {
TotalRelationSize int64 `json:"total_relation_size"`
IndexesSize int64 `json:"indexes_size"`
ToastSize int64 `json:"toast_size,omitempty"`
// raw relfrozenxid/relminmxid (0 = partitioned parent); stable, so kept in the hash.
// ages are derived against DatabaseXid/DatabaseMxid, not stored, to stay dedup-stable.
RelfrozenXid int64 `json:"relfrozenxid,omitempty"`
RelminMxid int64 `json:"relminmxid,omitempty"`
}

// age(relfrozenxid) at capture, vs DatabaseXid (next-xid reference).
func (s TableSizing) FrozenXidAge(databaseXid int64) (int64, bool) {
return counterAge(s.RelfrozenXid, databaseXid)
}

// mxid_age(relminmxid) at capture, vs DatabaseMxid (next-multixact reference).
func (s TableSizing) MinMxidAge(databaseMxid int64) (int64, bool) {
return counterAge(s.RelminMxid, databaseMxid)
}

// Reproduces pg's age()/mxid_age(): wraparound-aware forward distance in 32-bit counter
// space. ok=false when value 0 (partitioned parent) or reference 0 (pre-feature snapshot).
// Guard stays at 0, not <=1: relminmxid 1 (FirstMultiXactId) is a real ageable value.
func counterAge(value, reference int64) (int64, bool) {
if value == 0 || reference == 0 {
return 0, false
}
const modulo = int64(1) << 32
age := (reference%modulo - value%modulo) % modulo
if age < 0 {
age += modulo
}
return age, true
}

// Counters and vacuum/analyze timestamps from pg_stat_user_tables
Expand Down Expand Up @@ -379,13 +408,17 @@ type IndexActivityEntry struct {

// Persisted planner inputs; schema_ref_hash binds rows to a SchemaSnapshot
type PlannerStatsSnapshot struct {
SchemaRefHash string `json:"schema_ref_hash"`
ContentHash string `json:"content_hash"`
Database string `json:"database"`
Timestamp time.Time `json:"timestamp"`
Tables []TableSizingEntry `json:"tables"`
Indexes []IndexSizingEntry `json:"indexes"`
Columns []ColumnStatsEntry `json:"columns"`
SchemaRefHash string `json:"schema_ref_hash"`
ContentHash string `json:"content_hash"`
Database string `json:"database"`
Timestamp time.Time `json:"timestamp"`
// next-xid/next-multixact at capture; reference points for deriving the per-table
// ages. volatile, so excluded from ComputePlannerContentHash to preserve dedup.
DatabaseXid int64 `json:"database_xid,omitempty"`
DatabaseMxid int64 `json:"database_mxid,omitempty"`
Tables []TableSizingEntry `json:"tables"`
Indexes []IndexSizingEntry `json:"indexes"`
Columns []ColumnStatsEntry `json:"columns"`
}

// Persisted per-node activity counters
Expand Down
70 changes: 59 additions & 11 deletions internal/schema/vacuum.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,25 @@ type (
}

VacuumHealth struct {
Schema string `json:"schema"`
Table string `json:"table"`
Reltuples float64 `json:"reltuples"`
DeadTuples int64 `json:"dead_tuples"`
VacuumTriggerAt float64 `json:"vacuum_trigger_at"`
VacuumProgress float64 `json:"vacuum_progress"`
HasOverrides bool `json:"has_overrides"`
EffectiveThreshold int64 `json:"effective_threshold"`
EffectiveScale float64 `json:"effective_scale_factor"`
EffectiveAnalyzeThreshold int64 `json:"effective_analyze_threshold"`
EffectiveAnalyzeScale float64 `json:"effective_analyze_scale_factor"`
Schema string `json:"schema"`
Table string `json:"table"`
Reltuples float64 `json:"reltuples"`
DeadTuples int64 `json:"dead_tuples"`
VacuumTriggerAt float64 `json:"vacuum_trigger_at"`
VacuumProgress float64 `json:"vacuum_progress"`
HasOverrides bool `json:"has_overrides"`
EffectiveThreshold int64 `json:"effective_threshold"`
EffectiveScale float64 `json:"effective_scale_factor"`
EffectiveAnalyzeThreshold int64 `json:"effective_analyze_threshold"`
EffectiveAnalyzeScale float64 `json:"effective_analyze_scale_factor"`
AnalyzeTriggerAt float64 `json:"analyze_trigger_at"`
AutovacuumEnabled bool `json:"autovacuum_enabled"`
XidAge int64 `json:"xid_age,omitempty"`
FreezeMaxAge int64 `json:"freeze_max_age,omitempty"`
FreezeProgress float64 `json:"freeze_progress,omitempty"`
MxidAge int64 `json:"mxid_age,omitempty"`
MultixactFreezeMaxAge int64 `json:"multixact_freeze_max_age,omitempty"`
MultixactFreezeProgress float64 `json:"multixact_freeze_progress,omitempty"`
LastVacuum *time.Time `json:"last_vacuum,omitempty"`
LastAutovacuum *time.Time `json:"last_autovacuum,omitempty"`
LastAnalyze *time.Time `json:"last_analyze,omitempty"`
Expand Down Expand Up @@ -196,6 +202,48 @@ func AnalyzeVacuumHealth(a *AnnotatedSchema) []VacuumHealth {
AnalyzeTriggerAt: analyzeTrigger,
AutovacuumEnabled: avEnabled,
}
// anti-wraparound: age(relfrozenxid) vs the (possibly overridden) freeze_max_age.
// ok=false (partitioned parents, pre-feature snapshots) skips freeze analysis.
freezeMaxAge := defaults.FreezeMaxAge
if v, ok := opts["autovacuum_freeze_max_age"]; ok {
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
freezeMaxAge = parsed
}
}
if age, ok := sizing.FrozenXidAge(a.Planner.DatabaseXid); ok {
vh.XidAge = age
vh.FreezeMaxAge = freezeMaxAge
if freezeMaxAge > 0 {
vh.FreezeProgress = float64(age) / float64(freezeMaxAge)
if vh.FreezeProgress >= 0.9 {
vh.Recommendations = append(vh.Recommendations,
fmt.Sprintf("relfrozenxid age is %d, %.0f%% of autovacuum_freeze_max_age (%d); "+
"anti-wraparound autovacuum is imminent, make sure it can finish",
age, vh.FreezeProgress*100, freezeMaxAge))
}
}
}

mxidFreezeMaxAge := defaults.MultixactFreezeMaxAge
if v, ok := opts["autovacuum_multixact_freeze_max_age"]; ok {
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
mxidFreezeMaxAge = parsed
}
}
if age, ok := sizing.MinMxidAge(a.Planner.DatabaseMxid); ok {
vh.MxidAge = age
vh.MultixactFreezeMaxAge = mxidFreezeMaxAge
if mxidFreezeMaxAge > 0 {
vh.MultixactFreezeProgress = float64(age) / float64(mxidFreezeMaxAge)
if vh.MultixactFreezeProgress >= 0.9 {
vh.Recommendations = append(vh.Recommendations,
fmt.Sprintf("relminmxid age is %d, %.0f%% of autovacuum_multixact_freeze_max_age (%d); "+
"anti-wraparound autovacuum is imminent, make sure it can finish",
age, vh.MultixactFreezeProgress*100, mxidFreezeMaxAge))
}
}
}

if activity != nil {
vh.LastVacuum = activity.LastVacuum
vh.LastAutovacuum = activity.LastAutovacuum
Expand Down
133 changes: 133 additions & 0 deletions internal/schema/vacuum_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package schema

import (
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -222,3 +223,135 @@ func TestAnalyzeVacuumHealth_HighTriggerThreshold(t *testing.T) {
t.Errorf("expected high-trigger-threshold recommendation, got %v", results[0].Recommendations)
}
}

// FrozenXidAge mirrors postgres' age(relfrozenxid): the forward distance in
// transactions between the table's frozen xid and the snapshot's next-xid
// reference. We exercise the simple case, the wraparound (modular) case, and
// the two "not applicable" sentinels so callers can trust the bool.
func TestFrozenXidAge(t *testing.T) {
const mod = int64(1) << 32

cases := []struct {
name string
frozen int64
databaseXid int64
wantAge int64
wantOK bool
}{
{name: "simple forward distance", frozen: 1_000, databaseXid: 201_000_000, wantAge: 200_999_000, wantOK: true},
{name: "freshly frozen, tiny age", frozen: 500_000, databaseXid: 500_050, wantAge: 50, wantOK: true},
// databaseXid wrapped past 2^32 (epoch bumped): current32 = 100, frozen at
// 4_000_000_000, so age = (100 - 4000000000) mod 2^32 = 294_967_396.
{name: "wraparound: current already past 2^32", frozen: 4_000_000_000, databaseXid: mod + 100, wantAge: 294_967_396, wantOK: true},
{name: "no frozen xid (partitioned parent)", frozen: 0, databaseXid: 12_345, wantAge: 0, wantOK: false},
{name: "no reference point", frozen: 999, databaseXid: 0, wantAge: 0, wantOK: false},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := TableSizing{RelfrozenXid: c.frozen}
age, ok := s.FrozenXidAge(c.databaseXid)
if ok != c.wantOK {
t.Fatalf("ok=%v want %v", ok, c.wantOK)
}
if ok && age != c.wantAge {
t.Errorf("age=%d want %d", age, c.wantAge)
}
})
}
}

// A table whose relfrozenxid age has reached ~95% of autovacuum_freeze_max_age
// must surface an anti-wraparound recommendation and populate the freeze fields.
func TestAnalyzeVacuumHealth_WraparoundImminent(t *testing.T) {
a := vacuumFixture("aging", 1_000_000, 0, nil)
// default freeze_max_age is 200M; put the frozen xid ~190M behind the ref.
a.Planner.DatabaseXid = 200_000_000
a.Planner.Tables[0].Sizing.RelfrozenXid = 10_000_000 // age = 190M = 95%

results := AnalyzeVacuumHealth(a)
if len(results) != 1 {
t.Fatalf("expected 1 result, got %d", len(results))
}
vh := results[0]
if vh.XidAge != 190_000_000 {
t.Errorf("XidAge=%d want 190000000", vh.XidAge)
}
if vh.FreezeMaxAge != 200_000_000 {
t.Errorf("FreezeMaxAge=%d want 200000000", vh.FreezeMaxAge)
}
if vh.FreezeProgress < 0.94 || vh.FreezeProgress > 0.96 {
t.Errorf("FreezeProgress=%f want ~0.95", vh.FreezeProgress)
}
hasRec := false
for _, r := range vh.Recommendations {
if strings.Contains(r, "anti-wraparound") {
hasRec = true
}
}
if !hasRec {
t.Errorf("expected anti-wraparound recommendation, got %v", vh.Recommendations)
}
}

// Multixact wraparound mirrors the xid path: relminmxid age at ~95% of
// autovacuum_multixact_freeze_max_age (default 400M) must surface a
// recommendation naming relminmxid and populate the multixact freeze fields.
func TestAnalyzeVacuumHealth_MultixactWraparoundImminent(t *testing.T) {
a := vacuumFixture("aging_mxid", 1_000_000, 0, nil)
a.Planner.DatabaseMxid = 400_000_000
a.Planner.Tables[0].Sizing.RelminMxid = 20_000_000 // age = 380M = 95%

vh := AnalyzeVacuumHealth(a)[0]
if vh.MxidAge != 380_000_000 {
t.Errorf("MxidAge=%d want 380000000", vh.MxidAge)
}
if vh.MultixactFreezeMaxAge != 400_000_000 {
t.Errorf("MultixactFreezeMaxAge=%d want 400000000", vh.MultixactFreezeMaxAge)
}
if vh.MultixactFreezeProgress < 0.94 || vh.MultixactFreezeProgress > 0.96 {
t.Errorf("MultixactFreezeProgress=%f want ~0.95", vh.MultixactFreezeProgress)
}
hasRec := false
for _, r := range vh.Recommendations {
if strings.Contains(r, "relminmxid") && strings.Contains(r, "anti-wraparound") {
hasRec = true
}
}
if !hasRec {
t.Errorf("expected multixact anti-wraparound recommendation, got %v", vh.Recommendations)
}
}

// Healthy frozen-xid age (well under freeze_max_age) and the partitioned-parent
// case (relfrozenxid 0) must NOT raise a wraparound recommendation, and the
// partitioned case must leave the freeze fields zeroed.
func TestAnalyzeVacuumHealth_WraparoundQuiet(t *testing.T) {
a := vacuumFixture("young", 1_000_000, 0, nil)
a.Planner.DatabaseXid = 200_000_000
a.Planner.DatabaseMxid = 400_000_000
a.Planner.Tables[0].Sizing.RelfrozenXid = 190_000_000 // xid age = 10M = 5%
a.Planner.Tables[0].Sizing.RelminMxid = 380_000_000 // mxid age = 20M = 5%

vh := AnalyzeVacuumHealth(a)[0]
for _, r := range vh.Recommendations {
if strings.Contains(r, "anti-wraparound") {
t.Errorf("did not expect anti-wraparound rec at 5%%, got %v", vh.Recommendations)
}
}

// partitioned parent: relfrozenxid/relminmxid 0 means nothing to age.
p := vacuumFixture("parent", 1_000_000, 0, nil)
p.Planner.DatabaseXid = 200_000_000
p.Planner.DatabaseMxid = 400_000_000
p.Planner.Tables[0].Sizing.RelfrozenXid = 0
p.Planner.Tables[0].Sizing.RelminMxid = 0

pvh := AnalyzeVacuumHealth(p)[0]
if pvh.XidAge != 0 || pvh.FreezeMaxAge != 0 || pvh.FreezeProgress != 0 {
t.Errorf("expected zeroed freeze fields for partitioned parent, got %+v", pvh)
}
if pvh.MxidAge != 0 || pvh.MultixactFreezeMaxAge != 0 || pvh.MultixactFreezeProgress != 0 {
t.Errorf("expected zeroed multixact freeze fields for partitioned parent, got %+v", pvh)
}
}