diff --git a/lib/uffdpager/VERSION b/lib/uffdpager/VERSION index b1e80bb2..845639ee 100644 --- a/lib/uffdpager/VERSION +++ b/lib/uffdpager/VERSION @@ -1 +1 @@ -0.1.3 +0.1.4 diff --git a/lib/uffdpager/metrics_linux.go b/lib/uffdpager/metrics_linux.go new file mode 100644 index 00000000..f17dcb39 --- /dev/null +++ b/lib/uffdpager/metrics_linux.go @@ -0,0 +1,271 @@ +//go:build linux + +package uffdpager + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// RegisterMetrics creates observable pager instruments whose callbacks read +// statsFn on each collection cycle. Returns nil when the meter or statsFn is +// nil so callers can wire this in unconditionally. +func RegisterMetrics(meter metric.Meter, versionKey string, statsFn func() Stats) error { + if meter == nil || statsFn == nil { + return nil + } + + cacheHits, err := meter.Int64ObservableCounter( + "hypeman_uffd_cache_hits_total", + metric.WithDescription("UFFD page cache hits"), + ) + if err != nil { + return fmt.Errorf("create cache hits counter: %w", err) + } + cacheMisses, err := meter.Int64ObservableCounter( + "hypeman_uffd_cache_misses_total", + metric.WithDescription("UFFD page cache misses"), + ) + if err != nil { + return fmt.Errorf("create cache misses counter: %w", err) + } + faults, err := meter.Int64ObservableCounter( + "hypeman_uffd_faults_total", + metric.WithDescription("UFFD page faults handled"), + ) + if err != nil { + return fmt.Errorf("create faults counter: %w", err) + } + backingBytesRead, err := meter.Int64ObservableCounter( + "hypeman_uffd_backing_bytes_read_total", + metric.WithDescription("Bytes read from the backing memory file"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create backing bytes read counter: %w", err) + } + copies, err := meter.Int64ObservableCounter( + "hypeman_uffd_copies_total", + metric.WithDescription("UFFDIO_COPY operations issued"), + ) + if err != nil { + return fmt.Errorf("create copies counter: %w", err) + } + copyErrors, err := meter.Int64ObservableCounter( + "hypeman_uffd_copy_errors_total", + metric.WithDescription("UFFDIO_COPY operations that returned an error"), + ) + if err != nil { + return fmt.Errorf("create copy errors counter: %w", err) + } + cacheLookupNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_cache_lookup_nanos_total", + metric.WithDescription("Nanoseconds spent in cache lookups"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create cache lookup nanos counter: %w", err) + } + cacheAddNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_cache_add_nanos_total", + metric.WithDescription("Nanoseconds spent inserting cache entries"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create cache add nanos counter: %w", err) + } + faultNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_fault_nanos_total", + metric.WithDescription("Nanoseconds spent handling faults end-to-end"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create fault nanos counter: %w", err) + } + readPageNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_read_page_nanos_total", + metric.WithDescription("Nanoseconds spent reading pages"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create read page nanos counter: %w", err) + } + backingReadNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_backing_read_nanos_total", + metric.WithDescription("Nanoseconds spent reading from the backing file"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create backing read nanos counter: %w", err) + } + copyNanos, err := meter.Int64ObservableCounter( + "hypeman_uffd_copy_nanos_total", + metric.WithDescription("Nanoseconds spent in UFFDIO_COPY calls"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create copy nanos counter: %w", err) + } + + cacheBytes, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_bytes", + metric.WithDescription("Current UFFD page cache size"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create cache bytes gauge: %w", err) + } + cacheMaxBytes, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_max_bytes", + metric.WithDescription("Configured UFFD page cache capacity"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create cache max bytes gauge: %w", err) + } + cacheItems, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_items", + metric.WithDescription("Pages currently held in the UFFD cache"), + ) + if err != nil { + return fmt.Errorf("create cache items gauge: %w", err) + } + cacheShards, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_shards", + metric.WithDescription("UFFD page cache shard count"), + ) + if err != nil { + return fmt.Errorf("create cache shards gauge: %w", err) + } + activeSessions, err := meter.Int64ObservableGauge( + "hypeman_uffd_active_sessions", + metric.WithDescription("Active pager sessions"), + ) + if err != nil { + return fmt.Errorf("create active sessions gauge: %w", err) + } + activeFaults, err := meter.Int64ObservableGauge( + "hypeman_uffd_active_faults", + metric.WithDescription("Faults currently in flight"), + ) + if err != nil { + return fmt.Errorf("create active faults gauge: %w", err) + } + maxConcurrentFaults, err := meter.Int64ObservableGauge( + "hypeman_uffd_max_concurrent_faults", + metric.WithDescription("High-water mark of concurrent in-flight faults"), + ) + if err != nil { + return fmt.Errorf("create max concurrent faults gauge: %w", err) + } + draining, err := meter.Int64ObservableGauge( + "hypeman_uffd_draining", + metric.WithDescription("1 if the pager is draining, 0 otherwise"), + ) + if err != nil { + return fmt.Errorf("create draining gauge: %w", err) + } + cacheLookupMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_lookup_max_nanos", + metric.WithDescription("Max cache lookup latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create cache lookup max nanos gauge: %w", err) + } + cacheAddMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_cache_add_max_nanos", + metric.WithDescription("Max cache add latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create cache add max nanos gauge: %w", err) + } + faultMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_fault_max_nanos", + metric.WithDescription("Max fault handling latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create fault max nanos gauge: %w", err) + } + readPageMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_read_page_max_nanos", + metric.WithDescription("Max page read latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create read page max nanos gauge: %w", err) + } + backingReadMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_backing_read_max_nanos", + metric.WithDescription("Max backing file read latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create backing read max nanos gauge: %w", err) + } + copyMaxNanos, err := meter.Int64ObservableGauge( + "hypeman_uffd_copy_max_nanos", + metric.WithDescription("Max UFFDIO_COPY latency"), + metric.WithUnit("ns"), + ) + if err != nil { + return fmt.Errorf("create copy max nanos gauge: %w", err) + } + + attrs := metric.WithAttributes(attribute.String("version_key", versionKey)) + + _, err = meter.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + s := statsFn() + o.ObserveInt64(cacheHits, s.CacheHits, attrs) + o.ObserveInt64(cacheMisses, s.CacheMisses, attrs) + o.ObserveInt64(faults, s.Faults, attrs) + o.ObserveInt64(backingBytesRead, s.BackingBytesRead, attrs) + o.ObserveInt64(copies, s.Copies, attrs) + o.ObserveInt64(copyErrors, s.CopyErrors, attrs) + o.ObserveInt64(cacheLookupNanos, s.CacheLookupNanos, attrs) + o.ObserveInt64(cacheAddNanos, s.CacheAddNanos, attrs) + o.ObserveInt64(faultNanos, s.FaultNanos, attrs) + o.ObserveInt64(readPageNanos, s.ReadPageNanos, attrs) + o.ObserveInt64(backingReadNanos, s.BackingReadNanos, attrs) + o.ObserveInt64(copyNanos, s.CopyNanos, attrs) + + o.ObserveInt64(cacheBytes, s.CacheBytes, attrs) + o.ObserveInt64(cacheMaxBytes, s.CacheMax, attrs) + o.ObserveInt64(cacheItems, int64(s.CacheItems), attrs) + o.ObserveInt64(cacheShards, int64(s.CacheShards), attrs) + o.ObserveInt64(activeSessions, int64(s.ActiveSessions), attrs) + o.ObserveInt64(activeFaults, s.ActiveFaults, attrs) + o.ObserveInt64(maxConcurrentFaults, s.MaxConcurrentFaults, attrs) + o.ObserveInt64(draining, boolToInt64(s.Draining), attrs) + o.ObserveInt64(cacheLookupMaxNanos, s.CacheLookupMaxNanos, attrs) + o.ObserveInt64(cacheAddMaxNanos, s.CacheAddMaxNanos, attrs) + o.ObserveInt64(faultMaxNanos, s.FaultMaxNanos, attrs) + o.ObserveInt64(readPageMaxNanos, s.ReadPageMaxNanos, attrs) + o.ObserveInt64(backingReadMaxNanos, s.BackingReadMaxNanos, attrs) + o.ObserveInt64(copyMaxNanos, s.CopyMaxNanos, attrs) + return nil + }, + cacheHits, cacheMisses, faults, backingBytesRead, copies, copyErrors, + cacheLookupNanos, cacheAddNanos, faultNanos, readPageNanos, backingReadNanos, copyNanos, + cacheBytes, cacheMaxBytes, cacheItems, cacheShards, activeSessions, activeFaults, + maxConcurrentFaults, draining, + cacheLookupMaxNanos, cacheAddMaxNanos, faultMaxNanos, readPageMaxNanos, backingReadMaxNanos, copyMaxNanos, + ) + if err != nil { + return fmt.Errorf("register uffd metrics callback: %w", err) + } + return nil +} + +func boolToInt64(v bool) int64 { + if v { + return 1 + } + return 0 +} diff --git a/lib/uffdpager/metrics_linux_test.go b/lib/uffdpager/metrics_linux_test.go new file mode 100644 index 00000000..1576bf4f --- /dev/null +++ b/lib/uffdpager/metrics_linux_test.go @@ -0,0 +1,86 @@ +//go:build linux + +package uffdpager + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestRegisterMetricsObservesStats(t *testing.T) { + t.Parallel() + + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + meter := provider.Meter("hypeman-uffd-pager-test") + + stats := Stats{ + Version: "0.1.2", + Draining: true, + ActiveSessions: 3, + CacheBytes: 1024, + CacheMax: 4096, + CacheItems: 8, + CacheHits: 100, + CacheMisses: 5, + CacheShards: 4, + CacheLookupNanos: 2000, + CacheLookupMaxNanos: 500, + CacheAddNanos: 3000, + CacheAddMaxNanos: 900, + Faults: 42, + BackingBytesRead: 16384, + Copies: 41, + CopyErrors: 1, + ActiveFaults: 2, + MaxConcurrentFaults: 7, + FaultNanos: 9999, + FaultMaxNanos: 333, + } + require.NoError(t, RegisterMetrics(meter, "0.1.2", func() Stats { return stats })) + + var got metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &got)) + + values := collectInt64Values(t, got) + require.Equal(t, int64(100), values["hypeman_uffd_cache_hits_total"]) + require.Equal(t, int64(5), values["hypeman_uffd_cache_misses_total"]) + require.Equal(t, int64(42), values["hypeman_uffd_faults_total"]) + require.Equal(t, int64(1024), values["hypeman_uffd_cache_bytes"]) + require.Equal(t, int64(4096), values["hypeman_uffd_cache_max_bytes"]) + require.Equal(t, int64(8), values["hypeman_uffd_cache_items"]) + require.Equal(t, int64(3), values["hypeman_uffd_active_sessions"]) + require.Equal(t, int64(7), values["hypeman_uffd_max_concurrent_faults"]) + require.Equal(t, int64(1), values["hypeman_uffd_draining"]) + require.Equal(t, int64(500), values["hypeman_uffd_cache_lookup_max_nanos"]) +} + +func TestRegisterMetricsNilMeter(t *testing.T) { + t.Parallel() + + require.NoError(t, RegisterMetrics(nil, "0.1.2", func() Stats { return Stats{} })) +} + +func collectInt64Values(t *testing.T, rm metricdata.ResourceMetrics) map[string]int64 { + t.Helper() + out := make(map[string]int64) + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + switch d := m.Data.(type) { + case metricdata.Sum[int64]: + for _, dp := range d.DataPoints { + out[m.Name] = dp.Value + } + case metricdata.Gauge[int64]: + for _, dp := range d.DataPoints { + out[m.Name] = dp.Value + } + } + } + } + return out +} diff --git a/lib/uffdpager/server_handlers_linux.go b/lib/uffdpager/server_handlers_linux.go index f2d604dd..fcc81cec 100644 --- a/lib/uffdpager/server_handlers_linux.go +++ b/lib/uffdpager/server_handlers_linux.go @@ -21,9 +21,13 @@ func (s *server) handleHealth(w http.ResponseWriter, r *http.Request) { } func (s *server) handleStats(w http.ResponseWriter, r *http.Request) { + s.writeJSON(w, http.StatusOK, s.stats()) +} + +func (s *server) stats() Stats { cacheBytes, cacheMax, cacheItems, hits, misses := s.cache.SnapshotStats() cacheShards, cacheLookupNanos, cacheLookupMaxNanos, cacheAddNanos, cacheAddMaxNanos := s.cache.SnapshotTimingStats() - s.writeJSON(w, http.StatusOK, Stats{ + return Stats{ Version: s.versionKey, Draining: s.isDraining(), ActiveSessions: s.activeSessions(), @@ -51,7 +55,7 @@ func (s *server) handleStats(w http.ResponseWriter, r *http.Request) { BackingReadMaxNanos: s.backingReadMaxNanos.Load(), CopyNanos: s.copyNanos.Load(), CopyMaxNanos: s.copyMaxNanos.Load(), - }) + } } func (s *server) handleCreateSession(w http.ResponseWriter, r *http.Request) { diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go index d007ee6a..7252c775 100644 --- a/lib/uffdpager/server_linux.go +++ b/lib/uffdpager/server_linux.go @@ -3,9 +3,11 @@ package uffdpager import ( + "context" "errors" "flag" "fmt" + "log/slog" "net" "net/http" "os" @@ -13,8 +15,10 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/go-chi/chi/v5" + hypotel "github.com/kernel/hypeman/lib/otel" ) type server struct { @@ -66,6 +70,10 @@ func Main(args []string) error { dataDir := fs.String("data-dir", "", "hypeman data directory") versionKey := fs.String("version-key", "", "pager version key") cacheMaxBytes := fs.Int64("cache-max-bytes", defaultCacheMaxBytes, "maximum shared page cache bytes") + metricsAddr := fs.String("metrics-addr", "", "prometheus /metrics listen address (empty disables)") + otelEndpoint := fs.String("otel-endpoint", "", "OTLP push endpoint (empty disables push)") + otelInsecure := fs.Bool("otel-insecure", true, "use insecure transport for OTLP push") + metricExportInterval := fs.String("otel-metric-export-interval", "", "OTLP metric export interval, e.g. 30s") if err := fs.Parse(args); err != nil { return err } @@ -77,9 +85,70 @@ func Main(args []string) error { } s := newServer(*dataDir, *versionKey, *cacheMaxBytes) + + metricsShutdown, err := s.startMetrics(*metricsAddr, *otelEndpoint, *otelInsecure, *metricExportInterval) + if err != nil { + return fmt.Errorf("start metrics: %w", err) + } + defer metricsShutdown() + return s.run() } +func (s *server) startMetrics(metricsAddr, otelEndpoint string, otelInsecure bool, metricExportInterval string) (func(), error) { + if strings.TrimSpace(metricsAddr) == "" && strings.TrimSpace(otelEndpoint) == "" { + return func() {}, nil + } + + ctx := context.Background() + provider, otelShutdown, err := hypotel.Init(ctx, hypotel.Config{ + Enabled: strings.TrimSpace(otelEndpoint) != "", + Endpoint: otelEndpoint, + Insecure: otelInsecure, + ServiceName: "hypeman-uffd-pager", + ServiceInstanceID: s.versionKey, + MetricExportInterval: metricExportInterval, + }) + if err != nil { + return nil, fmt.Errorf("initialize telemetry: %w", err) + } + + if err := RegisterMetrics(provider.MeterFor("hypeman-uffd-pager"), s.versionKey, s.stats); err != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = otelShutdown(shutdownCtx) + return nil, fmt.Errorf("register uffd metrics: %w", err) + } + + var metricsSrv *http.Server + if strings.TrimSpace(metricsAddr) != "" { + mux := http.NewServeMux() + mux.Handle("/metrics", provider.MetricsHandler) + metricsSrv = &http.Server{Addr: metricsAddr, Handler: mux} + go func() { + slog.Info("serving uffd pager metrics", "addr", metricsAddr, "path", "/metrics", "version_key", s.versionKey) + if err := metricsSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("uffd metrics server error", "error", err) + } + }() + } + + return func() { + if metricsSrv != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsSrv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Warn("uffd metrics server shutdown error", "error", err) + } + } + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := otelShutdown(shutdownCtx); err != nil { + slog.Warn("uffd otel shutdown error", "error", err) + } + }, nil +} + func newServer(dataDir, versionKey string, cacheMaxBytes int64) *server { dir := pagerVersionDir(dataDir, versionKey) return &server{ diff --git a/scripts/install.sh b/scripts/install.sh index c6d8013b..f43dd4e4 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -623,8 +623,9 @@ Environment="HYPEMAN_UFFD_BINARY=${INSTALL_DIR}/${UFFD_PAGER_BINARY_NAME}" Environment="HYPEMAN_UFFD_DATA_DIR=${DATA_DIR}" Environment="HYPEMAN_UFFD_VERSION_KEY=%i" Environment="HYPEMAN_UFFD_CACHE_MAX_BYTES=4294967296" +Environment="HYPEMAN_UFFD_METRICS_ADDR=" EnvironmentFile=-/run/hypeman/uffd/%i.env -ExecStart=/bin/sh -c 'exec "\${HYPEMAN_UFFD_BINARY}" --data-dir "\${HYPEMAN_UFFD_DATA_DIR}" --version-key "\${HYPEMAN_UFFD_VERSION_KEY}" --cache-max-bytes "\${HYPEMAN_UFFD_CACHE_MAX_BYTES}"' +ExecStart=/bin/sh -c 'exec "\${HYPEMAN_UFFD_BINARY}" --data-dir "\${HYPEMAN_UFFD_DATA_DIR}" --version-key "\${HYPEMAN_UFFD_VERSION_KEY}" --cache-max-bytes "\${HYPEMAN_UFFD_CACHE_MAX_BYTES}" --metrics-addr "\${HYPEMAN_UFFD_METRICS_ADDR}"' Restart=on-failure RestartSec=5 KillMode=process