diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 8fc4a2d9..bd1841c6 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -60,6 +60,7 @@ from sentry_streams.rust_streams import ( ArroyoConsumer, InitialOffset, + PyDlqConfig, PyKafkaConsumerConfig, PyKafkaProducerConfig, PyMetricConfig, @@ -171,6 +172,25 @@ def build_kafka_producer_config( ) +def build_dlq_config( + step: StreamSource, +) -> PyDlqConfig | None: + """ + Build the DLQ configuration for the source. + Returns PyDlqConfig if step.dlq_config is set, otherwise None. + """ + if step.dlq_config is None: + return None + + return PyDlqConfig( + topic=step.dlq_config.topic, + producer_config=PyKafkaProducerConfig( + bootstrap_servers=step.dlq_config.bootstrap_servers, + override_params=None, + ), + ) + + def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) @@ -270,6 +290,7 @@ def source(self, step: Source[Any]) -> Route: schema=schema_name, metric_config=self.__metric_config, write_healthcheck=self.__write_healthcheck, + dlq_config=build_dlq_config(step), ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 110804c9..9f90f1f4 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,11 +9,22 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + topic: str + producer_config: "KafkaProducerConfig" + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str consumer_group: NotRequired[str] additional_settings: Mapping[str, Any] + dlq: NotRequired[DlqConfig] class KafkaProducerConfig(TypedDict, StepConfig): diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index a52994f6..6a8779af 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]): """ +@dataclass +class DlqConfig: + """ + Configuration for Dead Letter Queue (DLQ). + + When provided, invalid messages will be sent to the DLQ topic + instead of causing the consumer to stop processing. + """ + + topic: str + bootstrap_servers: Sequence[str] + + @dataclass class StreamSource(Source[bytes]): """ @@ -233,17 +246,35 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + # dlq_config is Optional for now for no-op, in a following PR I will add default values and remove Optional + dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic and consumer_group from deployment configuration.""" + """Override topic, consumer_group, and dlq_config from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) + if loaded_config.get("dlq"): + loaded_dlq = loaded_config["dlq"] + topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) + # bootstrap_servers is nested under producer_config in config_types.DlqConfig + producer_config = loaded_dlq.get("producer_config", {}) + servers = producer_config.get("bootstrap_servers") or ( + self.dlq_config.bootstrap_servers if self.dlq_config else None + ) + + if topic is None or servers is None: + raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'") + + self.dlq_config = DlqConfig( + topic=topic, + bootstrap_servers=servers, + ) @dataclass diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 6f58ffbb..711895db 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -31,14 +31,27 @@ class PyKafkaConsumerConfig: auto_offset_reset: InitialOffset, strict_offset_reset: bool, max_poll_interval_ms: int, - override_params: Mapping[str, str], + override_params: Mapping[str, str] | None = None, ) -> None: ... class PyKafkaProducerConfig: + bootstrap_servers: Sequence[str] + override_params: Mapping[str, str] | None + def __init__( self, bootstrap_servers: Sequence[str], - override_params: Mapping[str, str], + override_params: Mapping[str, str] | None = None, + ) -> None: ... + +class PyDlqConfig: + topic: str + producer_config: PyKafkaProducerConfig + + def __init__( + self, + topic: str, + producer_config: PyKafkaProducerConfig, ) -> None: ... class PyMetricConfig: @@ -97,6 +110,8 @@ class RuntimeOperator: def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... class ArroyoConsumer: + dlq_config: PyDlqConfig | None + def __init__( self, source: str, @@ -105,6 +120,7 @@ class ArroyoConsumer: schema: str | None, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, + dlq_config: PyDlqConfig | None = None, ) -> None: ... def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..a068b58e 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct PyDlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl PyDlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + PyDlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,18 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + #[pyo3(get)] + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +115,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +127,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +155,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle()); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +183,40 @@ impl ArroyoConsumer { } } +/// Builds the DLQ policy if dlq_config is provided. +/// Returns None if DLQ is not configured. +pub fn build_dlq_policy( + dlq_config: &Option, + handle: tokio::runtime::Handle, +) -> Option> { + match dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!("DLQ not configured, invalid messages will cause processing to stop"); + None + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +503,52 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + #[test] + fn test_build_dlq_policy_with_various_configs() { + // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) + let test_cases = vec![ + ("without_dlq_config", None, false), + ( + "with_dlq_config_single_broker", + Some(vec!["localhost:9092".to_string()]), + true, + ), + ( + "with_dlq_config_multiple_brokers", + Some(vec![ + "broker1:9092".to_string(), + "broker2:9092".to_string(), + "broker3:9092".to_string(), + ]), + true, + ), + ]; + + // Create a tokio runtime to get a handle for testing + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { + // Create DLQ config if bootstrap servers are provided + let dlq_config = dlq_bootstrap_servers.map(|servers| { + let producer_config = PyKafkaProducerConfig::new(servers, None); + PyDlqConfig::new("test-dlq".to_string(), producer_config) + }); + + // Build DLQ policy and assert + let dlq_policy: Option> = + build_dlq_policy(&dlq_config, handle.clone()); + assert_eq!( + dlq_policy.is_some(), + expected_some, + "Test case '{}' failed: expected is_some() to be {}", + test_name, + expected_some + ); + } + } + + // Note: Asserting on inside properties of dlq_policy is tested through Python integration tests + // in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private. } diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index d9bf9559..c58193b6 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -104,14 +104,16 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { + #[pyo3(get)] bootstrap_servers: Vec, + #[pyo3(get)] override_params: Option>, } #[pymethods] impl PyKafkaProducerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index c8913a39..d5131492 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py new file mode 100644 index 00000000..27627da9 --- /dev/null +++ b/sentry_streams/tests/test_dlq.py @@ -0,0 +1,192 @@ +import pytest + +from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config +from sentry_streams.pipeline.pipeline import DlqConfig, StreamSource +from sentry_streams.rust_streams import ( + ArroyoConsumer, + InitialOffset, + PyDlqConfig, + PyKafkaConsumerConfig, + PyKafkaProducerConfig, +) + + +@pytest.mark.parametrize( + "dlq_config_type, expected_topic, expected_bootstrap_servers", + [ + pytest.param("without_dlq", None, None, id="without_dlq"), + pytest.param("with_none", None, None, id="with_explicit_none"), + pytest.param("with_dlq_config", "test-dlq", ["localhost:9092"], id="with_full_config"), + ], +) +def test_consumer_creation( + dlq_config_type: str, + expected_topic: str | None, + expected_bootstrap_servers: list[str] | None, +) -> None: + """Test that Rust ArroyoConsumer correctly accepts and stores DLQ configuration. + + This parameterized test verifies: + 1. Backward compatibility when DLQ param is omitted + 2. Explicit None for DLQ config works + 3. Full DLQ configuration is accepted and stored correctly + """ + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + consumer: ArroyoConsumer + if dlq_config_type == "without_dlq": + # Test backward compatibility: don't pass dlq_config at all + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + ) + elif dlq_config_type == "with_none": + # Test explicit None + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=None, + ) + elif dlq_config_type == "with_dlq_config": + # Test with full DLQ configuration + kafka_producer_config = PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ) + dlq_config = PyDlqConfig( + topic="test-dlq", + producer_config=kafka_producer_config, + ) + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=dlq_config, + ) + else: + raise ValueError(f"Unknown dlq_config_type: {dlq_config_type}") + + # Verify the consumer was created and DLQ config is stored correctly + assert consumer is not None + if expected_topic is None: + assert consumer.dlq_config is None + else: + assert consumer.dlq_config is not None + assert consumer.dlq_config.topic == expected_topic + assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + + +@pytest.mark.parametrize( + "dlq_config, expected_topic, expected_bootstrap_servers", + [ + pytest.param(None, None, None, id="no_dlq_config"), + pytest.param( + DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]), + "test-dlq", + ["localhost:9092"], + id="single_bootstrap_server", + ), + pytest.param( + DlqConfig( + topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"] + ), + "my-dlq", + ["broker1:9092", "broker2:9092", "broker3:9092"], + id="multiple_bootstrap_servers", + ), + ], +) +def test_build_dlq_config( + dlq_config: DlqConfig | None, + expected_topic: str | None, + expected_bootstrap_servers: list[str] | None, +) -> None: + """Test build_dlq_config returns correct PyDlqConfig for various inputs.""" + source = StreamSource( + name="test_source", + stream_name="test-topic", + dlq_config=dlq_config, + ) + + result = build_dlq_config(source) + + if expected_topic is None: + assert result is None + else: + assert result is not None + assert isinstance(result, PyDlqConfig) + assert result.topic == expected_topic + assert result.producer_config is not None + assert result.producer_config.bootstrap_servers == expected_bootstrap_servers + assert result.producer_config.override_params is None + + +@pytest.mark.parametrize( + "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers", + [ + pytest.param( + None, + {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}}, + "new-dlq", + ["broker1:9092"], + id="create_new_config", + ), + pytest.param( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"topic": "new-dlq"}, + "new-dlq", + ["old-broker:9092"], + id="override_topic_only", + ), + pytest.param( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}}, + "old-dlq", + ["new-broker:9092", "new-broker2:9092"], + id="override_bootstrap_servers_only", + ), + pytest.param( + DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]), + {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}}, + "new-dlq", + ["new-broker:9092"], + id="override_both_fields", + ), + ], +) +def test_stream_source_override_config_dlq( + initial_dlq_config: DlqConfig | None, + override_dlq: dict[str, str | list[str]], + expected_topic: str, + expected_bootstrap_servers: list[str], +) -> None: + """Test that StreamSource.override_config correctly overrides DLQ settings.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=initial_dlq_config, + ) + + source.override_config({"dlq": override_dlq}) + + assert source.dlq_config is not None + assert source.dlq_config.topic == expected_topic + assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers