Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from sentry_streams.rust_streams import (
ArroyoConsumer,
InitialOffset,
PyDlqConfig,
PyKafkaConsumerConfig,
PyKafkaProducerConfig,
PyMetricConfig,
Expand Down Expand Up @@ -171,6 +172,25 @@ def build_kafka_producer_config(
)


def build_dlq_config(
step: StreamSource,
) -> PyDlqConfig | None:
"""
Build the DLQ configuration for the source.
Returns PyDlqConfig if step.dlq_config is set, otherwise None.
"""
if step.dlq_config is None:
return None

return PyDlqConfig(
topic=step.dlq_config.topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=step.dlq_config.bootstrap_servers,
override_params=None,
),
)


def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator:
rust_route = RustRoute(route.source, route.waypoints)
config, func = chains.finalize(route)
Expand Down Expand Up @@ -270,6 +290,7 @@ def source(self, step: Source[Any]) -> Route:
schema=schema_name,
metric_config=self.__metric_config,
write_healthcheck=self.__write_healthcheck,
dlq_config=build_dlq_config(step),
)
return Route(source_name, [])

Expand Down
11 changes: 11 additions & 0 deletions sentry_streams/sentry_streams/config_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ class StepConfig(TypedDict):
starts_segment: Optional[bool]


class DlqConfig(TypedDict, total=False):
"""
Dead Letter Queue configuration for a StreamSource.
All fields are optional to allow for default behavior.
"""

topic: str
producer_config: "KafkaProducerConfig"

Comment on lines +17 to +20
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The Python adapter in rust_arroyo.py doesn't read the dlq configuration or pass it to the Rust ArroyoConsumer, rendering the DLQ feature non-functional.
Severity: CRITICAL

Suggested Fix

Update rust_arroyo.py to read the dlq configuration from the source config. If present, construct the corresponding Rust DlqConfig object and pass it as the dlq_config argument when initializing the ArroyoConsumer. Add integration tests to verify the DLQ functionality.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/config_types.py#L17-L21

Potential issue: The pull request adds Dead Letter Queue (DLQ) support, with a
`DlqConfig` defined in Python and the Rust consumer expecting a `dlq_config` parameter.
However, the Python adapter in `rust_arroyo.py` that instantiates the `ArroyoConsumer`
never reads the `dlq` key from the configuration dictionary and does not pass it during
initialization. As a result, any user-provided DLQ configuration will be silently
ignored, and the DLQ functionality will not work. Invalid messages will be dropped
instead of being routed to the configured dead-letter topic.

Did we get this right? 👍 / 👎 to inform future reviews.


class KafkaConsumerConfig(TypedDict, StepConfig):
bootstrap_servers: Sequence[str]
auto_offset_reset: str
consumer_group: NotRequired[str]
additional_settings: Mapping[str, Any]
dlq: NotRequired[DlqConfig]


class KafkaProducerConfig(TypedDict, StepConfig):
Expand Down
33 changes: 32 additions & 1 deletion sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ class Source(Step, Generic[TOut]):
"""


@dataclass
class DlqConfig:
"""
Configuration for Dead Letter Queue (DLQ).

When provided, invalid messages will be sent to the DLQ topic
instead of causing the consumer to stop processing.
"""

topic: str
bootstrap_servers: Sequence[str]


@dataclass
class StreamSource(Source[bytes]):
"""
Expand All @@ -233,17 +246,35 @@ class StreamSource(Source[bytes]):
stream_name: str
header_filter: Optional[Tuple[str, bytes]] = None
consumer_group: Optional[str] = None
# dlq_config is Optional for now for no-op, in a following PR I will add default values and remove Optional
dlq_config: Optional[DlqConfig] = None
step_type: StepType = StepType.SOURCE

def register(self, ctx: Pipeline[bytes], previous: Step) -> None:
super().register(ctx, previous)

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic and consumer_group from deployment configuration."""
"""Override topic, consumer_group, and dlq_config from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
if loaded_config.get("consumer_group"):
self.consumer_group = str(loaded_config.get("consumer_group"))
if loaded_config.get("dlq"):
loaded_dlq = loaded_config["dlq"]
topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None)
# bootstrap_servers is nested under producer_config in config_types.DlqConfig
producer_config = loaded_dlq.get("producer_config", {})
servers = producer_config.get("bootstrap_servers") or (
self.dlq_config.bootstrap_servers if self.dlq_config else None
)

if topic is None or servers is None:
raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")

self.dlq_config = DlqConfig(
topic=topic,
bootstrap_servers=servers,
)


@dataclass
Expand Down
20 changes: 18 additions & 2 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,27 @@ class PyKafkaConsumerConfig:
auto_offset_reset: InitialOffset,
strict_offset_reset: bool,
max_poll_interval_ms: int,
override_params: Mapping[str, str],
override_params: Mapping[str, str] | None = None,
) -> None: ...

class PyKafkaProducerConfig:
bootstrap_servers: Sequence[str]
override_params: Mapping[str, str] | None

def __init__(
self,
bootstrap_servers: Sequence[str],
override_params: Mapping[str, str],
override_params: Mapping[str, str] | None = None,
) -> None: ...

class PyDlqConfig:
topic: str
producer_config: PyKafkaProducerConfig

def __init__(
self,
topic: str,
producer_config: PyKafkaProducerConfig,
) -> None: ...

class PyMetricConfig:
Expand Down Expand Up @@ -97,6 +110,8 @@ class RuntimeOperator:
def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ...

class ArroyoConsumer:
dlq_config: PyDlqConfig | None

def __init__(
self,
source: str,
Expand All @@ -105,6 +120,7 @@ class ArroyoConsumer:
schema: str | None,
metric_config: PyMetricConfig | None = None,
write_healthcheck: bool = False,
dlq_config: PyDlqConfig | None = None,
) -> None: ...
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...
Expand Down
128 changes: 125 additions & 3 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! The pipeline is built by adding RuntimeOperators to the consumer.

use crate::commit_policy::WatermarkCommitOffsets;
use crate::kafka_config::PyKafkaConsumerConfig;
use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig};
use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload};
use crate::metrics::configure_metrics;
use crate::metrics_config::PyMetricConfig;
Expand All @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil;
use crate::watermark::WatermarkEmitter;
use pyo3::prelude::*;
use rdkafka::message::{Header, Headers, OwnedHeaders};
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer};
use sentry_arroyo::processing::strategies::healthcheck::HealthCheck;
use sentry_arroyo::processing::strategies::noop::Noop;
use sentry_arroyo::processing::strategies::run_task::RunTask;
Expand All @@ -34,6 +36,31 @@ use std::sync::Arc;
/// Matches Arroyo docs for Kubernetes liveness probes.
const HEALTHCHECK_PATH: &str = "/tmp/health.txt";

/// Configuration for Dead Letter Queue (DLQ).
/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"

/// The Kafka topic name to send invalid messages to
#[pyo3(get)]
pub topic: String,

/// The Kafka producer configuration for the DLQ
#[pyo3(get)]
pub producer_config: PyKafkaProducerConfig,
}

#[pymethods]
impl PyDlqConfig {
#[new]
fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self {
PyDlqConfig {
topic,
producer_config,
}
}
}

/// The class that represent the consumer.
/// This class is exposed to python and it is the main entry point
/// used by the Python adapter to build a pipeline and run it.
Expand Down Expand Up @@ -69,19 +96,26 @@ pub struct ArroyoConsumer {

/// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness.
write_healthcheck: bool,

/// DLQ (Dead Letter Queue) configuration.
/// If provided, invalid messages will be sent to the DLQ topic.
/// Otherwise, invalid messages will cause the consumer to stop processing.
#[pyo3(get)]
dlq_config: Option<PyDlqConfig>,
}

#[pymethods]
impl ArroyoConsumer {
#[new]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))]
fn new(
source: String,
kafka_config: PyKafkaConsumerConfig,
topic: String,
schema: Option<String>,
metric_config: Option<PyMetricConfig>,
write_healthcheck: bool,
dlq_config: Option<PyDlqConfig>,
) -> Self {
ArroyoConsumer {
consumer_config: kafka_config,
Expand All @@ -93,6 +127,7 @@ impl ArroyoConsumer {
concurrency_config: Arc::new(ConcurrencyConfig::new(1)),
metric_config,
write_healthcheck,
dlq_config,
}
}

Expand Down Expand Up @@ -120,7 +155,12 @@ impl ArroyoConsumer {
self.write_healthcheck,
);
let config = self.consumer_config.clone().into();
let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None);

// Build DLQ policy if configured
let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle());

let processor =
StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy);
self.handle = Some(processor.get_handle());

let mut handle = processor.get_handle();
Expand All @@ -143,6 +183,40 @@ impl ArroyoConsumer {
}
}

/// Builds the DLQ policy if dlq_config is provided.
/// Returns None if DLQ is not configured.
pub fn build_dlq_policy(
dlq_config: &Option<PyDlqConfig>,
handle: tokio::runtime::Handle,
) -> Option<DlqPolicy<KafkaPayload>> {
match dlq_config {
Some(dlq_config) => {
tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic);

// Create Kafka producer for DLQ
let producer_config = dlq_config.producer_config.clone().into();
let kafka_producer = KafkaProducer::new(producer_config);
let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic));

// Use default DLQ limits (no limits) and no max buffered messages
// These can be made configurable in a future PR if needed
let dlq_limit = DlqLimit::default();
let max_buffered_messages = None;

Some(DlqPolicy::new(
handle,
Box::new(dlq_producer),
dlq_limit,
max_buffered_messages,
))
}
None => {
tracing::info!("DLQ not configured, invalid messages will cause processing to stop");
None
}
}
}

/// Converts a Message<KafkaPayload> to a Message<RoutedValue>.
///
/// The messages we send around between steps in the pipeline contain
Expand Down Expand Up @@ -429,4 +503,52 @@ mod tests {
let _ = std::fs::remove_file(healthcheck_path);
})
}

#[test]
fn test_build_dlq_policy_with_various_configs() {
// Define test cases: (test_name, dlq_bootstrap_servers, expected_some)
let test_cases = vec![
("without_dlq_config", None, false),
(
"with_dlq_config_single_broker",
Some(vec!["localhost:9092".to_string()]),
true,
),
(
"with_dlq_config_multiple_brokers",
Some(vec![
"broker1:9092".to_string(),
"broker2:9092".to_string(),
"broker3:9092".to_string(),
]),
true,
),
];

// Create a tokio runtime to get a handle for testing
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();

for (test_name, dlq_bootstrap_servers, expected_some) in test_cases {
// Create DLQ config if bootstrap servers are provided
let dlq_config = dlq_bootstrap_servers.map(|servers| {
let producer_config = PyKafkaProducerConfig::new(servers, None);
PyDlqConfig::new("test-dlq".to_string(), producer_config)
});

// Build DLQ policy and assert
let dlq_policy: Option<DlqPolicy<KafkaPayload>> =
build_dlq_policy(&dlq_config, handle.clone());
assert_eq!(
dlq_policy.is_some(),
expected_some,
"Test case '{}' failed: expected is_some() to be {}",
test_name,
expected_some
);
}
}

// Note: Asserting on inside properties of dlq_policy is tested through Python integration tests
// in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private.
}
4 changes: 3 additions & 1 deletion sentry_streams/src/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ impl From<PyKafkaConsumerConfig> for KafkaConfig {
#[pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PyKafkaProducerConfig {
#[pyo3(get)]
bootstrap_servers: Vec<String>,
#[pyo3(get)]
override_params: Option<HashMap<String, String>>,
}

#[pymethods]
impl PyKafkaProducerConfig {
#[new]
fn new(
pub fn new(
bootstrap_servers: Vec<String>,
override_params: Option<HashMap<String, String>>,
) -> Self {
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<kafka_config::PyKafkaProducerConfig>()?;
m.add_class::<kafka_config::InitialOffset>()?;
m.add_class::<consumer::ArroyoConsumer>()?;
m.add_class::<consumer::PyDlqConfig>()?;
m.add_class::<metrics_config::PyMetricConfig>()?;
m.add_class::<messages::PyAnyMessage>()?;
m.add_class::<messages::RawMessage>()?;
Expand Down
Loading
Loading