Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ src/user_service_gen/models/update_notification_subscription_request.py
src/user_service_gen/models/update_user_request.py
src/user_service_gen/models/user_profile.py
src/user_service_gen/security_api.py
src/user_service_gen/apis/subscriptions_api.py
src/user_service_gen/apis/subscriptions_api_base.py
2 changes: 2 additions & 0 deletions api/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from feeds_gen.apis.licenses_api import router as LicensesApiRouter
from user_service_gen.apis.users_api import router as UsersApiRouter
from user_service_gen.apis.notifications_api import router as NotificationsApiRouter
from user_service_gen.apis.subscriptions_api import router as SubscriptionsApiRouter

# Using the starlettte implementaiton as fastapi implementation generates errors with CORS in certain situations and
# returns 200 in the method response. More info, https://github.com/tiangolo/fastapi/issues/1663#issuecomment-730362611
Expand Down Expand Up @@ -59,6 +60,7 @@
app.include_router(LicensesApiRouter)
app.include_router(UsersApiRouter)
app.include_router(NotificationsApiRouter)
app.include_router(SubscriptionsApiRouter)


@app.on_event("startup")
Expand Down
56 changes: 49 additions & 7 deletions api/src/shared/common/brevo.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,54 @@ class BrevoSubscriptionStatus(Enum):
NOT_FOUND = "not_found"


def _get_contacts_api() -> "sib_api_v3_sdk.ContactsApi":
"""Build a Brevo ContactsApi client. Raises RuntimeError if BREVO_API_KEY is unset."""
api_key = os.getenv("BREVO_API_KEY")
if not api_key:
raise RuntimeError("BREVO_API_KEY environment variable is not set")
configuration = sib_api_v3_sdk.Configuration()
configuration.api_key["api-key"] = api_key
return sib_api_v3_sdk.ContactsApi(sib_api_v3_sdk.ApiClient(configuration))


def get_announcements_list_id() -> int:
"""Return the Brevo API-announcements list id from BREVO_API_ANNOUNCEMENTS_LIST_ID."""
raw = os.getenv("BREVO_API_ANNOUNCEMENTS_LIST_ID")
if not raw:
raise RuntimeError("BREVO_API_ANNOUNCEMENTS_LIST_ID environment variable is not set")
return int(raw)


def add_contact_to_list(email: str, list_id: int, subscription_id: str) -> None:
"""Create/update a Brevo contact, add it to the list, and set MDB_SUBSCRIPTION_ID.

Uses create_contact with update_enabled so it works whether or not the
contact already exists.
"""
api = _get_contacts_api()
api.create_contact(
sib_api_v3_sdk.CreateContact(
email=email,
attributes={"MDB_SUBSCRIPTION_ID": subscription_id},
list_ids=[list_id],

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question]: Does the update_enabled add the list_id to the lists, or does it reset the list to only one list?

update_enabled=True,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)
)


def remove_contact_from_list(email: str, list_id: int) -> None:
"""Remove a Brevo contact from the list. No-op if the contact is not on the list."""
api = _get_contacts_api()
try:
api.remove_contact_from_list(list_id, sib_api_v3_sdk.RemoveContactFromList(emails=[email]))
except sib_api_v3_sdk.rest.ApiException as exc:
# 400 "Contact already removed from list" / 404 contact-not-found are idempotent no-ops.
if exc.status in (400, 404):
logger.info("Contact %s not on list %s, nothing to remove", email, list_id)
return
raise


def get_contact_subscription_status(
email: str,
list_id: int | None = None,
Expand All @@ -56,13 +104,7 @@ def get_contact_subscription_status(
Raises RuntimeError if BREVO_API_KEY is not set.
Raises sib_api_v3_sdk.rest.ApiException on unexpected API errors.
"""
api_key = os.getenv("BREVO_API_KEY")
if not api_key:
raise RuntimeError("BREVO_API_KEY environment variable is not set")

configuration = sib_api_v3_sdk.Configuration()
configuration.api_key["api-key"] = api_key
api = sib_api_v3_sdk.ContactsApi(sib_api_v3_sdk.ApiClient(configuration))
api = _get_contacts_api()

try:
contact = api.get_contact_info(email)
Expand Down
24 changes: 24 additions & 0 deletions api/src/shared/db_models/notification_subscription_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from shared.users_database_gen.sqlacodegen_models import NotificationSubscription as NotificationSubscriptionOrm
from user_service_gen.models.notification_subscription import NotificationSubscription


class NotificationSubscriptionImpl(NotificationSubscription):
"""Implementation of the NotificationSubscription model.
Converts a SQLAlchemy NotificationSubscription ORM object to a Pydantic NotificationSubscription model.
"""

class Config:
from_attributes = True

@classmethod
def from_orm(cls, sub: NotificationSubscriptionOrm | None) -> NotificationSubscription | None:
if not sub:
return None
return cls(
id=sub.id,
user_id=sub.user_id,
notification_id=sub.notification_type_id,
active=sub.active,
last_notified_at=sub.last_notified_at,
created_at=sub.created_at,
)
39 changes: 39 additions & 0 deletions api/src/user_service/impl/subscription_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# MobilityData 2026
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Helpers shared between the authenticated (users) and public (subscriptions) APIs."""

import logging

from fastapi import HTTPException

import sib_api_v3_sdk
from shared.common.brevo import add_contact_to_list, get_announcements_list_id, remove_contact_from_list

logger = logging.getLogger(__name__)

ANNOUNCEMENTS_NOTIFICATION_TYPE_ID = "api.announcements"


def sync_announcements(email: str, subscribe: bool, subscription_id: str | None = None) -> None:
"""Sync an api.announcements subscription with Brevo, mapping provider errors to 502."""
try:
if subscribe:
add_contact_to_list(email, get_announcements_list_id(), subscription_id)
else:
remove_contact_from_list(email, get_announcements_list_id())
except (RuntimeError, sib_api_v3_sdk.rest.ApiException) as exc:
logger.error("Brevo sync failed for %s: %s", email, exc)
raise HTTPException(status_code=502, detail="Failed to sync subscription with email provider.")
55 changes: 55 additions & 0 deletions api/src/user_service/impl/subscriptions_api_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# MobilityData 2026
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from fastapi import HTTPException

from shared.database.users_database import with_users_db_session
from shared.db_models.notification_subscription_impl import NotificationSubscriptionImpl
from shared.users_database_gen.sqlacodegen_models import (
AppUser,
NotificationSubscription as NotificationSubscriptionOrm,
)
from user_service.impl.subscription_helpers import ANNOUNCEMENTS_NOTIFICATION_TYPE_ID, sync_announcements
from user_service_gen.apis.subscriptions_api_base import BaseSubscriptionsApi
from user_service_gen.models.notification_subscription import NotificationSubscription


class SubscriptionsApiImpl(BaseSubscriptionsApi):
"""Public, unauthenticated subscription management.

The subscription UUID is the access capability
"""

@with_users_db_session
def get_subscription(self, id: str, db_session=None) -> NotificationSubscription:
sub = db_session.get(NotificationSubscriptionOrm, id)
if sub is None:
raise HTTPException(status_code=404, detail="Subscription not found.")
return NotificationSubscriptionImpl.from_orm(sub)

@with_users_db_session
def delete_subscription(self, id: str, db_session=None) -> None:
sub = db_session.get(NotificationSubscriptionOrm, id)
if sub is None:
raise HTTPException(status_code=404, detail="Subscription not found.")

if sub.notification_type_id == ANNOUNCEMENTS_NOTIFICATION_TYPE_ID:
user = db_session.get(AppUser, sub.user_id)
if user is not None:
sync_announcements(user.email, subscribe=False)

db_session.delete(sub)
db_session.flush()
128 changes: 105 additions & 23 deletions api/src/user_service/impl/users_api_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@
from sqlalchemy.orm import selectinload

from middleware.request_context import get_request_context
from shared.database.database import generate_unique_id
from shared.database.users_database import with_users_db_session
from shared.db_models.app_user_impl import AppUserImpl
from shared.users_database_gen.sqlacodegen_models import AppUser, FeatureFlag, UserFeatureFlag
from shared.db_models.notification_subscription_impl import NotificationSubscriptionImpl
from shared.users_database_gen.sqlacodegen_models import (
AppUser,
FeatureFlag,
UserFeatureFlag,
NotificationSubscription as NotificationSubscriptionOrm,
NotificationType,
)
from user_service.impl.subscription_helpers import ANNOUNCEMENTS_NOTIFICATION_TYPE_ID, sync_announcements
from user_service_gen.apis.users_api_base import BaseUsersApi
from user_service_gen.models.create_notification_subscription_request import (
CreateNotificationSubscriptionRequest,
Expand All @@ -39,8 +48,6 @@

logger = logging.getLogger(__name__)

_NOT_IMPLEMENTED = "Not yet implemented."


class UsersApiImpl(BaseUsersApi):
"""Implementation of the User Service users API."""
Expand Down Expand Up @@ -92,14 +99,7 @@ def update_user(self, update_user_request: UpdateUserRequest, db_session=None) -
Email is intentionally excluded (requires re-verification).
Guest users cannot update their profile.
"""
context = get_request_context()
user_id: str | None = context.get("user_id")

if not user_id:
raise HTTPException(status_code=401, detail="Unable to determine user identity from token.")

if context.get("is_guest"):
raise HTTPException(status_code=403, detail="Guest users cannot update a profile.")
user_id = self._require_user_id()

user = (
db_session.query(AppUser)
Expand All @@ -119,23 +119,105 @@ def update_user(self, update_user_request: UpdateUserRequest, db_session=None) -
all_flags = db_session.query(FeatureFlag).filter(FeatureFlag.disabled.is_(False)).order_by(FeatureFlag.id).all()
return AppUserImpl.from_orm(user, all_flags)

# ── Subscription stubs — implemented in a follow-up issue ────────────────
# ── Subscriptions ────────────────────────────────────────────────────────

def get_user_subscriptions(self) -> List[NotificationSubscription]:
raise HTTPException(status_code=501, detail=_NOT_IMPLEMENTED)
@with_users_db_session
def get_user_subscriptions(self, db_session=None) -> List[NotificationSubscription]:
"""Returns all notification subscriptions for the authenticated user."""
user_id = self._require_user_id()
subs = (
db_session.query(NotificationSubscriptionOrm)
.filter(NotificationSubscriptionOrm.user_id == user_id)
.order_by(NotificationSubscriptionOrm.created_at)
.all()
)
return [NotificationSubscriptionImpl.from_orm(s) for s in subs]

@with_users_db_session
def create_user_subscription(
self,
create_notification_subscription_request: CreateNotificationSubscriptionRequest,
self, create_notification_subscription_request: CreateNotificationSubscriptionRequest, db_session=None
) -> NotificationSubscription:
raise HTTPException(status_code=501, detail=_NOT_IMPLEMENTED)
"""Subscribes the authenticated user to a notification type (idempotent)."""
user_id = self._require_user_id()
notification_id = create_notification_subscription_request.notification_id

if db_session.get(NotificationType, notification_id) is None:
raise HTTPException(status_code=400, detail=f"Unknown notification type '{notification_id}'.")

user = db_session.get(AppUser, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found.")

# Idempotent: reuse an existing subscription, reactivating if needed.
existing = (
db_session.query(NotificationSubscriptionOrm)
.filter(
NotificationSubscriptionOrm.user_id == user_id,
NotificationSubscriptionOrm.notification_type_id == notification_id,
)
.one_or_none()
)
sub = existing or NotificationSubscriptionOrm(
id=generate_unique_id(),
user_id=user_id,
notification_type_id=notification_id,
created_at=datetime.now(timezone.utc),
)
sub.active = True

if notification_id == ANNOUNCEMENTS_NOTIFICATION_TYPE_ID:
sync_announcements(user.email, subscribe=True, subscription_id=sub.id)

if existing is None:
db_session.add(sub)
db_session.flush()
return NotificationSubscriptionImpl.from_orm(sub)

@with_users_db_session
def update_user_subscription(
self,
id: str,
update_notification_subscription_request: UpdateNotificationSubscriptionRequest,
self, id: str, update_notification_subscription_request: UpdateNotificationSubscriptionRequest, db_session=None
) -> NotificationSubscription:
raise HTTPException(status_code=501, detail=_NOT_IMPLEMENTED)
"""Activates or deactivates a notification subscription by ID."""
user_id = self._require_user_id()
sub = self._get_owned_subscription(db_session, id, user_id)

active = update_notification_subscription_request.active
if sub.notification_type_id == ANNOUNCEMENTS_NOTIFICATION_TYPE_ID:
user = db_session.get(AppUser, user_id)
sync_announcements(user.email, subscribe=active, subscription_id=sub.id)

sub.active = active
db_session.flush()
return NotificationSubscriptionImpl.from_orm(sub)

@with_users_db_session
def delete_user_subscription(self, id: str, db_session=None) -> None:
"""Removes a notification subscription by ID."""
user_id = self._require_user_id()
sub = self._get_owned_subscription(db_session, id, user_id)

if sub.notification_type_id == ANNOUNCEMENTS_NOTIFICATION_TYPE_ID:
user = db_session.get(AppUser, user_id)
sync_announcements(user.email, subscribe=False)

db_session.delete(sub)
db_session.flush()

# ── Helpers ──────────────────────────────────────────────────────────────

def delete_user_subscription(self, id: str) -> None:
raise HTTPException(status_code=501, detail=_NOT_IMPLEMENTED)
@staticmethod
def _require_user_id() -> str:
context = get_request_context()
user_id = context.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Unable to determine user identity from token.")
if context.get("is_guest"):
raise HTTPException(status_code=403, detail="Guest users cannot perform this action.")
return user_id

@staticmethod
def _get_owned_subscription(db_session, sub_id: str, user_id: str) -> NotificationSubscriptionOrm:
sub = db_session.get(NotificationSubscriptionOrm, sub_id)
if sub is None or sub.user_id != user_id:
raise HTTPException(status_code=404, detail="Subscription not found.")
return sub
Loading
Loading