Skip to content

Commit 8987c32

Browse files
authored
add archiver lag collector (#11)
Track WAL archiving lag by comparing current WAL position to last archived segment position, providing visibility into archival delays. This works well with planetscale/hzdb-operator#1052 - we can/should expose this metric on single node branches I think, and then users can think about these cases themselves correctly and see e.g. historical spikes etc.
1 parent 8d826f8 commit 8987c32

File tree

2 files changed

+481
-0
lines changed

2 files changed

+481
-0
lines changed

collector/pg_stat_archiver_lag.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright 2025 PlanetScale Inc.
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
"database/sql"
19+
"fmt"
20+
"strconv"
21+
"strings"
22+
23+
"github.com/prometheus/client_golang/prometheus"
24+
)
25+
26+
const archiverLagSubsystem = "stat_archiver"
27+
28+
func init() {
29+
registerCollector(archiverLagSubsystem, defaultEnabled, NewPGStatArchiverLagCollector)
30+
}
31+
32+
type PGStatArchiverLagCollector struct{}
33+
34+
func NewPGStatArchiverLagCollector(collectorConfig) (Collector, error) {
35+
return &PGStatArchiverLagCollector{}, nil
36+
}
37+
38+
var (
39+
statArchiverLagBytesDesc = prometheus.NewDesc(
40+
prometheus.BuildFQName(namespace, archiverLagSubsystem, "lag_bytes"),
41+
"Archiver lag in bytes (difference between current WAL position and last archived WAL)",
42+
[]string{},
43+
prometheus.Labels{},
44+
)
45+
46+
statArchiverLagQuery = `
47+
SELECT
48+
last_archived_wal,
49+
pg_current_wal_lsn() AS current_lsn
50+
FROM pg_stat_archiver
51+
WHERE last_archived_wal IS NOT NULL
52+
AND last_archived_wal != ''
53+
`
54+
)
55+
56+
// LSN represents a PostgreSQL Log Sequence Number, a 64-bit unsigned integer
57+
// representing a byte position in the WAL.
58+
type LSN uint64
59+
60+
const (
61+
// walSegmentSizeBytes is the size of a WAL segment in bytes (16MB)
62+
walSegmentSizeBytes = 16 * 1024 * 1024 // 16777216
63+
// segmentsPerLogID is the number of segments per log ID (256)
64+
segmentsPerLogID = 256
65+
)
66+
67+
// parseLSNFromWalFile parses a WAL file name (e.g., "000000010000000000000001") and returns
68+
// the LSN position in bytes. The WAL file format is:
69+
// - Positions 1-8: timeline ID (8 hex chars)
70+
// - Positions 9-16: log ID (8 hex chars)
71+
// - Positions 17-24: segment ID (8 hex chars)
72+
// Returns LSN = logID * 256 segments * 16MB + segmentID * 16MB
73+
//
74+
// Handles WAL files with suffixes like .backup, .history, .partial by stripping them first.
75+
func parseLSNFromWalFile(walFile string) (LSN, error) {
76+
// Strip suffix if present (e.g., .backup, .history, .partial)
77+
if idx := strings.Index(walFile, "."); idx != -1 {
78+
walFile = walFile[:idx]
79+
}
80+
81+
if len(walFile) != 24 {
82+
return 0, fmt.Errorf("WAL file name must be exactly 24 hex chars, got %d: %q", len(walFile), walFile)
83+
}
84+
85+
// Validate all characters are hex
86+
for i, r := range walFile {
87+
if (r < '0' || r > '9') && (r < 'A' || r > 'F') && (r < 'a' || r > 'f') {
88+
return 0, fmt.Errorf("WAL file name contains invalid hex character at position %d: %q", i+1, string(r))
89+
}
90+
}
91+
92+
// Extract log ID (positions 9-16, 0-indexed: 8-15)
93+
logIDHex := walFile[8:16]
94+
logID, err := parseHexUint32(logIDHex)
95+
if err != nil {
96+
return 0, fmt.Errorf("parse log ID from %q: %w", logIDHex, err)
97+
}
98+
99+
// Extract segment ID (positions 17-24, 0-indexed: 16-23)
100+
segIDHex := walFile[16:24]
101+
segID, err := parseHexUint32(segIDHex)
102+
if err != nil {
103+
return 0, fmt.Errorf("parse segment ID from %q: %w", segIDHex, err)
104+
}
105+
106+
// Calculate LSN: logID * 256 segments * 16MB + segmentID * 16MB
107+
lsnBytes := LSN(logID)*segmentsPerLogID*walSegmentSizeBytes + LSN(segID)*walSegmentSizeBytes
108+
return lsnBytes, nil
109+
}
110+
111+
// parseLSNFromLSNString parses a PostgreSQL LSN string (e.g., "0/12345678") and returns
112+
// the LSN position in bytes. PostgreSQL LSNs represent byte positions in the WAL.
113+
// The format is "high/low" where both are hex numbers representing a 64-bit byte offset:
114+
// LSN = (high << 32) | low
115+
func parseLSNFromLSNString(lsnStr string) (LSN, error) {
116+
parts := strings.Split(lsnStr, "/")
117+
if len(parts) != 2 {
118+
return 0, fmt.Errorf("LSN string must be in format 'high/low', got: %q", lsnStr)
119+
}
120+
121+
highStr, lowStr := parts[0], parts[1]
122+
if highStr == "" || lowStr == "" {
123+
return 0, fmt.Errorf("LSN string parts cannot be empty: %q", lsnStr)
124+
}
125+
126+
high, err := strconv.ParseUint(highStr, 16, 64)
127+
if err != nil {
128+
return 0, fmt.Errorf("parse high part %q of LSN string %q: %w", highStr, lsnStr, err)
129+
}
130+
131+
low, err := strconv.ParseUint(lowStr, 16, 64)
132+
if err != nil {
133+
return 0, fmt.Errorf("parse low part %q of LSN string %q: %w", lowStr, lsnStr, err)
134+
}
135+
136+
// LSN = (high << 32) | low
137+
return LSN(high<<32 | low), nil
138+
}
139+
140+
// parseHexUint32 parses a hex string (8 hex chars) and returns a uint32.
141+
func parseHexUint32(hexStr string) (uint32, error) {
142+
if len(hexStr) != 8 {
143+
return 0, fmt.Errorf("hex string must be exactly 8 chars, got %d: %q", len(hexStr), hexStr)
144+
}
145+
146+
val, err := strconv.ParseUint(hexStr, 16, 32)
147+
if err != nil {
148+
return 0, fmt.Errorf("parse hex %q: %w", hexStr, err)
149+
}
150+
return uint32(val), nil
151+
}
152+
153+
// bytesBetweenLSN calculates the difference in bytes between two LSN positions.
154+
// Returns the difference, clamped to 0 if currentLSN is less than archivedLSN
155+
// (which can happen during wraparound or timeline switches).
156+
func bytesBetweenLSN(currentLSN, archivedLSN LSN) LSN {
157+
if currentLSN < archivedLSN {
158+
return 0
159+
}
160+
return currentLSN - archivedLSN
161+
}
162+
163+
func (PGStatArchiverLagCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
164+
db := instance.getDB()
165+
row := db.QueryRowContext(ctx, statArchiverLagQuery)
166+
167+
var lastArchivedWal sql.NullString
168+
var currentLSN sql.NullString
169+
170+
err := row.Scan(&lastArchivedWal, &currentLSN)
171+
if err != nil {
172+
// If no rows found (no WAL segments archived yet), return 0 lag
173+
if err == sql.ErrNoRows {
174+
ch <- prometheus.MustNewConstMetric(
175+
statArchiverLagBytesDesc,
176+
prometheus.GaugeValue,
177+
0,
178+
)
179+
return nil
180+
}
181+
return err
182+
}
183+
184+
// If either value is null, return 0 lag
185+
if !lastArchivedWal.Valid || !currentLSN.Valid {
186+
ch <- prometheus.MustNewConstMetric(
187+
statArchiverLagBytesDesc,
188+
prometheus.GaugeValue,
189+
0,
190+
)
191+
return nil
192+
}
193+
194+
// Parse LSN from WAL file name
195+
archivedLSN, err := parseLSNFromWalFile(lastArchivedWal.String)
196+
if err != nil {
197+
return fmt.Errorf("parse archived WAL file %q: %w", lastArchivedWal.String, err)
198+
}
199+
200+
// Parse current LSN from PostgreSQL LSN string format
201+
currentLSNBytes, err := parseLSNFromLSNString(currentLSN.String)
202+
if err != nil {
203+
return fmt.Errorf("parse current LSN %q: %w", currentLSN.String, err)
204+
}
205+
206+
// Calculate lag
207+
lagBytes := bytesBetweenLSN(currentLSNBytes, archivedLSN)
208+
209+
ch <- prometheus.MustNewConstMetric(
210+
statArchiverLagBytesDesc,
211+
prometheus.GaugeValue,
212+
float64(lagBytes),
213+
)
214+
215+
return nil
216+
}

0 commit comments

Comments
 (0)