From f8418961b5c3e5c3f3cda91e16002d516227c09c Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 20 Mar 2026 15:48:58 -0400 Subject: [PATCH 01/17] add --- sentry_streams/sentry_streams/config_types.py | 12 +++ sentry_streams/src/consumer.rs | 88 ++++++++++++++++++- sentry_streams/src/lib.rs | 1 + 3 files changed, 98 insertions(+), 3 deletions(-) diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 110804c9..353e0160 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,11 +9,23 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + enabled: bool + topic: str + producer_config: "KafkaProducerConfig" + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str consumer_group: NotRequired[str] additional_settings: Mapping[str, Any] + dlq: NotRequired[DlqConfig] class KafkaProducerConfig(TypedDict, StepConfig): diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..913eaa6c 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct DlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl DlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + DlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,17 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +114,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +126,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +154,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = self.build_dlq_policy(); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +182,46 @@ impl ArroyoConsumer { } } +// Internal implementation methods (not exposed to Python) +impl ArroyoConsumer { + /// Builds the DLQ policy if dlq_config is provided. + /// Returns None if DLQ is not configured. + fn build_dlq_policy(&self) -> Option> { + match &self.dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = + KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Get tokio runtime handle for async DLQ operations + let handle = self.concurrency_config.handle(); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!( + "DLQ not configured, invalid messages will cause processing to stop" + ); + None + } + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +508,7 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + // Note: DLQ functionality is primarily tested through Python integration tests + // in tests/test_dlq.py, as Rust unit tests require the full Python module to be built. } diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..d0171ab6 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From 72716350adf04e8cf1d600d79ded4b67ecbd2031 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 20 Mar 2026 16:17:16 -0400 Subject: [PATCH 02/17] make it part of StreamSource --- sentry_streams/sentry_streams/pipeline/pipeline.py | 14 ++++++++++++++ sentry_streams/src/consumer.rs | 10 +++++----- sentry_streams/src/lib.rs | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index a52994f6..edd4a4bc 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]): """ +@dataclass +class DlqConfig: + """ + Configuration for Dead Letter Queue (DLQ). + + When provided, invalid messages will be sent to the DLQ topic + instead of causing the consumer to stop processing. + """ + + topic: str + bootstrap_servers: Sequence[str] + + @dataclass class StreamSource(Source[bytes]): """ @@ -233,6 +246,7 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 913eaa6c..26667351 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -40,7 +40,7 @@ const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; /// When provided, invalid messages will be sent to the DLQ topic. #[pyclass] #[derive(Debug, Clone)] -pub struct DlqConfig { +pub struct PyDlqConfig { /// The Kafka topic name to send invalid messages to #[pyo3(get)] pub topic: String, @@ -51,10 +51,10 @@ pub struct DlqConfig { } #[pymethods] -impl DlqConfig { +impl PyDlqConfig { #[new] fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { - DlqConfig { + PyDlqConfig { topic, producer_config, } @@ -100,7 +100,7 @@ pub struct ArroyoConsumer { /// DLQ (Dead Letter Queue) configuration. /// If provided, invalid messages will be sent to the DLQ topic. /// Otherwise, invalid messages will cause the consumer to stop processing. - dlq_config: Option, + dlq_config: Option, } #[pymethods] @@ -114,7 +114,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, - dlq_config: Option, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index d0171ab6..d5131492 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,7 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From 2fe9bd89d7f07dc99be08b2547d8c97b3e81637b Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 23 Mar 2026 17:27:26 -0400 Subject: [PATCH 03/17] feat(streams): Add Dead Letter Queue (DLQ) support to StreamSource Adds DLQ configuration support throughout the streaming platform: - Add PyDlqConfig type stub and build_dlq_config() helper - Wire DLQ config from StreamSource to Rust ArroyoConsumer - Support DLQ override from YAML deployment configuration - Add comprehensive test coverage for DLQ functionality Co-Authored-By: Claude Sonnet 4.5 --- .../adapters/arroyo/rust_arroyo.py | 21 ++ .../sentry_streams/pipeline/pipeline.py | 8 +- .../sentry_streams/rust_streams.pyi | 15 +- sentry_streams/tests/test_dlq.py | 182 ++++++++++++++++++ 4 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 sentry_streams/tests/test_dlq.py diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 8fc4a2d9..bd1841c6 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -60,6 +60,7 @@ from sentry_streams.rust_streams import ( ArroyoConsumer, InitialOffset, + PyDlqConfig, PyKafkaConsumerConfig, PyKafkaProducerConfig, PyMetricConfig, @@ -171,6 +172,25 @@ def build_kafka_producer_config( ) +def build_dlq_config( + step: StreamSource, +) -> PyDlqConfig | None: + """ + Build the DLQ configuration for the source. + Returns PyDlqConfig if step.dlq_config is set, otherwise None. + """ + if step.dlq_config is None: + return None + + return PyDlqConfig( + topic=step.dlq_config.topic, + producer_config=PyKafkaProducerConfig( + bootstrap_servers=step.dlq_config.bootstrap_servers, + override_params=None, + ), + ) + + def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) @@ -270,6 +290,7 @@ def source(self, step: Source[Any]) -> Route: schema=schema_name, metric_config=self.__metric_config, write_healthcheck=self.__write_healthcheck, + dlq_config=build_dlq_config(step), ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index edd4a4bc..4e0aaf65 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -253,11 +253,17 @@ def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic and consumer_group from deployment configuration.""" + """Override topic, consumer_group, and dlq_config from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) + if loaded_config.get("dlq"): + dlq_data = loaded_config["dlq"] + self.dlq_config = DlqConfig( + topic=dlq_data["topic"], + bootstrap_servers=dlq_data["bootstrap_servers"], + ) @dataclass diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 6f58ffbb..2904135c 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -31,14 +31,24 @@ class PyKafkaConsumerConfig: auto_offset_reset: InitialOffset, strict_offset_reset: bool, max_poll_interval_ms: int, - override_params: Mapping[str, str], + override_params: Mapping[str, str] | None = None, ) -> None: ... class PyKafkaProducerConfig: def __init__( self, bootstrap_servers: Sequence[str], - override_params: Mapping[str, str], + override_params: Mapping[str, str] | None = None, + ) -> None: ... + +class PyDlqConfig: + topic: str + producer_config: PyKafkaProducerConfig + + def __init__( + self, + topic: str, + producer_config: PyKafkaProducerConfig, ) -> None: ... class PyMetricConfig: @@ -105,6 +115,7 @@ class ArroyoConsumer: schema: str | None, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, + dlq_config: PyDlqConfig | None = None, ) -> None: ... def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py new file mode 100644 index 00000000..3517f897 --- /dev/null +++ b/sentry_streams/tests/test_dlq.py @@ -0,0 +1,182 @@ +""" +Tests for Dead Letter Queue (DLQ) functionality. + +These tests verify: +1. Python StreamSource can be configured with DLQ settings +2. Rust ArroyoConsumer correctly accepts DLQ configuration parameters +3. Backward compatibility is maintained +4. build_dlq_config() correctly converts Python DlqConfig to PyDlqConfig +5. StreamSource.override_config() correctly overrides DLQ from YAML +""" + +from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config +from sentry_streams.pipeline.pipeline import DlqConfig, StreamSource +from sentry_streams.rust_streams import ( + ArroyoConsumer, + InitialOffset, + PyDlqConfig, + PyKafkaConsumerConfig, + PyKafkaProducerConfig, +) + + +def test_consumer_creation_without_dlq_params() -> None: + """Test backward compatibility: consumer works without DLQ params.""" + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + ) + + assert consumer is not None + + +def test_consumer_creation_with_explicit_none_dlq_config() -> None: + """Test that consumer can be created with explicit None for DLQ config.""" + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=None, + ) + + assert consumer is not None + + +def test_consumer_creation_with_dlq_config() -> None: + """Test that consumer can be created with Rust DLQ config.""" + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + kafka_producer_config = PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ) + + dlq_config = PyDlqConfig( + topic="test-dlq", + producer_config=kafka_producer_config, + ) + + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=dlq_config, + ) + + assert consumer is not None + + +def test_stream_source_creation_without_dlq() -> None: + """Test that StreamSource can be created without DLQ config.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + ) + + assert source.dlq_config is None + + +def test_stream_source_creation_with_dlq_config() -> None: + """Test that StreamSource can be created with DLQ config.""" + dlq_config = DlqConfig( + topic="my-source-dlq", + bootstrap_servers=["localhost:9092"], + ) + + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=dlq_config, + ) + + assert source.dlq_config is not None + assert source.dlq_config.topic == "my-source-dlq" + assert source.dlq_config.bootstrap_servers == ["localhost:9092"] + + +def test_stream_source_with_all_optional_params_including_dlq() -> None: + """Test StreamSource with all optional parameters including DLQ.""" + dlq_config = DlqConfig( + topic="my-dlq", + bootstrap_servers=["localhost:9092"], + ) + + source = StreamSource( + name="my_source", + stream_name="my-topic", + header_filter=("my-header", b"my-value"), + consumer_group="my-group", + dlq_config=dlq_config, + ) + + assert source.name == "my_source" + assert source.stream_name == "my-topic" + assert source.consumer_group == "my-group" + assert source.dlq_config is not None + assert source.dlq_config.topic == "my-dlq" + + +def test_dlq_config_with_multiple_bootstrap_servers() -> None: + """Test that DlqConfig can have multiple bootstrap servers.""" + dlq_config = DlqConfig( + topic="my-dlq", + bootstrap_servers=["broker1:9092", "broker2:9092"], + ) + + assert len(dlq_config.bootstrap_servers) == 2 + assert dlq_config.bootstrap_servers[0] == "broker1:9092" + assert dlq_config.bootstrap_servers[1] == "broker2:9092" + + +def test_build_dlq_config_with_dlq_configured() -> None: + """Test build_dlq_config returns PyDlqConfig when step has dlq_config.""" + source = StreamSource( + name="test_source", + stream_name="test-topic", + dlq_config=DlqConfig( + topic="test-dlq", + bootstrap_servers=["localhost:9092", "localhost:9093"], + ), + ) + + result = build_dlq_config(source) + + assert result is not None + assert isinstance(result, PyDlqConfig) + assert result.topic == "test-dlq" + assert result.producer_config is not None From 592958da59408d34c25ba2378c49d5a69ccb5d7c Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 24 Mar 2026 11:29:21 -0400 Subject: [PATCH 04/17] remove one redundant test --- sentry_streams/tests/test_dlq.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 3517f897..4f6dfc7f 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -111,24 +111,6 @@ def test_stream_source_creation_without_dlq() -> None: assert source.dlq_config is None -def test_stream_source_creation_with_dlq_config() -> None: - """Test that StreamSource can be created with DLQ config.""" - dlq_config = DlqConfig( - topic="my-source-dlq", - bootstrap_servers=["localhost:9092"], - ) - - source = StreamSource( - name="my_source", - stream_name="my-topic", - dlq_config=dlq_config, - ) - - assert source.dlq_config is not None - assert source.dlq_config.topic == "my-source-dlq" - assert source.dlq_config.bootstrap_servers == ["localhost:9092"] - - def test_stream_source_with_all_optional_params_including_dlq() -> None: """Test StreamSource with all optional parameters including DLQ.""" dlq_config = DlqConfig( From 6e9693765ffe0a5c197b960ab5d16c0935e1be47 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 14:10:09 -0400 Subject: [PATCH 05/17] parameterized tests for creating consumer --- sentry_streams/tests/test_dlq.py | 129 +++++++++++++++---------------- 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 4f6dfc7f..12c40656 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -9,6 +9,8 @@ 5. StreamSource.override_config() correctly overrides DLQ from YAML """ +import pytest + from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config from sentry_streams.pipeline.pipeline import DlqConfig, StreamSource from sentry_streams.rust_streams import ( @@ -20,55 +22,22 @@ ) -def test_consumer_creation_without_dlq_params() -> None: - """Test backward compatibility: consumer works without DLQ params.""" - kafka_consumer_config = PyKafkaConsumerConfig( - bootstrap_servers=["localhost:9092"], - group_id="test-group", - auto_offset_reset=InitialOffset.latest, - strict_offset_reset=False, - max_poll_interval_ms=60000, - override_params=None, - ) - - consumer = ArroyoConsumer( - source="test_source", - kafka_config=kafka_consumer_config, - topic="test-topic", - schema=None, - metric_config=None, - write_healthcheck=False, - ) - - assert consumer is not None - - -def test_consumer_creation_with_explicit_none_dlq_config() -> None: - """Test that consumer can be created with explicit None for DLQ config.""" - kafka_consumer_config = PyKafkaConsumerConfig( - bootstrap_servers=["localhost:9092"], - group_id="test-group", - auto_offset_reset=InitialOffset.latest, - strict_offset_reset=False, - max_poll_interval_ms=60000, - override_params=None, - ) - - consumer = ArroyoConsumer( - source="test_source", - kafka_config=kafka_consumer_config, - topic="test-topic", - schema=None, - metric_config=None, - write_healthcheck=False, - dlq_config=None, - ) - - assert consumer is not None - - -def test_consumer_creation_with_dlq_config() -> None: - """Test that consumer can be created with Rust DLQ config.""" +@pytest.mark.parametrize( + "dlq_config_type", + [ + "without_dlq", # Backward compatibility: no DLQ param + "with_none", # Explicit None for DLQ config + "with_dlq_config", # Full DLQ configuration + ], +) +def test_consumer_creation_with_various_dlq_configs(dlq_config_type: str) -> None: + """Test that consumer can be created with different DLQ configurations. + + This parameterized test verifies: + 1. Backward compatibility when DLQ param is omitted + 2. Explicit None for DLQ config works + 3. Full DLQ configuration is accepted + """ kafka_consumer_config = PyKafkaConsumerConfig( bootstrap_servers=["localhost:9092"], group_id="test-group", @@ -78,25 +47,49 @@ def test_consumer_creation_with_dlq_config() -> None: override_params=None, ) - kafka_producer_config = PyKafkaProducerConfig( - bootstrap_servers=["localhost:9092"], - override_params=None, - ) - - dlq_config = PyDlqConfig( - topic="test-dlq", - producer_config=kafka_producer_config, - ) - - consumer = ArroyoConsumer( - source="test_source", - kafka_config=kafka_consumer_config, - topic="test-topic", - schema=None, - metric_config=None, - write_healthcheck=False, - dlq_config=dlq_config, - ) + consumer: ArroyoConsumer + if dlq_config_type == "without_dlq": + # Test backward compatibility: don't pass dlq_config at all + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + ) + elif dlq_config_type == "with_none": + # Test explicit None + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=None, + ) + elif dlq_config_type == "with_dlq_config": + # Test with full DLQ configuration + kafka_producer_config = PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ) + dlq_config = PyDlqConfig( + topic="test-dlq", + producer_config=kafka_producer_config, + ) + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=dlq_config, + ) + else: + raise ValueError(f"Unknown dlq_config_type: {dlq_config_type}") assert consumer is not None From 5e8c0456564481b2829b43b3909679b31304f186 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 14:56:05 -0400 Subject: [PATCH 06/17] add unit test for consumer.build_dlq_policy --- sentry_streams/src/consumer.rs | 71 +++++++++++++++++++++++++++++- sentry_streams/src/kafka_config.rs | 16 +++---- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 26667351..79ba1a50 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -509,6 +509,73 @@ mod tests { }) } - // Note: DLQ functionality is primarily tested through Python integration tests - // in tests/test_dlq.py, as Rust unit tests require the full Python module to be built. + #[test] + fn test_build_dlq_policy_with_various_configs() { + crate::testutils::initialize_python(); + traced_with_gil!(|_py| { + // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) + let test_cases = vec![ + ("without_dlq_config", None, false), + ( + "with_dlq_config_single_broker", + Some(vec!["localhost:9092".to_string()]), + true, + ), + ( + "with_dlq_config_multiple_brokers", + Some(vec![ + "broker1:9092".to_string(), + "broker2:9092".to_string(), + "broker3:9092".to_string(), + ]), + true, + ), + ]; + + for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { + // Create consumer config + let consumer_config = PyKafkaConsumerConfig { + bootstrap_servers: vec!["localhost:9092".to_string()], + group_id: "test-group".to_string(), + auto_offset_reset: crate::kafka_config::InitialOffset::Latest, + strict_offset_reset: false, + max_poll_interval_ms: 60000, + override_params: None, + }; + + // Create DLQ config if bootstrap servers are provided + let dlq_config = dlq_bootstrap_servers.map(|servers| { + let producer_config = PyKafkaProducerConfig { + bootstrap_servers: servers, + override_params: None, + }; + PyDlqConfig::new("test-dlq".to_string(), producer_config) + }); + + // Create consumer + let consumer = ArroyoConsumer::new( + "test_source".to_string(), + consumer_config, + "test-topic".to_string(), + None, + None, + false, + dlq_config, + ); + + // Build DLQ policy and assert + let dlq_policy = consumer.build_dlq_policy(); + assert_eq!( + dlq_policy.is_some(), + expected_some, + "Test case '{}' failed: expected is_some() to be {}", + test_name, + expected_some + ); + } + }); + } + + // Note: Additional DLQ functionality is tested through Python integration tests + // in tests/test_dlq.py, as full end-to-end testing requires the Python module. } diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index d9bf9559..f7724401 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -58,12 +58,12 @@ impl OffsetResetConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaConsumerConfig { - bootstrap_servers: Vec, - group_id: String, - auto_offset_reset: InitialOffset, - strict_offset_reset: bool, - max_poll_interval_ms: usize, - override_params: Option>, + pub bootstrap_servers: Vec, + pub group_id: String, + pub auto_offset_reset: InitialOffset, + pub strict_offset_reset: bool, + pub max_poll_interval_ms: usize, + pub override_params: Option>, } #[pymethods] @@ -104,8 +104,8 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { - bootstrap_servers: Vec, - override_params: Option>, + pub bootstrap_servers: Vec, + pub override_params: Option>, } #[pymethods] From 0023e3007141d4c9dae9aabea0490e9023701ff2 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 15:07:54 -0400 Subject: [PATCH 07/17] make new function pub --- sentry_streams/src/consumer.rs | 23 ++++++++++------------- sentry_streams/src/kafka_config.rs | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 79ba1a50..0d1b63a3 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -533,22 +533,19 @@ mod tests { ]; for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { - // Create consumer config - let consumer_config = PyKafkaConsumerConfig { - bootstrap_servers: vec!["localhost:9092".to_string()], - group_id: "test-group".to_string(), - auto_offset_reset: crate::kafka_config::InitialOffset::Latest, - strict_offset_reset: false, - max_poll_interval_ms: 60000, - override_params: None, - }; + // Create consumer config using the public new constructor + let consumer_config = PyKafkaConsumerConfig::new( + vec!["localhost:9092".to_string()], + "test-group".to_string(), + crate::kafka_config::InitialOffset::Latest, + false, + 60000, + None, + ); // Create DLQ config if bootstrap servers are provided let dlq_config = dlq_bootstrap_servers.map(|servers| { - let producer_config = PyKafkaProducerConfig { - bootstrap_servers: servers, - override_params: None, - }; + let producer_config = PyKafkaProducerConfig::new(servers, None); PyDlqConfig::new("test-dlq".to_string(), producer_config) }); diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index f7724401..09757e7b 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -58,18 +58,18 @@ impl OffsetResetConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaConsumerConfig { - pub bootstrap_servers: Vec, - pub group_id: String, - pub auto_offset_reset: InitialOffset, - pub strict_offset_reset: bool, - pub max_poll_interval_ms: usize, - pub override_params: Option>, + bootstrap_servers: Vec, + group_id: String, + auto_offset_reset: InitialOffset, + strict_offset_reset: bool, + max_poll_interval_ms: usize, + override_params: Option>, } #[pymethods] impl PyKafkaConsumerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, group_id: String, auto_offset_reset: InitialOffset, @@ -104,14 +104,14 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { - pub bootstrap_servers: Vec, - pub override_params: Option>, + bootstrap_servers: Vec, + override_params: Option>, } #[pymethods] impl PyKafkaProducerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { From e0275605dd931c414dcb6f4162309d73d13c524a Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 18:38:57 -0400 Subject: [PATCH 08/17] add parameterized python test for build_dlq_config --- .../sentry_streams/rust_streams.pyi | 3 ++ sentry_streams/src/kafka_config.rs | 2 + sentry_streams/tests/test_dlq.py | 47 +++++++++++++++---- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 2904135c..849cd7d6 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -35,6 +35,9 @@ class PyKafkaConsumerConfig: ) -> None: ... class PyKafkaProducerConfig: + bootstrap_servers: Sequence[str] + override_params: Mapping[str, str] | None + def __init__( self, bootstrap_servers: Sequence[str], diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index 09757e7b..540eb340 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -104,7 +104,9 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { + #[pyo3(get)] bootstrap_servers: Vec, + #[pyo3(get)] override_params: Option>, } diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 12c40656..95562805 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -138,20 +138,47 @@ def test_dlq_config_with_multiple_bootstrap_servers() -> None: assert dlq_config.bootstrap_servers[1] == "broker2:9092" -def test_build_dlq_config_with_dlq_configured() -> None: - """Test build_dlq_config returns PyDlqConfig when step has dlq_config.""" +@pytest.mark.parametrize( + "dlq_config, expected_topic, expected_bootstrap_servers", + [ + # No DLQ config -> returns None + (None, None, None), + # Single bootstrap server + ( + DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]), + "test-dlq", + ["localhost:9092"], + ), + # Multiple bootstrap servers + ( + DlqConfig( + topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"] + ), + "my-dlq", + ["broker1:9092", "broker2:9092", "broker3:9092"], + ), + ], +) +def test_build_dlq_config_with_various_configs( + dlq_config: DlqConfig | None, + expected_topic: str | None, + expected_bootstrap_servers: list[str] | None, +) -> None: + """Test build_dlq_config returns correct PyDlqConfig for various inputs.""" source = StreamSource( name="test_source", stream_name="test-topic", - dlq_config=DlqConfig( - topic="test-dlq", - bootstrap_servers=["localhost:9092", "localhost:9093"], - ), + dlq_config=dlq_config, ) result = build_dlq_config(source) - assert result is not None - assert isinstance(result, PyDlqConfig) - assert result.topic == "test-dlq" - assert result.producer_config is not None + if expected_topic is None: + assert result is None + else: + assert result is not None + assert isinstance(result, PyDlqConfig) + assert result.topic == expected_topic + assert result.producer_config is not None + assert result.producer_config.bootstrap_servers == expected_bootstrap_servers + assert result.producer_config.override_params is None From e89789ded5fff7beeac882f4d61a8d0c31635392 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 19:13:50 -0400 Subject: [PATCH 09/17] update tests --- .../sentry_streams/rust_streams.pyi | 2 + sentry_streams/src/consumer.rs | 1 + sentry_streams/tests/test_dlq.py | 114 +++++++++--------- 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 849cd7d6..711895db 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -110,6 +110,8 @@ class RuntimeOperator: def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... class ArroyoConsumer: + dlq_config: PyDlqConfig | None + def __init__( self, source: str, diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 0d1b63a3..f9da2237 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -100,6 +100,7 @@ pub struct ArroyoConsumer { /// DLQ (Dead Letter Queue) configuration. /// If provided, invalid messages will be sent to the DLQ topic. /// Otherwise, invalid messages will cause the consumer to stop processing. + #[pyo3(get)] dlq_config: Option, } diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 95562805..592f3ad6 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -1,14 +1,3 @@ -""" -Tests for Dead Letter Queue (DLQ) functionality. - -These tests verify: -1. Python StreamSource can be configured with DLQ settings -2. Rust ArroyoConsumer correctly accepts DLQ configuration parameters -3. Backward compatibility is maintained -4. build_dlq_config() correctly converts Python DlqConfig to PyDlqConfig -5. StreamSource.override_config() correctly overrides DLQ from YAML -""" - import pytest from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config @@ -23,20 +12,24 @@ @pytest.mark.parametrize( - "dlq_config_type", + "dlq_config_type,expected_topic,expected_bootstrap_servers", [ - "without_dlq", # Backward compatibility: no DLQ param - "with_none", # Explicit None for DLQ config - "with_dlq_config", # Full DLQ configuration + ("without_dlq", None, None), # Backward compatibility: no DLQ param + ("with_none", None, None), # Explicit None for DLQ config + ("with_dlq_config", "test-dlq", ["localhost:9092"]), # Full DLQ configuration ], ) -def test_consumer_creation_with_various_dlq_configs(dlq_config_type: str) -> None: - """Test that consumer can be created with different DLQ configurations. +def test_consumer_creation_with_various_dlq_configs( + dlq_config_type: str, + expected_topic: str | None, + expected_bootstrap_servers: list[str] | None, +) -> None: + """Test that Rust ArroyoConsumer correctly accepts and stores DLQ configuration. This parameterized test verifies: 1. Backward compatibility when DLQ param is omitted 2. Explicit None for DLQ config works - 3. Full DLQ configuration is accepted + 3. Full DLQ configuration is accepted and stored correctly """ kafka_consumer_config = PyKafkaConsumerConfig( bootstrap_servers=["localhost:9092"], @@ -91,51 +84,54 @@ def test_consumer_creation_with_various_dlq_configs(dlq_config_type: str) -> Non else: raise ValueError(f"Unknown dlq_config_type: {dlq_config_type}") + # Verify the consumer was created and DLQ config is stored correctly assert consumer is not None + if expected_topic is None: + assert consumer.dlq_config is None + else: + assert consumer.dlq_config is not None + assert consumer.dlq_config.topic == expected_topic + assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers -def test_stream_source_creation_without_dlq() -> None: - """Test that StreamSource can be created without DLQ config.""" - source = StreamSource( - name="my_source", - stream_name="my-topic", - ) - - assert source.dlq_config is None - - -def test_stream_source_with_all_optional_params_including_dlq() -> None: - """Test StreamSource with all optional parameters including DLQ.""" - dlq_config = DlqConfig( - topic="my-dlq", - bootstrap_servers=["localhost:9092"], - ) - - source = StreamSource( - name="my_source", - stream_name="my-topic", - header_filter=("my-header", b"my-value"), - consumer_group="my-group", - dlq_config=dlq_config, - ) - - assert source.name == "my_source" - assert source.stream_name == "my-topic" - assert source.consumer_group == "my-group" - assert source.dlq_config is not None - assert source.dlq_config.topic == "my-dlq" - - -def test_dlq_config_with_multiple_bootstrap_servers() -> None: - """Test that DlqConfig can have multiple bootstrap servers.""" - dlq_config = DlqConfig( - topic="my-dlq", - bootstrap_servers=["broker1:9092", "broker2:9092"], - ) +@pytest.mark.parametrize( + "dlq_config_type,expected_topic", + [ + ("omitted", None), # Don't pass dlq_config at all + ("explicit_none", None), # Pass dlq_config=None explicitly + ("with_config", "my-dlq"), # Pass an actual DlqConfig + ], +) +def test_stream_source_creation_with_various_dlq_configs( + dlq_config_type: str, + expected_topic: str | None, +) -> None: + """Test that StreamSource can be created with different DLQ configurations.""" + if dlq_config_type == "omitted": + source = StreamSource( + name="my_source", + stream_name="my-topic", + ) + elif dlq_config_type == "explicit_none": + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=None, + ) + elif dlq_config_type == "with_config": + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=DlqConfig(topic="my-dlq", bootstrap_servers=["localhost:9092"]), + ) + else: + raise ValueError(f"Unknown dlq_config_type: {dlq_config_type}") - assert len(dlq_config.bootstrap_servers) == 2 - assert dlq_config.bootstrap_servers[0] == "broker1:9092" - assert dlq_config.bootstrap_servers[1] == "broker2:9092" + if expected_topic is None: + assert source.dlq_config is None + else: + assert source.dlq_config is not None + assert source.dlq_config.topic == expected_topic @pytest.mark.parametrize( From 2f3bbab612916e5270818e6072ece11e583f8610 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 20:20:09 -0400 Subject: [PATCH 10/17] standalone function --- sentry_streams/src/consumer.rs | 173 ++++++++++++++------------------- 1 file changed, 74 insertions(+), 99 deletions(-) diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index f9da2237..a068b58e 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -157,7 +157,7 @@ impl ArroyoConsumer { let config = self.consumer_config.clone().into(); // Build DLQ policy if configured - let dlq_policy = self.build_dlq_policy(); + let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle()); let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); @@ -183,42 +183,36 @@ impl ArroyoConsumer { } } -// Internal implementation methods (not exposed to Python) -impl ArroyoConsumer { - /// Builds the DLQ policy if dlq_config is provided. - /// Returns None if DLQ is not configured. - fn build_dlq_policy(&self) -> Option> { - match &self.dlq_config { - Some(dlq_config) => { - tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); - - // Create Kafka producer for DLQ - let producer_config = dlq_config.producer_config.clone().into(); - let kafka_producer = KafkaProducer::new(producer_config); - let dlq_producer = - KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); - - // Get tokio runtime handle for async DLQ operations - let handle = self.concurrency_config.handle(); - - // Use default DLQ limits (no limits) and no max buffered messages - // These can be made configurable in a future PR if needed - let dlq_limit = DlqLimit::default(); - let max_buffered_messages = None; - - Some(DlqPolicy::new( - handle, - Box::new(dlq_producer), - dlq_limit, - max_buffered_messages, - )) - } - None => { - tracing::info!( - "DLQ not configured, invalid messages will cause processing to stop" - ); - None - } +/// Builds the DLQ policy if dlq_config is provided. +/// Returns None if DLQ is not configured. +pub fn build_dlq_policy( + dlq_config: &Option, + handle: tokio::runtime::Handle, +) -> Option> { + match dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!("DLQ not configured, invalid messages will cause processing to stop"); + None } } } @@ -512,68 +506,49 @@ mod tests { #[test] fn test_build_dlq_policy_with_various_configs() { - crate::testutils::initialize_python(); - traced_with_gil!(|_py| { - // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) - let test_cases = vec![ - ("without_dlq_config", None, false), - ( - "with_dlq_config_single_broker", - Some(vec!["localhost:9092".to_string()]), - true, - ), - ( - "with_dlq_config_multiple_brokers", - Some(vec![ - "broker1:9092".to_string(), - "broker2:9092".to_string(), - "broker3:9092".to_string(), - ]), - true, - ), - ]; - - for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { - // Create consumer config using the public new constructor - let consumer_config = PyKafkaConsumerConfig::new( - vec!["localhost:9092".to_string()], - "test-group".to_string(), - crate::kafka_config::InitialOffset::Latest, - false, - 60000, - None, - ); - - // Create DLQ config if bootstrap servers are provided - let dlq_config = dlq_bootstrap_servers.map(|servers| { - let producer_config = PyKafkaProducerConfig::new(servers, None); - PyDlqConfig::new("test-dlq".to_string(), producer_config) - }); - - // Create consumer - let consumer = ArroyoConsumer::new( - "test_source".to_string(), - consumer_config, - "test-topic".to_string(), - None, - None, - false, - dlq_config, - ); - - // Build DLQ policy and assert - let dlq_policy = consumer.build_dlq_policy(); - assert_eq!( - dlq_policy.is_some(), - expected_some, - "Test case '{}' failed: expected is_some() to be {}", - test_name, - expected_some - ); - } - }); + // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) + let test_cases = vec![ + ("without_dlq_config", None, false), + ( + "with_dlq_config_single_broker", + Some(vec!["localhost:9092".to_string()]), + true, + ), + ( + "with_dlq_config_multiple_brokers", + Some(vec![ + "broker1:9092".to_string(), + "broker2:9092".to_string(), + "broker3:9092".to_string(), + ]), + true, + ), + ]; + + // Create a tokio runtime to get a handle for testing + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { + // Create DLQ config if bootstrap servers are provided + let dlq_config = dlq_bootstrap_servers.map(|servers| { + let producer_config = PyKafkaProducerConfig::new(servers, None); + PyDlqConfig::new("test-dlq".to_string(), producer_config) + }); + + // Build DLQ policy and assert + let dlq_policy: Option> = + build_dlq_policy(&dlq_config, handle.clone()); + assert_eq!( + dlq_policy.is_some(), + expected_some, + "Test case '{}' failed: expected is_some() to be {}", + test_name, + expected_some + ); + } } - // Note: Additional DLQ functionality is tested through Python integration tests - // in tests/test_dlq.py, as full end-to-end testing requires the Python module. + // Note: Asserting on inside properties of dlq_policy is tested through Python integration tests + // in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private. } From c6cd8078528e5907d57bb7788596d57d11d34d92 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 20:39:14 -0400 Subject: [PATCH 11/17] remove dlq enabled field --- sentry_streams/sentry_streams/config_types.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 353e0160..9f90f1f4 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -15,7 +15,6 @@ class DlqConfig(TypedDict, total=False): All fields are optional to allow for default behavior. """ - enabled: bool topic: str producer_config: "KafkaProducerConfig" From 189f2b4ecdf62d8eda0fa41c1e61d0debc3e1eeb Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 21:21:53 -0400 Subject: [PATCH 12/17] allow overriding dlq config field by field --- .../sentry_streams/pipeline/pipeline.py | 20 +++++-- sentry_streams/tests/test_dlq.py | 53 +++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 4e0aaf65..ba2d8d8a 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -246,6 +246,7 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + # dlq_config is Optional for now for no-op, in a following PR I will add default values and remove Optional dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE @@ -259,11 +260,20 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) if loaded_config.get("dlq"): - dlq_data = loaded_config["dlq"] - self.dlq_config = DlqConfig( - topic=dlq_data["topic"], - bootstrap_servers=dlq_data["bootstrap_servers"], - ) + loaded_dlq = loaded_config["dlq"] + if "topic" in loaded_dlq: + topic = topic = loaded_dlq["topic"] + elif self.dlq_config: + topic = self.dlq_config.topic + + if "bootstrap_servers" in loaded_dlq: + servers = loaded_dlq["bootstrap_servers"] + elif self.dlq_config: + servers = self.dlq_config.bootstrap_servers + self.dlq_config = DlqConfig( + topic=topic, + bootstrap_servers=servers, + ) @dataclass diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 592f3ad6..e6065018 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -178,3 +178,56 @@ def test_build_dlq_config_with_various_configs( assert result.producer_config is not None assert result.producer_config.bootstrap_servers == expected_bootstrap_servers assert result.producer_config.override_params is None + + +@pytest.mark.parametrize( + "initial_dlq_config,override_dlq,expected_topic,expected_bootstrap_servers", + [ + # No initial config, create new with both fields + ( + None, + {"topic": "new-dlq", "bootstrap_servers": ["broker1:9092"]}, + "new-dlq", + ["broker1:9092"], + ), + # Existing config, override only topic + ( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"topic": "new-dlq"}, + "new-dlq", + ["old-broker:9092"], + ), + # Existing config, override only bootstrap_servers + ( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}, + "old-dlq", + ["new-broker:9092", "new-broker2:9092"], + ), + # Existing config, override both fields + ( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"topic": "new-dlq", "bootstrap_servers": ["new-broker:9092"]}, + "new-dlq", + ["new-broker:9092"], + ), + ], +) +def test_stream_source_override_config_dlq( + initial_dlq_config: DlqConfig | None, + override_dlq: dict[str, str | list[str]], + expected_topic: str, + expected_bootstrap_servers: list[str], +) -> None: + """Test that StreamSource.override_config correctly overrides DLQ settings.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=initial_dlq_config, + ) + + source.override_config({"dlq": override_dlq}) + + assert source.dlq_config is not None + assert source.dlq_config.topic == expected_topic + assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers From b667a3c195421f6571bf21e2c7077d1b69d047ab Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 21:25:59 -0400 Subject: [PATCH 13/17] comment --- sentry_streams/sentry_streams/pipeline/pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index ba2d8d8a..bcd05dcd 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -263,11 +263,13 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: loaded_dlq = loaded_config["dlq"] if "topic" in loaded_dlq: topic = topic = loaded_dlq["topic"] + # this is for passing type checking, bc rn dlq_config is optional, see above comment elif self.dlq_config: topic = self.dlq_config.topic if "bootstrap_servers" in loaded_dlq: servers = loaded_dlq["bootstrap_servers"] + # this is for passing type checking, bc rn dlq_config is optional, see above comment elif self.dlq_config: servers = self.dlq_config.bootstrap_servers self.dlq_config = DlqConfig( From 8ef9a6ac5a0c24c4eca1e3b5262d934b321d8708 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 21:30:08 -0400 Subject: [PATCH 14/17] remove unused pub new --- sentry_streams/src/kafka_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index 540eb340..c58193b6 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -69,7 +69,7 @@ pub struct PyKafkaConsumerConfig { #[pymethods] impl PyKafkaConsumerConfig { #[new] - pub fn new( + fn new( bootstrap_servers: Vec, group_id: String, auto_offset_reset: InitialOffset, From 5aee685e5ad2e36211194a10a52cdde266d4bfb3 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 25 Mar 2026 21:53:15 -0400 Subject: [PATCH 15/17] finalize tests --- .../sentry_streams/pipeline/pipeline.py | 13 ++- sentry_streams/tests/test_dlq.py | 81 +++++-------------- 2 files changed, 26 insertions(+), 68 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index bcd05dcd..d350602a 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -262,20 +262,19 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: if loaded_config.get("dlq"): loaded_dlq = loaded_config["dlq"] if "topic" in loaded_dlq: - topic = topic = loaded_dlq["topic"] - # this is for passing type checking, bc rn dlq_config is optional, see above comment + topic = loaded_dlq["topic"] elif self.dlq_config: topic = self.dlq_config.topic if "bootstrap_servers" in loaded_dlq: servers = loaded_dlq["bootstrap_servers"] - # this is for passing type checking, bc rn dlq_config is optional, see above comment elif self.dlq_config: servers = self.dlq_config.bootstrap_servers - self.dlq_config = DlqConfig( - topic=topic, - bootstrap_servers=servers, - ) + + self.dlq_config = DlqConfig( + topic=topic, + bootstrap_servers=servers, + ) @dataclass diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index e6065018..fa857876 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -12,14 +12,14 @@ @pytest.mark.parametrize( - "dlq_config_type,expected_topic,expected_bootstrap_servers", + "dlq_config_type, expected_topic, expected_bootstrap_servers", [ - ("without_dlq", None, None), # Backward compatibility: no DLQ param - ("with_none", None, None), # Explicit None for DLQ config - ("with_dlq_config", "test-dlq", ["localhost:9092"]), # Full DLQ configuration + pytest.param("without_dlq", None, None, id="without_dlq"), + pytest.param("with_none", None, None, id="with_explicit_none"), + pytest.param("with_dlq_config", "test-dlq", ["localhost:9092"], id="with_full_config"), ], ) -def test_consumer_creation_with_various_dlq_configs( +def test_consumer_creation( dlq_config_type: str, expected_topic: str | None, expected_bootstrap_servers: list[str] | None, @@ -94,68 +94,27 @@ def test_consumer_creation_with_various_dlq_configs( assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers -@pytest.mark.parametrize( - "dlq_config_type,expected_topic", - [ - ("omitted", None), # Don't pass dlq_config at all - ("explicit_none", None), # Pass dlq_config=None explicitly - ("with_config", "my-dlq"), # Pass an actual DlqConfig - ], -) -def test_stream_source_creation_with_various_dlq_configs( - dlq_config_type: str, - expected_topic: str | None, -) -> None: - """Test that StreamSource can be created with different DLQ configurations.""" - if dlq_config_type == "omitted": - source = StreamSource( - name="my_source", - stream_name="my-topic", - ) - elif dlq_config_type == "explicit_none": - source = StreamSource( - name="my_source", - stream_name="my-topic", - dlq_config=None, - ) - elif dlq_config_type == "with_config": - source = StreamSource( - name="my_source", - stream_name="my-topic", - dlq_config=DlqConfig(topic="my-dlq", bootstrap_servers=["localhost:9092"]), - ) - else: - raise ValueError(f"Unknown dlq_config_type: {dlq_config_type}") - - if expected_topic is None: - assert source.dlq_config is None - else: - assert source.dlq_config is not None - assert source.dlq_config.topic == expected_topic - - @pytest.mark.parametrize( "dlq_config, expected_topic, expected_bootstrap_servers", [ - # No DLQ config -> returns None - (None, None, None), - # Single bootstrap server - ( + pytest.param(None, None, None, id="no_dlq_config"), + pytest.param( DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]), "test-dlq", ["localhost:9092"], + id="single_bootstrap_server", ), - # Multiple bootstrap servers - ( + pytest.param( DlqConfig( topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"] ), "my-dlq", ["broker1:9092", "broker2:9092", "broker3:9092"], + id="multiple_bootstrap_servers", ), ], ) -def test_build_dlq_config_with_various_configs( +def test_build_dlq_config( dlq_config: DlqConfig | None, expected_topic: str | None, expected_bootstrap_servers: list[str] | None, @@ -181,35 +140,35 @@ def test_build_dlq_config_with_various_configs( @pytest.mark.parametrize( - "initial_dlq_config,override_dlq,expected_topic,expected_bootstrap_servers", + "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers", [ - # No initial config, create new with both fields - ( + pytest.param( None, {"topic": "new-dlq", "bootstrap_servers": ["broker1:9092"]}, "new-dlq", ["broker1:9092"], + id="create_new_config", ), - # Existing config, override only topic - ( + pytest.param( DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), {"topic": "new-dlq"}, "new-dlq", ["old-broker:9092"], + id="override_topic_only", ), - # Existing config, override only bootstrap_servers - ( + pytest.param( DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}, "old-dlq", ["new-broker:9092", "new-broker2:9092"], + id="override_bootstrap_servers_only", ), - # Existing config, override both fields - ( + pytest.param( DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), {"topic": "new-dlq", "bootstrap_servers": ["new-broker:9092"]}, "new-dlq", ["new-broker:9092"], + id="override_both_fields", ), ], ) From 138898d153d8326677dcc3949de743d6d6171b25 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Thu, 26 Mar 2026 11:09:43 -0400 Subject: [PATCH 16/17] simpify --- .../sentry_streams/pipeline/pipeline.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index d350602a..0aae9a92 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -261,15 +261,13 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: self.consumer_group = str(loaded_config.get("consumer_group")) if loaded_config.get("dlq"): loaded_dlq = loaded_config["dlq"] - if "topic" in loaded_dlq: - topic = loaded_dlq["topic"] - elif self.dlq_config: - topic = self.dlq_config.topic - - if "bootstrap_servers" in loaded_dlq: - servers = loaded_dlq["bootstrap_servers"] - elif self.dlq_config: - servers = self.dlq_config.bootstrap_servers + topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) + servers = loaded_dlq.get("bootstrap_servers") or ( + self.dlq_config.bootstrap_servers if self.dlq_config else None + ) + + if topic is None or servers is None: + raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'") self.dlq_config = DlqConfig( topic=topic, From 22c97b6ee57ae03746f32552c032f2e31562d0ba Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Thu, 26 Mar 2026 11:44:00 -0400 Subject: [PATCH 17/17] fix schema --- sentry_streams/sentry_streams/pipeline/pipeline.py | 4 +++- sentry_streams/tests/test_dlq.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 0aae9a92..6a8779af 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -262,7 +262,9 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: if loaded_config.get("dlq"): loaded_dlq = loaded_config["dlq"] topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) - servers = loaded_dlq.get("bootstrap_servers") or ( + # bootstrap_servers is nested under producer_config in config_types.DlqConfig + producer_config = loaded_dlq.get("producer_config", {}) + servers = producer_config.get("bootstrap_servers") or ( self.dlq_config.bootstrap_servers if self.dlq_config else None ) diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index fa857876..27627da9 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -144,7 +144,7 @@ def test_build_dlq_config( [ pytest.param( None, - {"topic": "new-dlq", "bootstrap_servers": ["broker1:9092"]}, + {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}}, "new-dlq", ["broker1:9092"], id="create_new_config", @@ -158,14 +158,14 @@ def test_build_dlq_config( ), pytest.param( DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), - {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}, + {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}}, "old-dlq", ["new-broker:9092", "new-broker2:9092"], id="override_bootstrap_servers_only", ), pytest.param( DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), - {"topic": "new-dlq", "bootstrap_servers": ["new-broker:9092"]}, + {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}}, "new-dlq", ["new-broker:9092"], id="override_both_fields",