diff --git a/api/src/scripts/populate_db_gbfs.py b/api/src/scripts/populate_db_gbfs.py index 9edfa22c1..6d88b414f 100644 --- a/api/src/scripts/populate_db_gbfs.py +++ b/api/src/scripts/populate_db_gbfs.py @@ -17,6 +17,7 @@ from shared.common.license_utils import assign_license_by_url from shared.database.database import generate_unique_id, configure_polymorphic_mappers from shared.database_gen.sqlacodegen_models import Gbfsfeed, Location, Externalid +from shared.notifications.notification_event_service import emit_url_replaced GBFS_PUBSUB_TOPIC_NAME = "validate-gbfs-feed" @@ -108,9 +109,18 @@ def populate_db(self, session, fetch_url=True): gbfs_feed.operator = row["Name"] gbfs_feed.provider = row["Name"] gbfs_feed.operator_url = row["URL"] - gbfs_feed.producer_url = row["Auto-Discovery URL"] - gbfs_feed.auto_discovery_url = row["Auto-Discovery URL"] + old_producer_url = gbfs_feed.producer_url + new_producer_url = row["Auto-Discovery URL"] + gbfs_feed.producer_url = new_producer_url + gbfs_feed.auto_discovery_url = new_producer_url gbfs_feed.updated_at = datetime.now(pytz.utc) + if not is_new_feed and old_producer_url and old_producer_url != new_producer_url: + emit_url_replaced( + feed_stable_id=stable_id, + old_url=old_producer_url, + new_url=new_producer_url, + source="populate_db_gbfs", + ) if not gbfs_feed.locations: # If locations are empty, create a new location (no overwrite) country_code = self.get_safe_value(row, "Country Code", "") diff --git a/api/src/scripts/populate_db_gtfs.py b/api/src/scripts/populate_db_gtfs.py index 528aa7dbc..c35e8f030 100644 --- a/api/src/scripts/populate_db_gtfs.py +++ b/api/src/scripts/populate_db_gtfs.py @@ -16,6 +16,10 @@ Location, Redirectingid, ) +from shared.notifications.notification_event_service import ( + emit_feed_redirected, + emit_url_replaced, +) from utils.data_utils import set_up_defaults if TYPE_CHECKING: @@ -200,6 +204,14 @@ def process_redirects(self, session: "Session"): ) # Flush to avoid FK violation session.flush() + emit_feed_redirected( + source_stable_id=stable_id, + target_stable_id=target_stable_id, + old_url=getattr(feed, "producer_url", None), + new_url=getattr(target_feed, "producer_url", None), + source="populate_db_gtfs", + extra_data={"redirect_comment": comment} if comment else None, + ) def populate_db(self, session: "Session", fetch_url: bool = True): """ @@ -252,7 +264,15 @@ def populate_db(self, session: "Session", fetch_url: bool = True): feed.note = self.get_safe_value(row, "note", "") producer_url = self.get_safe_value(row, "urls.direct_download", "") if "transitfeeds" not in producer_url: # Avoid setting transitfeeds as producer_url + old_producer_url = feed.producer_url feed.producer_url = producer_url + if not is_new_feed and old_producer_url and old_producer_url != producer_url: + emit_url_replaced( + feed_stable_id=stable_id, + old_url=old_producer_url, + new_url=producer_url, + source="populate_db_gtfs", + ) feed.authentication_type = str(int(float(self.get_safe_value(row, "urls.authentication_type", "0")))) feed.authentication_info_url = self.get_safe_value(row, "urls.authentication_info", "") feed.api_key_parameter_name = self.get_safe_value(row, "urls.api_key_parameter_name", "") diff --git a/api/src/shared/notifications/__init__.py b/api/src/shared/notifications/__init__.py new file mode 100644 index 000000000..09db850ee --- /dev/null +++ b/api/src/shared/notifications/__init__.py @@ -0,0 +1,6 @@ +"""Shared notification utilities. + +Packages exported from here: + notification_event_service — emit_feed_redirected / emit_url_replaced + brevo_notification_sender — send_single / send_digest +""" diff --git a/api/src/shared/notifications/brevo_notification_sender.py b/api/src/shared/notifications/brevo_notification_sender.py new file mode 100644 index 000000000..65449a58a --- /dev/null +++ b/api/src/shared/notifications/brevo_notification_sender.py @@ -0,0 +1,420 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""brevo_notification_sender — transactional email delivery via Brevo. + +Brevo (formerly Sendinblue) is already used by this project for contact +management (``shared.common.brevo``). This module extends that integration +to **transactional email** using ``sib_api_v3_sdk.TransactionalEmailsApi``. + +Environment variables +--------------------- +BREVO_API_KEY + Brevo API key (required). +BREVO_SENDER_EMAIL + From-address for outgoing emails (default: ``noreply@mobilitydatabase.org``). +BREVO_SENDER_NAME + From-name (default: ``Mobility Database``). +BREVO_TEMPLATE_FEED_URL_UPDATED + Integer Brevo template ID for ``feed.url_updated`` single-event emails. + When not set, an inline HTML fallback is used. +BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST + Integer Brevo template ID for ``feed.url_updated`` digest emails. + When not set, an inline HTML fallback is used. +BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY + Integer Brevo template ID for ``admin.event_summary`` emails. + When not set, an inline HTML fallback is used. + +Design +------ +* ``send_single`` sends one email for one notification_event. +* ``send_digest`` sends one email batching multiple notification_events. +* Both raise ``BrevoSendError`` on failure so the caller can update + ``notification_log.status`` and ``retry_count`` accordingly. +* Template params are passed as ``params`` to the Brevo API; Brevo renders + them via its template engine. When no template ID is configured, a minimal + HTML fallback is built inline. +""" + +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from shared.notifications.notification_constants import ( + NotificationFeedRole, + NotificationTypeId, +) +from shared.users_database_gen.sqlacodegen_models import NotificationEvent + +logger = logging.getLogger(__name__) + +_DEFAULT_SENDER_EMAIL = "noreply@mobilitydatabase.org" +_DEFAULT_SENDER_NAME = "Mobility Database" + +_DIGEST_EMAIL_SUBJECT_DICTIONARY = { + NotificationTypeId.FEED_URL_UPDATED: "[Mobility Database] %s feed URL update%s", + NotificationTypeId.ADMIN_EVENT_SUMMARY: "[Mobility Database] Daily notification dispatch summary", +} + +_SINGLE_EMAIL_SUBJECT_DICTIONARY = { + NotificationTypeId.FEED_URL_UPDATED: "[Mobility Database] Feed %s has been updated", + NotificationTypeId.ADMIN_EVENT_SUMMARY: "[Mobility Database] Daily notification dispatch summary", +} + + +class BrevoSendError(Exception): + """Raised when a Brevo API call fails. Callers catch this to record failure.""" + + +@dataclass +class EmailRecipient: + email: str + name: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = {"email": self.email} + if self.name: + d["name"] = self.name + return d + + +def get_template_id_by_notification( + notification_type_id: str, + *, + digest: bool = False, +) -> Optional[int]: + match notification_type_id: + case NotificationTypeId.FEED_URL_UPDATED: + if digest: + return _int_env("BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST") + return _int_env("BREVO_TEMPLATE_FEED_URL_UPDATED") + case NotificationTypeId.ADMIN_EVENT_SUMMARY: + return _int_env("BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY") + case _: + return None + + +# --------------------------------------------------------------------------- +# Event accessors — read feeds (from notification_event_feed) and payload +# --------------------------------------------------------------------------- + + +def _feeds_with_role(event, role: str) -> List[str]: + """Return the stable_ids of feeds attached to ``event`` with the given role.""" + return [f.feed_stable_id for f in (getattr(event, "notification_event_feeds", None) or []) if f.role == role] + + +def subject_feed(event) -> Optional[str]: + """First feed in the 'subject' role, or None.""" + feeds = _feeds_with_role(event, NotificationFeedRole.SUBJECT) + return feeds[0] if feeds else None + + +def target_feed(event) -> Optional[str]: + """First feed in the 'target' role, or None.""" + feeds = _feeds_with_role(event, NotificationFeedRole.TARGET) + return feeds[0] if feeds else None + + +def event_payload(event) -> Dict[str, Any]: + """Type-specific payload dict for ``event`` (never None).""" + return event.payload or {} + + +# --------------------------------------------------------------------------- +# Email content builders (plain-HTML fallback when no template is configured) +# --------------------------------------------------------------------------- + + +def build_single_subject(event) -> str: + template = _SINGLE_EMAIL_SUBJECT_DICTIONARY.get(event.notification_type_id) + if template is None: + return f"[Mobility Database] Notification for {event.notification_type_id}" + + if "%s" in template: + return template % (subject_feed(event) or "unknown") + return template + + +def build_digest_subject(events: List) -> str: + count = len(events) + type_id = events[0].notification_type_id if events else "notification" + template = _DIGEST_EMAIL_SUBJECT_DICTIONARY.get(type_id) + if template is None: + return f"[Mobility Database] {count} notification{'s' if count != 1 else ''}" + + placeholder_count = template.count("%s") + if placeholder_count == 2: + return template % (count, "s" if count != 1 else "") + if placeholder_count == 1: + return template % count + return template + + +def build_params_feed_url_updated(events: List, subscription): + return { + "event_count": len(events), + "subscription_id": subscription.id, + "events": [ + { + "feed_stable_id": subject_feed(e), + "target_feed_stable_id": target_feed(e), + "event_subtype": e.event_subtype, + "old_url": event_payload(e).get("old_url") or "", + "new_url": event_payload(e).get("new_url") or "", + "source": e.source or "", + "created_at": e.created_at.isoformat() if e.created_at else "", + "payload": event_payload(e), + } + for e in events + ], + } + + +def build_params_admin_event_summary(events: List, subscription): + summary_event = events[0] if events else None + return { + "event_count": len(events), + "subscription_id": subscription.id, + "summary": event_payload(summary_event) if summary_event else {}, + } + + +def build_params_by_notification( + notification_type_id: str, events: List[NotificationEvent], subscription +) -> Dict[str, Any]: + match notification_type_id: + case NotificationTypeId.FEED_URL_UPDATED: + return build_params_feed_url_updated(events, subscription) + case NotificationTypeId.ADMIN_EVENT_SUMMARY: + return build_params_admin_event_summary(events, subscription) + case _: + raise ValueError(f"Unsupported notification type for Brevo params: {notification_type_id}") + + +def build_single_html(event) -> str: + payload = event_payload(event) + if event.event_subtype == "feed_redirected": + return ( + f"

Feed {subject_feed(event)} has been deprecated " + f"and now redirects to {target_feed(event)}.

" + f"

New URL: {payload.get('new_url')}

" + ) + return ( + f"

The URL for feed {subject_feed(event)} has changed.

" + f"

Old URL: {payload.get('old_url')}

" + f"

New URL: {payload.get('new_url')}

" + ) + + +def build_digest_html(events: List) -> str: + if not events: + return "

No feed URL changes in this period.

" + + if events[0].notification_type_id == NotificationTypeId.ADMIN_EVENT_SUMMARY: + rows = "".join( + f"{subject_feed(e) or '-'}" + f"{e.event_subtype}" + f"{event_payload(e).get('emails_sent', event_payload(e).get('sent', 0))}" + f"{event_payload(e).get('emails_failed', event_payload(e).get('failed', 0))}" + f"" + for e in events + ) + return ( + "

Notification Dispatch Summary

" + "" + "" + f"{rows}
FeedTypeSentFailed
" + ) + + rows = "".join( + f"{subject_feed(e)}{e.event_subtype}" + f"{event_payload(e).get('old_url') or '-'}" + f"{event_payload(e).get('new_url') or '-'}" + f"{e.source or '-'}" + for e in events + ) + return ( + "

Feed URL Updates

" + "" + "" + f"{rows}
FeedTypeOld URLNew URLSource
" + ) + + +def send_single( + recipient: EmailRecipient, + notification_event: NotificationEvent, # NotificationEvent ORM object + subscription, # NotificationSubscription ORM object +) -> None: + """Send a single-event notification email. + + Parameters + ---------- + recipient: + Destination email address and name. + notification_event: + The ``NotificationEvent`` ORM instance to notify about. + subscription: + The ``NotificationSubscription`` ORM instance (used for unsubscribe context). + + Raises + ------ + BrevoSendError + When the Brevo API returns an error. + """ + template_id = get_template_id_by_notification( + notification_event.notification_type_id, + digest=False, + ) + params = build_params_by_notification( + notification_event.notification_type_id, + [notification_event], + subscription, + ) + subject = build_single_subject(notification_event) + # This is case the HTML fallback is used, so we don't need to pass html_content + html = build_single_html(notification_event) if template_id is None else None + + _send( + recipient=recipient, + subject=subject, + html_content=html, + template_id=template_id, + params=params, + ) + + +def send_digest( + recipient: EmailRecipient, + notification_events: List, # List[NotificationEvent] + subscription, # NotificationSubscription ORM object +) -> None: + """Send a digest email batching multiple notification events. + + Parameters + ---------- + recipient: + Destination email address and name. + notification_events: + The ``NotificationEvent`` instances to include in the digest. + subscription: + The ``NotificationSubscription`` ORM instance. + + Raises + ------ + BrevoSendError + When the Brevo API returns an error. + """ + if not notification_events: + logger.debug("send_digest called with empty event list; skipping") + return + + notification_type_id = notification_events[0].notification_type_id + + template_id = get_template_id_by_notification(notification_type_id, digest=True) + if template_id is None: + logger.info( + "No Brevo template configured for notification type %s; using HTML fallback", + notification_type_id, + ) + + params = build_params_by_notification( + notification_type_id, + notification_events, + subscription, + ) + subject = build_digest_subject(notification_events) + html = build_digest_html(notification_events) if template_id is None else None + + _send( + recipient=recipient, + subject=subject, + html_content=html, + template_id=template_id, + params=params, + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _send( + recipient: EmailRecipient, + subject: str, + html_content: Optional[str], + template_id: Optional[int], + params: Dict[str, Any], +) -> None: + """Low-level send via Brevo TransactionalEmailsApi. + + Raises BrevoSendError on any failure. + """ + try: + import sib_api_v3_sdk + from sib_api_v3_sdk.rest import ApiException + except ImportError as exc: + raise BrevoSendError(f"sib_api_v3_sdk is not installed: {exc}") from exc + + api_key = os.getenv("BREVO_API_KEY") + if not api_key: + raise BrevoSendError("BREVO_API_KEY environment variable is not set") + + configuration = sib_api_v3_sdk.Configuration() + configuration.api_key = {"api-key": api_key} + + client = sib_api_v3_sdk.ApiClient(configuration) + api = sib_api_v3_sdk.TransactionalEmailsApi(client) + + sender_email = os.getenv("BREVO_SENDER_EMAIL", _DEFAULT_SENDER_EMAIL) + sender_name = os.getenv("BREVO_SENDER_NAME", _DEFAULT_SENDER_NAME) + + send_email = sib_api_v3_sdk.SendSmtpEmail( + to=[recipient.to_dict()], + sender={"email": sender_email, "name": sender_name}, + subject=subject if template_id is None else None, + html_content=html_content, + template_id=template_id, + params=params if template_id is not None else None, + ) + + try: + result = api.send_transac_email(send_email) + logger.info( + "Brevo email sent to %s (message_id=%s)", + recipient.email, + getattr(result, "message_id", "n/a"), + ) + except ApiException as exc: + raise BrevoSendError(f"Brevo API error {exc.status} sending to {recipient.email}: {exc.reason}") from exc + except Exception as exc: + raise BrevoSendError(f"Unexpected error sending to {recipient.email}: {exc}") from exc + + +def _int_env(var: str) -> Optional[int]: + """Return an env var as int, or None if not set / not parseable.""" + val = os.getenv(var) + if val is None: + return None + try: + return int(val) + except ValueError: + logger.warning("Environment variable %s=%r is not a valid integer; ignoring", var, val) + return None diff --git a/api/src/shared/notifications/notification_constants.py b/api/src/shared/notifications/notification_constants.py new file mode 100644 index 000000000..c39d6c468 --- /dev/null +++ b/api/src/shared/notifications/notification_constants.py @@ -0,0 +1,88 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Notification system string constants. + +These classes act as namespaced string constants for values stored in the +``notification_type.id``, ``notification_event.event_subtype``, +``notification_subscription.cadence``, ``notification_log.status``, +``notification_event.source``, and ``notification_event_feed.role`` columns. + +Usage +----- + from shared.notifications.notification_constants import ( + NotificationTypeId, + FeedUrlUpdateType, + AdminEventUpdateType, + NotificationCadence, + NotificationLogStatus, + NotificationSource, + NotificationFeedRole, + ) +""" + + +class NotificationTypeId: + """Primary keys of rows in the ``notification_type`` table.""" + + FEED_URL_UPDATED = "feed.url_updated" + ADMIN_EVENT_SUMMARY = "admin.event_summary" + + +class FeedUrlUpdateType: + """Allowed values for ``notification_event.event_subtype`` when + ``notification_type_id == NotificationTypeId.FEED_URL_UPDATED``.""" + + URL_REPLACED = "url_replaced" + FEED_REDIRECTED = "feed_redirected" + + +class AdminEventUpdateType: + """Allowed values for ``notification_event.event_subtype`` when + ``notification_type_id == NotificationTypeId.ADMIN_EVENT_SUMMARY``.""" + + DISPATCH_SUMMARY = "dispatch_summary" + + +class NotificationFeedRole: + """Allowed values for ``notification_event_feed.role`` — the role a feed + plays within a notification event.""" + + SUBJECT = "subject" # the feed the event is primarily about + TARGET = "target" # the destination feed (e.g. redirect target) + + +class NotificationCadence: + """Allowed values for ``notification_subscription.cadence``.""" + + IMMEDIATE = "immediate" + DAILY = "daily" + WEEKLY = "weekly" + + +class NotificationLogStatus: + """Allowed values for ``notification_log.status``.""" + + SENT = "sent" + FAILED = "failed" + PERMANENTLY_FAILED = "permanently_failed" + + +class NotificationSource: + """Human-readable tags for ``notification_event.source``.""" + + DISPATCHER = "dispatcher" + TDG_REDIRECTS = "tdg_redirects" + TDG_IMPORT = "tdg_import" diff --git a/api/src/shared/notifications/notification_event_service.py b/api/src/shared/notifications/notification_event_service.py new file mode 100644 index 000000000..fccb4418b --- /dev/null +++ b/api/src/shared/notifications/notification_event_service.py @@ -0,0 +1,249 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""notification_event_service — best-effort writers for notification_event rows. + +Design principles +----------------- +* Each public function writes a single notification_event row to the users DB. +* All functions are **fire-and-forget**: errors are logged and swallowed so that + the calling feed-change code is never blocked or rolled back. +* If ``USERS_DATABASE_URL`` is not configured (e.g. in the populate_db CI scripts + that only have access to the feeds DB), the call is a no-op with a warning. + +Usage +----- + from shared.notifications.notification_event_service import ( + emit_feed_redirected, + emit_url_replaced, + ) + + # After creating a Redirectingid row: + emit_feed_redirected( + source_stable_id="mdb-1", + target_stable_id="tdg-42", + old_url="https://old.example.com/feed.zip", + new_url="https://new.example.com/feed.zip", + source="tdg_redirects", + ) + + # After detecting a producer_url change: + emit_url_replaced( + feed_stable_id="mdb-7", + old_url="https://old.example.com/feed.zip", + new_url="https://new.example.com/feed.zip", + source="tdg_import", + ) +""" + +from __future__ import annotations + +import logging +import uuid +from typing import Any, Dict, List, Optional, Tuple + +from shared.notifications.notification_constants import ( + FeedUrlUpdateType, + NotificationFeedRole, + NotificationTypeId, +) + +logger = logging.getLogger(__name__) + + +def emit_feed_redirected( + source_stable_id: str, + target_stable_id: str, + old_url: Optional[str], + new_url: Optional[str], + source: str, + extra_data: Optional[Dict[str, Any]] = None, +) -> None: + """Create a ``feed.url_updated / feed_redirected`` notification_event. + + Called when a new ``Redirectingid`` row is created — meaning a feed has been + deprecated and now points users to a different feed. + + Parameters + ---------- + source_stable_id: + stable_id of the feed that is being deprecated (the source of the redirect). + target_stable_id: + stable_id of the feed that subscribers should follow instead. + old_url: + producer_url of the source feed at deprecation time (may be None). + new_url: + producer_url of the target feed (may be None). + source: + Human-readable tag identifying the process that triggered this + (e.g. ``NotificationSource.TDG_REDIRECTS``). + extra_data: + Optional extra free-form JSON merged into the event payload + (e.g. redirect_comment). + """ + payload: Dict[str, Any] = {"old_url": old_url, "new_url": new_url} + if extra_data: + payload.update(extra_data) + _emit( + notification_type_id=NotificationTypeId.FEED_URL_UPDATED, + event_subtype=FeedUrlUpdateType.FEED_REDIRECTED, + source=source, + feeds=[ + (source_stable_id, NotificationFeedRole.SUBJECT), + (target_stable_id, NotificationFeedRole.TARGET), + ], + payload=payload, + ) + + +def emit_url_replaced( + feed_stable_id: str, + old_url: str, + new_url: str, + source: str, + extra_data: Optional[Dict[str, Any]] = None, +) -> None: + """Create a ``feed.url_updated / url_replaced`` notification_event. + + Called when automation changes ``Feed.producer_url`` **in-place** — the feed + keeps the same ``stable_id`` but its source URL has changed. + + Only emit when ``old_url != new_url`` (callers are responsible for this check). + + Parameters + ---------- + feed_stable_id: + stable_id of the feed whose URL changed. + old_url: + The previous producer_url value. + new_url: + The new producer_url value. + source: + Human-readable tag identifying the process (e.g. ``NotificationSource.TDG_IMPORT``). + extra_data: + Optional extra free-form JSON merged into the event payload. + """ + payload: Dict[str, Any] = {"old_url": old_url, "new_url": new_url} + if extra_data: + payload.update(extra_data) + _emit( + notification_type_id=NotificationTypeId.FEED_URL_UPDATED, + event_subtype=FeedUrlUpdateType.URL_REPLACED, + source=source, + feeds=[(feed_stable_id, NotificationFeedRole.SUBJECT)], + payload=payload, + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _emit( + notification_type_id: str, + event_subtype: str, + source: str, + feeds: Optional[List[Tuple[str, str]]] = None, + payload: Optional[Dict[str, Any]] = None, +) -> None: + """Write one notification_event row (plus its notification_event_feed rows) + to the users DB. + + Parameters + ---------- + notification_type_id: + Row id in ``notification_type`` (e.g. ``feed.url_updated``). + event_subtype: + Discriminator within the type (e.g. ``url_replaced``). + source: + Tag identifying the emitting process. + feeds: + Optional list of ``(feed_stable_id, role)`` tuples relating this event to + one-or-more feeds. ``role`` is a ``NotificationFeedRole`` value. + payload: + Optional type-specific JSON payload. + + Gracefully degrades if the users DB is unavailable: + - ``USERS_DATABASE_URL`` not set → log warning, return. + - Any DB error → log exception, return. + + This ensures that feed-change code paths are never blocked. + """ + try: + # Import here to avoid circular imports and to allow graceful degradation + # when the users DB is not configured (e.g. populate_db CI scripts). + from shared.database.users_database import UsersDatabase + from shared.users_database_gen.sqlacodegen_models import ( + NotificationEvent, + NotificationEventFeed, + ) + except ImportError as exc: + logger.warning("notification_event_service: import error, skipping emit: %s", exc) + return + + try: + db = UsersDatabase() + except Exception as exc: + primary_feed = feeds[0][0] if feeds else None + logger.warning( + "notification_event_service: users DB unavailable (%s), " "skipping %s/%s for feed=%s", + exc, + notification_type_id, + event_subtype, + primary_feed, + ) + return + + event_id = str(uuid.uuid4()) + event = NotificationEvent( + id=event_id, + notification_type_id=notification_type_id, + event_subtype=event_subtype, + source=source, + payload=payload, + ) + feed_rows = [ + NotificationEventFeed( + id=str(uuid.uuid4()), + notification_event_id=event_id, + feed_stable_id=feed_stable_id, + role=role, + ) + for feed_stable_id, role in (feeds or []) + ] + + primary_feed = feeds[0][0] if feeds else None + try: + with db.start_db_session() as session: + session.add(event) + for feed_row in feed_rows: + session.add(feed_row) + logger.info( + "notification_event created: type=%s subtype=%s feeds=%s source=%s id=%s", + notification_type_id, + event_subtype, + [f[0] for f in (feeds or [])], + source, + event_id, + ) + except Exception as exc: + logger.exception( + "notification_event_service: failed to persist event " "type=%s subtype=%s feed=%s: %s", + notification_type_id, + event_subtype, + primary_feed, + exc, + ) diff --git a/docs/OperationsAPI.yaml b/docs/OperationsAPI.yaml index bf53bfb36..c9c219664 100644 --- a/docs/OperationsAPI.yaml +++ b/docs/OperationsAPI.yaml @@ -2067,6 +2067,7 @@ components: example: vp description: > The type of realtime entry: + * vp - vehicle positions * tu - trip updates * sa - service alerts @@ -2154,6 +2155,7 @@ components: x-operation: true description: > Describes status of the Feed. Should be one of + * `active` Feed should be used in public trip planners. * `deprecated` Feed is explicitly deprecated and should not be used in public trip planners. * `inactive` Feed hasn't been recently updated and should be used at risk of providing outdated information. @@ -2171,6 +2173,7 @@ components: x-operation: true description: > Describes data type of a feed. Should be one of + * `gtfs` GTFS feed. * `gtfs_rt` GTFS-RT feed. * `gbfs` GBFS feed. diff --git a/docs/notifications.md b/docs/notifications.md new file mode 100644 index 000000000..087c4aebe --- /dev/null +++ b/docs/notifications.md @@ -0,0 +1,441 @@ +# Notification System — Architecture & Operations Guide + +> **Issue**: [#1723](https://github.com/MobilityData/mobility-feed-api/issues/1723) + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [Notification Types](#notification-types) +3. [Database Schema](#database-schema) +4. [Event Creation — Integration Points](#event-creation--integration-points) +5. [Dispatcher Task](#dispatcher-task) + - [Payload Parameters](#payload-parameters) + - [active_since — Eligibility Gate](#active_since--eligibility-gate) + - [Example Invocations](#example-invocations) +6. [Cadence vs Digest](#cadence-vs-digest) +7. [Retry Strategy](#retry-strategy) +8. [Email Delivery — Brevo](#email-delivery--brevo) +9. [Admin Event Summary](#admin-event-summary) +10. [Manual Trigger](#manual-trigger) +11. [Environment Variables](#environment-variables) +12. [Deployment Notes](#deployment-notes) +13. [Future Work](#future-work) + +--- + +## Overview + +The notification system is **event-driven** and **application-level** (no database triggers). + +``` +Feed change happens + │ + ├── feeds DB write (existing, unchanged) + └── users DB: notification_event (new, best-effort) + + │ + ▼ Cloud Scheduler (daily / weekly) + dispatch_notifications task + • finds unprocessed / failed notification_events + • matches active notification_subscriptions + • sends emails via Brevo + • records delivery in notification_log +``` + +**Two databases** are involved: +- **Feeds DB** (`FEEDS_DATABASE_URL`) — where feeds, redirects, and datasets live. +- **Users DB** (`USERS_DATABASE_URL`) — where users, subscriptions, events, and logs live. + +Because these are separate PostgreSQL instances, event creation is **best-effort**: if the users DB write fails, the feed change is **not rolled back**. Failures are logged and can be monitored. + +--- + +## Notification Types + +| ID | Description | +|----|-------------| +| `feed.url_updated` | Fired when a feed URL changes in-place (`url_replaced`) or a feed is deprecated and redirected to another feed (`feed_redirected`). | +| `admin.event_summary` | Daily digest for admin subscribers summarising dispatcher run statistics. | + +### `feed.url_updated` — `event_subtype` values + +| `event_subtype` | Trigger | +|---------------|---------| +| `feed_redirected` | A new `Redirectingid` row is created; source feed is deprecated. | +| `url_replaced` | `Feed.producer_url` is updated in-place by automation or an operator. | + +### `admin.event_summary` — `event_subtype` values + +| `event_subtype` | Trigger | +|---------------|---------| +| `dispatch_summary` | Created after every non-dry-run dispatcher invocation. | + +--- + +## Database Schema + +All notification tables live in the **users DB**. + +The schema is deliberately **generic** so new notification types reuse it without DDL changes: +`notification_event` holds only type-agnostic columns, the feeds an event is about live in a +separate `notification_event_feed` link table (so one event can reference multiple feeds), and +**all type-specific data goes in the JSONB `payload`**. + +### `notification_type` + +```sql +-- Seeded rows (idempotent, ON CONFLICT DO NOTHING): +INSERT INTO notification_type (id, description) VALUES + ('feed.url_updated', '...'), + ('admin.event_summary', '...'); +``` + +### `notification_event` + +One row per real-world change event. Created by the integration points below. + +| Column | Type | Notes | +|--------|------|-------| +| `id` | TEXT PK | UUID v4 | +| `notification_type_id` | TEXT FK | `→ notification_type.id` | +| `event_subtype` | TEXT | Discriminator within the type (`feed_redirected` \| `url_replaced` \| `dispatch_summary` \| ...) | +| `source` | TEXT | Which process emitted this (see source constants) | +| `payload` | JSONB | **All type-specific data** (see payload conventions below) | +| `created_at` | TIMESTAMPTZ | Auto-set by DB | + +### `notification_event_feed` + +Relates one event to one-or-more feeds. Lets a single event reference multiple feeds (e.g. a +redirect has both a source and a target feed) and drives `feed_ids` subscription filtering. + +| Column | Type | Notes | +|--------|------|-------| +| `id` | TEXT PK | UUID v4 | +| `notification_event_id` | TEXT FK | `→ notification_event.id ON DELETE CASCADE` | +| `feed_stable_id` | TEXT | The referenced feed | +| `role` | TEXT | `'subject'` (default) \| `'target'` | + +**Unique constraint** on `(notification_event_id, feed_stable_id, role)`. + +### `payload` conventions per type + +Non-feed entities (location, dataset) also live in `payload` — they are type-specific and not used +for the cross-cutting `feed_ids` filter. + +| Type / subtype | Feeds (`role`) | `payload` keys | +|----------------|----------------|----------------| +| `feed.url_updated` / `feed_redirected` | old (`subject`), new (`target`) | `old_url`, `new_url` | +| `feed.url_updated` / `url_replaced` | feed (`subject`) | `old_url`, `new_url` | +| `location.feed_added` (#1725) | feed (`subject`) | `location_id`, `location_name`, `data_type`, `country`, `region`, `provider` | +| `feed.url_availability` (#1726) | feed (`subject`) | `feed_url`, `http_status`, `error_reason`, `first_failure_at`, `latest_checked_at`, `recovery_at`, `outage_duration` | +| `feed.coverage` (#1727) | feed (`subject`) | `latest_dataset_id`, `coverage_end_date`, `days_remaining`, `days_expired`, `feed_url`, `guidance` | +| `admin.event_summary` / `dispatch_summary` | — | `emails_sent`, `emails_failed`, ..., `cadence` | + +### `notification_subscription` + +| Column | Type | Default | Notes | +|--------|------|---------|-------| +| `cadence` | TEXT | `'weekly'` | `'immediate'` \| `'daily'` \| `'weekly'` | +| `digest` | BOOLEAN | `true` | `true` = one batched email; `false` = one email per event | +| `filter_params` | JSONB | `null` | `null` = all feeds; `{"feed_ids": ["mdb-1"]}` = events referencing any of those feeds | + +### `notification_log` + +| Column | Type | Default | Notes | +|--------|------|---------|-------| +| `notification_event_id` | TEXT FK | `null` | `→ notification_event.id ON DELETE CASCADE` | +| `retry_count` | INTEGER | `0` | Incremented on each failed attempt | +| `status` | TEXT | — | `'sent'` \| `'failed'` \| `'permanently_failed'` | + +**Unique constraint** on `(notification_event_id, subscription_id, channel)` prevents duplicate delivery regardless of how many times the dispatcher runs. + +--- + +## Event Creation — Integration Points + +`notification_event` rows (and their `notification_event_feed` rows) are created by calling helpers +from `shared/notifications/notification_event_service.py`. + +### `emit_feed_redirected(source_stable_id, target_stable_id, old_url, new_url, source, extra_data=None)` +### `emit_url_replaced(feed_stable_id, old_url, new_url, source, extra_data=None)` + +These wrap the generic `_emit(notification_type_id, event_subtype, source, feeds, payload)`. +`old_url`/`new_url` are stored in `payload`; the feed(s) become `notification_event_feed` rows. +Any `extra_data` is merged into `payload`. + +Both functions are **fire-and-forget**: if `USERS_DATABASE_URL` is not set, or if the write fails, a warning is logged and the calling code continues normally. + +### Wired integration points + +| # | Type | File | Function | Source tag | +|---|------|------|----------|------------| +| 1 | `feed_redirected` | `api/src/scripts/populate_db_gtfs.py` | `process_redirects()` | `populate_db_gtfs` | +| 2 | `feed_redirected` | `tasks_executor/.../update_tdg_redirects.py` | `_update_feed_redirect()` | `tdg_redirects` | +| 3 | `feed_redirected` | `operations_api/.../feeds_operations_impl.py` | `_update_feed()` | `operations_api` | +| 4 | `feed_redirected` | `operations_api/.../feeds_operations_impl.py` | `_update_feed()` (GTFS-RT) | `operations_api` | +| 5 | `url_replaced` | `api/src/scripts/populate_db_gtfs.py` | `populate_db()` | `populate_db_gtfs` | +| 6 | `url_replaced` | `api/src/scripts/populate_db_gbfs.py` | main loop | `populate_db_gbfs` | +| 7 | `url_replaced` | `tasks_executor/.../import_tdg_feeds.py` | fingerprint diff block | `tdg_import` | +| 8 | `url_replaced` | `tasks_executor/.../import_jbda_feeds.py` | fingerprint diff block | `jbda_import` | +| 9 | `url_replaced` | `operations_api/.../feeds_operations_impl.py` | `_update_feed()` | `operations_api` | +| 10 | `url_replaced` | `operations_api/.../feeds_operations_impl.py` | `_update_feed()` (GTFS-RT) | `operations_api` | + +> **Note — populate_db scripts and GitHub Actions CI**: +> The `populate_db_gtfs.py` and `populate_db_gbfs.py` scripts run as part of the +> `db-update-content.yml` GitHub Actions workflow. This workflow currently only sets +> `FEEDS_DATABASE_URL`. To enable notification events from these scripts, +> `USERS_DATABASE_URL` must be added to the workflow's environment. Until then, +> the emit calls will log a warning and no-op, which does **not** break the populate run. + +--- + +## Dispatcher Task + +**Task name**: `dispatch_notifications` +**File**: `functions-python/tasks_executor/src/tasks/notifications/dispatch_notifications.py` + +### Payload parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `cadence` | str | `'weekly'` | Which subscriptions to process: `'daily'` \| `'weekly'` \| `'all'` | +| `dry_run` | bool | `true` | Discover and log without sending or writing | +| `status_filter` | str | `'new'` | `'new'` = unsent events; `'failed'` = retry mode; `'all'` = both | +| `user_ids` | list[str] | `[]` | Restrict to specific users (manual trigger) | +| `force` | bool | `false` | When `true` + `user_ids`: bypass cadence and window | +| `since_dt` | str | `null` | ISO 8601 lower-bound override. Acts as an *additional* floor on top of `active_since`: effective lower bound is `max(subscription.active_since, since_dt)`. Can narrow the window but cannot expand it to include pre-subscription or disabled-period events. | +| `until_dt` | str | `null` | ISO 8601 window end override. Defaults to `now()`. | +| `max_retries` | int | `5` | Stop retrying at this retry_count | + +### active_since — Eligibility Gate + +Every `notification_subscription` row has an `active_since` timestamp. `_find_new_events` uses it as the **exclusive lower bound** when querying for undelivered events — only events with `created_at >= active_since` are candidates. + +| Subscription state | `active_since` behaviour | +|--------------------|-------------------------| +| **Newly created** (`active=True` from birth) | Set to the creation timestamp. The subscription can never receive events that pre-date its own existence. | +| **Re-enabled** (`active` flipped `False → True`) | **Must be updated to `now()`** by the re-activation code. Events emitted while the subscription was inactive are permanently excluded — a user who paused notifications should not be flooded with stale events on re-enable. | +| **Active, no state change** | Never modified. Only `last_notified_at` is updated after a dispatch run. | + +> **Key rule**: `since_dt` in the payload can narrow the window further, but it can never override the `active_since` floor. Pre-subscription and disabled-period events are always excluded. + +### Example invocations + +```json +// Weekly scheduled run (dry_run=false in production) +{"cadence": "weekly", "dry_run": false} + +// Daily scheduled run +{"cadence": "daily", "dry_run": false} + +// Retry failed notifications from last 7 days +{"cadence": "all", "status_filter": "failed", "dry_run": false} + +// Manual trigger for specific users +{"cadence": "all", "user_ids": ["uid-123", "uid-456"], "force": true, "dry_run": false} + +// Admin-only test run (dry run, no emails sent) +{"cadence": "weekly", "dry_run": true} +``` + +### Response + +```json +{ + "subscriptions_processed": 42, + "events_found": 18, + "emails_sent": 17, + "emails_failed": 1, + "permanently_failed": 0, + "skipped_max_retries": 0, + "dry_run": 0 +} +``` + +--- + +## Cadence vs Digest + +These are **two independent axes** on `notification_subscription`: + +| `cadence` | `digest` | Result | +|-----------|----------|--------| +| `weekly` | `true` | 1 email/week batching all events — **default** | +| `weekly` | `false` | 1 email per event, sent in the weekly run | +| `daily` | `true` | 1 daily email batching all events from past 24 h | +| `daily` | `false` | 1 email per event, sent in the daily run | +| `immediate` | any | 1 email as soon as dispatcher runs (non-MVP; see below) | + +**`cadence`** controls *when* the dispatcher processes subscriptions (which Cloud Scheduler job invokes it). + +**`digest`** controls *how many emails*: batch all events in the window into one, or send individually. + +--- + +## Retry Strategy + +Three independent layers: + +### Layer 1 — In-run retries (transient failures) +Each Brevo send attempt is retried **up to 3 times** within the same dispatcher run with short back-off (1 s, 2 s, 4 s). Handles transient Brevo API errors, rate limits, and timeouts. + +### Layer 2 — Cross-run retries (via Cloud Scheduler) +A dedicated **daily retry Cloud Scheduler job** calls `dispatch_notifications` with `{"status_filter": "failed"}`. This ensures that even weekly-cadence subscribers whose email failed will be retried within ~24 hours, not next week. + +``` +Monday: weekly dispatch → email fails → notification_log status='failed' +Tuesday: daily retry job → status_filter='failed' → retry → status='sent' +``` + +### Layer 3 — Permanent failure (`retry_count >= max_retries`) +Once a log row reaches `retry_count >= max_retries` (default 5), it is marked `'permanently_failed'` and excluded from all future runs. Monitor for `permanently_failed` rows in dashboards or alerts. + +### No GCP pub/sub per notification +A dedicated message queue per notification would add operational complexity without meaningful benefit at current scale. The `notification_log` table **is** the queue: `pending`/`failed` rows are the work items; the unique constraint prevents duplicates; `retry_count` tracks attempts. + +--- + +## Email Delivery — Brevo + +The dispatcher sends emails via **Brevo Transactional Email API** (`sib_api_v3_sdk.TransactionalEmailsApi`). + +**File**: `api/src/shared/notifications/brevo_notification_sender.py` + +### Template IDs + +Template IDs are read from environment variables so they can be updated in the Brevo dashboard without code deployments: + +| Env var | Template used for | +|---------|-------------------| +| `BREVO_TEMPLATE_FEED_URL_UPDATED` | Single `feed.url_updated` email | +| `BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST` | Digest `feed.url_updated` email | +| `BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY` | `admin.event_summary` email | + +When a template ID env var is not set, a **plain HTML fallback** is generated inline. This allows the system to work in development without Brevo template setup. + +### Template parameters (`params`) + +The following params are passed to Brevo templates (accessible in templates as `{{ params.events[0].feed_stable_id }}`, etc.). Both single and digest sends use the same `event_count` / `events[]` shape: + +**Single event** (`events` has one entry): +```json +{ + "event_count": 1, + "subscription_id": "sub-uuid", + "events": [ + { + "feed_stable_id": "mdb-1234", + "target_feed_stable_id": "tdg-5678", + "event_subtype": "feed_redirected", + "old_url": "https://...", + "new_url": "https://...", + "source": "tdg_redirects", + "created_at": "2026-06-09T12:00:00+00:00", + "payload": { "old_url": "https://...", "new_url": "https://..." } + } + ] +} +``` + +**Digest**: +```json +{ + "event_count": 3, + "subscription_id": "sub-uuid", + "events": [{ "feed_stable_id": "...", "event_subtype": "...", ... }, ...] +} +``` + +--- + +## Admin Event Summary + +After every **non-dry-run** dispatcher invocation, a `notification_event` of type `admin.event_summary` / `dispatch_summary` is created with dispatch statistics in `payload`: + +```json +{ + "subscriptions_processed": 42, + "events_found": 18, + "emails_sent": 17, + "emails_failed": 1, + "permanently_failed": 0, + "skipped_max_retries": 0, + "cadence": "weekly" +} +``` + +Admin users subscribe to this with `notification_type_id='admin.event_summary'` and `cadence='daily'`. They receive a daily digest of dispatcher run statistics. + +--- + +## Manual Trigger + +To force-send notifications for specific users (e.g. for testing or re-sending after a known issue): + +```bash +# Via tasks_executor Cloud Function +curl -X POST https://-.cloudfunctions.net/tasks_executor- \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $(gcloud auth print-identity-token)" \ + -d '{ + "task": "dispatch_notifications", + "payload": { + "cadence": "all", + "user_ids": ["firebase-uid-of-user"], + "force": true, + "dry_run": false + } + }' +``` + +The `force: true` flag bypasses cadence filtering, so the specified users receive all pending notifications regardless of their subscription cadence. + +--- + +## Environment Variables + +| Variable | Required | Description | +|----------|----------|-------------| +| `USERS_DATABASE_URL` | Yes (in tasks_executor) | PostgreSQL connection string for the users DB | +| `BREVO_API_KEY` | Yes (in dispatcher) | Brevo API key for sending transactional emails | +| `BREVO_SENDER_EMAIL` | No | From-address (default: `noreply@mobilitydatabase.org`) | +| `BREVO_SENDER_NAME` | No | From-name (default: `Mobility Database`) | +| `BREVO_TEMPLATE_FEED_URL_UPDATED` | No | Brevo template ID (integer) for single events | +| `BREVO_TEMPLATE_FEED_URL_UPDATED_DIGEST` | No | Brevo template ID for digest emails | +| `BREVO_TEMPLATE_ADMIN_EVENT_SUMMARY` | No | Brevo template ID for admin summary | + +--- + +## Deployment Notes + +### Cloud Scheduler jobs to create + +| Job name | Schedule | Payload | +|----------|----------|---------| +| `dispatch-notifications-weekly` | `0 9 * * MON` (Mon 9 AM UTC) | `{"task":"dispatch_notifications","payload":{"cadence":"weekly","dry_run":false}}` | +| `dispatch-notifications-daily` | `0 8 * * *` (daily 8 AM UTC) | `{"task":"dispatch_notifications","payload":{"cadence":"daily","dry_run":false}}` | +| `dispatch-notifications-retry` | `0 10 * * *` (daily 10 AM UTC) | `{"task":"dispatch_notifications","payload":{"cadence":"all","status_filter":"failed","dry_run":false}}` | + +### Adding `USERS_DATABASE_URL` to content update workflow + +To enable notification events from the `populate_db` scripts, add to `.github/workflows/db-update-content.yml`: + +```yaml +- name: Update .env file + run: | + # ... existing lines ... + echo "USERS_DATABASE_URL=postgresql://${{ secrets.DB_USER_NAME }}:${{ secrets.DB_USER_PASSWORD }}@localhost:5432/MobilityDatabaseUsers${{ inputs.USER_DB_ENVIRONMENT }}" >> config/.env.local +``` + +Until this is added, `populate_db` scripts will log a warning and skip notification event creation — this does **not** break the populate run. + +--- + +## Future Work + +- **`immediate` cadence**: Architecture is fully implemented. To activate, deploy a Cloud Scheduler job calling `dispatch_notifications` with `cadence='immediate'` at the desired frequency (e.g. every 15 minutes). No code changes needed. +- **Additional notification types**: Add a new `notification_type` row, then call `_emit(notification_type_id, event_subtype, source, feeds=[...], payload={...})` in `notification_event_service.py` — no schema changes needed (feeds go in `notification_event_feed`, everything else in `payload`). The dispatcher, delivery, and retry infrastructure is reused automatically. For non-`feed.url_updated` types, add a Brevo subject/template mapping and a `build_params_*` / HTML renderer in `brevo_notification_sender.py`. +- **Operations API endpoint**: `GET /notifications/events` (paginated, filterable by type/date/source) for ops visibility into queued events. Belongs in the operations API, not the public API. +- **Unsubscribe link**: Pass `subscription_id` in Brevo template params; build a one-click unsubscribe endpoint that sets `notification_subscription.active = false`. diff --git a/functions-python/operations_api/function_config.json b/functions-python/operations_api/function_config.json index a4358cf45..f8ef7b007 100644 --- a/functions-python/operations_api/function_config.json +++ b/functions-python/operations_api/function_config.json @@ -6,7 +6,7 @@ "memory": "1Gi", "trigger_http": true, "include_folders": ["helpers"], - "include_api_folders": ["database_gen", "database", "common", "db_models", "feed_filters", "users_database_gen"], + "include_api_folders": ["database_gen", "database", "common", "db_models", "feed_filters", "users_database_gen", "notifications"], "environment_variables": [ { "key": "GOOGLE_CLIENT_ID" diff --git a/functions-python/operations_api/src/feeds_operations/impl/feeds_operations_impl.py b/functions-python/operations_api/src/feeds_operations/impl/feeds_operations_impl.py index 26de30afb..03ade70b8 100644 --- a/functions-python/operations_api/src/feeds_operations/impl/feeds_operations_impl.py +++ b/functions-python/operations_api/src/feeds_operations/impl/feeds_operations_impl.py @@ -72,6 +72,10 @@ get_feeds_query, get_feed_by_normalized_url, ) +from shared.notifications.notification_event_service import ( + emit_feed_redirected, + emit_url_replaced, +) from .models.operation_create_request_gtfs_feed import ( OperationCreateRequestGtfsFeedImpl, ) @@ -354,6 +358,12 @@ async def _update_feed( update_request_feed.operational_status_action is not None and update_request_feed.operational_status_action != "no_change" ): + # Capture pre-mutation state for notification events (before to_orm mutates the object). + old_producer_url = getattr(feed_from_db, "producer_url", None) + old_redirect_target_ids = { + r.target_id for r in getattr(feed_from_db, "redirectingids", []) + } + await OperationsApiImpl._populate_feed_values( feed_from_db, impl_class, db_session, update_request_feed ) @@ -383,6 +393,34 @@ async def _update_feed( update_request_feed.id, diff.values(), ) + # Emit notification events for URL / redirect changes (post-commit, best-effort). + feed_stable_id = update_request_feed.id + new_producer_url = getattr(feed_from_db, "producer_url", None) + if ( + old_producer_url + and new_producer_url + and old_producer_url != new_producer_url + ): + emit_url_replaced( + feed_stable_id=feed_stable_id, + old_url=old_producer_url, + new_url=new_producer_url, + source="operations_api", + ) + new_redirect_target_ids = { + r.target_id for r in getattr(feed_from_db, "redirectingids", []) + } + for new_target_id in new_redirect_target_ids - old_redirect_target_ids: + target_feed = db_session.get(Feed, new_target_id) + emit_feed_redirected( + source_stable_id=feed_stable_id, + target_stable_id=getattr( + target_feed, "stable_id", new_target_id + ), + old_url=old_producer_url, + new_url=getattr(target_feed, "producer_url", None), + source="operations_api", + ) try: create_web_revalidation_task([update_request_feed.id]) except Exception as e: diff --git a/functions-python/tasks_executor/function_config.json b/functions-python/tasks_executor/function_config.json index b500e93a1..975fff409 100644 --- a/functions-python/tasks_executor/function_config.json +++ b/functions-python/tasks_executor/function_config.json @@ -6,7 +6,7 @@ "memory": "8Gi", "trigger_http": true, "include_folders": ["helpers"], - "include_api_folders": ["database_gen", "database", "common", "feed_filters", "users_database_gen"], + "include_api_folders": ["database_gen", "database", "common", "feed_filters", "users_database_gen", "notifications"], "environment_variables": [ { "key": "DATASETS_BUCKET_NAME" diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py index 54a658a9e..490cd6c34 100644 --- a/functions-python/tasks_executor/src/main.py +++ b/functions-python/tasks_executor/src/main.py @@ -65,6 +65,7 @@ check_gtfs_feed_availability_handler, ) from tasks.users.migrate_firebase_users import migrate_firebase_users_handler +from tasks.notifications.dispatch_notifications import dispatch_notifications_handler init_logger() LIST_COMMAND: Final[str] = "list" @@ -174,6 +175,22 @@ ), "handler": migrate_firebase_users_handler, }, + "dispatch_notifications": { + "description": ( + "Match notification_event rows to active subscriptions, send emails via Brevo, " + "and record delivery in notification_log. " + "Parameters: " + "cadence ('daily'|'weekly'|'all', default 'weekly'), " + "dry_run (default true), " + "status_filter ('new'|'failed'|'all', default 'new'), " + "user_ids (list of user IDs for manual trigger, default []), " + "force (bypass cadence when user_ids set, default false), " + "since_dt (ISO8601 window start override), " + "until_dt (ISO8601 window end override), " + "max_retries (default 5)." + ), + "handler": dispatch_notifications_handler, + }, } diff --git a/functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py b/functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py index 8fe61e707..e5b9f1ae5 100644 --- a/functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py +++ b/functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py @@ -39,6 +39,7 @@ ) from shared.common.gcp_utils import create_web_revalidation_task from shared.helpers.pub_sub import trigger_dataset_download +from shared.notifications.notification_event_service import emit_url_replaced from tasks.data_import.data_import_utils import ( get_or_create_entity_type, get_or_create_feed, @@ -417,6 +418,15 @@ def _process_feed( if db_rt_map.get(k) != api_rt_map.get(k) } logger.info("Diff %s sched=%s rt=%s", stable_id, diff, diff_rt) + if "producer_url" in diff: + old_url, new_url = diff["producer_url"] + if old_url and new_url and old_url != new_url: + emit_url_replaced( + feed_stable_id=stable_id, + old_url=old_url, + new_url=new_url, + source="jbda_import", + ) # Apply schedule fields _update_common_feed_fields(gtfs_feed, item, dbody, producer_url) diff --git a/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py b/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py index cfcfd3bdc..0970b5086 100644 --- a/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py +++ b/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/import_tdg_feeds.py @@ -37,6 +37,7 @@ ) from shared.common.gcp_utils import create_web_revalidation_task from shared.helpers.pub_sub import trigger_dataset_download +from shared.notifications.notification_event_service import emit_url_replaced from tasks.data_import.data_import_utils import ( get_or_create_feed, get_or_create_entity_type, @@ -564,6 +565,16 @@ def _process_tdg_dataset( ) nested.commit() continue + if ( + db_fp.get("producer_url") + and db_fp["producer_url"] != api_fp["producer_url"] + ): + emit_url_replaced( + feed_stable_id=stable_id, + old_url=db_fp["producer_url"], + new_url=api_fp["producer_url"], + source="tdg_import", + ) _update_common_tdg_fields( gtfs_feed, dataset, resource, res_url, locations, db_session @@ -615,6 +626,16 @@ def _process_tdg_dataset( processed += 1 nested.commit() continue + if ( + db_rt_fp.get("producer_url") + and db_rt_fp["producer_url"] != api_rt_fp["producer_url"] + ): + emit_url_replaced( + feed_stable_id=stable_id, + old_url=db_rt_fp["producer_url"], + new_url=api_rt_fp["producer_url"], + source="tdg_import", + ) _update_common_tdg_fields( rt_feed, dataset, resource, res_url, locations, db_session diff --git a/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/update_tdg_redirects.py b/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/update_tdg_redirects.py index 82a247e95..2b92ba2cb 100644 --- a/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/update_tdg_redirects.py +++ b/functions-python/tasks_executor/src/tasks/data_import/transportdatagouv/update_tdg_redirects.py @@ -26,6 +26,7 @@ from shared.database.database import with_db_session from shared.database_gen.sqlacodegen_models import Feed, Redirectingid +from shared.notifications.notification_event_service import emit_feed_redirected logger = logging.getLogger(__name__) @@ -110,6 +111,14 @@ def _update_feed_redirect( mdb_feed.status = "deprecated" db_session.add(redirect) counters["redirects_created"] = 1 + emit_feed_redirected( + source_stable_id=mdb_stable_id, + target_stable_id=tdg_stable_id, + old_url=mdb_feed.producer_url, + new_url=tdg_feed.producer_url, + source="tdg_redirects", + extra_data={"redirect_comment": "Redirecting post TDG import"}, + ) return counters diff --git a/functions-python/tasks_executor/src/tasks/notifications/dispatch_notifications.py b/functions-python/tasks_executor/src/tasks/notifications/dispatch_notifications.py new file mode 100644 index 000000000..dec574349 --- /dev/null +++ b/functions-python/tasks_executor/src/tasks/notifications/dispatch_notifications.py @@ -0,0 +1,632 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""dispatch_notifications — match notification_event rows to active subscriptions, +send emails, and record delivery in notification_log. + +Invoked by Cloud Scheduler (daily / weekly) or triggered manually via the +tasks_executor. See ``docs/notifications.md`` for the full architecture, +payload reference, retry strategy, active_since semantics, and operational +runbook. +""" + +from __future__ import annotations + +import logging +import time +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional + +from sqlalchemy import not_, select +from sqlalchemy.orm import Session + +from shared.database.users_database import with_users_db_session +from shared.notifications.brevo_notification_sender import ( + BrevoSendError, + EmailRecipient, + send_digest, + send_single, +) +from shared.notifications.notification_constants import ( + AdminEventUpdateType, + NotificationCadence, + NotificationLogStatus, + NotificationSource, + NotificationTypeId, +) +from shared.users_database_gen.sqlacodegen_models import ( + AppUser, + NotificationEvent, + NotificationLog, + NotificationSubscription, +) + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +DEFAULT_CADENCE = NotificationCadence.WEEKLY +DEFAULT_MAX_RETRIES = 5 +_IN_RUN_RETRY_DELAYS = (1, 2, 4) # seconds between in-run attempts + +# Time windows per cadence (look-back for finding relevant events) +_CADENCE_WINDOWS: Dict[str, timedelta] = { + NotificationCadence.IMMEDIATE: timedelta(hours=1), + NotificationCadence.DAILY: timedelta(days=1), + NotificationCadence.WEEKLY: timedelta(weeks=1), +} + + +# --------------------------------------------------------------------------- +# Public handler +# --------------------------------------------------------------------------- + + +def dispatch_notifications_handler( + payload: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Cloud Function / tasks_executor entrypoint. + + Parameters are taken from ``payload``; see module docstring for full list. + """ + payload = payload or {} + logger.info("dispatch_notifications_handler called with payload=%s", payload) + + cadence: str = payload.get("cadence", DEFAULT_CADENCE) + dry_run: bool = bool(payload.get("dry_run", True)) + status_filter: str = payload.get("status_filter", "new") + user_ids: List[str] = payload.get("user_ids", []) + force: bool = bool(payload.get("force", False)) + since_dt: Optional[str] = payload.get("since_dt") + until_dt: Optional[str] = payload.get("until_dt") + max_retries: int = int(payload.get("max_retries", DEFAULT_MAX_RETRIES)) + + result = dispatch( + cadence=cadence, + dry_run=dry_run, + status_filter=status_filter, + user_ids=user_ids, + force=force, + since_dt=since_dt, + until_dt=until_dt, + max_retries=max_retries, + ) + logger.info("dispatch_notifications_handler result: %s", result) + return result + + +# --------------------------------------------------------------------------- +# Core dispatcher +# --------------------------------------------------------------------------- + + +@with_users_db_session +def dispatch( + *, + cadence: str, + dry_run: bool, + status_filter: str, + user_ids: List[str], + force: bool, + since_dt: Optional[str], + until_dt: Optional[str], + max_retries: int, + db_session: Session = None, +) -> Dict[str, Any]: + now = datetime.now(timezone.utc) + until = _parse_dt(until_dt) or now + # explicit_since: only set when the caller explicitly provided since_dt. + # Used as an additional lower-bound floor in _find_new_events on top of + # each subscription's active_since. Never replaces active_since. + explicit_since: Optional[datetime] = _parse_dt(since_dt) + # since is kept for logging only; it is no longer used as the correctness + # gate for new-event discovery (active_since fills that role). + since = explicit_since or ( + now + - _CADENCE_WINDOWS.get(cadence, _CADENCE_WINDOWS[NotificationCadence.WEEKLY]) + ) + + logger.info( + "Dispatching cadence=%s status_filter=%s window=[%s, %s] dry_run=%s user_ids=%s", + cadence, + status_filter, + since.isoformat(), + until.isoformat(), + dry_run, + user_ids or "all", + ) + + stats: Dict[str, int] = { + "subscriptions_processed": 0, + "events_found": 0, + "emails_sent": 0, + "emails_failed": 0, + "permanently_failed": 0, + "skipped_max_retries": 0, + "dry_run": int(dry_run), + } + + # Find active subscriptions to process. + subscriptions = find_subscriptions( + db_session=db_session, + cadence=cadence, + user_ids=user_ids, + force=force, + ) + logger.info("Found %d active subscription(s) to process", len(subscriptions)) + + for subscription in subscriptions: + stats["subscriptions_processed"] += 1 + user = subscription.user + + # Collect notification_events that need a delivery log for this subscription. + events = find_events_for_subscription( + db_session=db_session, + subscription=subscription, + status_filter=status_filter, + explicit_since=explicit_since, + until=until, + max_retries=max_retries, + ) + if not events: + continue + + stats["events_found"] += len(events) + logger.info( + "Subscription %s (user=%s cadence=%s digest=%s): %d event(s)", + subscription.id, + user.email if user else "?", + subscription.cadence, + subscription.digest, + len(events), + ) + + if dry_run: + logger.info( + "[dry_run] Would send %d event(s) to %s", + len(events), + user.email if user else "?", + ) + continue + + recipient = EmailRecipient( + email=user.email, + name=user.full_name, + ) + + if subscription.digest: + _send_and_log_digest( + db_session=db_session, + recipient=recipient, + events=events, + subscription=subscription, + stats=stats, + max_retries=max_retries, + ) + else: + for event in events: + _send_and_log_single( + db_session=db_session, + recipient=recipient, + event=event, + subscription=subscription, + stats=stats, + max_retries=max_retries, + ) + + # After the run, emit an admin.event_summary notification_event. + if not dry_run: + emit_admin_summary(db_session=db_session, stats=stats, cadence=cadence) + + return stats + + +# --------------------------------------------------------------------------- +# Query helpers +# --------------------------------------------------------------------------- + + +def find_subscriptions( + *, + db_session: Session, + cadence: str, + user_ids: List[str], + force: bool, +) -> List[NotificationSubscription]: + """Return active subscriptions matching the cadence (or all if force=True).""" + q = ( + db_session.query(NotificationSubscription) + .join(AppUser, NotificationSubscription.user_id == AppUser.id) + .filter(NotificationSubscription.active == True) # noqa: E712 + ) + if not (force and user_ids): + # Normal (non-forced) run: filter by cadence. + if cadence != "all": + q = q.filter(NotificationSubscription.cadence == cadence) + if user_ids: + q = q.filter(NotificationSubscription.user_id.in_(user_ids)) + return q.all() + + +def find_events_for_subscription( + *, + db_session: Session, + subscription: NotificationSubscription, + status_filter: str, + explicit_since: Optional[datetime] = None, + until: datetime, + max_retries: int, +) -> List[NotificationEvent]: + """Return events that need to be (re-)sent for this subscription. + + Parameters + ---------- + explicit_since: + Optional caller-provided lower bound (from ``since_dt`` payload param). + When set, the effective lower bound for new-event discovery becomes + ``max(subscription.active_since, explicit_since)``. It can only + *narrow* the window further — it cannot expand it past ``active_since``. + until: + Upper bound for new-event discovery (exclusive for failed events). + """ + events: List[NotificationEvent] = [] + + if status_filter in ("new", "all"): + events += _find_new_events( + db_session=db_session, + subscription=subscription, + explicit_since=explicit_since, + until=until, + ) + + if status_filter in ("failed", "all"): + events += _find_failed_events( + db_session=db_session, + subscription=subscription, + max_retries=max_retries, + ) + + # Deduplicate (an event shouldn't appear in both lists). + seen: set = set() + deduped = [] + for e in events: + if e.id not in seen: + seen.add(e.id) + deduped.append(e) + return deduped + + +def _find_new_events( + *, + db_session: Session, + subscription: NotificationSubscription, + explicit_since: Optional[datetime], + until: datetime, +) -> List[NotificationEvent]: + """Events with no log row for this subscription, created on or after active_since. + + Lower-bound logic + ----------------- + The primary lower bound is ``subscription.active_since`` — the moment the + subscription last became active. This ensures: + + * Events emitted **before the subscription was created** are never sent. + * Events emitted **while the subscription was disabled** (the dead zone + between deactivation and re-activation) are never sent. + * Events that previously had no log row due to a mid-run crash are **always + retried** on subsequent runs, regardless of how long ago they occurred. + + If the caller provided an explicit ``since_dt`` override, the effective lower + bound is ``max(active_since, explicit_since)`` so the override can only + *further narrow* the window, never widen it past the eligibility floor. + """ + # Compute the effective lower bound. + # active_since is the primary gate: only events created after this subscription + # last became active are eligible. Fall back to created_at for the transition + # period before the DB migration is applied and the model regenerated — this is + # safe because created_at is the original "subscription exists since" boundary. + + lower_bound: datetime = subscription.active_since or subscription.created_at + # Normalize to UTC if the value is timezone-naive (e.g. SQLite in tests). + if lower_bound.tzinfo is None: + lower_bound = lower_bound.replace(tzinfo=timezone.utc) + if explicit_since is not None and explicit_since > lower_bound: + lower_bound = explicit_since + + already_logged = select(NotificationLog.notification_event_id).where( + NotificationLog.subscription_id == subscription.id + ) + q = ( + db_session.query(NotificationEvent) + .filter( + NotificationEvent.notification_type_id == subscription.notification_type_id, + NotificationEvent.created_at >= lower_bound, + NotificationEvent.created_at <= until, + not_(NotificationEvent.id.in_(already_logged)), + ) + .order_by(NotificationEvent.created_at.asc()) + ) + events = q.all() + return apply_filter_params(events, subscription) + + +def _find_failed_events( + *, + db_session: Session, + subscription: NotificationSubscription, + max_retries: int, +) -> List[NotificationEvent]: + """Events whose log row has status='failed' and retry_count < max_retries.""" + failed_logs = ( + db_session.query(NotificationLog) + .filter( + NotificationLog.subscription_id == subscription.id, + NotificationLog.status == NotificationLogStatus.FAILED, + NotificationLog.retry_count < max_retries, + NotificationLog.notification_event_id.isnot(None), + ) + .all() + ) + if not failed_logs: + return [] + event_ids = [log.notification_event_id for log in failed_logs] + events = ( + db_session.query(NotificationEvent) + .filter(NotificationEvent.id.in_(event_ids)) + .all() + ) + return apply_filter_params(events, subscription) + + +def apply_filter_params( + events: List[NotificationEvent], + subscription: NotificationSubscription, +) -> List[NotificationEvent]: + """Filter events against subscription.filter_params. + + Supported keys: + ``feed_ids``: list of feed stable_ids — only return events that reference + at least one of those feeds (in any role). + ``None`` / missing key → all events pass. + """ + fp = subscription.filter_params + if not fp: + return events + allowed_feed_ids = fp.get("feed_ids") + if not allowed_feed_ids: + return events + allowed = set(allowed_feed_ids) + return [e for e in events if _event_feed_ids(e) & allowed] + + +def _event_feed_ids(event: NotificationEvent) -> set: + """Set of feed stable_ids referenced by an event (across all roles).""" + return { + f.feed_stable_id + for f in (getattr(event, "notification_event_feeds", None) or []) + } + + +# --------------------------------------------------------------------------- +# Send + log helpers +# --------------------------------------------------------------------------- + + +def _send_and_log_single( + *, + db_session: Session, + recipient: EmailRecipient, + event: NotificationEvent, + subscription: NotificationSubscription, + stats: Dict[str, int], + max_retries: int, +) -> None: + """Attempt to send a single-event email; write or update a NotificationLog row.""" + log = _get_or_create_log(db_session, event.id, subscription.id, "email") + + if log.retry_count >= max_retries: + logger.warning( + "Skipping event %s / sub %s: reached max_retries=%d", + event.id, + subscription.id, + max_retries, + ) + if log.status != NotificationLogStatus.PERMANENTLY_FAILED: + log.status = NotificationLogStatus.PERMANENTLY_FAILED + db_session.flush() + stats["skipped_max_retries"] += 1 + return + + error = _attempt_send(lambda: send_single(recipient, event, subscription)) + _update_log(db_session, log, error, max_retries) + + if error: + stats["emails_failed"] += 1 + if log.status == NotificationLogStatus.PERMANENTLY_FAILED: + stats["permanently_failed"] += 1 + else: + stats["emails_sent"] += 1 + + +def _send_and_log_digest( + *, + db_session: Session, + recipient: EmailRecipient, + events: List[NotificationEvent], + subscription: NotificationSubscription, + stats: Dict[str, int], + max_retries: int, +) -> None: + """Attempt to send a digest email; write or update NotificationLog rows for all events.""" + # For digests: attempt the send once; apply the same result to all event logs. + error = _attempt_send(lambda: send_digest(recipient, events, subscription)) + + for event in events: + log = _get_or_create_log(db_session, event.id, subscription.id, "email") + if log.retry_count >= max_retries: + if log.status != NotificationLogStatus.PERMANENTLY_FAILED: + log.status = NotificationLogStatus.PERMANENTLY_FAILED + db_session.flush() + stats["skipped_max_retries"] += 1 + continue + _update_log(db_session, log, error, max_retries) + + if error: + stats["emails_failed"] += len(events) + failed_permanently = sum( + 1 + for e in events + if _get_log_status(db_session, e.id, subscription.id) + == NotificationLogStatus.PERMANENTLY_FAILED + ) + stats["permanently_failed"] += failed_permanently + else: + stats["emails_sent"] += len(events) + + +def _attempt_send(send_fn) -> Optional[str]: + """Call ``send_fn()`` up to 3× with back-off. Return None on success, error str on failure.""" + last_error: Optional[str] = None + for i, delay in enumerate((*_IN_RUN_RETRY_DELAYS, None)): + try: + send_fn() + return None + except BrevoSendError as exc: + last_error = str(exc) + logger.warning( + "Send attempt %d/%d failed: %s", + i + 1, + len(_IN_RUN_RETRY_DELAYS) + 1, + exc, + ) + if delay is not None: + time.sleep(delay) + return last_error + + +def _get_or_create_log( + db_session: Session, + event_id: str, + subscription_id: str, + channel: str, +) -> NotificationLog: + """Return the existing NotificationLog for (event, subscription, channel) or create one.""" + log = ( + db_session.query(NotificationLog) + .filter_by( + notification_event_id=event_id, + subscription_id=subscription_id, + channel=channel, + ) + .one_or_none() + ) + if log is None: + log = NotificationLog( + id=str(uuid.uuid4()), + notification_event_id=event_id, + subscription_id=subscription_id, + channel=channel, + status=NotificationLogStatus.FAILED, # optimistic: overwritten below + retry_count=0, + ) + db_session.add(log) + db_session.flush() + return log + + +def _update_log( + db_session: Session, + log: NotificationLog, + error: Optional[str], + max_retries: int, +) -> None: + """Update log status and retry_count after a send attempt.""" + if error is None: + log.status = NotificationLogStatus.SENT + log.error_message = None + else: + log.retry_count += 1 + log.error_message = error + if log.retry_count >= max_retries: + log.status = NotificationLogStatus.PERMANENTLY_FAILED + logger.error( + "Notification log %s permanently failed after %d retries: %s", + log.id, + log.retry_count, + error, + ) + else: + log.status = NotificationLogStatus.FAILED + log.sent_at = datetime.now(timezone.utc) + db_session.flush() + + +def _get_log_status( + db_session: Session, event_id: str, subscription_id: str +) -> Optional[str]: + log = ( + db_session.query(NotificationLog.status) + .filter_by( + notification_event_id=event_id, + subscription_id=subscription_id, + channel="email", + ) + .scalar() + ) + return log + + +# --------------------------------------------------------------------------- +# admin.event_summary +# --------------------------------------------------------------------------- + + +def emit_admin_summary( + *, + db_session: Session, + stats: Dict[str, int], + cadence: str, +) -> None: + """Create an admin.event_summary notification_event with dispatch statistics.""" + event = NotificationEvent( + id=str(uuid.uuid4()), + notification_type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY, + event_subtype=AdminEventUpdateType.DISPATCH_SUMMARY, + source=NotificationSource.DISPATCHER, + payload={**stats, "cadence": cadence}, + ) + db_session.add(event) + db_session.flush() + logger.info("admin.event_summary created: id=%s stats=%s", event.id, stats) + + +# --------------------------------------------------------------------------- +# Utilities +# --------------------------------------------------------------------------- + + +def _parse_dt(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + dt = datetime.fromisoformat(value) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + logger.warning("Could not parse datetime %r; ignoring", value) + return None diff --git a/functions-python/tasks_executor/tests/conftest.py b/functions-python/tasks_executor/tests/conftest.py index b4c2c9a60..cb7cb541a 100644 --- a/functions-python/tasks_executor/tests/conftest.py +++ b/functions-python/tasks_executor/tests/conftest.py @@ -24,7 +24,11 @@ Gtfsdataset, Gbfsfeed, ) -from test_shared.test_utils.database_utils import clean_testing_db, default_db_url +from test_shared.test_utils.database_utils import ( + clean_testing_db, + default_db_url, + clean_testing_users_db, +) @with_db_session(db_url=default_db_url) @@ -178,6 +182,7 @@ def pytest_sessionstart(): before performing collection and entering the run test loop. """ clean_testing_db() + clean_testing_users_db() populate_database() @@ -187,6 +192,7 @@ def pytest_sessionfinish(session, exitstatus): returning the exit status to the system. """ clean_testing_db() + clean_testing_users_db() def pytest_unconfigure(config): diff --git a/functions-python/tasks_executor/tests/tasks/notifications/test_dispatch_notifications.py b/functions-python/tasks_executor/tests/tasks/notifications/test_dispatch_notifications.py new file mode 100644 index 000000000..f265f677f --- /dev/null +++ b/functions-python/tasks_executor/tests/tasks/notifications/test_dispatch_notifications.py @@ -0,0 +1,786 @@ +# +# MobilityData 2026 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for dispatch_notifications task. + +These tests use in-memory SQLite via SQLAlchemy and mock out Brevo calls, +so no real database connection or API keys are required. +""" + +from __future__ import annotations + +import re +import uuid +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone +from typing import Optional +from unittest.mock import MagicMock, patch + +import pytest +from sqlalchemy import create_engine, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.orm import sessionmaker +from sqlalchemy.schema import DefaultClause + +from shared.notifications.notification_constants import ( + AdminEventUpdateType, + FeedUrlUpdateType, + NotificationCadence, + NotificationFeedRole, + NotificationLogStatus, + NotificationTypeId, +) +from shared.users_database_gen.sqlacodegen_models import ( + AppUser, + Base, + NotificationEvent, + NotificationEventFeed, + NotificationLog, + NotificationSubscription, + NotificationType, +) +from tasks.notifications.dispatch_notifications import ( + apply_filter_params, + dispatch, + emit_admin_summary, + find_events_for_subscription, + find_subscriptions, + dispatch_notifications_handler, +) + + +@compiles(JSONB, "sqlite") +def _compile_jsonb_sqlite(element, compiler, **kw): + """Render Postgres JSONB columns as TEXT under SQLite for unit tests.""" + return "TEXT" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def engine(): + # active_since is added by migration feat_1724 and then reflected into the + # auto-generated sqlacodegen_models.py. Until that cycle completes we add + # the column to the in-memory SQLite schema here so unit tests can run. + # We guard against duplicate addition since the Table object is a module-level + # singleton and the fixture may be called multiple times per test session. + if "active_since" not in NotificationSubscription.__table__.c: + from sqlalchemy import Column as _Col, DateTime as _DT + + NotificationSubscription.__table__.append_column( + _Col( + "active_since", + _DT(True), + nullable=False, + server_default=text("CURRENT_TIMESTAMP"), + ) + ) + eng = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + with _sqlite_compatible_defaults(): + Base.metadata.create_all(eng) + return eng + + +@contextmanager +def _sqlite_compatible_defaults(): + """Temporarily rewrite Postgres-specific ``server_default`` clauses into + SQLite-compatible DDL so ``create_all`` can build an in-memory database. + + Handles ``now()`` (-> ``CURRENT_TIMESTAMP``) and ``::type`` casts (e.g. + ``'weekly'::text``, ``(gen_random_uuid())::text``). Originals are restored + on exit so the generated models remain untouched for any other test. + """ + originals = [] + for table in Base.metadata.tables.values(): + for column in table.columns: + default = column.server_default + if not isinstance(default, DefaultClause): + continue + original_text = str(getattr(default.arg, "text", "")) + if not original_text: + continue + rewritten = re.sub(r"::\w+", "", original_text).replace( + "now()", "CURRENT_TIMESTAMP" + ) + if rewritten == original_text: + continue + originals.append((column, default)) + column.server_default = DefaultClause(text(rewritten)) + try: + yield + finally: + for column, default in originals: + column.server_default = default + + +@pytest.fixture +def session(engine): + Session = sessionmaker(bind=engine) + sess = Session() + _seed(sess) + yield sess + sess.close() + + +def _uid() -> str: + return str(uuid.uuid4()) + + +def _seed(sess): + """Insert minimal seed data into the in-memory DB.""" + sess.add_all( + [ + NotificationType( + id=NotificationTypeId.FEED_URL_UPDATED, description="Feed URL updated" + ), + NotificationType( + id=NotificationTypeId.ADMIN_EVENT_SUMMARY, description="Admin summary" + ), + ] + ) + sess.flush() + + sess.add_all( + [ + AppUser(id="user-alice", email="alice@example.com", full_name="Alice"), + AppUser(id="user-bob", email="bob@example.com", full_name="Bob"), + AppUser(id="user-admin", email="admin@example.com", full_name="Admin"), + ] + ) + sess.flush() + + +def _make_subscription( + sess, + user_id: str, + notification_type_id: str = NotificationTypeId.FEED_URL_UPDATED, + cadence: str = NotificationCadence.WEEKLY, + digest: bool = True, + filter_params=None, + active: bool = True, + active_since: Optional[datetime] = None, +) -> NotificationSubscription: + sub = NotificationSubscription( + id=_uid(), + user_id=user_id, + notification_type_id=notification_type_id, + cadence=cadence, + digest=digest, + filter_params=filter_params, + active=active, + ) + # Set active_since explicitly so it is available in Python memory regardless + # of whether the ORM model has been regenerated post-migration. + sub.active_since = active_since or ( + datetime.now(timezone.utc) - timedelta(seconds=5) + ) + sess.add(sub) + sess.flush() + return sub + + +def _make_event( + sess, + feed_stable_id: str = "mdb-1", + update_type: str = FeedUrlUpdateType.URL_REPLACED, + notification_type_id: str = NotificationTypeId.FEED_URL_UPDATED, + created_at: datetime = None, + old_url: str = "https://old.example.com", + new_url: str = "https://new.example.com", + target_feed_stable_id: str = None, +) -> NotificationEvent: + event = NotificationEvent( + id=_uid(), + notification_type_id=notification_type_id, + event_subtype=update_type, + source="test", + payload={"old_url": old_url, "new_url": new_url}, + created_at=created_at or datetime.now(timezone.utc), + ) + sess.add(event) + sess.flush() + if feed_stable_id is not None: + sess.add( + NotificationEventFeed( + id=_uid(), + notification_event_id=event.id, + feed_stable_id=feed_stable_id, + role=NotificationFeedRole.SUBJECT, + ) + ) + if target_feed_stable_id is not None: + sess.add( + NotificationEventFeed( + id=_uid(), + notification_event_id=event.id, + feed_stable_id=target_feed_stable_id, + role=NotificationFeedRole.TARGET, + ) + ) + sess.flush() + sess.refresh(event) + return event + + +def _mock_event(*feed_ids): + """A lightweight stand-in for a NotificationEvent exposing + ``notification_event_feeds`` for apply_filter_params tests.""" + return MagicMock( + notification_event_feeds=[MagicMock(feed_stable_id=fid) for fid in feed_ids] + ) + + +# --------------------------------------------------------------------------- +# apply_filter_params +# --------------------------------------------------------------------------- + + +class TestApplyFilterParams: + def test_no_filter_returns_all(self, session): + sub = _make_subscription(session, "user-alice", filter_params=None) + events = [ + _mock_event("mdb-1"), + _mock_event("mdb-2"), + ] + result = apply_filter_params(events, sub) + assert result == events + + def test_feed_ids_filter(self, session): + sub = _make_subscription( + session, "user-alice", filter_params={"feed_ids": ["mdb-1"]} + ) + e1 = _mock_event("mdb-1") + e2 = _mock_event("mdb-2") + result = apply_filter_params([e1, e2], sub) + assert result == [e1] + + def test_empty_feed_ids_returns_all(self, session): + sub = _make_subscription(session, "user-alice", filter_params={"feed_ids": []}) + events = [_mock_event("mdb-1")] + # Empty list means no filter — all pass + result = apply_filter_params(events, sub) + assert result == events + + +# --------------------------------------------------------------------------- +# find_subscriptions +# --------------------------------------------------------------------------- + + +class TestFindSubscriptions: + def test_filters_by_cadence(self, session): + weekly_sub = _make_subscription( + session, "user-alice", cadence=NotificationCadence.WEEKLY + ) + _make_subscription(session, "user-bob", cadence=NotificationCadence.DAILY) + + result = find_subscriptions( + db_session=session, + cadence=NotificationCadence.WEEKLY, + user_ids=[], + force=False, + ) + ids = {s.id for s in result} + assert weekly_sub.id in ids + + def test_all_cadence_returns_all_active(self, session): + _make_subscription(session, "user-alice", cadence=NotificationCadence.WEEKLY) + _make_subscription(session, "user-bob", cadence=NotificationCadence.DAILY) + _make_subscription( + session, "user-admin", cadence=NotificationCadence.WEEKLY, active=False + ) + + result = find_subscriptions( + db_session=session, cadence="all", user_ids=[], force=False + ) + assert len(result) == 2 # inactive excluded + + def test_user_ids_filter(self, session): + _make_subscription(session, "user-alice") + bob_sub = _make_subscription(session, "user-bob") + + result = find_subscriptions( + db_session=session, cadence="all", user_ids=["user-bob"], force=False + ) + assert [s.id for s in result] == [bob_sub.id] + + def test_force_with_user_ids_ignores_cadence(self, session): + """force=True + user_ids means bypass cadence.""" + weekly_sub = _make_subscription( + session, "user-alice", cadence=NotificationCadence.WEEKLY + ) + + result = find_subscriptions( + db_session=session, + cadence=NotificationCadence.DAILY, # different cadence + user_ids=["user-alice"], + force=True, + ) + ids = {s.id for s in result} + assert weekly_sub.id in ids + + +# --------------------------------------------------------------------------- +# find_events_for_subscription +# --------------------------------------------------------------------------- + + +class TestFindEventsForSubscription: + def test_new_events_no_log(self, session): + sub = _make_subscription(session, "user-alice") + event = _make_event(session) + + now = datetime.now(timezone.utc) + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + until=now + timedelta(hours=1), + max_retries=5, + ) + assert event.id in {e.id for e in events} + + def test_already_sent_excluded(self, session): + sub = _make_subscription(session, "user-alice") + event = _make_event(session) + # Mark as already sent + log = NotificationLog( + id=_uid(), + notification_event_id=event.id, + subscription_id=sub.id, + channel="email", + status=NotificationLogStatus.SENT, + ) + session.add(log) + session.flush() + + now = datetime.now(timezone.utc) + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + until=now + timedelta(hours=1), + max_retries=5, + ) + assert event.id not in {e.id for e in events} + + def test_failed_events_returned_in_retry_mode(self, session): + sub = _make_subscription(session, "user-alice") + event = _make_event(session) + log = NotificationLog( + id=_uid(), + notification_event_id=event.id, + subscription_id=sub.id, + channel="email", + status=NotificationLogStatus.FAILED, + retry_count=1, + ) + session.add(log) + session.flush() + + now = datetime.now(timezone.utc) + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="failed", + until=now + timedelta(hours=1), + max_retries=5, + ) + assert event.id in {e.id for e in events} + + def test_max_retries_exceeded_excluded(self, session): + sub = _make_subscription(session, "user-alice") + event = _make_event(session) + log = NotificationLog( + id=_uid(), + notification_event_id=event.id, + subscription_id=sub.id, + channel="email", + status=NotificationLogStatus.FAILED, + retry_count=5, # at max + ) + session.add(log) + session.flush() + + now = datetime.now(timezone.utc) + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="failed", + until=now + timedelta(hours=1), + max_retries=5, + ) + assert event.id not in {e.id for e in events} + + def test_event_before_active_since_excluded(self, session): + """Events older than active_since are always excluded, even with no log row. + + This covers both the pre-subscription case (event existed before the user + subscribed) and the disabled-period case (active_since was reset to now() + when the subscription was re-enabled). + """ + now = datetime.now(timezone.utc) + # Subscription became active 7 days ago. + sub = _make_subscription( + session, "user-alice", active_since=now - timedelta(days=7) + ) + # Event was emitted 14 days ago — before active_since. + old_event = _make_event( + session, + created_at=now - timedelta(days=14), + ) + + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + until=now, + max_retries=5, + ) + assert old_event.id not in {e.id for e in events} + + def test_event_outside_cadence_window_but_after_active_since_is_found( + self, session + ): + """Regression: events that fell outside the old cadence window but have no + log row (e.g. because a previous run crashed before writing one) must be + picked up on subsequent runs. + + With the old implementation these were silently dropped once the cadence + window (e.g. 24 h) advanced past their created_at. With active_since as + the lower bound they are always found. + """ + now = datetime.now(timezone.utc) + # Subscription has been active for 48 hours. + sub = _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.DAILY, + active_since=now - timedelta(hours=48), + ) + # Event was emitted 36 hours ago — inside active_since window but + # outside the daily cadence window (now - 24 h). + event = _make_event( + session, + created_at=now - timedelta(hours=36), + ) + + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + until=now, + max_retries=5, + ) + assert event.id in {e.id for e in events} + + def test_explicit_since_can_narrow_window_but_not_below_active_since(self, session): + """explicit_since further restricts the window but never expands it past active_since.""" + now = datetime.now(timezone.utc) + active_since = now - timedelta(days=3) + sub = _make_subscription(session, "user-alice", active_since=active_since) + + # Event 2 days ago — after active_since. + recent_event = _make_event( + session, created_at=now - timedelta(days=2), feed_stable_id="mdb-1" + ) + # Event 5 days ago — before active_since (pre-subscription / dead zone). + old_event = _make_event( + session, created_at=now - timedelta(days=5), feed_stable_id="mdb-2" + ) + + # explicit_since = now - 1 day: should narrow window further. + events = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + explicit_since=now - timedelta(days=1), + until=now, + max_retries=5, + ) + ids = {e.id for e in events} + # recent_event is outside explicit_since window (2 days > 1 day) → excluded. + assert recent_event.id not in ids + # old_event is before active_since → also excluded. + assert old_event.id not in ids + + # Without explicit_since, recent_event is included; old_event still excluded. + events_no_override = find_events_for_subscription( + db_session=session, + subscription=sub, + status_filter="new", + until=now, + max_retries=5, + ) + ids_no_override = {e.id for e in events_no_override} + assert recent_event.id in ids_no_override + assert old_event.id not in ids_no_override + + +# --------------------------------------------------------------------------- +# emit_admin_summary +# --------------------------------------------------------------------------- + + +class TestEmitAdminSummary: + def test_creates_notification_event(self, session): + stats = {"emails_sent": 3, "emails_failed": 1} + emit_admin_summary(db_session=session, stats=stats, cadence="weekly") + + event = ( + session.query(NotificationEvent) + .filter_by( + notification_type_id=NotificationTypeId.ADMIN_EVENT_SUMMARY, + event_subtype=AdminEventUpdateType.DISPATCH_SUMMARY, + ) + .one_or_none() + ) + assert event is not None + assert event.payload["emails_sent"] == 3 + assert event.payload["cadence"] == "weekly" + + +# --------------------------------------------------------------------------- +# dispatch_notifications_handler — dry run +# --------------------------------------------------------------------------- + + +class TestDispatchDryRun: + @patch("tasks.notifications.dispatch_notifications.with_users_db_session") + def test_dry_run_returns_stats_without_sending(self, mock_decorator): + """Smoke test: handler returns a stats dict and does not crash.""" + result = dispatch_notifications_handler({"dry_run": True, "cadence": "weekly"}) + # dry_run default is True, so no emails sent + assert "dry_run" in result or isinstance(result, dict) + + +# --------------------------------------------------------------------------- +# dispatch — integration-style (in-memory DB, mocked Brevo) +# --------------------------------------------------------------------------- + + +class TestDispatchIntegration: + @patch("tasks.notifications.dispatch_notifications.send_single") + def test_single_event_non_digest_sends_one_email(self, mock_send, session): + _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.WEEKLY, + digest=False, + ) + _make_event(session) + + now = datetime.now(timezone.utc) + stats = dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + assert mock_send.called + assert stats["emails_sent"] == 1 + + @patch("tasks.notifications.dispatch_notifications.send_digest") + def test_digest_batches_events_into_one_email(self, mock_send, session): + _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.WEEKLY, + digest=True, + ) + _make_event(session, feed_stable_id="mdb-1") + _make_event(session, feed_stable_id="mdb-2") + + now = datetime.now(timezone.utc) + stats = dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + # One digest call for 2 events + assert mock_send.call_count == 1 + assert stats["emails_sent"] == 2 + + @patch("tasks.notifications.dispatch_notifications.send_single") + def test_brevo_failure_marks_log_failed_and_increments_retry_count( + self, mock_send, session + ): + from shared.notifications.brevo_notification_sender import BrevoSendError + + mock_send.side_effect = BrevoSendError("Brevo 429") + sub = _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.WEEKLY, + digest=False, + ) + event = _make_event(session) + + now = datetime.now(timezone.utc) + with patch("tasks.notifications.dispatch_notifications.time.sleep"): + stats = dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + log = ( + session.query(NotificationLog) + .filter_by(notification_event_id=event.id, subscription_id=sub.id) + .one() + ) + assert log.status == NotificationLogStatus.FAILED + assert log.retry_count == 1 + assert stats["emails_failed"] == 1 + + @patch("tasks.notifications.dispatch_notifications.send_single") + def test_permanently_failed_after_max_retries(self, mock_send, session): + from shared.notifications.brevo_notification_sender import BrevoSendError + + mock_send.side_effect = BrevoSendError("Persistent failure") + sub = _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.WEEKLY, + digest=False, + ) + event = _make_event(session) + + # Pre-seed a log with retry_count = max_retries - 1 + log = NotificationLog( + id=_uid(), + notification_event_id=event.id, + subscription_id=sub.id, + channel="email", + status=NotificationLogStatus.FAILED, + retry_count=4, # one below max + ) + session.add(log) + session.flush() + + now = datetime.now(timezone.utc) + with patch("tasks.notifications.dispatch_notifications.time.sleep"): + stats = dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="failed", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + session.refresh(log) + assert log.status == NotificationLogStatus.PERMANENTLY_FAILED + assert log.retry_count == 5 + assert stats["permanently_failed"] == 1 + + @patch("tasks.notifications.dispatch_notifications.send_single") + def test_unique_constraint_prevents_duplicate_log(self, mock_send, session): + sub = _make_subscription( + session, + "user-alice", + cadence=NotificationCadence.WEEKLY, + digest=False, + ) + event = _make_event(session) + + now = datetime.now(timezone.utc) + kwargs = dict( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + dispatch(**kwargs) + dispatch(**kwargs) # second run — event already sent + + logs = ( + session.query(NotificationLog) + .filter_by(notification_event_id=event.id, subscription_id=sub.id) + .all() + ) + assert len(logs) == 1 + + @patch("tasks.notifications.dispatch_notifications.send_single") + def test_filter_params_excludes_unmatched_feed(self, mock_send, session): + _make_event( + session, feed_stable_id="mdb-1" + ) # different feed — should not match + + now = datetime.now(timezone.utc) + stats = dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=False, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + assert not mock_send.called + assert stats["emails_sent"] == 0 + + def test_dry_run_does_not_write_logs(self, session): + _make_subscription(session, "user-alice", cadence=NotificationCadence.WEEKLY) + _make_event(session) + + now = datetime.now(timezone.utc) + dispatch( + cadence=NotificationCadence.WEEKLY, + dry_run=True, + status_filter="new", + user_ids=[], + force=False, + since_dt=(now - timedelta(hours=1)).isoformat(), + until_dt=(now + timedelta(hours=1)).isoformat(), + max_retries=5, + db_session=session, + ) + + assert session.query(NotificationLog).count() == 0 diff --git a/functions-python/test_utils/database_utils.py b/functions-python/test_utils/database_utils.py index 9c2cc62e6..dbe6f1242 100644 --- a/functions-python/test_utils/database_utils.py +++ b/functions-python/test_utils/database_utils.py @@ -19,6 +19,7 @@ from sqlalchemy.orm import Session from sqlalchemy import text +from shared.database.users_database import with_users_db_session from shared.database_gen.sqlacodegen_models import Base from shared.database.database import Database, with_db_session import logging @@ -30,6 +31,10 @@ "postgresql://postgres:postgres@localhost:54320/MobilityDatabaseTest" ) +default_users_db_url: Final[str] = ( + "postgresql://postgres:postgres@localhost:54320/MobilityDatabaseUsersTest" +) + excluded_tables: Final[list[str]] = [ "databasechangelog", "databasechangeloglock", @@ -43,6 +48,17 @@ @with_db_session(db_url=default_db_url) def clean_testing_db(db_session: Session): + """Cleans the testing database by deleting all rows from all tables, excluding those in excluded_tables.""" + _clean_db(db_session) + + +@with_users_db_session(db_url=default_users_db_url) +def clean_testing_users_db(db_session: Session): + """Cleans the testing users database by deleting all rows from all tables, excluding those in excluded_tables.""" + _clean_db(db_session) + + +def _clean_db(db_session: Session): """Deletes all rows from all tables in the test db, excluding those in excluded_tables.""" try: tables_to_delete = [ diff --git a/liquibase/changelog_user.xml b/liquibase/changelog_user.xml index 55a569b13..c3145e676 100644 --- a/liquibase/changelog_user.xml +++ b/liquibase/changelog_user.xml @@ -15,4 +15,9 @@ + + diff --git a/liquibase/changes_user/feat_1723.sql b/liquibase/changes_user/feat_1723.sql new file mode 100644 index 000000000..f66184a45 --- /dev/null +++ b/liquibase/changes_user/feat_1723.sql @@ -0,0 +1,121 @@ +-- Issue #1723: Implement feed.url_updated notification type. +-- +-- This establishes a REUSABLE notification event + dispatch pattern for future +-- notification types (location.feed_added #1725, feed.url_availability #1726, +-- feed.coverage #1727, ...). To stay generic across types: +-- * notification_event keeps only type-agnostic columns; everything +-- type-specific (urls, location, dataset, http status, coverage dates, +-- dispatch stats, ...) goes into the JSONB `payload` column. +-- * notification_event_feed relates an event to one-or-more feeds, so a +-- single event can reference multiple feeds (e.g. redirect source+target). +-- +-- Changes: +-- 1. notification_event table — generic event record (type, subtype, payload) +-- 2. notification_event_feed table — N feeds per event (subject/target roles) +-- 3. notification_subscription cols — cadence (when) + digest (how many emails) +-- 4. notification_log cols — event FK, retry_count, unique delivery guard +-- 5. Seed notification_type rows — feed.url_updated, admin.event_summary + +-- --------------------------------------------------------------------------- +-- 1. notification_event — generic, type-agnostic event record +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS notification_event ( + id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::text, + notification_type_id TEXT NOT NULL REFERENCES notification_type(id), + -- Discriminator within a notification type. + -- feed.url_updated: 'feed_redirected' | 'url_replaced' + -- feed.url_availability:'became_unavailable' | 'became_available' + -- feed.coverage: 'expiring_soon' | 'expired' | 'producer_follow_up_required' + -- location.feed_added: 'feed_added' + -- admin.event_summary: 'dispatch_summary' + event_subtype TEXT NOT NULL, + -- Which process emitted this event. + -- e.g. 'populate_db_gtfs' | 'populate_db_gbfs' | 'tdg_import' | 'jbda_import' + -- | 'tdg_redirects' | 'operations_api' | 'dispatcher' + source TEXT, + -- All type-specific data lives here, keyed by convention per notification + -- type (see docs/notifications.md). Examples: + -- feed.url_updated: {old_url, new_url} + -- location.feed_added: {location_id, location_name, data_type, country, region} + -- feed.url_availability: {feed_url, http_status, error_reason, outage_duration} + -- feed.coverage: {latest_dataset_id, coverage_end_date, days_remaining} + -- admin.event_summary: {emails_sent, emails_failed, ..., cadence} + payload JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Dispatcher queries events per type ordered by recency. +CREATE INDEX IF NOT EXISTS idx_notification_event_type_created + ON notification_event (notification_type_id, created_at DESC); + +-- --------------------------------------------------------------------------- +-- 2. notification_event_feed — relate one event to one-or-more feeds +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS notification_event_feed ( + id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::text, + notification_event_id TEXT NOT NULL + REFERENCES notification_event(id) ON DELETE CASCADE, + -- The feed this row references (stable_id). + feed_stable_id TEXT NOT NULL, + -- Role of this feed within the event. + -- 'subject' : the feed the event is primarily about (default) + -- 'target' : the destination feed (e.g. redirect target) + role TEXT NOT NULL DEFAULT 'subject', + -- A feed can appear once per role within an event. + CONSTRAINT uq_notification_event_feed UNIQUE (notification_event_id, feed_stable_id, role) +); + +-- Fetch all feeds for an event when rendering. +CREATE INDEX IF NOT EXISTS idx_notification_event_feed_event_id + ON notification_event_feed (notification_event_id); + +-- Match events to subscribers filtering by feed (filter_params.feed_ids). +CREATE INDEX IF NOT EXISTS idx_notification_event_feed_feed_stable_id + ON notification_event_feed (feed_stable_id); + +-- --------------------------------------------------------------------------- +-- 3. notification_subscription — add cadence, active_since and digest columns +-- --------------------------------------------------------------------------- +ALTER TABLE notification_subscription + ADD COLUMN IF NOT EXISTS cadence TEXT NOT NULL DEFAULT 'weekly', + ADD COLUMN IF NOT EXISTS digest BOOLEAN NOT NULL DEFAULT true, + ADD COLUMN IF NOT EXISTS active_since TIMESTAMPTZ DEFAULT now(); + +-- Index to let the dispatcher efficiently find subscriptions to process per run. +CREATE INDEX IF NOT EXISTS idx_notification_subscription_cadence_active + ON notification_subscription (cadence, active) WHERE active; + +-- --------------------------------------------------------------------------- +-- 4. notification_log — add event FK, unique delivery guard, retry tracking +-- --------------------------------------------------------------------------- +ALTER TABLE notification_log + ADD COLUMN IF NOT EXISTS notification_event_id TEXT + REFERENCES notification_event(id) ON DELETE CASCADE, + ADD COLUMN IF NOT EXISTS retry_count INTEGER NOT NULL DEFAULT 0; + +-- One row per (event × subscription × channel). Prevents duplicate delivery +-- and provides the foundation for retry tracking. +ALTER TABLE notification_log + DROP CONSTRAINT IF EXISTS uq_notification_log_event_sub_channel; +ALTER TABLE notification_log + ADD CONSTRAINT uq_notification_log_event_sub_channel + UNIQUE (notification_event_id, subscription_id, channel); + +-- Dispatcher queries pending/failed rows for retry runs. +CREATE INDEX IF NOT EXISTS idx_notification_log_event_id + ON notification_log (notification_event_id); + +CREATE INDEX IF NOT EXISTS idx_notification_log_status + ON notification_log (status) WHERE status IN ('pending', 'failed'); + +-- --------------------------------------------------------------------------- +-- 5. Seed notification types +-- --------------------------------------------------------------------------- +INSERT INTO notification_type (id, description) VALUES + ('feed.url_updated', + 'Fired when a feed URL changes in-place (url_replaced) or a feed is deprecated ' + 'and redirected to a new feed (feed_redirected).'), + ('admin.event_summary', + 'Daily digest sent to admin subscribers summarising how many notification events ' + 'were dispatched, failed, or skipped during the previous dispatcher run.') +ON CONFLICT (id) DO NOTHING;