Skip to content

instrumentation/aiokafka: add messaging.cluster.id to producer/consumer spans#4727

Draft
shashank-reddy-nr wants to merge 14 commits into
open-telemetry:mainfrom
shashank-reddy-nr:feature/kafka-cluster-id
Draft

instrumentation/aiokafka: add messaging.cluster.id to producer/consumer spans#4727
shashank-reddy-nr wants to merge 14 commits into
open-telemetry:mainfrom
shashank-reddy-nr:feature/kafka-cluster-id

Conversation

@shashank-reddy-nr

@shashank-reddy-nr shashank-reddy-nr commented Jun 22, 2026

Copy link
Copy Markdown

Description

Adds messaging.cluster.id attribute to producer (send) and consumer (getone/getmany) spans in the aiokafka instrumentation.

The attribute is populated from AIOKafkaClient.cluster.cluster_id, which aiokafka populates lazily after the first broker metadata response — no extra connections or background threads needed. For producers, a post-send retry is included to handle the case where the first send() itself triggers the metadata fetch.

Following the Java instrumentation's captureExperimentalSpanAttributes pattern, this attribute is opt-in and off by default because messaging.cluster.id is currently in the experimental semconv namespace. Users must explicitly enable it:

AIOKafkaInstrumentor().instrument(capture_experimental_span_attributes=True)

Type of change

  • New feature (non-breaking change which adds functionality)

How Has This Been Tested?

Unit tests added and updated in tests/test_utils.py:

  • test_cluster_id_attribute_set_on_send_span — verifies attribute is set when flag=True and cluster_id is available
  • test_cluster_id_attribute_absent_when_not_resolved — verifies no attribute when flag=True but metadata not yet received
  • test_cluster_id_attribute_absent_by_default — verifies no attribute emitted when flag=False (default), even when cluster_id is available

All 15 unit tests pass.

Does This PR Require a Core Repo Change?

  • No.

Checklist:

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

@linux-foundation-easycla

linux-foundation-easycla Bot commented Jun 22, 2026

Copy link
Copy Markdown

CLA Signed
The committers listed above are authorized under a signed CLA.

  • ✅ login: shashank-reddy-nr / name: Pulipelly Shashank Reddy (3760932)

When a service connects to multiple Kafka clusters, or when the same topic name
exists across environments, Kafka spans carry no information about which cluster
a message came from. messaging.cluster.id is defined in the OTel messaging
semantic conventions for Kafka but was not previously set by this instrumentation.

Adds messaging.cluster.id as a span attribute on producer and consumer spans for
Confluent.Kafka (≥2.0.0). A one-time background AdminClient.DescribeClusterAsync()
call is made per unique bootstrap-servers string; security config (SASL/SSL) is
forwarded so authenticated clusters are supported. The attribute is omitted on
older library versions or if resolution has not yet completed — no overhead on
the hot path.

Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com>
@shashank-reddy-nr shashank-reddy-nr force-pushed the feature/kafka-cluster-id branch from 937e539 to 3760932 Compare June 22, 2026 13:05
@shashank-reddy-nr shashank-reddy-nr marked this pull request as draft June 22, 2026 20:24
Cache values change from a plain string to (cluster_id, timestamp) tuples.
On read, isinstance(val, tuple) distinguishes a resolved entry from the ""
in-flight sentinel. When the TTL has elapsed the guard falls through to
re-fetch; the stale tuple stays in cache until the background thread stores
a fresh one or removes the key on failure.

Also fix an indentation bug in aiokafka _wrap_send where the return was
outside the span context manager.

Assisted-by: Claude Sonnet 4.6
kafka-python (utils.py):
  t.start() was not guarded by try/except. If the OS failed to create the
  thread, the "" sentinel would remain in _kafka_cluster_id_cache permanently,
  blocking all future fetches for that broker key. Fix: wrap t.start() in
  try/except; remove the sentinel on failure (matching the confluent-kafka
  pattern already in place).

confluent-kafka (utils.py):
  _fetch_cluster_id_background was called from producer/consumer constructor
  interceptors but not from _enrich_span (the hot path). Long-lived clients
  never triggered TTL re-fetches after the initial fetch expired.
  Fix: call _fetch_cluster_id_background in _enrich_span before _get_cluster_id.
  The function is a no-op when the cached value is still fresh (TTL not yet
  expired), and starts a background re-fetch only when the TTL has lapsed.

  To preserve auth credentials for SSL/SASL clusters during TTL re-fetches:
  _fetch_cluster_id_background now stores the base_config from the first call
  (constructor path, which has the full producer/consumer config) in a separate
  _kafka_cluster_id_config_cache dict keyed by bootstrap_servers. When called
  from _enrich_span without credentials, it looks up the stored config and
  passes it to the AdminClient, so re-fetches succeed on authenticated clusters.

Assisted-by: Claude Sonnet 4.6
…inClient

Replace the separate AdminClient connection used to retrieve the Kafka
cluster UUID with a direct list_topics() call on the existing
Producer/Consumer instance, matching the approach already taken by the
NR Python agent.

- Add _get_real_instance() helper to unwrap ProxiedProducer/Consumer
- _fetch_cluster_id_background() accepts optional `instance` arg; uses
  instance.list_topics() when provided, falls back to AdminClient
- _enrich_span() accepts and forwards the instance
- AutoInstrumentedProducer/Consumer pass instance=self at init
- ProxiedProducer/Consumer pass instance=producer/consumer at init
- wrap_produce/poll/consume pass instance=_get_real_instance(instance)

Assisted-by: Claude Sonnet 4.6
Move _kafka_cluster_id_config_cache.setdefault into the existing
_kafka_cluster_id_lock block to prevent a data race under free-threaded
Python (PEP 703).

Assisted-by: Claude Sonnet 4.6
…adata read

Remove the background-thread AIOKafkaAdminClient machinery (_fetch_cluster_id_background,
_extract_security_kwargs, _normalize_bootstrap_servers, _get_cluster_id, and the
shared cache/lock). Replace with _extract_cluster_id_from_client(), which reads
AIOKafkaClient.cluster.cluster_id directly — this value is populated by the existing
producer/consumer client after its first broker metadata response, so no extra
connection, thread, or asyncio.run() is needed.

For the send path the attribute is set once at span start (may be None on the very
first send) and again after await func() returns (guaranteed populated after the
broker responds). Consumer spans are created after func() returns so cluster_id is
always available at creation time.

Assisted-by: Claude Sonnet 4.6
@shashank-reddy-nr shashank-reddy-nr changed the title Feature/kafka cluster instrumentation/aiokafka: add messaging.cluster.id to producer/consumer spans Jul 4, 2026
…perimental_span_attributes flag

Add a `capture_experimental_span_attributes: bool = False` kwarg to
`AIOKafkaInstrumentor().instrument()`, mirroring the Java instrumentation's
`captureExperimentalSpanAttributes` pattern.

`messaging.cluster.id` is in the experimental semconv namespace so it is
opt-in and off by default. The flag is threaded through `_wrap_send`,
`_wrap_getone`, and `_wrap_getmany` in utils.py. When False (default) the
cluster ID extraction is skipped entirely; when True the existing
lazy-populate-plus-post-send-retry logic runs as before.

Update unit tests: default assertions now expect cluster_id=None; the two
integration-style tests are updated to pass flag=True; a new test
`test_cluster_id_attribute_absent_by_default` verifies the flag-off path.

Assisted-by: Claude Sonnet 4.6
- Break long _wrap_send/getone/getmany call arguments across lines
  to satisfy ruff's 79-character line limit
- Shorten two comment lines in _wrap_send that exceeded 79 chars
- Add .changelog/4727.added fragment for the messaging.cluster.id feature

Assisted-by: Claude Sonnet 4.6
Remove unused import and fix import sort order in aiokafka test_utils.py;
apply ruff-format to confluent-kafka and kafka-python utils/__init__.py

Assisted-by: Claude Sonnet 4.6
- Add `# pylint: disable=import-outside-toplevel` alongside existing
  `# noqa: PLC0415` to suppress C0415 for lazy AdminClient imports
  (import is intentionally deferred to avoid hard dependency at module load)
- Rename thread variable `t` -> `thread` to satisfy C0103 naming pattern

Assisted-by: Claude Sonnet 4.6
ruff-isort (I001) wraps long import lines to fit within the 79-char
line-length setting. Move the `# noqa: PLC0415` comment to the
opening `from ... import (` line so ruff honours the suppression
after wrapping.

Assisted-by: Claude Sonnet 4.6
Re-add `# pylint: disable=import-outside-toplevel` to the lazy
`from ... import (` lines in both kafka and confluent-kafka utils.
The comment was accidentally dropped in the previous ruff-I001 fix
that moved the `# noqa: PLC0415` to the opening line of the
multi-line import form.

Assisted-by: Claude Sonnet 4.6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant