Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/uffdpager/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.3
0.1.4
271 changes: 271 additions & 0 deletions lib/uffdpager/metrics_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
//go:build linux

package uffdpager

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// RegisterMetrics creates observable pager instruments whose callbacks read
// statsFn on each collection cycle. Returns nil when the meter or statsFn is
// nil so callers can wire this in unconditionally.
func RegisterMetrics(meter metric.Meter, versionKey string, statsFn func() Stats) error {
if meter == nil || statsFn == nil {
return nil
}

cacheHits, err := meter.Int64ObservableCounter(
"hypeman_uffd_cache_hits_total",
metric.WithDescription("UFFD page cache hits"),
)
if err != nil {
return fmt.Errorf("create cache hits counter: %w", err)
}
cacheMisses, err := meter.Int64ObservableCounter(
"hypeman_uffd_cache_misses_total",
metric.WithDescription("UFFD page cache misses"),
)
if err != nil {
return fmt.Errorf("create cache misses counter: %w", err)
}
faults, err := meter.Int64ObservableCounter(
"hypeman_uffd_faults_total",
metric.WithDescription("UFFD page faults handled"),
)
if err != nil {
return fmt.Errorf("create faults counter: %w", err)
}
backingBytesRead, err := meter.Int64ObservableCounter(
"hypeman_uffd_backing_bytes_read_total",
metric.WithDescription("Bytes read from the backing memory file"),
metric.WithUnit("By"),
)
if err != nil {
return fmt.Errorf("create backing bytes read counter: %w", err)
}
copies, err := meter.Int64ObservableCounter(
"hypeman_uffd_copies_total",
metric.WithDescription("UFFDIO_COPY operations issued"),
)
if err != nil {
return fmt.Errorf("create copies counter: %w", err)
}
copyErrors, err := meter.Int64ObservableCounter(
"hypeman_uffd_copy_errors_total",
metric.WithDescription("UFFDIO_COPY operations that returned an error"),
)
if err != nil {
return fmt.Errorf("create copy errors counter: %w", err)
}
cacheLookupNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_cache_lookup_nanos_total",
metric.WithDescription("Nanoseconds spent in cache lookups"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create cache lookup nanos counter: %w", err)
}
cacheAddNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_cache_add_nanos_total",
metric.WithDescription("Nanoseconds spent inserting cache entries"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create cache add nanos counter: %w", err)
}
faultNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_fault_nanos_total",
metric.WithDescription("Nanoseconds spent handling faults end-to-end"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create fault nanos counter: %w", err)
}
readPageNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_read_page_nanos_total",
metric.WithDescription("Nanoseconds spent reading pages"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create read page nanos counter: %w", err)
}
backingReadNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_backing_read_nanos_total",
metric.WithDescription("Nanoseconds spent reading from the backing file"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create backing read nanos counter: %w", err)
}
copyNanos, err := meter.Int64ObservableCounter(
"hypeman_uffd_copy_nanos_total",
metric.WithDescription("Nanoseconds spent in UFFDIO_COPY calls"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create copy nanos counter: %w", err)
}

cacheBytes, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_bytes",
metric.WithDescription("Current UFFD page cache size"),
metric.WithUnit("By"),
)
if err != nil {
return fmt.Errorf("create cache bytes gauge: %w", err)
}
cacheMaxBytes, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_max_bytes",
metric.WithDescription("Configured UFFD page cache capacity"),
metric.WithUnit("By"),
)
if err != nil {
return fmt.Errorf("create cache max bytes gauge: %w", err)
}
cacheItems, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_items",
metric.WithDescription("Pages currently held in the UFFD cache"),
)
if err != nil {
return fmt.Errorf("create cache items gauge: %w", err)
}
cacheShards, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_shards",
metric.WithDescription("UFFD page cache shard count"),
)
if err != nil {
return fmt.Errorf("create cache shards gauge: %w", err)
}
activeSessions, err := meter.Int64ObservableGauge(
"hypeman_uffd_active_sessions",
metric.WithDescription("Active pager sessions"),
)
if err != nil {
return fmt.Errorf("create active sessions gauge: %w", err)
}
activeFaults, err := meter.Int64ObservableGauge(
"hypeman_uffd_active_faults",
metric.WithDescription("Faults currently in flight"),
)
if err != nil {
return fmt.Errorf("create active faults gauge: %w", err)
}
maxConcurrentFaults, err := meter.Int64ObservableGauge(
"hypeman_uffd_max_concurrent_faults",
metric.WithDescription("High-water mark of concurrent in-flight faults"),
)
if err != nil {
return fmt.Errorf("create max concurrent faults gauge: %w", err)
}
draining, err := meter.Int64ObservableGauge(
"hypeman_uffd_draining",
metric.WithDescription("1 if the pager is draining, 0 otherwise"),
)
if err != nil {
return fmt.Errorf("create draining gauge: %w", err)
}
cacheLookupMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_lookup_max_nanos",
metric.WithDescription("Max cache lookup latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create cache lookup max nanos gauge: %w", err)
}
cacheAddMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_cache_add_max_nanos",
metric.WithDescription("Max cache add latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create cache add max nanos gauge: %w", err)
}
faultMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_fault_max_nanos",
metric.WithDescription("Max fault handling latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create fault max nanos gauge: %w", err)
}
readPageMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_read_page_max_nanos",
metric.WithDescription("Max page read latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create read page max nanos gauge: %w", err)
}
backingReadMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_backing_read_max_nanos",
metric.WithDescription("Max backing file read latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create backing read max nanos gauge: %w", err)
}
copyMaxNanos, err := meter.Int64ObservableGauge(
"hypeman_uffd_copy_max_nanos",
metric.WithDescription("Max UFFDIO_COPY latency"),
metric.WithUnit("ns"),
)
if err != nil {
return fmt.Errorf("create copy max nanos gauge: %w", err)
}

attrs := metric.WithAttributes(attribute.String("version_key", versionKey))

_, err = meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
s := statsFn()
o.ObserveInt64(cacheHits, s.CacheHits, attrs)
o.ObserveInt64(cacheMisses, s.CacheMisses, attrs)
o.ObserveInt64(faults, s.Faults, attrs)
o.ObserveInt64(backingBytesRead, s.BackingBytesRead, attrs)
o.ObserveInt64(copies, s.Copies, attrs)
o.ObserveInt64(copyErrors, s.CopyErrors, attrs)
o.ObserveInt64(cacheLookupNanos, s.CacheLookupNanos, attrs)
o.ObserveInt64(cacheAddNanos, s.CacheAddNanos, attrs)
o.ObserveInt64(faultNanos, s.FaultNanos, attrs)
o.ObserveInt64(readPageNanos, s.ReadPageNanos, attrs)
o.ObserveInt64(backingReadNanos, s.BackingReadNanos, attrs)
o.ObserveInt64(copyNanos, s.CopyNanos, attrs)

o.ObserveInt64(cacheBytes, s.CacheBytes, attrs)
o.ObserveInt64(cacheMaxBytes, s.CacheMax, attrs)
o.ObserveInt64(cacheItems, int64(s.CacheItems), attrs)
o.ObserveInt64(cacheShards, int64(s.CacheShards), attrs)
o.ObserveInt64(activeSessions, int64(s.ActiveSessions), attrs)
o.ObserveInt64(activeFaults, s.ActiveFaults, attrs)
o.ObserveInt64(maxConcurrentFaults, s.MaxConcurrentFaults, attrs)
o.ObserveInt64(draining, boolToInt64(s.Draining), attrs)
o.ObserveInt64(cacheLookupMaxNanos, s.CacheLookupMaxNanos, attrs)
o.ObserveInt64(cacheAddMaxNanos, s.CacheAddMaxNanos, attrs)
o.ObserveInt64(faultMaxNanos, s.FaultMaxNanos, attrs)
o.ObserveInt64(readPageMaxNanos, s.ReadPageMaxNanos, attrs)
o.ObserveInt64(backingReadMaxNanos, s.BackingReadMaxNanos, attrs)
o.ObserveInt64(copyMaxNanos, s.CopyMaxNanos, attrs)
return nil
},
cacheHits, cacheMisses, faults, backingBytesRead, copies, copyErrors,
cacheLookupNanos, cacheAddNanos, faultNanos, readPageNanos, backingReadNanos, copyNanos,
cacheBytes, cacheMaxBytes, cacheItems, cacheShards, activeSessions, activeFaults,
maxConcurrentFaults, draining,
cacheLookupMaxNanos, cacheAddMaxNanos, faultMaxNanos, readPageMaxNanos, backingReadMaxNanos, copyMaxNanos,
)
if err != nil {
return fmt.Errorf("register uffd metrics callback: %w", err)
}
return nil
}

func boolToInt64(v bool) int64 {
if v {
return 1
}
return 0
}
86 changes: 86 additions & 0 deletions lib/uffdpager/metrics_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//go:build linux

package uffdpager

import (
"context"
"testing"

"github.com/stretchr/testify/require"
otelmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestRegisterMetricsObservesStats(t *testing.T) {
t.Parallel()

reader := otelmetric.NewManualReader()
provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader))
meter := provider.Meter("hypeman-uffd-pager-test")

stats := Stats{
Version: "0.1.2",
Draining: true,
ActiveSessions: 3,
CacheBytes: 1024,
CacheMax: 4096,
CacheItems: 8,
CacheHits: 100,
CacheMisses: 5,
CacheShards: 4,
CacheLookupNanos: 2000,
CacheLookupMaxNanos: 500,
CacheAddNanos: 3000,
CacheAddMaxNanos: 900,
Faults: 42,
BackingBytesRead: 16384,
Copies: 41,
CopyErrors: 1,
ActiveFaults: 2,
MaxConcurrentFaults: 7,
FaultNanos: 9999,
FaultMaxNanos: 333,
}
require.NoError(t, RegisterMetrics(meter, "0.1.2", func() Stats { return stats }))

var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &got))

values := collectInt64Values(t, got)
require.Equal(t, int64(100), values["hypeman_uffd_cache_hits_total"])
require.Equal(t, int64(5), values["hypeman_uffd_cache_misses_total"])
require.Equal(t, int64(42), values["hypeman_uffd_faults_total"])
require.Equal(t, int64(1024), values["hypeman_uffd_cache_bytes"])
require.Equal(t, int64(4096), values["hypeman_uffd_cache_max_bytes"])
require.Equal(t, int64(8), values["hypeman_uffd_cache_items"])
require.Equal(t, int64(3), values["hypeman_uffd_active_sessions"])
require.Equal(t, int64(7), values["hypeman_uffd_max_concurrent_faults"])
require.Equal(t, int64(1), values["hypeman_uffd_draining"])
require.Equal(t, int64(500), values["hypeman_uffd_cache_lookup_max_nanos"])
}

func TestRegisterMetricsNilMeter(t *testing.T) {
t.Parallel()

require.NoError(t, RegisterMetrics(nil, "0.1.2", func() Stats { return Stats{} }))
}

func collectInt64Values(t *testing.T, rm metricdata.ResourceMetrics) map[string]int64 {
t.Helper()
out := make(map[string]int64)
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
switch d := m.Data.(type) {
case metricdata.Sum[int64]:
for _, dp := range d.DataPoints {
out[m.Name] = dp.Value
}
case metricdata.Gauge[int64]:
for _, dp := range d.DataPoints {
out[m.Name] = dp.Value
}
}
}
}
return out
}
8 changes: 6 additions & 2 deletions lib/uffdpager/server_handlers_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ func (s *server) handleHealth(w http.ResponseWriter, r *http.Request) {
}

func (s *server) handleStats(w http.ResponseWriter, r *http.Request) {
s.writeJSON(w, http.StatusOK, s.stats())
}

func (s *server) stats() Stats {
cacheBytes, cacheMax, cacheItems, hits, misses := s.cache.SnapshotStats()
cacheShards, cacheLookupNanos, cacheLookupMaxNanos, cacheAddNanos, cacheAddMaxNanos := s.cache.SnapshotTimingStats()
s.writeJSON(w, http.StatusOK, Stats{
return Stats{
Version: s.versionKey,
Draining: s.isDraining(),
ActiveSessions: s.activeSessions(),
Expand Down Expand Up @@ -51,7 +55,7 @@ func (s *server) handleStats(w http.ResponseWriter, r *http.Request) {
BackingReadMaxNanos: s.backingReadMaxNanos.Load(),
CopyNanos: s.copyNanos.Load(),
CopyMaxNanos: s.copyMaxNanos.Load(),
})
}
}

func (s *server) handleCreateSession(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading