feat(dlq): add dlq support (no-op)#277
Conversation
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
🤖 This preview updates automatically when you update the PR. |
|
|
||
| enabled: bool | ||
| topic: str | ||
| producer_config: "KafkaProducerConfig" | ||
|
|
There was a problem hiding this comment.
Bug: The Python adapter in rust_arroyo.py doesn't read the dlq configuration or pass it to the Rust ArroyoConsumer, rendering the DLQ feature non-functional.
Severity: CRITICAL
Suggested Fix
Update rust_arroyo.py to read the dlq configuration from the source config. If present, construct the corresponding Rust DlqConfig object and pass it as the dlq_config argument when initializing the ArroyoConsumer. Add integration tests to verify the DLQ functionality.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/config_types.py#L17-L21
Potential issue: The pull request adds Dead Letter Queue (DLQ) support, with a
`DlqConfig` defined in Python and the Rust consumer expecting a `dlq_config` parameter.
However, the Python adapter in `rust_arroyo.py` that instantiates the `ArroyoConsumer`
never reads the `dlq` key from the configuration dictionary and does not pass it during
initialization. As a result, any user-provided DLQ configuration will be silently
ignored, and the DLQ functionality will not work. Invalid messages will be dropped
instead of being routed to the configured dead-letter topic.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Python type stub missing new DLQ class and parameter
- Updated
rust_streams.pyito addPyDlqConfigand the optionaldlq_configargument onArroyoConsumer.__init__to match the Rust-exposed interface.
- Updated
Or push these changes by commenting:
@cursor push cb8d770063
Preview (cb8d770063)
diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi
--- a/sentry_streams/sentry_streams/rust_streams.pyi
+++ b/sentry_streams/sentry_streams/rust_streams.pyi
@@ -41,6 +41,12 @@
override_params: Mapping[str, str],
) -> None: ...
+class PyDlqConfig:
+ topic: str
+ producer_config: PyKafkaProducerConfig
+
+ def __init__(self, topic: str, producer_config: PyKafkaProducerConfig) -> None: ...
+
class PyMetricConfig:
def __init__(
self,
@@ -105,6 +111,7 @@
schema: str | None,
metric_config: PyMetricConfig | None = None,
write_healthcheck: bool = False,
+ dlq_config: PyDlqConfig | None = None,
) -> None: ...
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
Adds DLQ configuration support throughout the streaming platform: - Add PyDlqConfig type stub and build_dlq_config() helper - Wire DLQ config from StreamSource to Rust ArroyoConsumer - Support DLQ override from YAML deployment configuration - Add comprehensive test coverage for DLQ functionality Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
fpacifici
left a comment
There was a problem hiding this comment.
Please see my comments on the unit tests and on the defaults.
Also please adjust the example pipelines to set up the DLQ. Since this cannot be tested with unit test consider adapting https://github.com/getsentry/streams/blob/main/sentry_streams/integration_tests/test_example_pipelines.py
sentry_streams/src/consumer.rs
Outdated
| /// Builds the DLQ policy if dlq_config is provided. | ||
| /// Returns None if DLQ is not configured. | ||
| fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> { | ||
| match &self.dlq_config { |
There was a problem hiding this comment.
Making this a standalone funciton also allows you not to have this case for when the config is not provided.
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { |
There was a problem hiding this comment.
Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?
There was a problem hiding this comment.
I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"
| topic=dlq_data["topic"], | ||
| bootstrap_servers=dlq_data["bootstrap_servers"], |
There was a problem hiding this comment.
What if the user wants to override only the topic name and leave the bootstrap_servers as they are?
Also should we make the bootstrap_servers default to the same one we use for the StreamingSource ?
There was a problem hiding this comment.
added individual field overriding ability
There was a problem hiding this comment.
default value i was planning on handling in the next PR, see my PR description
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.


ticket
PR1 (this): add arroyo dlq support that can reach rust-arroyo - noop
PR2: polish deployment config yaml (json validation), default config - noop
PR3: wire everything together, end to end testing
PR4: enable by default, deploy some default topics