From 4b7334a35e55a3b8fafaf8e85f8ee8fe9130af27 Mon Sep 17 00:00:00 2001 From: Nicolas Sterchele Date: Tue, 24 Mar 2026 10:28:42 +0100 Subject: [PATCH 1/2] feat(sdk): add service.instance.id to default resource The Python SDK did not auto-generate service.instance.id, unlike the Java SDK and the stable semantic convention recommendation. Add it to _DEFAULT_RESOURCE so every process gets a unique instance identity at startup without any user configuration. --- .../src/opentelemetry/sdk/resources/__init__.py | 2 ++ opentelemetry-sdk/tests/resources/test_resources.py | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index a04d27e9ab1..addb321be79 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -72,6 +72,7 @@ from types import ModuleType from typing import List, Optional, Set, cast from urllib import parse +from uuid import uuid4 from opentelemetry.attributes import BoundedAttributes from opentelemetry.sdk.environment_variables import ( @@ -323,6 +324,7 @@ def to_json(self, indent: Optional[int] = 4) -> str: TELEMETRY_SDK_LANGUAGE: "python", TELEMETRY_SDK_NAME: "opentelemetry", TELEMETRY_SDK_VERSION: _OPENTELEMETRY_SDK_VERSION, + SERVICE_INSTANCE_ID: str(uuid4()), } ) diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index c083eff1460..ea1d02afab8 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -45,6 +45,7 @@ PROCESS_RUNTIME_DESCRIPTION, PROCESS_RUNTIME_NAME, PROCESS_RUNTIME_VERSION, + SERVICE_INSTANCE_ID, SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, @@ -88,6 +89,9 @@ def test_create(self): TELEMETRY_SDK_LANGUAGE: "python", TELEMETRY_SDK_VERSION: _OPENTELEMETRY_SDK_VERSION, SERVICE_NAME: "unknown_service", + SERVICE_INSTANCE_ID: _DEFAULT_RESOURCE.attributes[ + SERVICE_INSTANCE_ID + ], } resource = Resource.create(attributes) @@ -211,6 +215,9 @@ def test_immutability(self): TELEMETRY_SDK_LANGUAGE: "python", TELEMETRY_SDK_VERSION: _OPENTELEMETRY_SDK_VERSION, SERVICE_NAME: "unknown_service", + SERVICE_INSTANCE_ID: _DEFAULT_RESOURCE.attributes[ + SERVICE_INSTANCE_ID + ], } attributes_copy = attributes.copy() From 3256bf01d45d886df4cba6cc117b14664046bc0b Mon Sep 17 00:00:00 2001 From: Nicolas Sterchele Date: Tue, 24 Mar 2026 10:28:42 +0100 Subject: [PATCH 2/2] fix(sdk): regenerate service.instance.id post-fork in MeterProvider and TracerProvider When a prefork server (e.g. gunicorn) forks workers, all workers inherit the same Resource from the master process, including the same service.instance.id. Register an os.register_at_fork(after_in_child=...) hook on both MeterProvider and TracerProvider that replaces service.instance.id with a fresh UUID in each forked worker, ensuring distinct resource identities without any user configuration. Resource.merge() preserves all other resource attributes. WeakMethod is used for the hook reference, consistent with the existing pattern in PeriodicExportingMetricReader and BatchSpanProcessor. Fixes: https://github.com/open-telemetry/opentelemetry-python/issues/4390 Related: https://github.com/open-telemetry/opentelemetry-python/issues/3885 --- CHANGELOG.md | 4 + .../sdk/metrics/_internal/__init__.py | 24 ++++ .../src/opentelemetry/sdk/trace/__init__.py | 23 ++++ .../tests/metrics/test_meter_provider_fork.py | 125 ++++++++++++++++++ .../tests/trace/test_tracer_provider_fork.py | 113 ++++++++++++++++ 5 files changed, 289 insertions(+) create mode 100644 opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py create mode 100644 opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 540f7b9d347..8ccd4055801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: Add `service.instance.id` to default resource so every process gets a unique instance identity at startup + ([#5000](https://github.com/open-telemetry/opentelemetry-python/pull/5000)) +- `opentelemetry-sdk`: Regenerate `service.instance.id` post-fork in `MeterProvider` and `TracerProvider` to ensure distinct resource identities across prefork workers + ([#5000](https://github.com/open-telemetry/opentelemetry-python/pull/5000)) - `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema ([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898)) - Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index a2adaa36a98..cbdde4e2491 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import weakref from atexit import register, unregister from logging import getLogger @@ -19,6 +20,7 @@ from threading import Lock from time import time_ns from typing import Optional, Sequence +from uuid import uuid4 # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics @@ -456,6 +458,12 @@ def __init__( self._shutdown_once = Once() self._shutdown = False + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + for metric_reader in self._sdk_config.metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: @@ -471,6 +479,22 @@ def __init__( self._measurement_consumer.collect ) + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._sdk_config.resource = self._sdk_config.resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def force_flush(self, timeout_millis: float = 10_000) -> bool: deadline_ns = time_ns() + timeout_millis * 10**6 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index e0b639d81cf..d8d919126ea 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -42,6 +42,7 @@ Type, Union, ) +from uuid import uuid4 from warnings import filterwarnings from typing_extensions import deprecated @@ -1366,6 +1367,28 @@ def __init__( _tracer_configurator or _default_tracer_configurator ) + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda + ) + + def _at_fork_reinit(self) -> None: + """Update the resource with a new unique service.instance.id after a fork. + + When gunicorn (or any other prefork server) forks workers, all workers + inherit the same Resource, including the same service.instance.id. This + causes metric collisions in backends like Datadog where multiple workers + exporting with the same resource identity result in last-write-wins + instead of correct aggregation. + + This hook runs post-fork in each worker and replaces service.instance.id + with a fresh UUID, ensuring each worker is a distinct instance. + """ + self._resource = self._resource.merge( + Resource({"service.instance.id": str(uuid4())}) + ) + def _set_tracer_configurator( self, *, tracer_configurator: _TracerConfiguratorT ): diff --git a/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py new file mode 100644 index 00000000000..942182cc5af --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_meter_provider_fork.py @@ -0,0 +1,125 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.resources import Resource + +_fork_ctx = ( + multiprocessing.get_context("fork") if system() != "Windows" else None +) + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestMeterProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = MeterProvider(resource=resource) + + original_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = MeterProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._sdk_config.resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual(attrs.get("deployment.environment"), "production") + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = MeterProvider() + + parent_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + self.assertIsNotNone(parent_id) + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + # Child should have a different service.instance.id than parent + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = MeterProvider() + + def child(conn): + child_id = provider._sdk_config.resource.attributes.get( + "service.instance.id" + ) + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + # All 4 workers should have distinct IDs + self.assertEqual(len(ids), 4) diff --git a/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py new file mode 100644 index 00000000000..748ca10c8ff --- /dev/null +++ b/opentelemetry-sdk/tests/trace/test_tracer_provider_fork.py @@ -0,0 +1,113 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import multiprocessing +import os +import unittest +from platform import system + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider + +_fork_ctx = ( + multiprocessing.get_context("fork") if system() != "Windows" else None +) + + +@unittest.skipUnless( + hasattr(os, "fork"), + "needs *nix", +) +class TestTracerProviderFork(unittest.TestCase): + def test_at_fork_reinit_changes_service_instance_id(self): + """_at_fork_reinit should assign a new service.instance.id.""" + resource = Resource({"service.instance.id": "original-id"}) + provider = TracerProvider(resource=resource) + + original_id = provider._resource.attributes.get("service.instance.id") + self.assertEqual(original_id, "original-id") + + provider._at_fork_reinit() + + new_id = provider._resource.attributes.get("service.instance.id") + self.assertNotEqual(new_id, "original-id") + self.assertIsNotNone(new_id) + + def test_at_fork_reinit_preserves_other_resource_attributes(self): + """_at_fork_reinit should not affect other resource attributes.""" + resource = Resource( + { + "service.name": "my-service", + "service.instance.id": "original-id", + "deployment.environment": "production", + } + ) + provider = TracerProvider(resource=resource) + + provider._at_fork_reinit() + + attrs = provider._resource.attributes + self.assertEqual(attrs.get("service.name"), "my-service") + self.assertEqual(attrs.get("deployment.environment"), "production") + + def test_fork_produces_unique_service_instance_ids(self): + """Each forked worker should get a distinct service.instance.id.""" + provider = TracerProvider() + + parent_id = provider._resource.attributes.get("service.instance.id") + self.assertIsNotNone(parent_id) + + def child(conn): + child_id = provider._resource.attributes.get("service.instance.id") + conn.send(child_id) + conn.close() + + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + process.start() + child_id = parent_conn.recv() + process.join() + + self.assertNotEqual(parent_id, child_id) + self.assertIsNotNone(child_id) + + def test_multiple_forks_produce_unique_service_instance_ids(self): + """Each of N forked workers should have a distinct service.instance.id.""" + provider = TracerProvider() + + def child(conn): + child_id = provider._resource.attributes.get("service.instance.id") + conn.send(child_id) + conn.close() + + ids = set() + processes = [] + conns = [] + + for _ in range(4): + parent_conn, child_conn = _fork_ctx.Pipe() + process = _fork_ctx.Process(target=child, args=(child_conn,)) + processes.append(process) + conns.append(parent_conn) + process.start() + + for conn in conns: + ids.add(conn.recv()) + + for process in processes: + process.join() + + self.assertEqual(len(ids), 4)