Skip to content

fix(kafka): [Queue Instrumentation 36] Avoid dropping customer interceptor#5341

Open
adinauer wants to merge 3 commits intofeat/kafka-producer-wrapperfrom
fix/kafka-consumer-interceptor-reflection
Open

fix(kafka): [Queue Instrumentation 36] Avoid dropping customer interceptor#5341
adinauer wants to merge 3 commits intofeat/kafka-producer-wrapperfrom
fix/kafka-consumer-interceptor-reflection

Conversation

@adinauer
Copy link
Copy Markdown
Member

@adinauer adinauer commented Apr 28, 2026

PR Stack (Queue Instrumentation)


📜 Description

Avoid installing SentryKafkaRecordInterceptor when the bean post-processor cannot safely read the existing recordInterceptor field from AbstractKafkaListenerContainerFactory.

Before this change, reflection failures fell back to existing = null, and the post-processor still called setRecordInterceptor(sentryInterceptor). That could silently overwrite a customer-configured interceptor.

After this change:

  • getExistingInterceptor throws a dedicated exception when reflection fails
  • postProcessAfterInitialization logs an error and leaves the bean untouched
  • Existing customer interceptors are still chained as the delegate when reflection succeeds
  • Tests cover both the chaining path and the "do not modify the bean" invariant

💡 Motivation and Context

Addresses review finding R10-F003: reflection fallback silently drops the customer's existing RecordInterceptor.

Customers often use RecordInterceptor for DLQ routing, auditing, or other message handling behavior. If Sentry cannot safely discover the existing interceptor, it must disable consumer tracing for that factory instead of overwriting customer behavior.

💚 How did you test it?

  • Ran ./gradlew spotlessApply apiDump
  • Ran ./gradlew ':sentry-spring-jakarta:test' --tests "*SentryKafkaConsumerBeanPostProcessor*"
  • Added tests covering customer interceptor chaining and the invariant that the customer interceptor is never dropped

📝 Checklist

  • I added GH Issue ID & Linear ID
  • I added tests to verify the changes.
  • No new PII added or SDK only sends newly added PII if sendDefaultPII is enabled.
  • I updated the docs if needed.
  • I updated the wizard if needed.
  • Review from the native team if needed.
  • No breaking change or entry added to the changelog.
  • No breaking change for hybrid SDKs or communicated to hybrid SDKs.

🔮 Next steps

Continue addressing the remaining Queue Instrumentation review findings.

#skip-changelog

⚠️ Merge this PR using a merge commit (not squash). Only the collection branch is squash-merged into main.

adinauer and others added 3 commits April 27, 2026 15:15
Replace the concrete `implements Producer<K,V>` class with a
`Proxy.newProxyInstance`-based wrapper that intercepts only the two
`send()` overloads and forwards every other method reflectively to
the delegate.

The concrete class required explicitly delegating every method on the
`Producer` interface, coupling the wrapper to a specific Kafka version:
`clientInstanceId(Duration)` was added in Kafka 3.7, and the deprecated
`sendOffsetsToTransaction(Map, String)` was removed in Kafka 4.0. The
dynamic proxy has no such coupling — new or removed interface methods are
handled automatically, giving full compatibility across all Kafka
client versions.

Public API change: `SentryKafkaProducer` is now a utility class with
static `wrap()` overloads instead of constructors. Callers wrap a
producer with `SentryKafkaProducer.wrap(producer)`. The Spring BPP and
console sample are updated accordingly.

Co-Authored-By: Claude <noreply@anthropic.com>
When ProducerFactory.addPostProcessor() is a no-op (the interface
default), the Sentry post-processor is silently dropped and the
customer gets zero producer tracing with no signal.

Verify registration succeeded via getPostProcessors() after each
addPostProcessor() call, and log a WARNING naming the factory bean
and pointing toward SentryKafkaProducer.wrap() as the manual fallback.

Co-Authored-By: Claude <noreply@anthropic.com>
If reading recordInterceptor via reflection fails, leave the container\nfactory untouched instead of installing Sentry's interceptor with a\nnull delegate. This avoids silently dropping customer-configured\ninterceptors for DLQ routing, auditing, or other message handling\nconcerns.\n\nAdd tests that preserve customer interceptors both when chaining\nsucceeds and when reflection cannot safely determine the existing\ninterceptor.\n\nCo-Authored-By: Claude <noreply@anthropic.com>
This was referenced Apr 28, 2026
@sentry
Copy link
Copy Markdown

sentry Bot commented Apr 28, 2026

📲 Install Builds

Android

🔗 App Name App ID Version Configuration
SDK Size io.sentry.tests.size 8.37.1 (1) release

⚙️ sentry-android Build Distribution Settings

@adinauer adinauer marked this pull request as ready for review April 28, 2026 13:21
Comment on lines +49 to +53
"Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read "
+ "existing recordInterceptor via reflection. Refusing to install Sentry's "
+ "interceptor to avoid overwriting a customer-configured RecordInterceptor.",
e,
beanName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The log call in the catch block for InterceptorReadFailedException has its arguments in the wrong order, causing the exception's stack trace to be lost.
Severity: MEDIUM

Suggested Fix

Reorder the arguments in the .log() call to match the correct signature for logging an exception with a formatted message. The correct order should be log(SentryLevel.ERROR, e, "...message...", beanName).

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location:
sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java#L49-L53

Potential issue: The logger call in `SentryKafkaConsumerBeanPostProcessor` uses the
`log(level, message, args...)` signature but provides arguments in the order expected by
`log(level, throwable, message, args...)`. When a reflection error occurs while
inspecting a Kafka consumer factory, the `InterceptorReadFailedException` is passed as a
format argument instead of a throwable. This results in the exception's `toString()`
value being logged in the message, the `beanName` being ignored, and the full stack
trace being lost, which hinders debugging.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, should be .log(SentryLevel.ERROR, e, "Message", beanName)

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit c1ccbf7. Configure here.

+ "existing recordInterceptor via reflection. Refusing to install Sentry's "
+ "interceptor to avoid overwriting a customer-configured RecordInterceptor.",
e,
beanName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger arguments swapped: exception used as format arg

Medium Severity

The log call passes the exception e as the third argument and beanName as the fourth, matching the log(SentryLevel, String, Object...) overload. This causes the %s placeholder to be filled with e.toString() instead of the factory's beanName, and the exception stack trace is never logged. The intended overload is log(SentryLevel, Throwable, String, Object...), which requires the throwable to come before the message string: .log(SentryLevel.ERROR, e, "...factory '%s'...", beanName).

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c1ccbf7. Configure here.


val installed = field.get(factory)
val effective =
if (installed is SentryKafkaRecordInterceptor<*, *>) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the test succeed in any case, whether or not the interceptor was installed. The intention of the test, however, is to verify that the interceptor is not installed, correct?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants