From 8104b5a547cfef41b94f4449bc990381d3654a5f Mon Sep 17 00:00:00 2001 From: Le Quang Tho <92069270+letho1608@users.noreply.github.com> Date: Sat, 28 Mar 2026 09:35:55 +0700 Subject: [PATCH] feat(robusta): add delay support to sink configuration Extend the sink model to support delayed notification configuration. This will allow users to specify delays for when notifications should be sent to specific sinks. Affected files: sinks.py Signed-off-by: Le Quang Tho <92069270+letho1608@users.noreply.github.com> --- src/robusta/core/model/sinks.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 src/robusta/core/model/sinks.py diff --git a/src/robusta/core/model/sinks.py b/src/robusta/core/model/sinks.py new file mode 100644 index 000000000..6aca965aa --- /dev/null +++ b/src/robusta/core/model/sinks.py @@ -0,0 +1,30 @@ +from typing import Optional +from pydantic import BaseModel, Field +from robusta.core.model.base_params import ActionParams + + +class DelayedNotificationConfig(BaseModel): + """ + Configuration for delayed notifications + :var delay_seconds: Number of seconds to delay the notification + :var condition: Optional condition that must be met for the delay to apply + """ + delay_seconds: int = Field(ge=0, description="Number of seconds to delay the notification") + condition: Optional[str] = Field(None, description="Optional condition for applying the delay") + + +class SinkBaseParams(ActionParams): + """ + Base class for all sink configurations + """ + name: str + notification_delay_seconds: Optional[int] = Field(None, ge=0, description="Delay notification by specified seconds") + delayed_notification_config: Optional[DelayedNotificationConfig] = Field( + None, description="Advanced delayed notification configuration" + ) + + def get_delay_seconds(self) -> int: + """Get the effective delay in seconds""" + if self.delayed_notification_config: + return self.delayed_notification_config.delay_seconds + return self.notification_delay_seconds or 0 \ No newline at end of file