From 6bc1da53f6ed9736d7cd9f10d01bb8def59582ea Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Sat, 13 Jun 2026 11:39:18 +0300 Subject: [PATCH 1/8] feat(tracing): enrich OTel resource, compress OTLP, annotate spans --- pkg/node/node.go | 31 +++++++--- pkg/pingpong/pingpong.go | 3 + pkg/storer/netstore.go | 2 + pkg/tracing/export_test.go | 5 +- pkg/tracing/resource_test.go | 108 +++++++++++++++++++++++++++++++++++ pkg/tracing/tracing.go | 46 +++++++++++++-- 6 files changed, 182 insertions(+), 13 deletions(-) create mode 100644 pkg/tracing/resource_test.go diff --git a/pkg/node/node.go b/pkg/node/node.go index afcf55e3cf3..221ed219f7a 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -26,6 +26,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + bee "github.com/ethersphere/bee/v2" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/accounting" "github.com/ethersphere/bee/v2/pkg/addressbook" @@ -219,6 +220,20 @@ const ( maxAllowedDoubling = 1 ) +// tracingEnvironment maps a Swarm network id to the deployment.environment +// resource attribute reported on traces, so spans from different networks are +// distinguishable in an OTLP backend. Unknown ids are reported as "private". +func tracingEnvironment(networkID uint64) string { + switch networkID { + case config.Mainnet.NetworkID: + return "mainnet" + case config.Testnet.NetworkID: + return "testnet" + default: + return "private" + } +} + func NewBee( ctx context.Context, addr string, @@ -238,13 +253,15 @@ func NewBee( nodeMetrics := newMetrics() tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{ - Enabled: o.TracingEnabled, - Endpoint: o.TracingEndpoint, - Insecure: o.TracingInsecure, - CAFile: o.TracingCAFile, - Protocol: o.TracingProtocol, - SamplingRatio: o.TracingSamplingRatio, - ServiceName: o.TracingServiceName, + Enabled: o.TracingEnabled, + Endpoint: o.TracingEndpoint, + Insecure: o.TracingInsecure, + CAFile: o.TracingCAFile, + Protocol: o.TracingProtocol, + SamplingRatio: o.TracingSamplingRatio, + ServiceName: o.TracingServiceName, + ServiceVersion: bee.Version, + Environment: tracingEnvironment(networkID), }) if err != nil { return nil, fmt.Errorf("tracer: %w", err) diff --git a/pkg/pingpong/pingpong.go b/pkg/pingpong/pingpong.go index 7c90fe04dac..858ba4d4f39 100644 --- a/pkg/pingpong/pingpong.go +++ b/pkg/pingpong/pingpong.go @@ -19,6 +19,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/pingpong/pb" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/tracing" + "go.opentelemetry.io/otel/attribute" ) // loggerName is the tree path name of the logger for this package. @@ -65,6 +66,7 @@ func (s *Service) Protocol() p2p.ProtocolSpec { func (s *Service) Ping(ctx context.Context, address swarm.Address, msgs ...string) (rtt time.Duration, err error) { span, _, ctx := s.tracer.StartSpanFromContext(ctx, "pingpong-p2p-ping", s.logger) + span.SetAttributes(attribute.String("peer_address", address.String())) defer span.End() start := time.Now() @@ -104,6 +106,7 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) er defer stream.FullClose() span, _, ctx := s.tracer.StartSpanFromContext(ctx, "pingpong-p2p-handler", s.logger) + span.SetAttributes(attribute.String("peer_address", p.Address.String())) defer span.End() var ping pb.Ping diff --git a/pkg/storer/netstore.go b/pkg/storer/netstore.go index 1ec9e199fcf..f9e43ae59c4 100644 --- a/pkg/storer/netstore.go +++ b/pkg/storer/netstore.go @@ -32,6 +32,7 @@ func (db *DB) DirectUpload() PutterSession { defer func() { <-db.directUploadLimiter }() span, logger, ctx := db.tracer.FollowSpanFromContext(ctx, "put-direct-upload", db.logger) + span.SetAttributes(attribute.String("address", ch.Address().String())) defer func() { if err != nil { tracing.RecordError(span, err) @@ -83,6 +84,7 @@ func (db *DB) Download(cache bool) storage.Getter { return getterWithMetrics{ storage.GetterFunc(func(ctx context.Context, address swarm.Address) (ch swarm.Chunk, err error) { span, logger, ctx := db.tracer.StartSpanFromContext(ctx, "get-chunk", db.logger) + span.SetAttributes(attribute.String("address", address.String())) defer func() { if err != nil { tracing.RecordError(span, err) diff --git a/pkg/tracing/export_test.go b/pkg/tracing/export_test.go index 6a1c05f3546..d39d46ca0fe 100644 --- a/pkg/tracing/export_test.go +++ b/pkg/tracing/export_test.go @@ -8,4 +8,7 @@ const ( LogField = logField ) -var LoadCAFile = loadCAFile +var ( + LoadCAFile = loadCAFile + NewResource = newResource +) diff --git a/pkg/tracing/resource_test.go b/pkg/tracing/resource_test.go new file mode 100644 index 00000000000..4aa1058dbb4 --- /dev/null +++ b/pkg/tracing/resource_test.go @@ -0,0 +1,108 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tracing_test + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/tracing" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" +) + +// attrValue returns the string value of key in res, or ("", false) when absent. +func attrValue(t *testing.T, res interface{ Set() *attribute.Set }, key attribute.Key) (string, bool) { + t.Helper() + + v, ok := res.Set().Value(key) + if !ok { + return "", false + } + return v.AsString(), true +} + +func TestNewResource(t *testing.T) { + // Not parallel: a subtest uses t.Setenv, which forbids a parallel parent. + // The parallel subtests below still pause until this function returns, by + // which point the env-mutating subtest has run inline and restored the env. + + t.Run("service name and version", func(t *testing.T) { + t.Parallel() + + res, err := tracing.NewResource(&tracing.Options{ + ServiceName: "bee", + ServiceVersion: "2.8.0-abcdef", + Environment: "mainnet", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got, ok := attrValue(t, res, semconv.ServiceNameKey); !ok || got != "bee" { + t.Errorf("service.name = %q (present=%v), want %q", got, ok, "bee") + } + if got, ok := attrValue(t, res, semconv.ServiceVersionKey); !ok || got != "2.8.0-abcdef" { + t.Errorf("service.version = %q (present=%v), want %q", got, ok, "2.8.0-abcdef") + } + if got, ok := attrValue(t, res, semconv.DeploymentEnvironmentKey); !ok || got != "mainnet" { + t.Errorf("deployment.environment = %q (present=%v), want %q", got, ok, "mainnet") + } + // WithTelemetrySDK must contribute the SDK identity attributes. + if _, ok := attrValue(t, res, semconv.TelemetrySDKNameKey); !ok { + t.Error("telemetry.sdk.name is absent, WithTelemetrySDK not applied") + } + // WithHost must contribute host.name so spans are attributable per node. + if _, ok := attrValue(t, res, semconv.HostNameKey); !ok { + t.Error("host.name is absent, WithHost not applied") + } + }) + + t.Run("empty environment omits attribute", func(t *testing.T) { + t.Parallel() + + res, err := tracing.NewResource(&tracing.Options{ServiceName: "bee"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got, ok := attrValue(t, res, semconv.DeploymentEnvironmentKey); ok { + t.Errorf("deployment.environment = %q, want it to be absent", got) + } + }) + + t.Run("empty version omits attribute", func(t *testing.T) { + t.Parallel() + + res, err := tracing.NewResource(&tracing.Options{ServiceName: "bee"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got, ok := attrValue(t, res, semconv.ServiceVersionKey); ok { + t.Errorf("service.version = %q, want it to be absent", got) + } + }) + + // Configured service name/version are applied after WithFromEnv, so they + // take precedence over a colliding OTEL_SERVICE_NAME while unrelated + // environment attributes still enrich the resource. This subtest mutates the + // process environment, so it must not run in parallel. + t.Run("configured values win over env, env enriches", func(t *testing.T) { + t.Setenv("OTEL_SERVICE_NAME", "from-env") + t.Setenv("OTEL_RESOURCE_ATTRIBUTES", "deployment.environment=testnet") + + res, err := tracing.NewResource(&tracing.Options{ServiceName: "configured"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got, ok := attrValue(t, res, semconv.ServiceNameKey); !ok || got != "configured" { + t.Errorf("service.name = %q (present=%v), want configured value to win", got, ok) + } + if got, ok := attrValue(t, res, semconv.DeploymentEnvironmentKey); !ok || got != "testnet" { + t.Errorf("deployment.environment = %q (present=%v), want %q from env", got, ok, "testnet") + } + }) +} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 51f48d2ba28..678c3185be5 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -96,6 +96,12 @@ type Options struct { Endpoint string // ServiceName is reported as the OTel service.name resource attribute. ServiceName string + // ServiceVersion is reported as the OTel service.version resource + // attribute. When empty the attribute is omitted. + ServiceVersion string + // Environment is reported as the OTel deployment.environment resource + // attribute (e.g. "mainnet", "testnet"). When empty the attribute is omitted. + Environment string // Insecure disables TLS for the OTLP exporter (useful for a local collector). Insecure bool // CAFile is an optional path to a PEM-encoded CA bundle used to verify @@ -126,9 +132,7 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { return nil, nil, errors.New("tracing-otlp-endpoint is required when tracing is enabled") } - res, err := resource.New(context.Background(), - resource.WithAttributes(semconv.ServiceName(o.ServiceName)), - ) + res, err := newResource(o) if err != nil { return nil, nil, fmt.Errorf("otel resource: %w", err) } @@ -152,18 +156,47 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { tp := sdktrace.NewTracerProvider( sdktrace.WithResource(res), sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(ratio))), + // The batch span processor keeps the SDK defaults (queue size 2048, + // 5s schedule). Operators can tune them via the standard OTEL_BSP_* + // environment variables without a bee flag or rebuild. sdktrace.WithBatcher(exporter), ) return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil } +// newResource builds the OTel resource describing this node. WithFromEnv, +// WithTelemetrySDK and WithHost are applied before WithAttributes so the +// explicitly configured service name/version/environment win over any colliding +// values from OTEL_RESOURCE_ATTRIBUTES/OTEL_SERVICE_NAME, while operators can +// still enrich the resource (e.g. cluster, region) via the environment. WithHost +// adds host.name so spans are attributable to a node in multi-node backends. +func newResource(o *Options) (*resource.Resource, error) { + attrs := []attribute.KeyValue{semconv.ServiceName(o.ServiceName)} + if o.ServiceVersion != "" { + attrs = append(attrs, semconv.ServiceVersion(o.ServiceVersion)) + } + if o.Environment != "" { + attrs = append(attrs, semconv.DeploymentEnvironment(o.Environment)) + } + + return resource.New(context.Background(), + resource.WithFromEnv(), + resource.WithTelemetrySDK(), + resource.WithHost(), + resource.WithAttributes(attrs...), + ) +} + // newOTLPClient builds the OTLP client for the configured transport. An empty // Protocol defaults to HTTP for backward compatibility with the initial OTLP // rollout. func newOTLPClient(o *Options) (otlptrace.Client, error) { switch o.Protocol { case "", protocolHTTP: - opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(o.Endpoint)} + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(o.Endpoint), + otlptracehttp.WithCompression(otlptracehttp.GzipCompression), + } if o.Insecure { opts = append(opts, otlptracehttp.WithInsecure()) } else if o.CAFile != "" { @@ -175,7 +208,10 @@ func newOTLPClient(o *Options) (otlptrace.Client, error) { } return otlptracehttp.NewClient(opts...), nil case protocolGRPC: - opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(o.Endpoint)} + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithEndpoint(o.Endpoint), + otlptracegrpc.WithCompressor("gzip"), + } if o.Insecure { opts = append(opts, otlptracegrpc.WithInsecure()) } else if o.CAFile != "" { From 1db885e775e05bdb110950a9dd564d64850c1905 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Sat, 13 Jun 2026 12:12:28 +0300 Subject: [PATCH 2/8] feat(tracing): add service.instance.id from overlay address --- pkg/node/node.go | 38 ++++++++++++++++++------------------ pkg/tracing/resource_test.go | 4 ++++ pkg/tracing/tracing.go | 20 ++++++++++--------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 221ed219f7a..596ea0501fa 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -220,9 +220,8 @@ const ( maxAllowedDoubling = 1 ) -// tracingEnvironment maps a Swarm network id to the deployment.environment -// resource attribute reported on traces, so spans from different networks are -// distinguishable in an OTLP backend. Unknown ids are reported as "private". +// tracingEnvironment maps a network id to the deployment.environment trace +// attribute. Unknown ids are reported as "private". func tracingEnvironment(networkID uint64) string { switch networkID { case config.Mainnet.NetworkID: @@ -252,21 +251,6 @@ func NewBee( nodeMetrics := newMetrics() - tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{ - Enabled: o.TracingEnabled, - Endpoint: o.TracingEndpoint, - Insecure: o.TracingInsecure, - CAFile: o.TracingCAFile, - Protocol: o.TracingProtocol, - SamplingRatio: o.TracingSamplingRatio, - ServiceName: o.TracingServiceName, - ServiceVersion: bee.Version, - Environment: tracingEnvironment(networkID), - }) - if err != nil { - return nil, fmt.Errorf("tracer: %w", err) - } - if err := validatePublicAddress(o.NATAddr); err != nil { return nil, fmt.Errorf("invalid NAT address %s: %w", o.NATAddr, err) } @@ -300,7 +284,6 @@ func NewBee( logger: logger, ctxCancel: ctxCancel, errorLogWriter: sink, - tracerCloser: tracerCloser, syncingStopped: syncutil.NewSignaler(), } @@ -411,6 +394,23 @@ func NewBee( return nil, fmt.Errorf("check overlay address: %w", err) } + tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{ + Enabled: o.TracingEnabled, + Endpoint: o.TracingEndpoint, + Insecure: o.TracingInsecure, + CAFile: o.TracingCAFile, + Protocol: o.TracingProtocol, + SamplingRatio: o.TracingSamplingRatio, + ServiceName: o.TracingServiceName, + ServiceVersion: bee.Version, + Environment: tracingEnvironment(networkID), + InstanceID: swarmAddress.String(), + }) + if err != nil { + return nil, fmt.Errorf("tracer: %w", err) + } + b.tracerCloser = tracerCloser + var ( chequebookService chequebook.Service = new(noOpChequebookService) chequeStore chequebook.ChequeStore diff --git a/pkg/tracing/resource_test.go b/pkg/tracing/resource_test.go index 4aa1058dbb4..84fe283b2c5 100644 --- a/pkg/tracing/resource_test.go +++ b/pkg/tracing/resource_test.go @@ -35,6 +35,7 @@ func TestNewResource(t *testing.T) { ServiceName: "bee", ServiceVersion: "2.8.0-abcdef", Environment: "mainnet", + InstanceID: "overlay-abc123", }) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -49,6 +50,9 @@ func TestNewResource(t *testing.T) { if got, ok := attrValue(t, res, semconv.DeploymentEnvironmentKey); !ok || got != "mainnet" { t.Errorf("deployment.environment = %q (present=%v), want %q", got, ok, "mainnet") } + if got, ok := attrValue(t, res, semconv.ServiceInstanceIDKey); !ok || got != "overlay-abc123" { + t.Errorf("service.instance.id = %q (present=%v), want %q", got, ok, "overlay-abc123") + } // WithTelemetrySDK must contribute the SDK identity attributes. if _, ok := attrValue(t, res, semconv.TelemetrySDKNameKey); !ok { t.Error("telemetry.sdk.name is absent, WithTelemetrySDK not applied") diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 678c3185be5..4d92f4b4ed7 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -102,6 +102,9 @@ type Options struct { // Environment is reported as the OTel deployment.environment resource // attribute (e.g. "mainnet", "testnet"). When empty the attribute is omitted. Environment string + // InstanceID is reported as the OTel service.instance.id resource attribute + // (the node's overlay address). When empty the attribute is omitted. + InstanceID string // Insecure disables TLS for the OTLP exporter (useful for a local collector). Insecure bool // CAFile is an optional path to a PEM-encoded CA bundle used to verify @@ -156,20 +159,16 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { tp := sdktrace.NewTracerProvider( sdktrace.WithResource(res), sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(ratio))), - // The batch span processor keeps the SDK defaults (queue size 2048, - // 5s schedule). Operators can tune them via the standard OTEL_BSP_* - // environment variables without a bee flag or rebuild. + // Batch processor keeps SDK defaults; tunable via OTEL_BSP_* env vars. sdktrace.WithBatcher(exporter), ) return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil } -// newResource builds the OTel resource describing this node. WithFromEnv, -// WithTelemetrySDK and WithHost are applied before WithAttributes so the -// explicitly configured service name/version/environment win over any colliding -// values from OTEL_RESOURCE_ATTRIBUTES/OTEL_SERVICE_NAME, while operators can -// still enrich the resource (e.g. cluster, region) via the environment. WithHost -// adds host.name so spans are attributable to a node in multi-node backends. +// newResource builds the OTel resource describing this node. The env options +// come before WithAttributes so the configured values win over colliding +// OTEL_SERVICE_NAME/OTEL_RESOURCE_ATTRIBUTES, while the environment can still +// add attributes (e.g. cluster, region). func newResource(o *Options) (*resource.Resource, error) { attrs := []attribute.KeyValue{semconv.ServiceName(o.ServiceName)} if o.ServiceVersion != "" { @@ -178,6 +177,9 @@ func newResource(o *Options) (*resource.Resource, error) { if o.Environment != "" { attrs = append(attrs, semconv.DeploymentEnvironment(o.Environment)) } + if o.InstanceID != "" { + attrs = append(attrs, semconv.ServiceInstanceID(o.InstanceID)) + } return resource.New(context.Background(), resource.WithFromEnv(), From 7c1326f1e4855fbb35cef2a50e9b8df42dd8a593 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Sat, 13 Jun 2026 12:27:35 +0300 Subject: [PATCH 3/8] feat(tracing): rename tracing-otlp-* flags to tracing-* and log on enable --- cmd/bee/cmd/cmd.go | 60 ++++++++++++++++--------------- cmd/bee/cmd/start.go | 8 ++--- packaging/bee.yaml | 12 +++---- packaging/homebrew-amd64/bee.yaml | 12 +++---- packaging/homebrew-arm64/bee.yaml | 12 +++---- packaging/scoop/bee.yaml | 12 +++---- pkg/node/node.go | 3 ++ pkg/tracing/tracing.go | 2 +- 8 files changed, 63 insertions(+), 58 deletions(-) diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index 0fe00baa922..212fcc41483 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -40,10 +40,10 @@ const ( optionWelcomeMessage = "welcome-message" optionCORSAllowedOrigins = "cors-allowed-origins" optionNameTracingEnabled = "tracing-enable" - optionNameTracingOTLPEndpoint = "tracing-otlp-endpoint" - optionNameTracingOTLPInsecure = "tracing-otlp-insecure" - optionNameTracingOTLPCAFile = "tracing-otlp-ca-file" - optionNameTracingOTLPProtocol = "tracing-otlp-protocol" + optionNameTracingEndpoint = "tracing-endpoint" + optionNameTracingInsecure = "tracing-insecure" + optionNameTracingCAFile = "tracing-ca-file" + optionNameTracingProtocol = "tracing-protocol" optionNameTracingSamplingRatio = "tracing-sampling-ratio" optionNameTracingServiceName = "tracing-service-name" optionNameVerbosity = "verbosity" @@ -108,10 +108,10 @@ const ( // tracing configKeyTracingEnabled = "tracing.enable" - configKeyTracingOTLPEndpoint = "tracing.otlp-endpoint" - configKeyTracingOTLPInsecure = "tracing.otlp-insecure" - configKeyTracingOTLPCAFile = "tracing.otlp-ca-file" - configKeyTracingOTLPProtocol = "tracing.otlp-protocol" + configKeyTracingEndpoint = "tracing.endpoint" + configKeyTracingInsecure = "tracing.insecure" + configKeyTracingCAFile = "tracing.ca-file" + configKeyTracingProtocol = "tracing.protocol" configKeyTracingSamplingRatio = "tracing.sampling-ratio" configKeyTracingServiceName = "tracing.service-name" ) @@ -126,10 +126,10 @@ var blockchainRpcConfigPairs = []struct{ flat, dotted string }{ var tracingConfigPairs = []struct{ flat, dotted string }{ {optionNameTracingEnabled, configKeyTracingEnabled}, - {optionNameTracingOTLPEndpoint, configKeyTracingOTLPEndpoint}, - {optionNameTracingOTLPInsecure, configKeyTracingOTLPInsecure}, - {optionNameTracingOTLPCAFile, configKeyTracingOTLPCAFile}, - {optionNameTracingOTLPProtocol, configKeyTracingOTLPProtocol}, + {optionNameTracingEndpoint, configKeyTracingEndpoint}, + {optionNameTracingInsecure, configKeyTracingInsecure}, + {optionNameTracingCAFile, configKeyTracingCAFile}, + {optionNameTracingProtocol, configKeyTracingProtocol}, {optionNameTracingSamplingRatio, configKeyTracingSamplingRatio}, {optionNameTracingServiceName, configKeyTracingServiceName}, } @@ -137,17 +137,19 @@ var tracingConfigPairs = []struct{ flat, dotted string }{ // Deprecated tracing options, removed in the OpenTelemetry migration. They are // kept only as hidden no-op flags so that existing configs do not break node // startup; their values are ignored (see deprecatedTracingKeys). +// +// Note: tracing-endpoint is intentionally NOT here — it is reused as the active +// OTLP endpoint flag (see optionNameTracingEndpoint). Its meaning changed from a +// Jaeger agent address to an OTLP collector endpoint. const ( - optionNameTracingEndpoint = "tracing-endpoint" - optionNameTracingHost = "tracing-host" - optionNameTracingPort = "tracing-port" + optionNameTracingHost = "tracing-host" + optionNameTracingPort = "tracing-port" ) // deprecatedTracingKeys are the pre-OpenTelemetry tracing options. They are // registered as hidden no-op flags so stale configs still start the node, but -// their values are ignored; operators should migrate to the tracing-otlp-* keys. +// their values are ignored; operators should migrate to the tracing-* keys. var deprecatedTracingKeys = []string{ - optionNameTracingEndpoint, optionNameTracingHost, optionNameTracingPort, } @@ -326,17 +328,17 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().Uint64(optionNameNetworkID, chaincfg.Mainnet.NetworkID, "ID of the Swarm network") cmd.Flags().StringSlice(optionCORSAllowedOrigins, []string{}, "origins with CORS headers enabled") cmd.Flags().Bool(optionNameTracingEnabled, false, "enable tracing") - cmd.Flags().String(optionNameTracingOTLPEndpoint, "127.0.0.1:4318", "OTLP endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc") - cmd.Flags().Bool(optionNameTracingOTLPInsecure, false, "disable TLS for the OTLP exporter (useful for a local collector); when false, set --tracing-otlp-ca-file to verify the collector certificate against a private CA") - cmd.Flags().String(optionNameTracingOTLPCAFile, "", "path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when --tracing-otlp-insecure=true") - cmd.Flags().String(optionNameTracingOTLPProtocol, "http", "OTLP exporter transport: http or grpc") + cmd.Flags().String(optionNameTracingEndpoint, "127.0.0.1:4318", "OTLP collector endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc") + cmd.Flags().Bool(optionNameTracingInsecure, false, "disable TLS for the OTLP exporter (useful for a local collector); when false, set --tracing-ca-file to verify the collector certificate against a private CA") + cmd.Flags().String(optionNameTracingCAFile, "", "path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when --tracing-insecure=true") + cmd.Flags().String(optionNameTracingProtocol, "http", "OTLP exporter transport: http or grpc") cmd.Flags().Float64(optionNameTracingSamplingRatio, 1.0, "head-based sampling ratio in [0,1]; 0 samples nothing, 1 samples everything") cmd.Flags().String(optionNameTracingServiceName, "bee", "service name identifier for tracing") // Deprecated, no-op tracing flags kept for backward compatibility so that // existing configs do not break node startup. Their values are ignored. for _, name := range deprecatedTracingKeys { - cmd.Flags().String(name, "", "deprecated and ignored; use the tracing-otlp-* options") - _ = cmd.Flags().MarkDeprecated(name, "no longer used after the OpenTelemetry migration; use the tracing-otlp-* options") + cmd.Flags().String(name, "", "deprecated and ignored; use the tracing-* options") + _ = cmd.Flags().MarkDeprecated(name, "no longer used after the OpenTelemetry migration; use the tracing-* options") } cmd.Flags().String(optionNameVerbosity, "info", "log verbosity level 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=trace") cmd.Flags().String(optionWelcomeMessage, "", "send a welcome message string during handshakes") @@ -423,8 +425,8 @@ func (c *command) bindBlockchainRpcConfig(cmd *cobra.Command) { c.bindNestedConfig(cmd, blockchainRpcConfigPairs) } -// bindTracingConfig supports both flat (tracing-otlp-endpoint) and nested -// (tracing.otlp-endpoint) YAML forms, with nested taking precedence. +// bindTracingConfig supports both flat (tracing-endpoint) and nested +// (tracing.endpoint) YAML forms, with nested taking precedence. func (c *command) bindTracingConfig(cmd *cobra.Command) { c.bindNestedConfig(cmd, tracingConfigPairs) } @@ -436,7 +438,7 @@ func (c *command) bindTracingConfig(cmd *cobra.Command) { func (c *command) warnDeprecatedTracingKeys() { for _, key := range deprecatedTracingKeys { if c.config.InConfig(key) { - c.logger.Warning("deprecated tracing config key is ignored; use the tracing-otlp-* options", "key", key) + c.logger.Warning("deprecated tracing config key is ignored; use the tracing-* options", "key", key) } } } @@ -445,9 +447,9 @@ func (c *command) warnDeprecatedTracingKeys() { // CA bundle, in which case the OTLP exporter falls back to the system root CAs. func (c *command) warnTracingTLSWithoutCA() { if c.config.GetBool(configKeyTracingEnabled) && - !c.config.GetBool(configKeyTracingOTLPInsecure) && - c.config.GetString(configKeyTracingOTLPCAFile) == "" { - c.logger.Warning("tracing: TLS is enabled but no CA bundle is configured; the OTLP exporter will rely on the system root CAs. Provide --tracing-otlp-ca-file, set --tracing-otlp-insecure=true for a plaintext local collector, or disable tracing.") + !c.config.GetBool(configKeyTracingInsecure) && + c.config.GetString(configKeyTracingCAFile) == "" { + c.logger.Warning("tracing: TLS is enabled but no CA bundle is configured; the OTLP exporter will rely on the system root CAs. Provide --tracing-ca-file, set --tracing-insecure=true for a plaintext local collector, or disable tracing.") } } diff --git a/cmd/bee/cmd/start.go b/cmd/bee/cmd/start.go index 4abcd178341..69ef9120835 100644 --- a/cmd/bee/cmd/start.go +++ b/cmd/bee/cmd/start.go @@ -343,10 +343,10 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo SwapInitialDeposit: c.config.GetString(optionNameSwapInitialDeposit), TargetNeighborhood: c.config.GetString(optionNameTargetNeighborhood), TracingEnabled: c.config.GetBool(configKeyTracingEnabled), - TracingEndpoint: c.config.GetString(configKeyTracingOTLPEndpoint), - TracingInsecure: c.config.GetBool(configKeyTracingOTLPInsecure), - TracingCAFile: c.config.GetString(configKeyTracingOTLPCAFile), - TracingProtocol: c.config.GetString(configKeyTracingOTLPProtocol), + TracingEndpoint: c.config.GetString(configKeyTracingEndpoint), + TracingInsecure: c.config.GetBool(configKeyTracingInsecure), + TracingCAFile: c.config.GetString(configKeyTracingCAFile), + TracingProtocol: c.config.GetString(configKeyTracingProtocol), TracingSamplingRatio: c.config.GetFloat64(configKeyTracingSamplingRatio), TracingServiceName: c.config.GetString(configKeyTracingServiceName), TrxDebugMode: c.config.GetBool(optionNameTransactionDebugMode), diff --git a/packaging/bee.yaml b/packaging/bee.yaml index 27464575cad..69e70e37371 100644 --- a/packaging/bee.yaml +++ b/packaging/bee.yaml @@ -111,14 +111,14 @@ password-file: "/var/lib/bee/password" # tracing: # ## enable tracing # enable: false -# ## OTLP endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc -# otlp-endpoint: 127.0.0.1:4318 +# ## OTLP collector endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc +# endpoint: 127.0.0.1:4318 # ## disable TLS for the OTLP exporter (useful for a local collector) -# otlp-insecure: false -# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when otlp-insecure is true -# otlp-ca-file: "" +# insecure: false +# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when insecure is true +# ca-file: "" # ## OTLP exporter transport: http or grpc -# otlp-protocol: http +# protocol: http # ## head-based sampling ratio in [0,1]; 0 samples nothing, 1 samples everything # sampling-ratio: 1.0 # ## service name identifier for tracing diff --git a/packaging/homebrew-amd64/bee.yaml b/packaging/homebrew-amd64/bee.yaml index b08681d8af4..9230233ec99 100644 --- a/packaging/homebrew-amd64/bee.yaml +++ b/packaging/homebrew-amd64/bee.yaml @@ -111,14 +111,14 @@ password-file: "/usr/local/var/lib/swarm-bee/password" # tracing: # ## enable tracing # enable: false -# ## OTLP endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc -# otlp-endpoint: 127.0.0.1:4318 +# ## OTLP collector endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc +# endpoint: 127.0.0.1:4318 # ## disable TLS for the OTLP exporter (useful for a local collector) -# otlp-insecure: false -# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when otlp-insecure is true -# otlp-ca-file: "" +# insecure: false +# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when insecure is true +# ca-file: "" # ## OTLP exporter transport: http or grpc -# otlp-protocol: http +# protocol: http # ## head-based sampling ratio in [0,1]; 0 samples nothing, 1 samples everything # sampling-ratio: 1.0 # ## service name identifier for tracing diff --git a/packaging/homebrew-arm64/bee.yaml b/packaging/homebrew-arm64/bee.yaml index 8d796846ec9..721ffc0ab3a 100644 --- a/packaging/homebrew-arm64/bee.yaml +++ b/packaging/homebrew-arm64/bee.yaml @@ -111,14 +111,14 @@ password-file: "/opt/homebrew/var/lib/swarm-bee/password" # tracing: # ## enable tracing # enable: false -# ## OTLP endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc -# otlp-endpoint: 127.0.0.1:4318 +# ## OTLP collector endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc +# endpoint: 127.0.0.1:4318 # ## disable TLS for the OTLP exporter (useful for a local collector) -# otlp-insecure: false -# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when otlp-insecure is true -# otlp-ca-file: "" +# insecure: false +# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when insecure is true +# ca-file: "" # ## OTLP exporter transport: http or grpc -# otlp-protocol: http +# protocol: http # ## head-based sampling ratio in [0,1]; 0 samples nothing, 1 samples everything # sampling-ratio: 1.0 # ## service name identifier for tracing diff --git a/packaging/scoop/bee.yaml b/packaging/scoop/bee.yaml index 4029de2ad0d..88f8f0e4bf0 100644 --- a/packaging/scoop/bee.yaml +++ b/packaging/scoop/bee.yaml @@ -111,14 +111,14 @@ password-file: "./password" # tracing: # ## enable tracing # enable: false -# ## OTLP endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc -# otlp-endpoint: 127.0.0.1:4318 +# ## OTLP collector endpoint to send tracing data (host:port); default port is 4318 for http, 4317 for grpc +# endpoint: 127.0.0.1:4318 # ## disable TLS for the OTLP exporter (useful for a local collector) -# otlp-insecure: false -# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when otlp-insecure is true -# otlp-ca-file: "" +# insecure: false +# ## path to a PEM-encoded CA bundle used to verify the OTLP collector certificate; ignored when insecure is true +# ca-file: "" # ## OTLP exporter transport: http or grpc -# otlp-protocol: http +# protocol: http # ## head-based sampling ratio in [0,1]; 0 samples nothing, 1 samples everything # sampling-ratio: 1.0 # ## service name identifier for tracing diff --git a/pkg/node/node.go b/pkg/node/node.go index 596ea0501fa..10ebe368c85 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -410,6 +410,9 @@ func NewBee( return nil, fmt.Errorf("tracer: %w", err) } b.tracerCloser = tracerCloser + if o.TracingEnabled { + logger.Info("tracing enabled", "endpoint", o.TracingEndpoint, "protocol", o.TracingProtocol, "sampling_ratio", o.TracingSamplingRatio) + } var ( chequebookService chequebook.Service = new(noOpChequebookService) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 4d92f4b4ed7..5452eac0dd1 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -132,7 +132,7 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { } if o.Endpoint == "" { - return nil, nil, errors.New("tracing-otlp-endpoint is required when tracing is enabled") + return nil, nil, errors.New("tracing-endpoint is required when tracing is enabled") } res, err := newResource(o) From 0b51f2abd2c540fb71f16de5505d97382511288a Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Sat, 13 Jun 2026 13:08:24 +0300 Subject: [PATCH 4/8] feat(tracing): propagate baggage and add postage/kademlia/salud spans --- pkg/api/api.go | 3 + pkg/node/node.go | 8 +- pkg/p2p/p2p.go | 4 + pkg/postage/batchservice/batchservice.go | 10 +- pkg/postage/batchservice/batchservice_test.go | 14 +-- pkg/salud/salud.go | 9 +- pkg/salud/salud_test.go | 10 +- pkg/topology/kademlia/kademlia.go | 9 ++ pkg/topology/kademlia/kademlia_test.go | 4 +- pkg/tracing/baggage_test.go | 106 ++++++++++++++++++ pkg/tracing/tracing.go | 54 +++++++-- 11 files changed, 200 insertions(+), 31 deletions(-) create mode 100644 pkg/tracing/baggage_test.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 83be07ecaeb..18086405020 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -772,6 +772,9 @@ func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error if err != nil { return err } + // Attach the batch id as baggage so it follows the chunk across hops (e.g. + // direct upload -> pushsync). Best effort: a baggage error must not fail the put. + ctx, _ = tracing.WithBaggageMember(ctx, "batch_id", hex.EncodeToString(stamp.BatchID())) return p.PutterSession.Put(ctx, chunk.WithStamp(stamp)) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 10ebe368c85..9820ae6ade5 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -815,7 +815,7 @@ func NewBee( eventListener = listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) b.listenerCloser = eventListener - batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) + batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync, tracer) if err != nil { return nil, fmt.Errorf("init batch service: %w", err) } @@ -844,7 +844,7 @@ func NewBee( var swapService *swap.Service - kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, detector, logger, + kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, detector, logger, tracer, kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode, StaticNodes: o.StaticNodes, DataDir: o.DataDir}) if err != nil { return nil, fmt.Errorf("unable to create kademlia: %w", err) @@ -925,7 +925,7 @@ func NewBee( snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) - snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) + snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync, tracer) if err != nil { logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") } else { @@ -1133,7 +1133,7 @@ func NewBee( return nil, fmt.Errorf("status service: %w", err) } - saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultDurPercentile, salud.DefaultConnsPercentile) + saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultDurPercentile, salud.DefaultConnsPercentile, tracer) b.saludCloser = saludService rC, unsub := saludService.SubscribeNetworkStorageRadius() diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index 6fac584e81b..d00424f4402 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -226,6 +226,10 @@ const ( // from the legacy OpenTracing/Jaeger payload, so the distinct key prevents // mixed-version peers from decoding each other's incompatible payloads. HeaderNameTracingSpanContext = "tracing-span-context-v2" + // HeaderNameTracingBaggage carries W3C baggage (a string of key=value pairs) + // alongside the span context. It is additive: peers that do not understand it + // simply ignore the header, so it never breaks the stream. + HeaderNameTracingBaggage = "tracing-baggage" ) // NewSwarmStreamName constructs a libp2p compatible stream name out of diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index eb99d458421..9682c1db157 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -17,6 +17,8 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/tracing" + "go.opentelemetry.io/otel/attribute" "golang.org/x/crypto/sha3" ) @@ -37,6 +39,7 @@ type batchService struct { listener postage.Listener owner []byte batchListener postage.BatchEventListener + tracer *tracing.Tracer checksum hash.Hash // checksum hasher resync bool @@ -56,6 +59,7 @@ func New( batchListener postage.BatchEventListener, checksumFunc func() hash.Hash, resync bool, + tracer *tracing.Tracer, ) (Interface, error) { if checksumFunc == nil { checksumFunc = sha3.New256 @@ -95,12 +99,16 @@ func New( } } - return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, sum, resync}, nil + return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, tracer, sum, resync}, nil } // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchedStore. func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error { + span, _, _ := svc.tracer.StartSpanFromContext(context.Background(), "postage-batch-create", svc.logger) + span.SetAttributes(attribute.String("batch_id", hex.EncodeToString(id))) + defer span.End() + // don't add batches which have value which equals total cumulative // payout or that are going to expire already within the next couple of blocks val := big.NewInt(0).Add(svc.storer.GetChainState().TotalAmount, svc.storer.GetChainState().CurrentPrice) diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index 57a7e5aef82..a9d7c927181 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -525,7 +525,7 @@ func TestTransactionOk(t *testing.T) { t.Fatal(err) } - svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false) + svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false, nil) if err != nil { t.Fatal(err) } @@ -550,7 +550,7 @@ func TestTransactionError(t *testing.T) { t.Fatal(err) } - svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false) + svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false, nil) if err != nil { t.Fatal(err) } @@ -569,7 +569,7 @@ func TestChecksum(t *testing.T) { s := mocks.NewStateStore() store := mock.New() mockHash := &hs{} - svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, false) + svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, false, nil) if err != nil { t.Fatal(err) } @@ -592,7 +592,7 @@ func TestChecksumResync(t *testing.T) { s := mocks.NewStateStore() store := mock.New() mockHash := &hs{} - svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, true) + svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, true, nil) if err != nil { t.Fatal(err) } @@ -611,7 +611,7 @@ func TestChecksumResync(t *testing.T) { // now start a new instance and check that the value gets read from statestore store2 := mock.New() mockHash2 := &hs{} - _, err = batchservice.New(s, store2, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash2 }, false) + _, err = batchservice.New(s, store2, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash2 }, false, nil) if err != nil { t.Fatal(err) } @@ -623,7 +623,7 @@ func TestChecksumResync(t *testing.T) { // when resyncing store3 := mock.New() mockHash3 := &hs{} - _, err = batchservice.New(s, store3, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash3 }, true) + _, err = batchservice.New(s, store3, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash3 }, true, nil) if err != nil { t.Fatal(err) } @@ -641,7 +641,7 @@ func newTestStoreAndServiceWithListener( t.Helper() s := mocks.NewStateStore() store := mock.New(opts...) - svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil, false) + svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil, false, nil) if err != nil { t.Fatal(err) } diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 4a3bebf76bf..b849d15b0aa 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" + "github.com/ethersphere/bee/v2/pkg/tracing" "go.uber.org/atomic" ) @@ -51,6 +52,7 @@ type service struct { metrics metrics isSelfHealthy *atomic.Bool reserve storer.RadiusChecker + tracer *tracing.Tracer radiusSubsMtx sync.Mutex radiusC []chan uint8 @@ -65,6 +67,7 @@ func New( mode string, durPercentile float64, connsPercentile float64, + tracer *tracing.Tracer, ) *service { metrics := newMetrics() @@ -76,6 +79,7 @@ func New( metrics: metrics, isSelfHealthy: atomic.NewBool(true), reserve: reserve, + tracer: tracer, } s.wg.Add(1) @@ -133,6 +137,9 @@ type peer struct { // per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond // the allowed thresholds. func (s *service) salud(mode string, durPercentile float64, connsPercentile float64) { + span, _, spanCtx := s.tracer.StartSpanFromContext(context.Background(), "salud-round", s.logger) + defer span.End() + var ( mtx sync.Mutex wg sync.WaitGroup @@ -144,7 +151,7 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa err := s.topology.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { wg.Go(func() { - ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + ctx, cancel := context.WithTimeout(spanCtx, requestTimeout) defer cancel() start := time.Now() diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index 4123702263e..7cea0d44356 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -72,7 +72,7 @@ func TestSalud(t *testing.T) { mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -119,7 +119,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { mockstorer.WithCapacityDoubling(0), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -157,7 +157,7 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) { mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -187,7 +187,7 @@ func TestSubToRadius(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) c, unsub := service.SubscribeNetworkStorageRadius() t.Cleanup(unsub) @@ -220,7 +220,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 57c4c41a950..09f8704bd03 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -27,8 +27,10 @@ import ( im "github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/metrics" "github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/waitnext" "github.com/ethersphere/bee/v2/pkg/topology/pslice" + "github.com/ethersphere/bee/v2/pkg/tracing" "github.com/ethersphere/bee/v2/pkg/util/ioutil" ma "github.com/multiformats/go-multiaddr" + "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" ) @@ -202,6 +204,7 @@ type Kad struct { bgBroadcastCancel context.CancelFunc reachability p2p.ReachabilityStatus detector *stabilization.Detector + tracer *tracing.Tracer } // New returns a new Kademlia. @@ -212,6 +215,7 @@ func New( p2pSvc p2p.Service, detector *stabilization.Detector, logger log.Logger, + tracer *tracing.Tracer, o Options, ) (*Kad, error) { var k *Kad @@ -253,6 +257,7 @@ func New( staticPeer: isStaticPeer(opt.StaticNodes), storageRadius: swarm.MaxPO, detector: detector, + tracer: tracer, } if k.opt.PruneFunc == nil { @@ -967,6 +972,10 @@ func (k *Kad) recalcDepth() { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma []ma.Multiaddr) error { k.logger.Debug("attempting connect to peer", "peer_address", peer) + span, _, ctx := k.tracer.StartSpanFromContext(ctx, "kademlia-connect", k.logger) + span.SetAttributes(attribute.String("peer_address", peer.String())) + defer span.End() + ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) defer cancel() diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 1da2353bac5..c6fe9d87daa 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -965,7 +965,7 @@ func TestClosestPeer(t *testing.T) { t.Fatal(err) } - kad, err := kademlia.New(base, ab, disc, p2pMock(t, ab, nil, nil, nil), detector, logger, kademlia.Options{}) + kad, err := kademlia.New(base, ab, disc, p2pMock(t, ab, nil, nil, nil), detector, logger, nil, kademlia.Options{}) if err != nil { t.Fatal(err) } @@ -2087,7 +2087,7 @@ func newTestKademliaWithAddrDiscovery( p2p = p2pMock(t, ab, signer, connCounter, failedConnCounter) // p2p mock logger = log.Noop // logger ) - kad, err := kademlia.New(base, ab, disc, p2p, detector, logger, kadOpts) + kad, err := kademlia.New(base, ab, disc, p2p, detector, logger, nil, kadOpts) if err != nil { t.Fatal(err) } diff --git a/pkg/tracing/baggage_test.go b/pkg/tracing/baggage_test.go new file mode 100644 index 00000000000..640de831b72 --- /dev/null +++ b/pkg/tracing/baggage_test.go @@ -0,0 +1,106 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package tracing_test + +import ( + "context" + "net/http" + "testing" + + "github.com/ethersphere/bee/v2/pkg/p2p" + "github.com/ethersphere/bee/v2/pkg/tracing" + "go.opentelemetry.io/otel/baggage" +) + +// baggageValue returns the value of the baggage member with the given key in +// ctx, or ("", false) when absent. +func baggageValue(ctx context.Context, key string) (string, bool) { + m := baggage.FromContext(ctx).Member(key) + if m.Key() == "" { + return "", false + } + return m.Value(), true +} + +func TestBaggageRoundTripP2P(t *testing.T) { + t.Parallel() + + tracer := newTracer(t) + + span, _, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation", nil) + defer span.End() + + ctx, err := tracing.WithBaggageMember(ctx, "batch_id", "deadbeef") + if err != nil { + t.Fatal(err) + } + + headers := make(p2p.Headers) + if err := tracer.AddContextHeader(ctx, headers); err != nil { + t.Fatal(err) + } + if headers[p2p.HeaderNameTracingBaggage] == nil { + t.Fatal("baggage header was not set") + } + + got, err := tracer.WithContextFromHeaders(context.Background(), headers) + if err != nil { + t.Fatal(err) + } + if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { + t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") + } +} + +func TestBaggageRoundTripHTTP(t *testing.T) { + t.Parallel() + + tracer := newTracer(t) + + span, _, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation", nil) + defer span.End() + + ctx, err := tracing.WithBaggageMember(ctx, "batch_id", "deadbeef") + if err != nil { + t.Fatal(err) + } + + headers := make(http.Header) + if err := tracer.AddContextHTTPHeader(ctx, headers); err != nil { + t.Fatal(err) + } + + got, err := tracer.WithContextFromHTTPHeaders(context.Background(), headers) + if err != nil { + t.Fatal(err) + } + if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { + t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") + } +} + +// TestBaggageOnlyWithoutSpanContext verifies that baggage is applied to the +// returned context even when no span context is present in the p2p headers. +func TestBaggageOnlyWithoutSpanContext(t *testing.T) { + t.Parallel() + + tracer := newTracer(t) + + bag, err := baggage.Parse("batch_id=deadbeef") + if err != nil { + t.Fatal(err) + } + headers := p2p.Headers{ + p2p.HeaderNameTracingBaggage: []byte(bag.String()), + } + + got, err := tracer.WithContextFromHeaders(context.Background(), headers) + if err == nil { + t.Fatal("expected ErrContextNotFound when no span context is present") + } + if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { + t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") + } +} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 5452eac0dd1..8aef4406ac5 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -21,6 +21,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -67,9 +68,12 @@ const ( // always operate against a working trace.Tracer. var noopTracer = &Tracer{tracer: noop.NewTracerProvider().Tracer(instrumentationName)} -// httpPropagator carries trace context across HTTP via the W3C TraceContext -// standard headers (traceparent, tracestate). -var httpPropagator propagation.TextMapPropagator = propagation.TraceContext{} +// httpPropagator carries trace context and baggage across HTTP via the W3C +// TraceContext (traceparent, tracestate) and Baggage (baggage) standard headers. +var httpPropagator propagation.TextMapPropagator = propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, +) // Tracer wraps an OTel Tracer and provides p2p/HTTP carriers plus helpers // aligned with bee's tracing API. @@ -272,6 +276,9 @@ func (t *Tracer) AddContextHeader(ctx context.Context, headers p2p.Headers) erro } headers[p2p.HeaderNameTracingSpanContext] = encodeP2PSpanContext(sc) + if bag := baggage.FromContext(ctx); bag.Len() > 0 { + headers[p2p.HeaderNameTracingBaggage] = []byte(bag.String()) + } return nil } @@ -291,9 +298,16 @@ func (t *Tracer) FromHeaders(headers p2p.Headers) (trace.SpanContext, error) { return sc, nil } -// WithContextFromHeaders extracts a span context from the p2p header and -// returns a new context carrying it. Safe to call on a nil receiver. +// WithContextFromHeaders extracts a span context and any baggage from the p2p +// headers and returns a new context carrying them. Baggage is applied even when +// no span context is present. Safe to call on a nil receiver. func (t *Tracer) WithContextFromHeaders(ctx context.Context, headers p2p.Headers) (context.Context, error) { + if v := headers[p2p.HeaderNameTracingBaggage]; v != nil { + if bag, err := baggage.Parse(string(v)); err == nil { + ctx = baggage.ContextWithBaggage(ctx, bag) + } + } + sc, err := t.FromHeaders(headers) if err != nil { return ctx, err @@ -324,14 +338,15 @@ func (t *Tracer) FromHTTPHeaders(headers http.Header) (trace.SpanContext, error) return sc, nil } -// WithContextFromHTTPHeaders extracts a span context from HTTP headers and -// returns a new context carrying it. Safe to call on a nil receiver. +// WithContextFromHTTPHeaders extracts a span context and any baggage from HTTP +// headers and returns a new context carrying them. Baggage is applied even when +// no span context is present. Safe to call on a nil receiver. func (t *Tracer) WithContextFromHTTPHeaders(ctx context.Context, headers http.Header) (context.Context, error) { - sc, err := t.FromHTTPHeaders(headers) - if err != nil { - return ctx, err + ctx = httpPropagator.Extract(ctx, propagation.HeaderCarrier(headers)) + if sc := trace.SpanContextFromContext(ctx); !sc.IsValid() { + return ctx, ErrContextNotFound } - return WithContext(ctx, sc), nil + return ctx, nil } // WithContext stores a span context in ctx using the standard OTel context @@ -346,6 +361,23 @@ func FromContext(ctx context.Context) trace.SpanContext { return trace.SpanContextFromContext(ctx) } +// WithBaggageMember returns a context carrying an additional baggage member +// (key=value) on top of any baggage already present. Baggage propagates across +// HTTP and p2p hops alongside the trace, letting a value such as a batch id +// follow a request end to end. The original context is returned unchanged if the +// key or value is not a valid baggage member. +func WithBaggageMember(ctx context.Context, key, value string) (context.Context, error) { + member, err := baggage.NewMember(key, value) + if err != nil { + return ctx, err + } + bag, err := baggage.FromContext(ctx).SetMember(member) + if err != nil { + return ctx, err + } + return baggage.ContextWithBaggage(ctx, bag), nil +} + // RecordError attaches an error event to the span, marks the span status as // Error, and records the supplied attributes alongside the error event. It is // the OTel equivalent of the OpenTracing ext.LogError pattern bee used previously. From 07dbef61a35d4663917984ff0fb020e4d9c28715 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Sat, 13 Jun 2026 13:37:49 +0300 Subject: [PATCH 5/8] feat(tracing): log when tracing is enabled and surface exporter errors --- pkg/node/node.go | 4 +--- pkg/tracing/tracing.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 9820ae6ade5..8419cffd65d 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -405,14 +405,12 @@ func NewBee( ServiceVersion: bee.Version, Environment: tracingEnvironment(networkID), InstanceID: swarmAddress.String(), + Logger: logger, }) if err != nil { return nil, fmt.Errorf("tracer: %w", err) } b.tracerCloser = tracerCloser - if o.TracingEnabled { - logger.Info("tracing enabled", "endpoint", o.TracingEndpoint, "protocol", o.TracingProtocol, "sampling_ratio", o.TracingSamplingRatio) - } var ( chequebookService chequebook.Service = new(noOpChequebookService) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 8aef4406ac5..314072d7a20 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -20,6 +20,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" @@ -122,6 +123,10 @@ type Options struct { // Protocol selects the OTLP exporter transport: "http" or "grpc". Empty // defaults to "http". Protocol string + // Logger, when set, receives a confirmation line once tracing is wired up + // and OTLP exporter errors (e.g. an unreachable collector) via the OTel + // global error handler. Optional. + Logger log.Logger } // NewTracer creates a new Tracer and returns a closer that flushes pending @@ -166,6 +171,15 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { // Batch processor keeps SDK defaults; tunable via OTEL_BSP_* env vars. sdktrace.WithBatcher(exporter), ) + if o.Logger != nil { + // Route async OTLP export failures (e.g. an unreachable collector) to the + // node logger so they are visible instead of silently dropped. + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + o.Logger.Warning("tracing exporter error", "error", err) + })) + o.Logger.Info("tracing enabled", "endpoint", o.Endpoint, "protocol", o.Protocol, "sampling_ratio", ratio) + } + return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil } From ac56039fff9440a9149378929f5702fb2690ffe2 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 15 Jun 2026 11:18:43 +0300 Subject: [PATCH 6/8] docs(tracing): clarify root-span and untrusted-baggage intent --- pkg/postage/batchservice/batchservice.go | 3 +++ pkg/tracing/tracing.go | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 9682c1db157..53380d25f88 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -105,6 +105,9 @@ func New( // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchedStore. func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error { + // Batch creation is driven by on-chain postage events, not a request, so + // there is no caller context to parent under: this is intentionally a + // detached root span. span, _, _ := svc.tracer.StartSpanFromContext(context.Background(), "postage-batch-create", svc.logger) span.SetAttributes(attribute.String("batch_id", hex.EncodeToString(id))) defer span.End() diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 314072d7a20..d59fc99245f 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -316,6 +316,10 @@ func (t *Tracer) FromHeaders(headers p2p.Headers) (trace.SpanContext, error) { // headers and returns a new context carrying them. Baggage is applied even when // no span context is present. Safe to call on a nil receiver. func (t *Tracer) WithContextFromHeaders(ctx context.Context, headers p2p.Headers) (context.Context, error) { + // Baggage arrives from an arbitrary remote peer, so treat it as untrusted: + // undecodable payloads are ignored, and callers must only ever surface its + // values as span attributes. Never promote baggage to metric labels or + // unbounded log fields — arbitrary peers could blow up cardinality. if v := headers[p2p.HeaderNameTracingBaggage]; v != nil { if bag, err := baggage.Parse(string(v)); err == nil { ctx = baggage.ContextWithBaggage(ctx, bag) From 505e1cceb10e5124c2f86161ee8d84d4a9559aae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Tue, 16 Jun 2026 15:12:33 +0200 Subject: [PATCH 7/8] feat(api): annotate HTTP spans with status and semantic attributes (#5503) --- pkg/api/api.go | 22 +++++- pkg/api/api_test.go | 15 ++-- pkg/api/bytes.go | 4 +- pkg/api/bzz.go | 6 +- pkg/api/tracing_test.go | 153 +++++++++++++++++++++++++++++++++++++++ pkg/pushsync/pushsync.go | 2 +- pkg/tracing/tracing.go | 7 ++ 7 files changed, 197 insertions(+), 12 deletions(-) create mode 100644 pkg/api/tracing_test.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 18086405020..7490f16399e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -63,6 +63,9 @@ import ( "github.com/gorilla/mux" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/semaphore" ) @@ -462,7 +465,7 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H // ignore } - span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger) + span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() err = s.tracer.AddContextHTTPHeader(ctx, r.Header) @@ -472,6 +475,23 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H } h.ServeHTTP(w, r.WithContext(ctx)) + + // The response status code is captured upstream by responseCodeMetricsHandler's + // *responseWriter, which exposes Status(). Read it to annotate the span with the + // standard HTTP server attributes so the request is queryable in Tempo/Grafana. + if sw, ok := w.(interface{ Status() int }); ok { + code := sw.Status() + span.SetAttributes( + semconv.HTTPRequestMethodKey.String(r.Method), + semconv.HTTPRoute(spanName), + semconv.HTTPResponseStatusCode(code), + ) + // OTel server-span convention: 5xx marks the span as Error; 4xx is a client + // error and stays Unset, still queryable via http.response.status_code. + if code >= 500 { + span.SetStatus(codes.Error, http.StatusText(code)) + } + } }) } } diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index babd816dd06..23782bcbc6b 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -90,6 +90,7 @@ func init() { type testServerOptions struct { Storer api.Storer + Tracer *tracing.Tracer StateStorer storage.StateStorer Resolver resolver.Interface Pss pss.Interface @@ -231,12 +232,16 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. s.SetSwarmAddress(&o.Overlay) s.SetProbe(o.Probe) - noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{ - Enabled: false, - }) - testutil.CleanupCloser(t, tracerCloser) + tracer := o.Tracer + if tracer == nil { + noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{ + Enabled: false, + }) + testutil.CleanupCloser(t, tracerCloser) + tracer = noOpTracer + } - s.Configure(signer, noOpTracer, api.Options{ + s.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, WsPingPeriod: o.WsPingPeriod, }, extraOpts, 1, erc20APIService) diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 0855c3fba54..c8e0b318037 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -30,7 +30,7 @@ type bytesPostResponse struct { // bytesUploadHandler handles upload of raw binary data of arbitrary length. func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { - span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bytes", s.logger.WithName("post_bytes").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bytes-post", s.logger.WithName("post_bytes").Build()) defer span.End() headers := struct { @@ -73,7 +73,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { tracing.RecordError(span, err, attribute.String("action", "tag.create")) return } - span.SetAttributes(attribute.Int64("tagID", int64(tag))) + span.SetAttributes(attribute.Int64("tag_id", int64(tag))) } defer s.observeUploadSpeed(w, r, time.Now(), "bytes", deferred) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 9f62e6f4895..141b3b55d24 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -65,7 +65,7 @@ func lookaheadBufferSize(size int64) int { } func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { - span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bzz-post", s.logger.WithName("post_bzz").Build()) defer span.End() headers := struct { @@ -107,7 +107,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { tracing.RecordError(span, err, attribute.String("action", "tag.create")) return } - span.SetAttributes(attribute.Int64("tagID", int64(tag))) + span.SetAttributes(attribute.Int64("tag_id", int64(tag))) } putter, err := s.newStamperPutter(ctx, putterOptions{ @@ -340,7 +340,7 @@ func (s *Service) fileUploadHandler( if tagID != 0 { w.Header().Set(SwarmTagHeader, fmt.Sprint(tagID)) - span.SetAttributes(attribute.Int64("tagID", int64(tagID))) + span.SetAttributes(attribute.Int64("tag_id", int64(tagID))) } w.Header().Set(ETagHeader, fmt.Sprintf("%q", reference.String())) w.Header().Set(AccessControlExposeHeaders, SwarmTagHeader) diff --git a/pkg/api/tracing_test.go b/pkg/api/tracing_test.go new file mode 100644 index 00000000000..f07fb5565c7 --- /dev/null +++ b/pkg/api/tracing_test.go @@ -0,0 +1,153 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api_test + +import ( + "bytes" + "context" + "net/http" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" + "github.com/ethersphere/bee/v2/pkg/log" + mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" + "github.com/ethersphere/bee/v2/pkg/spinlock" + mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/tracing" + "gitlab.com/nolash/go-mockbytes" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" +) + +// TestTracingHTTPSpan verifies that the HTTP tracing middleware annotates the +// span with the standard server attributes (method, route, status code), marks +// it as a server span, and maps a 5xx response to an Error status while a 2xx +// response leaves the status Unset. These are the fields used to query spans in +// Tempo/Grafana. +func TestTracingHTTPSpan(t *testing.T) { + t.Parallel() + + const resource = "/bytes" + + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + + storerMock := mockstorer.New() + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storerMock, + Tracer: tracing.NewTracerFromProvider(tp), + Logger: log.Noop, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + + // Upload some content so it can be downloaded with a 200 OK. + g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255) + content, err := g.SequentialBytes(swarm.ChunkSize * 2) + if err != nil { + t.Fatal(err) + } + + var res api.BytesPostResponse + jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(content)), + jsonhttptest.WithUnmarshalJSONResponse(&res), + ) + + // Successful download -> 200 OK. + jsonhttptest.Request(t, client, http.MethodGet, resource+"/"+res.Reference.String(), http.StatusOK, + jsonhttptest.WithExpectedResponse(content), + ) + + // Invalid address triggers an internal error -> 500. + jsonhttptest.Request(t, client, http.MethodGet, resource+"/abcd", http.StatusInternalServerError, + jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ + Message: "joiner failed", + Code: http.StatusInternalServerError, + }), + ) + + // span.End() runs in the middleware's deferred call, which can fire after the + // client has already received the response, so wait for both download spans. + var spans []sdktrace.ReadOnlySpan + if err := spinlock.Wait(time.Second, func() bool { + spans = endedSpansByName(sr, "bytes-download") + return len(spans) == 2 + }); err != nil { + t.Fatalf("expected 2 bytes-download spans, got %d", len(spans)) + } + + var ok200, err500 sdktrace.ReadOnlySpan + for _, s := range spans { + switch spanStatusCode(s) { + case http.StatusOK: + ok200 = s + case http.StatusInternalServerError: + err500 = s + } + } + if ok200 == nil || err500 == nil { + t.Fatalf("missing spans by status code: have ok=%t err=%t", ok200 != nil, err500 != nil) + } + + // Both spans carry the standard HTTP server attributes and are server-kind. + for _, s := range []sdktrace.ReadOnlySpan{ok200, err500} { + if s.SpanKind() != trace.SpanKindServer { + t.Errorf("span kind = %v, want server", s.SpanKind()) + } + if got := spanAttrString(s, semconv.HTTPRequestMethodKey); got != http.MethodGet { + t.Errorf("http.request.method = %q, want %q", got, http.MethodGet) + } + if got := spanAttrString(s, semconv.HTTPRouteKey); got != "bytes-download" { + t.Errorf("http.route = %q, want %q", got, "bytes-download") + } + } + + // A 5xx response marks the span as Error; a 2xx response stays Unset. + if got := err500.Status().Code; got != codes.Error { + t.Errorf("500 span status = %v, want %v", got, codes.Error) + } + if got := ok200.Status().Code; got != codes.Unset { + t.Errorf("200 span status = %v, want %v", got, codes.Unset) + } +} + +func endedSpansByName(sr *tracetest.SpanRecorder, name string) []sdktrace.ReadOnlySpan { + var out []sdktrace.ReadOnlySpan + for _, s := range sr.Ended() { + if s.Name() == name { + out = append(out, s) + } + } + return out +} + +func spanStatusCode(s sdktrace.ReadOnlySpan) int { + for _, a := range s.Attributes() { + if a.Key == semconv.HTTPResponseStatusCodeKey { + return int(a.Value.AsInt64()) + } + } + return 0 +} + +func spanAttrString(s sdktrace.ReadOnlySpan, key attribute.Key) string { + for _, a := range s.Attributes() { + if a.Key == key { + return a.Value.AsString() + } + } + return "" +} diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 59a5e842afe..f11dfd55f9c 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -209,7 +209,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, trace.WithAttributes( attribute.String("address", chunkAddress.String()), - attribute.Int64("tagID", int64(chunk.TagID())), + attribute.Int64("tag_id", int64(chunk.TagID())), attribute.String("sender_address", p.Address.String()), )) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index d59fc99245f..ccf0f596a27 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -183,6 +183,13 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil } +// NewTracerFromProvider wraps an existing OTel TracerProvider in a Tracer. It is +// primarily useful for tests that need a recording tracer (e.g. one backed by an +// in-memory span recorder) rather than the OTLP exporter pipeline NewTracer builds. +func NewTracerFromProvider(tp trace.TracerProvider) *Tracer { + return &Tracer{tracer: tp.Tracer(instrumentationName)} +} + // newResource builds the OTel resource describing this node. The env options // come before WithAttributes so the configured values win over colliding // OTEL_SERVICE_NAME/OTEL_RESOURCE_ATTRIBUTES, while the environment can still From 2b103abdeeb15734022bdcadeb83a2b24865b6f3 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 16 Jun 2026 16:38:32 +0300 Subject: [PATCH 8/8] revert(tracing): drop follow-up spans and baggage propagation --- pkg/api/api.go | 3 - pkg/node/node.go | 8 +- pkg/p2p/p2p.go | 4 - pkg/postage/batchservice/batchservice.go | 13 +-- pkg/postage/batchservice/batchservice_test.go | 14 +-- pkg/salud/salud.go | 9 +- pkg/salud/salud_test.go | 10 +- pkg/topology/kademlia/kademlia.go | 9 -- pkg/topology/kademlia/kademlia_test.go | 4 +- pkg/tracing/baggage_test.go | 106 ------------------ pkg/tracing/tracing.go | 58 ++-------- 11 files changed, 31 insertions(+), 207 deletions(-) delete mode 100644 pkg/tracing/baggage_test.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 7490f16399e..28586141e75 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -792,9 +792,6 @@ func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error if err != nil { return err } - // Attach the batch id as baggage so it follows the chunk across hops (e.g. - // direct upload -> pushsync). Best effort: a baggage error must not fail the put. - ctx, _ = tracing.WithBaggageMember(ctx, "batch_id", hex.EncodeToString(stamp.BatchID())) return p.PutterSession.Put(ctx, chunk.WithStamp(stamp)) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 8419cffd65d..6b14a2c4112 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -813,7 +813,7 @@ func NewBee( eventListener = listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) b.listenerCloser = eventListener - batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync, tracer) + batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) if err != nil { return nil, fmt.Errorf("init batch service: %w", err) } @@ -842,7 +842,7 @@ func NewBee( var swapService *swap.Service - kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, detector, logger, tracer, + kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, detector, logger, kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode, StaticNodes: o.StaticNodes, DataDir: o.DataDir}) if err != nil { return nil, fmt.Errorf("unable to create kademlia: %w", err) @@ -923,7 +923,7 @@ func NewBee( snapshotEventListener := listener.New(b.syncingStopped, logger, chainBackend, postageStampContractAddress, postageStampContractABI, o.BlockTime, postageSyncingStallingTimeout, postageSyncingBackoffTimeout) - snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync, tracer) + snapshotBatchSvc, err := batchservice.New(stateStore, batchStore, logger, snapshotEventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync) if err != nil { logger.Error(err, "failed to initialize batch service from snapshot, continuing outside snapshot block...") } else { @@ -1131,7 +1131,7 @@ func NewBee( return nil, fmt.Errorf("status service: %w", err) } - saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultDurPercentile, salud.DefaultConnsPercentile, tracer) + saludService := salud.New(nodeStatus, kad, localStore, logger, detector, api.FullMode.String(), salud.DefaultDurPercentile, salud.DefaultConnsPercentile) b.saludCloser = saludService rC, unsub := saludService.SubscribeNetworkStorageRadius() diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index d00424f4402..6fac584e81b 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -226,10 +226,6 @@ const ( // from the legacy OpenTracing/Jaeger payload, so the distinct key prevents // mixed-version peers from decoding each other's incompatible payloads. HeaderNameTracingSpanContext = "tracing-span-context-v2" - // HeaderNameTracingBaggage carries W3C baggage (a string of key=value pairs) - // alongside the span context. It is additive: peers that do not understand it - // simply ignore the header, so it never breaks the stream. - HeaderNameTracingBaggage = "tracing-baggage" ) // NewSwarmStreamName constructs a libp2p compatible stream name out of diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index 53380d25f88..eb99d458421 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -17,8 +17,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/tracing" - "go.opentelemetry.io/otel/attribute" "golang.org/x/crypto/sha3" ) @@ -39,7 +37,6 @@ type batchService struct { listener postage.Listener owner []byte batchListener postage.BatchEventListener - tracer *tracing.Tracer checksum hash.Hash // checksum hasher resync bool @@ -59,7 +56,6 @@ func New( batchListener postage.BatchEventListener, checksumFunc func() hash.Hash, resync bool, - tracer *tracing.Tracer, ) (Interface, error) { if checksumFunc == nil { checksumFunc = sha3.New256 @@ -99,19 +95,12 @@ func New( } } - return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, tracer, sum, resync}, nil + return &batchService{stateStore, storer, logger.WithName(loggerName).Register(), listener, owner, batchListener, sum, resync}, nil } // Create will create a new batch with the given ID, owner value and depth and // stores it in the BatchedStore. func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error { - // Batch creation is driven by on-chain postage events, not a request, so - // there is no caller context to parent under: this is intentionally a - // detached root span. - span, _, _ := svc.tracer.StartSpanFromContext(context.Background(), "postage-batch-create", svc.logger) - span.SetAttributes(attribute.String("batch_id", hex.EncodeToString(id))) - defer span.End() - // don't add batches which have value which equals total cumulative // payout or that are going to expire already within the next couple of blocks val := big.NewInt(0).Add(svc.storer.GetChainState().TotalAmount, svc.storer.GetChainState().CurrentPrice) diff --git a/pkg/postage/batchservice/batchservice_test.go b/pkg/postage/batchservice/batchservice_test.go index a9d7c927181..57a7e5aef82 100644 --- a/pkg/postage/batchservice/batchservice_test.go +++ b/pkg/postage/batchservice/batchservice_test.go @@ -525,7 +525,7 @@ func TestTransactionOk(t *testing.T) { t.Fatal(err) } - svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false, nil) + svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false) if err != nil { t.Fatal(err) } @@ -550,7 +550,7 @@ func TestTransactionError(t *testing.T) { t.Fatal(err) } - svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false, nil) + svc2, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, nil, false) if err != nil { t.Fatal(err) } @@ -569,7 +569,7 @@ func TestChecksum(t *testing.T) { s := mocks.NewStateStore() store := mock.New() mockHash := &hs{} - svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, false, nil) + svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, false) if err != nil { t.Fatal(err) } @@ -592,7 +592,7 @@ func TestChecksumResync(t *testing.T) { s := mocks.NewStateStore() store := mock.New() mockHash := &hs{} - svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, true, nil) + svc, err := batchservice.New(s, store, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash }, true) if err != nil { t.Fatal(err) } @@ -611,7 +611,7 @@ func TestChecksumResync(t *testing.T) { // now start a new instance and check that the value gets read from statestore store2 := mock.New() mockHash2 := &hs{} - _, err = batchservice.New(s, store2, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash2 }, false, nil) + _, err = batchservice.New(s, store2, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash2 }, false) if err != nil { t.Fatal(err) } @@ -623,7 +623,7 @@ func TestChecksumResync(t *testing.T) { // when resyncing store3 := mock.New() mockHash3 := &hs{} - _, err = batchservice.New(s, store3, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash3 }, true, nil) + _, err = batchservice.New(s, store3, testLog, newMockListener(), nil, nil, func() hash.Hash { return mockHash3 }, true) if err != nil { t.Fatal(err) } @@ -641,7 +641,7 @@ func newTestStoreAndServiceWithListener( t.Helper() s := mocks.NewStateStore() store := mock.New(opts...) - svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil, false, nil) + svc, err := batchservice.New(s, store, testLog, newMockListener(), owner, batchListener, nil, false) if err != nil { t.Fatal(err) } diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index b849d15b0aa..4a3bebf76bf 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -18,7 +18,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" - "github.com/ethersphere/bee/v2/pkg/tracing" "go.uber.org/atomic" ) @@ -52,7 +51,6 @@ type service struct { metrics metrics isSelfHealthy *atomic.Bool reserve storer.RadiusChecker - tracer *tracing.Tracer radiusSubsMtx sync.Mutex radiusC []chan uint8 @@ -67,7 +65,6 @@ func New( mode string, durPercentile float64, connsPercentile float64, - tracer *tracing.Tracer, ) *service { metrics := newMetrics() @@ -79,7 +76,6 @@ func New( metrics: metrics, isSelfHealthy: atomic.NewBool(true), reserve: reserve, - tracer: tracer, } s.wg.Add(1) @@ -137,9 +133,6 @@ type peer struct { // per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond // the allowed thresholds. func (s *service) salud(mode string, durPercentile float64, connsPercentile float64) { - span, _, spanCtx := s.tracer.StartSpanFromContext(context.Background(), "salud-round", s.logger) - defer span.End() - var ( mtx sync.Mutex wg sync.WaitGroup @@ -151,7 +144,7 @@ func (s *service) salud(mode string, durPercentile float64, connsPercentile floa err := s.topology.EachConnectedPeer(func(addr swarm.Address, bin uint8) (stop bool, jumpToNext bool, err error) { wg.Go(func() { - ctx, cancel := context.WithTimeout(spanCtx, requestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() start := time.Now() diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index 7cea0d44356..4123702263e 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -72,7 +72,7 @@ func TestSalud(t *testing.T) { mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -119,7 +119,7 @@ func TestSelfUnhealthyRadius(t *testing.T) { mockstorer.WithCapacityDoubling(0), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -157,7 +157,7 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) { mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) + service := salud.New(statusM, topM, reserve, log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -187,7 +187,7 @@ func TestSubToRadius(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) c, unsub := service.SubscribeNetworkStorageRadius() t.Cleanup(unsub) @@ -220,7 +220,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8, nil) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, stabilmock.NewSubscriber(true), "full", 0.8, 0.8) testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 09f8704bd03..57c4c41a950 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -27,10 +27,8 @@ import ( im "github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/metrics" "github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/waitnext" "github.com/ethersphere/bee/v2/pkg/topology/pslice" - "github.com/ethersphere/bee/v2/pkg/tracing" "github.com/ethersphere/bee/v2/pkg/util/ioutil" ma "github.com/multiformats/go-multiaddr" - "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" ) @@ -204,7 +202,6 @@ type Kad struct { bgBroadcastCancel context.CancelFunc reachability p2p.ReachabilityStatus detector *stabilization.Detector - tracer *tracing.Tracer } // New returns a new Kademlia. @@ -215,7 +212,6 @@ func New( p2pSvc p2p.Service, detector *stabilization.Detector, logger log.Logger, - tracer *tracing.Tracer, o Options, ) (*Kad, error) { var k *Kad @@ -257,7 +253,6 @@ func New( staticPeer: isStaticPeer(opt.StaticNodes), storageRadius: swarm.MaxPO, detector: detector, - tracer: tracer, } if k.opt.PruneFunc == nil { @@ -972,10 +967,6 @@ func (k *Kad) recalcDepth() { func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma []ma.Multiaddr) error { k.logger.Debug("attempting connect to peer", "peer_address", peer) - span, _, ctx := k.tracer.StartSpanFromContext(ctx, "kademlia-connect", k.logger) - span.SetAttributes(attribute.String("peer_address", peer.String())) - defer span.End() - ctx, cancel := context.WithTimeout(ctx, peerConnectionAttemptTimeout) defer cancel() diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index c6fe9d87daa..1da2353bac5 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -965,7 +965,7 @@ func TestClosestPeer(t *testing.T) { t.Fatal(err) } - kad, err := kademlia.New(base, ab, disc, p2pMock(t, ab, nil, nil, nil), detector, logger, nil, kademlia.Options{}) + kad, err := kademlia.New(base, ab, disc, p2pMock(t, ab, nil, nil, nil), detector, logger, kademlia.Options{}) if err != nil { t.Fatal(err) } @@ -2087,7 +2087,7 @@ func newTestKademliaWithAddrDiscovery( p2p = p2pMock(t, ab, signer, connCounter, failedConnCounter) // p2p mock logger = log.Noop // logger ) - kad, err := kademlia.New(base, ab, disc, p2p, detector, logger, nil, kadOpts) + kad, err := kademlia.New(base, ab, disc, p2p, detector, logger, kadOpts) if err != nil { t.Fatal(err) } diff --git a/pkg/tracing/baggage_test.go b/pkg/tracing/baggage_test.go deleted file mode 100644 index 640de831b72..00000000000 --- a/pkg/tracing/baggage_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2026 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package tracing_test - -import ( - "context" - "net/http" - "testing" - - "github.com/ethersphere/bee/v2/pkg/p2p" - "github.com/ethersphere/bee/v2/pkg/tracing" - "go.opentelemetry.io/otel/baggage" -) - -// baggageValue returns the value of the baggage member with the given key in -// ctx, or ("", false) when absent. -func baggageValue(ctx context.Context, key string) (string, bool) { - m := baggage.FromContext(ctx).Member(key) - if m.Key() == "" { - return "", false - } - return m.Value(), true -} - -func TestBaggageRoundTripP2P(t *testing.T) { - t.Parallel() - - tracer := newTracer(t) - - span, _, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation", nil) - defer span.End() - - ctx, err := tracing.WithBaggageMember(ctx, "batch_id", "deadbeef") - if err != nil { - t.Fatal(err) - } - - headers := make(p2p.Headers) - if err := tracer.AddContextHeader(ctx, headers); err != nil { - t.Fatal(err) - } - if headers[p2p.HeaderNameTracingBaggage] == nil { - t.Fatal("baggage header was not set") - } - - got, err := tracer.WithContextFromHeaders(context.Background(), headers) - if err != nil { - t.Fatal(err) - } - if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { - t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") - } -} - -func TestBaggageRoundTripHTTP(t *testing.T) { - t.Parallel() - - tracer := newTracer(t) - - span, _, ctx := tracer.StartSpanFromContext(context.Background(), "some-operation", nil) - defer span.End() - - ctx, err := tracing.WithBaggageMember(ctx, "batch_id", "deadbeef") - if err != nil { - t.Fatal(err) - } - - headers := make(http.Header) - if err := tracer.AddContextHTTPHeader(ctx, headers); err != nil { - t.Fatal(err) - } - - got, err := tracer.WithContextFromHTTPHeaders(context.Background(), headers) - if err != nil { - t.Fatal(err) - } - if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { - t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") - } -} - -// TestBaggageOnlyWithoutSpanContext verifies that baggage is applied to the -// returned context even when no span context is present in the p2p headers. -func TestBaggageOnlyWithoutSpanContext(t *testing.T) { - t.Parallel() - - tracer := newTracer(t) - - bag, err := baggage.Parse("batch_id=deadbeef") - if err != nil { - t.Fatal(err) - } - headers := p2p.Headers{ - p2p.HeaderNameTracingBaggage: []byte(bag.String()), - } - - got, err := tracer.WithContextFromHeaders(context.Background(), headers) - if err == nil { - t.Fatal("expected ErrContextNotFound when no span context is present") - } - if v, ok := baggageValue(got, "batch_id"); !ok || v != "deadbeef" { - t.Errorf("batch_id baggage = %q (present=%v), want %q", v, ok, "deadbeef") - } -} diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index ccf0f596a27..c1ef0fc614a 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -22,7 +22,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/p2p" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -69,12 +68,9 @@ const ( // always operate against a working trace.Tracer. var noopTracer = &Tracer{tracer: noop.NewTracerProvider().Tracer(instrumentationName)} -// httpPropagator carries trace context and baggage across HTTP via the W3C -// TraceContext (traceparent, tracestate) and Baggage (baggage) standard headers. -var httpPropagator propagation.TextMapPropagator = propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, -) +// httpPropagator carries trace context across HTTP via the W3C TraceContext +// standard headers (traceparent, tracestate). +var httpPropagator propagation.TextMapPropagator = propagation.TraceContext{} // Tracer wraps an OTel Tracer and provides p2p/HTTP carriers plus helpers // aligned with bee's tracing API. @@ -297,9 +293,6 @@ func (t *Tracer) AddContextHeader(ctx context.Context, headers p2p.Headers) erro } headers[p2p.HeaderNameTracingSpanContext] = encodeP2PSpanContext(sc) - if bag := baggage.FromContext(ctx); bag.Len() > 0 { - headers[p2p.HeaderNameTracingBaggage] = []byte(bag.String()) - } return nil } @@ -319,20 +312,9 @@ func (t *Tracer) FromHeaders(headers p2p.Headers) (trace.SpanContext, error) { return sc, nil } -// WithContextFromHeaders extracts a span context and any baggage from the p2p -// headers and returns a new context carrying them. Baggage is applied even when -// no span context is present. Safe to call on a nil receiver. +// WithContextFromHeaders extracts a span context from the p2p header and +// returns a new context carrying it. Safe to call on a nil receiver. func (t *Tracer) WithContextFromHeaders(ctx context.Context, headers p2p.Headers) (context.Context, error) { - // Baggage arrives from an arbitrary remote peer, so treat it as untrusted: - // undecodable payloads are ignored, and callers must only ever surface its - // values as span attributes. Never promote baggage to metric labels or - // unbounded log fields — arbitrary peers could blow up cardinality. - if v := headers[p2p.HeaderNameTracingBaggage]; v != nil { - if bag, err := baggage.Parse(string(v)); err == nil { - ctx = baggage.ContextWithBaggage(ctx, bag) - } - } - sc, err := t.FromHeaders(headers) if err != nil { return ctx, err @@ -363,15 +345,14 @@ func (t *Tracer) FromHTTPHeaders(headers http.Header) (trace.SpanContext, error) return sc, nil } -// WithContextFromHTTPHeaders extracts a span context and any baggage from HTTP -// headers and returns a new context carrying them. Baggage is applied even when -// no span context is present. Safe to call on a nil receiver. +// WithContextFromHTTPHeaders extracts a span context from HTTP headers and +// returns a new context carrying it. Safe to call on a nil receiver. func (t *Tracer) WithContextFromHTTPHeaders(ctx context.Context, headers http.Header) (context.Context, error) { - ctx = httpPropagator.Extract(ctx, propagation.HeaderCarrier(headers)) - if sc := trace.SpanContextFromContext(ctx); !sc.IsValid() { - return ctx, ErrContextNotFound + sc, err := t.FromHTTPHeaders(headers) + if err != nil { + return ctx, err } - return ctx, nil + return WithContext(ctx, sc), nil } // WithContext stores a span context in ctx using the standard OTel context @@ -386,23 +367,6 @@ func FromContext(ctx context.Context) trace.SpanContext { return trace.SpanContextFromContext(ctx) } -// WithBaggageMember returns a context carrying an additional baggage member -// (key=value) on top of any baggage already present. Baggage propagates across -// HTTP and p2p hops alongside the trace, letting a value such as a batch id -// follow a request end to end. The original context is returned unchanged if the -// key or value is not a valid baggage member. -func WithBaggageMember(ctx context.Context, key, value string) (context.Context, error) { - member, err := baggage.NewMember(key, value) - if err != nil { - return ctx, err - } - bag, err := baggage.FromContext(ctx).SetMember(member) - if err != nil { - return ctx, err - } - return baggage.ContextWithBaggage(ctx, bag), nil -} - // RecordError attaches an error event to the span, marks the span status as // Error, and records the supplied attributes alongside the error event. It is // the OTel equivalent of the OpenTracing ext.LogError pattern bee used previously.