From 9bf0041aa3b8ee3bde2f937848090191dcbebbfe Mon Sep 17 00:00:00 2001 From: Lina Date: Thu, 6 Nov 2025 15:19:44 +0100 Subject: [PATCH 01/16] Inbox mail table --- enums.py | 1 + models.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/enums.py b/enums.py index 128b8e9..5c01ec7 100644 --- a/enums.py +++ b/enums.py @@ -178,6 +178,7 @@ class Tablenames(Enum): ADMIN_QUERY_MESSAGE_SUMMARY = "admin_query_message_summary" RELEASE_NOTIFICATION = "release_notification" TIMED_EXECUTIONS = "timed_executions" + INBOX_MAIL = "inbox_mail" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs diff --git a/models.py b/models.py index 00e624e..ef72581 100644 --- a/models.py +++ b/models.py @@ -2512,3 +2512,35 @@ class TimedExecutions(Base): __table_args__ = {"schema": "global"} time_key = Column(String, unique=True, primary_key=True) # enums.TimedExecutionKey last_executed_at = Column(DateTime) + + +class InboxMail(Base): + __tablename__ = Tablenames.INBOX_MAIL.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + organization_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.ORGANIZATION.value}.id", ondelete="CASCADE"), + index=True, + ) + created_at = Column(DateTime, default=sql.func.now()) + send_from = Column(String) + send_to = Column(JSON) + subject = Column(String) + content = Column(String) + mark_as_important = Column(Boolean, default=False) + meta_data = Column(JSON) + is_seen = Column(Boolean, default=False) + being_worked_on = Column(Boolean, default=False) + parent_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="SET NULL"), + index=True, + nullable=True, + ) + child_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="SET NULL"), + index=True, + nullable=True, + ) From b3f8d919b4a1008817759fff3440971261ffd38f Mon Sep 17 00:00:00 2001 From: Lina Date: Thu, 6 Nov 2025 17:01:15 +0100 Subject: [PATCH 02/16] Create and get requests for inbox mail --- global_objects/inbox_mail.py | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 global_objects/inbox_mail.py diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py new file mode 100644 index 0000000..6b707fb --- /dev/null +++ b/global_objects/inbox_mail.py @@ -0,0 +1,47 @@ +from typing import Dict, List + +from ..session import session +from sqlalchemy import cast, String + +from submodules.model.business_objects import general +from submodules.model.models import InboxMail + + +def get_inbox_mail( + org_id: str, + user_email: str, +) -> List[InboxMail]: + return ( + session.query(InboxMail) + .filter(InboxMail.organization_id == org_id) + .filter(cast(InboxMail.send_to, String).like(f"%{user_email}%")) + .all() + ) + + +def create( + org_id: str, + send_from: str, + send_to: Dict, + subject: str, + content: str, + mark_as_important: bool, + meta_data: Dict, + parent_id: str = None, + child_id: str = None, + with_commit: bool = True, +) -> InboxMail: + obj = InboxMail( + organization_id=org_id, + send_from=send_from, + send_to=send_to, + subject=subject, + content=content, + mark_as_important=mark_as_important, + meta_data=meta_data, + parent_id=parent_id, + child_id=child_id, + ) + general.add(obj, with_commit) + + return obj From 662835912546e75a73be751ab23c62e95935409d Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 12 Nov 2025 11:00:56 +0100 Subject: [PATCH 03/16] new model --- global_objects/inbox_mail.py | 118 +++++++++++++++++++++++++++-------- models.py | 31 +++++---- 2 files changed, 109 insertions(+), 40 deletions(-) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index 6b707fb..c9faf55 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -1,47 +1,111 @@ -from typing import Dict, List +from typing import Dict, List, Optional, Any from ..session import session -from sqlalchemy import cast, String +from sqlalchemy import cast, String, func, desc from submodules.model.business_objects import general from submodules.model.models import InboxMail +from sqlalchemy import or_ -def get_inbox_mail( +def get_by_thread( org_id: str, - user_email: str, + user_id: str, + thread_id: str, ) -> List[InboxMail]: + return ( session.query(InboxMail) - .filter(InboxMail.organization_id == org_id) - .filter(cast(InboxMail.send_to, String).like(f"%{user_email}%")) + .filter( + InboxMail.organization_id == org_id, + InboxMail.thread_id == thread_id, + or_( + InboxMail.recipient_id == user_id, + InboxMail.sender_id == user_id, + ), + ) + .order_by(desc(InboxMail.created_at)) .all() ) -def create( +def get_overview_by_threads( + org_id: str, + user_id: str, + page: int = 1, + limit: int = 10, +) -> Dict[str, Any]: + subquery = ( + session.query( + InboxMail.thread_id, + func.max(InboxMail.created_at).label("latest_mail_time"), + ) + .filter( + InboxMail.organization_id == org_id, + or_( + InboxMail.recipient_id == user_id, + InboxMail.sender_id == user_id, + ), + ) + .group_by(InboxMail.thread_id) + .subquery() + ) + + query = ( + session.query(InboxMail) + .join( + subquery, + (InboxMail.thread_id == subquery.c.thread_id) + & (InboxMail.created_at == subquery.c.latest_mail_time), + ) + .order_by(desc(InboxMail.created_at)) + ) + + total_threads = query.count() + mails = query.offset((page - 1) * limit).limit(limit).all() + + return { + "totalThreads": total_threads, + "page": page, + "limit": limit, + "mails": mails, + } + + +def create_by_thread( org_id: str, - send_from: str, - send_to: Dict, + sender_id: str, + recipient_ids: List[str], subject: str, content: str, - mark_as_important: bool, - meta_data: Dict, - parent_id: str = None, - child_id: str = None, + meta_data: Optional[Dict] = None, + parent_id: Optional[str] = None, + thread_id: Optional[str] = None, + is_important: bool = False, with_commit: bool = True, -) -> InboxMail: - obj = InboxMail( - organization_id=org_id, - send_from=send_from, - send_to=send_to, - subject=subject, - content=content, - mark_as_important=mark_as_important, - meta_data=meta_data, - parent_id=parent_id, - child_id=child_id, - ) - general.add(obj, with_commit) +) -> List[InboxMail]: + mail_entities: List[InboxMail] = [] + + for rid in recipient_ids: + other_recipient_ids = [r for r in recipient_ids if r != rid] + + mail_entity = InboxMail( + organization_id=org_id, + sender_id=sender_id, + recipient_id=rid, + other_recipient_ids=other_recipient_ids, + subject=subject, + content=content, + meta_data=meta_data or {}, + thread_id=thread_id, + parent_id=parent_id, + is_important=is_important, + ) + + mail_entities.append(mail_entity) + + general.add_all(mail_entities) + if with_commit: + general.commit() - return obj + return mail_entities diff --git a/models.py b/models.py index 14d6dbd..faec1f4 100644 --- a/models.py +++ b/models.py @@ -2573,23 +2573,28 @@ class InboxMail(Base): index=True, ) created_at = Column(DateTime, default=sql.func.now()) - send_from = Column(String) - send_to = Column(JSON) - subject = Column(String) - content = Column(String) - mark_as_important = Column(Boolean, default=False) - meta_data = Column(JSON) - is_seen = Column(Boolean, default=False) - being_worked_on = Column(Boolean, default=False) - parent_id = Column( + sender_id = Column( UUID(as_uuid=True), - ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="SET NULL"), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), index=True, - nullable=True, ) - child_id = Column( + recipient_id = Column( UUID(as_uuid=True), - ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="SET NULL"), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), index=True, + ) + other_recipient_ids = Column(JSON) + thread_id = Column(UUID(as_uuid=True), index=True, default=uuid.uuid4()) + parent_id = Column( + UUID(as_uuid=True), + ForeignKey("global.inbox_mail.id", ondelete="SET NULL"), nullable=True, + index=True, ) + subject = Column(String) + content = Column(String) + meta_data = Column(JSON) + + is_seen = Column(Boolean, default=False) + is_important = Column(Boolean, default=False) + being_working_on = Column(Boolean, default=False) From 7809311ddef8fe9af2fbd04c9bb09d4bba40149d Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Wed, 12 Nov 2025 17:53:17 +0100 Subject: [PATCH 04/16] mail collection by threads --- global_objects/inbox_mail.py | 29 ++++++++++++++++++++++++----- models.py | 2 +- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index c9faf55..db42ec7 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -1,4 +1,5 @@ from typing import Dict, List, Optional, Any +from submodules.model.util import sql_alchemy_to_dict from ..session import session from sqlalchemy import cast, String, func, desc @@ -51,7 +52,7 @@ def get_overview_by_threads( .subquery() ) - query = ( + latest_mails_query = ( session.query(InboxMail) .join( subquery, @@ -61,14 +62,33 @@ def get_overview_by_threads( .order_by(desc(InboxMail.created_at)) ) - total_threads = query.count() - mails = query.offset((page - 1) * limit).limit(limit).all() + total_threads = latest_mails_query.count() + latest_mails = latest_mails_query.offset((page - 1) * limit).limit(limit).all() + + threads = [] + for mail in latest_mails: + total_mails = ( + session.query(func.count(InboxMail.id)) + .filter( + InboxMail.thread_id == mail.thread_id, + InboxMail.organization_id == org_id, + ) + .scalar() + ) + + threads.append( + { + "threadId": mail.thread_id, + "latestMail": sql_alchemy_to_dict(mail), + "totalMails": total_mails, + } + ) return { "totalThreads": total_threads, "page": page, "limit": limit, - "mails": mails, + "threads": threads, } @@ -85,7 +105,6 @@ def create_by_thread( with_commit: bool = True, ) -> List[InboxMail]: mail_entities: List[InboxMail] = [] - for rid in recipient_ids: other_recipient_ids = [r for r in recipient_ids if r != rid] diff --git a/models.py b/models.py index faec1f4..b572b63 100644 --- a/models.py +++ b/models.py @@ -2584,7 +2584,7 @@ class InboxMail(Base): index=True, ) other_recipient_ids = Column(JSON) - thread_id = Column(UUID(as_uuid=True), index=True, default=uuid.uuid4()) + thread_id = Column(UUID(as_uuid=True), index=True, unique=True, default=uuid.uuid4) parent_id = Column( UUID(as_uuid=True), ForeignKey("global.inbox_mail.id", ondelete="SET NULL"), From 3640ac3e481aca5800cbe35b65d5209fc6d91e8e Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 13 Nov 2025 14:36:30 +0100 Subject: [PATCH 05/16] enums inboxmail reference --- enums.py | 6 +++ global_objects/inbox_mail.py | 102 +++++++++++++++++------------------ models.py | 29 +++++++--- 3 files changed, 75 insertions(+), 62 deletions(-) diff --git a/enums.py b/enums.py index d16180f..6cacac8 100644 --- a/enums.py +++ b/enums.py @@ -181,6 +181,7 @@ class Tablenames(Enum): CONVERSATION_SHARE = "conversation_share" CONVERSATION_GLOBAL_SHARE = "conversation_global_share" INBOX_MAIL = "inbox_mail" + INBOX_MAIL_REFERENCE = "inbox_mail_reference" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs @@ -1026,3 +1027,8 @@ class MessageType(Enum): class TimedExecutionKey(Enum): LAST_RESET_USER_MESSAGE_COUNT = "LAST_RESET_USER_MESSAGE_COUNT" + + +class InboxMailReferenceScope(Enum): + SENDER = "SENDER" + RECIPIENT = "RECIPIENT" diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index db42ec7..fc51677 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -5,8 +5,9 @@ from sqlalchemy import cast, String, func, desc from submodules.model.business_objects import general -from submodules.model.models import InboxMail +from submodules.model.models import InboxMail, InboxMailReference from sqlalchemy import or_ +from enums import InboxMailReferenceScope def get_by_thread( @@ -15,20 +16,20 @@ def get_by_thread( thread_id: str, ) -> List[InboxMail]: - return ( + inbox_mail_entities = ( session.query(InboxMail) + .join(InboxMailReference, InboxMail.id == InboxMailReference.inbox_mail_id) .filter( InboxMail.organization_id == org_id, InboxMail.thread_id == thread_id, - or_( - InboxMail.recipient_id == user_id, - InboxMail.sender_id == user_id, - ), + InboxMailReference.user_id == user_id, ) .order_by(desc(InboxMail.created_at)) .all() ) + return inbox_mail_entities + def get_overview_by_threads( org_id: str, @@ -39,50 +40,30 @@ def get_overview_by_threads( subquery = ( session.query( InboxMail.thread_id, - func.max(InboxMail.created_at).label("latest_mail_time"), + func.max(InboxMail.created_at).label("latest_created_at"), ) + .join(InboxMailReference, InboxMail.id == InboxMailReference.inbox_mail_id) .filter( InboxMail.organization_id == org_id, - or_( - InboxMail.recipient_id == user_id, - InboxMail.sender_id == user_id, - ), + InboxMailReference.user_id == user_id, ) .group_by(InboxMail.thread_id) .subquery() ) - - latest_mails_query = ( + total_threads = session.query(func.count()).select_from(subquery).scalar() + thread_summaries = ( session.query(InboxMail) .join( subquery, (InboxMail.thread_id == subquery.c.thread_id) - & (InboxMail.created_at == subquery.c.latest_mail_time), + & (InboxMail.created_at == subquery.c.latest_created_at), ) .order_by(desc(InboxMail.created_at)) + .offset((page - 1) * limit) + .limit(limit) + .all() ) - - total_threads = latest_mails_query.count() - latest_mails = latest_mails_query.offset((page - 1) * limit).limit(limit).all() - - threads = [] - for mail in latest_mails: - total_mails = ( - session.query(func.count(InboxMail.id)) - .filter( - InboxMail.thread_id == mail.thread_id, - InboxMail.organization_id == org_id, - ) - .scalar() - ) - - threads.append( - { - "threadId": mail.thread_id, - "latestMail": sql_alchemy_to_dict(mail), - "totalMails": total_mails, - } - ) + threads = [sql_alchemy_to_dict(mail) for mail in thread_summaries] return { "totalThreads": total_threads, @@ -104,27 +85,40 @@ def create_by_thread( is_important: bool = False, with_commit: bool = True, ) -> List[InboxMail]: - mail_entities: List[InboxMail] = [] + inbox_mail_entitiy = InboxMail( + organization_id=org_id, + sender_id=sender_id, + original_recipient_ids=recipient_ids, + subject=subject, + content=content, + meta_data=meta_data or {}, + parent_id=parent_id, + thread_id=thread_id, + is_important=is_important, + ) + + inbox_mail_references = [] + + inbox_mail_sender_reference = InboxMailReference( + inbox_mail_id=inbox_mail_entitiy.id, + user_id=sender_id, + scope=InboxMailReferenceScope.SENDER.value, + is_seen=True, + ) + inbox_mail_references.append(inbox_mail_sender_reference) + for rid in recipient_ids: - other_recipient_ids = [r for r in recipient_ids if r != rid] - - mail_entity = InboxMail( - organization_id=org_id, - sender_id=sender_id, - recipient_id=rid, - other_recipient_ids=other_recipient_ids, - subject=subject, - content=content, - meta_data=meta_data or {}, - thread_id=thread_id, - parent_id=parent_id, - is_important=is_important, + inbox_mail_references.append( + InboxMailReference( + inbox_mail_id=inbox_mail_entitiy.id, + user_id=rid, + scope=InboxMailReferenceScope.RECIPIENT.value, + ) ) + general.add(inbox_mail_entitiy) + general.add_all(inbox_mail_references) - mail_entities.append(mail_entity) - - general.add_all(mail_entities) if with_commit: general.commit() - return mail_entities + return inbox_mail_entitiy diff --git a/models.py b/models.py index b572b63..a198558 100644 --- a/models.py +++ b/models.py @@ -2578,12 +2578,7 @@ class InboxMail(Base): ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), index=True, ) - recipient_id = Column( - UUID(as_uuid=True), - ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), - index=True, - ) - other_recipient_ids = Column(JSON) + original_recipient_ids = Column(JSON) thread_id = Column(UUID(as_uuid=True), index=True, unique=True, default=uuid.uuid4) parent_id = Column( UUID(as_uuid=True), @@ -2594,7 +2589,25 @@ class InboxMail(Base): subject = Column(String) content = Column(String) meta_data = Column(JSON) - - is_seen = Column(Boolean, default=False) is_important = Column(Boolean, default=False) being_working_on = Column(Boolean, default=False) + + +class InboxMailReference(Base): + __tablename__ = Tablenames.INBOX_MAIL_REFERENCE.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + inbox_mail_id = Column( + UUID(as_uuid=True), + ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="CASCADE"), + index=True, + nullable=False, + ) + scope = Column(String, nullable=False) # enums.InboxMailReferenceScope + user_id = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), + index=True, + nullable=False, + ) + is_seen = Column(Boolean, default=False) From 1f96ec8bd5f4a78b0c17de5552dec1144dba6c32 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 13 Nov 2025 17:08:55 +0100 Subject: [PATCH 06/16] restructure return inbox mails --- global_objects/inbox_mail.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index fc51677..94aee44 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -2,12 +2,12 @@ from submodules.model.util import sql_alchemy_to_dict from ..session import session -from sqlalchemy import cast, String, func, desc +from sqlalchemy import cast, String, func, desc, asc from submodules.model.business_objects import general from submodules.model.models import InboxMail, InboxMailReference from sqlalchemy import or_ -from enums import InboxMailReferenceScope +from submodules.model.enums import InboxMailReferenceScope def get_by_thread( @@ -24,7 +24,7 @@ def get_by_thread( InboxMail.thread_id == thread_id, InboxMailReference.user_id == user_id, ) - .order_by(desc(InboxMail.created_at)) + .order_by(asc(InboxMail.created_at)) .all() ) @@ -63,7 +63,11 @@ def get_overview_by_threads( .limit(limit) .all() ) - threads = [sql_alchemy_to_dict(mail) for mail in thread_summaries] + + threads = [ + {"id": str(mail.thread_id), "latest_mail": {**sql_alchemy_to_dict(mail)}} + for mail in thread_summaries + ] return { "totalThreads": total_threads, @@ -97,6 +101,8 @@ def create_by_thread( is_important=is_important, ) + general.add(inbox_mail_entitiy) + inbox_mail_references = [] inbox_mail_sender_reference = InboxMailReference( @@ -115,7 +121,6 @@ def create_by_thread( scope=InboxMailReferenceScope.RECIPIENT.value, ) ) - general.add(inbox_mail_entitiy) general.add_all(inbox_mail_references) if with_commit: From 3f79d9ab69f981143cee0db92f3bc5b4b8df24f3 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 13 Nov 2025 17:19:31 +0100 Subject: [PATCH 07/16] is admin notificaiton --- models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/models.py b/models.py index a198558..7c7f10f 100644 --- a/models.py +++ b/models.py @@ -2579,7 +2579,7 @@ class InboxMail(Base): index=True, ) original_recipient_ids = Column(JSON) - thread_id = Column(UUID(as_uuid=True), index=True, unique=True, default=uuid.uuid4) + thread_id = Column(UUID(as_uuid=True), index=True, default=uuid.uuid4) parent_id = Column( UUID(as_uuid=True), ForeignKey("global.inbox_mail.id", ondelete="SET NULL"), @@ -2591,6 +2591,7 @@ class InboxMail(Base): meta_data = Column(JSON) is_important = Column(Boolean, default=False) being_working_on = Column(Boolean, default=False) + is_admin_notification = Column(Boolean, default=False) class InboxMailReference(Base): From b9997e1ac0d16f6d4c319c43ee5b89c336dfb8ad Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 13 Nov 2025 17:19:49 +0100 Subject: [PATCH 08/16] create admin notification --- global_objects/inbox_mail.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index 94aee44..f455267 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -87,6 +87,7 @@ def create_by_thread( parent_id: Optional[str] = None, thread_id: Optional[str] = None, is_important: bool = False, + is_admin_notification: bool = False, with_commit: bool = True, ) -> List[InboxMail]: inbox_mail_entitiy = InboxMail( @@ -99,6 +100,7 @@ def create_by_thread( parent_id=parent_id, thread_id=thread_id, is_important=is_important, + is_admin_notification=is_admin_notification, ) general.add(inbox_mail_entitiy) From 721f939f660b80cb45bba2fce72f7f1875da0e02 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Thu, 13 Nov 2025 23:53:05 +0100 Subject: [PATCH 09/16] kratos --- kratos.py | 342 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 kratos.py diff --git a/kratos.py b/kratos.py new file mode 100644 index 0000000..473ec36 --- /dev/null +++ b/kratos.py @@ -0,0 +1,342 @@ +from email.mime.text import MIMEText +import smtplib +from typing import Union, Any, List, Dict +import os +import requests +import logging +from datetime import datetime, timedelta +from urllib.parse import quote + +from submodules.model import daemon +from submodules.model.business_objects import general, user + + +logging.basicConfig(level=logging.INFO) +logger: logging.Logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +KRATOS_ADMIN_URL = os.getenv("KRATOS_ADMIN_URL") +SMTP_HOST = os.getenv("SMTP_HOST") +SMTP_PORT = os.getenv("SMTP_PORT") +SMTP_USER = os.getenv("SMTP_USER") +SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") + +# user_id -> {"identity" -> full identity, "simple" -> {"id": str, "mail": str, "firstName": str, "lastName": str}} +# "collected" -> timestamp +KRATOS_IDENTITY_CACHE: Dict[str, Any] = {} +KRATOS_IDENTITY_CACHE_TIMEOUT = timedelta(minutes=30) + +LANGUAGE_MESSAGES = { + "en": "Hello!\n\nClick the link to complete your account setup:\n\n", + "de": "Hallo!\n\nKlicken Sie auf den Link, um Ihre Kontoeinrichtung abzuschließen:\n\n", +} + +INVITATION_SUBJECT = "Sie sind zu unserer app eingeladen/You are invited to our app" + +LANGUAGE_EXPIRATION_INFO = { + "en": "This link can only be clicked once and is valid for 2 days. Contact your system admin if you have issues.", + "de": "Dieser Link kann nur einmal angeklickt werden und ist 2 Tage lang gültig. Kontaktieren Sie Ihren Systemadministrator, wenn Sie Probleme haben.", +} + + +def get_cached_values(update_db_users: bool = True) -> Dict[str, Dict[str, Any]]: + global KRATOS_IDENTITY_CACHE + if not KRATOS_IDENTITY_CACHE or len(KRATOS_IDENTITY_CACHE) == 0: + __refresh_identity_cache(update_db_users) + elif ( + KRATOS_IDENTITY_CACHE["collected"] + KRATOS_IDENTITY_CACHE_TIMEOUT + < datetime.now() + ): + __refresh_identity_cache(update_db_users) + return KRATOS_IDENTITY_CACHE + + +def __refresh_identity_cache(update_db_users: bool = True) -> None: + global KRATOS_IDENTITY_CACHE + request = requests.get(f"{KRATOS_ADMIN_URL}/identities") + if request.ok: + collected = datetime.now() + identities = request.json() + + # maybe more pages https://www.ory.sh/docs/ecosystem/api-design#pagination + while next_link := __get_link_from_kratos_request(request): + request = requests.get(next_link) + if request.ok: + identities.extend(request.json()) + + KRATOS_IDENTITY_CACHE = { + identity["id"]: { + "identity": identity, + "simple": __parse_identity_to_simple(identity), + } + for identity in identities + } + + KRATOS_IDENTITY_CACHE["collected"] = collected + else: + KRATOS_IDENTITY_CACHE = {} + + if update_db_users: + migrate_kratos_users() + + +def __get_link_from_kratos_request(request: requests.Response) -> str: + # rel=next only if there is more than 1 page + # ; rel="first",; rel="next" + link = request.headers.get("Link") + if link: + if 'rel="next"' in link: + parts = link.split("<") + for part in parts: + if 'rel="next"' in part: + return part.split(">")[0].replace("/admin", KRATOS_ADMIN_URL) + return None + + +def __get_identity(user_id: str, only_simple: bool = True) -> Dict[str, Any]: + if not isinstance(user_id, str): + user_id = str(user_id) + cache = get_cached_values() + if user_id in cache: + if only_simple: + return cache[user_id]["simple"] + return cache[user_id] + + if len(user_id) == 36: + # check not new entry outside cache + request = requests.get(f"{KRATOS_ADMIN_URL}/identities/{user_id}") + if request.ok: + identity = request.json() + if identity["id"] == user_id: + KRATOS_IDENTITY_CACHE[user_id] = { + "identity": identity, + "simple": __parse_identity_to_simple(identity), + } + if only_simple: + return KRATOS_IDENTITY_CACHE[user_id]["simple"] + return KRATOS_IDENTITY_CACHE[user_id] + # e.g. if id "GOLD_STAR" is requested => wont be in cache but expects a dummy dict + if only_simple: + return __parse_identity_to_simple({"id": user_id}) + return { + "identity": { + "id": user_id, + "traits": {"email": None, "name": {"first": None, "last": None}}, + } + } + + +def __parse_identity_to_simple(identity: Dict[str, Any]) -> Dict[str, str]: + r = { + "id": identity["id"], + "mail": None, + "firstName": None, + "lastName": None, + } + if "traits" in identity: + r["mail"] = identity["traits"]["email"] + if "name" in identity["traits"]: + r["firstName"] = identity["traits"]["name"]["first"] + r["lastName"] = identity["traits"]["name"]["last"] + return r + + +def get_userid_from_mail(user_mail: str) -> str: + values = get_cached_values() + for key in values: + if key == "collected": + continue + if values[key]["simple"]["mail"] == user_mail: + return key + # not in cached values, try search kratos + return __search_kratos_for_user_mail(user_mail)["id"] + + +def __search_kratos_for_user_mail(user_mail: str) -> str: + request = requests.get( + f"{KRATOS_ADMIN_URL}/identities?preview_credentials_identifier_similar={quote(user_mail)}" + ) + if request.ok: + identities = request.json() + for i in identities: + if i["traits"]["email"].lower() == user_mail.lower(): + return i + return None + + +def resolve_user_mail_by_id(user_id: str) -> str: + i = __get_identity(user_id) + if i: + return i["mail"] + return None + + +def resolve_user_name_by_id(user_id: str) -> Dict[str, str]: + i = __get_identity(user_id, False) + if i: + i = i["identity"] + return i["traits"]["name"] if "name" in i["traits"] else None + return None + + +def resolve_all_user_ids( + relevant_ids: List[str], as_list: bool = True +) -> Union[Dict[str, Dict[str, str]], List[Dict[str, str]]]: + final = [] if as_list else {} + for id in relevant_ids: + i = __get_identity(id) + if as_list: + final.append(i) + else: + final[id] = i + return final + + +def expand_user_mail_name( + users: List[Dict[str, str]], user_id_key="id" +) -> List[Dict[str, str]]: + final = [] + for user in users: + i = __get_identity(user[user_id_key]) + user = {**user, **i} + final.append(user) + return final + + +def resolve_user_name_and_email_by_id(user_id: str) -> dict: + i = __get_identity(user_id, False) + if i: + i = i["identity"] + if i and "traits" in i and i["traits"]: + return i["traits"]["name"], i["traits"]["email"] + return None + + +def create_user_kratos(email: str, provider: str = None): + payload_registration = { + "schema_id": "default", + "traits": {"email": email}, + } + if provider: + payload_registration["metadata_public"] = { + "registration_scope": { + "provider_id": provider, + "invitation_sso": True, + } + } + response_create = requests.post( + f"{KRATOS_ADMIN_URL}/identities", + json=payload_registration, + ) + return response_create.json() if response_create.ok else None + + +def delete_user_kratos(user_id: str) -> bool: + response_delete = requests.delete(f"{KRATOS_ADMIN_URL}/identities/{user_id}") + if response_delete.ok: + del KRATOS_IDENTITY_CACHE[user_id] + return True + return False + + +def get_recovery_link(user_id: str) -> str: + payload_recovery_link = { + "expires_in": "48h", + "identity_id": user_id, + } + response_link = requests.post( + f"{KRATOS_ADMIN_URL}/recovery/link", json=payload_recovery_link + ) + return response_link.json() if response_link.ok else None + + +def email_with_link(to_email: str, recovery_link: str) -> None: + msg = MIMEText( + f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", + ) + msg["Subject"] = INVITATION_SUBJECT + msg["From"] = "signup@kern.ai" + msg["To"] = to_email + + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: + if SMTP_USER and SMTP_PASSWORD: + server.ehlo() + server.starttls() + server.login(SMTP_USER, SMTP_PASSWORD) + server.send_message(msg) + + +def send_bulk_emails(emails: List[str], recovery_links: List[str]) -> None: + + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: + if SMTP_USER and SMTP_PASSWORD: + server.ehlo() + server.starttls() + server.login(SMTP_USER, SMTP_PASSWORD) + + for to_email, recovery_link in zip(emails, recovery_links): + msg = MIMEText( + f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", + ) + msg["Subject"] = INVITATION_SUBJECT + msg["From"] = "signup@kern.ai" + msg["To"] = to_email + server.send_message(msg) + + +def check_user_exists(email: str) -> bool: + request = requests.get( + f"{KRATOS_ADMIN_URL}/identities?preview_credentials_identifier_similar={quote(email)}" + ) + if request.ok: + identities = request.json() + for i in identities: + if i["traits"]["email"].lower() == email.lower(): + return True + return False + + +def migrate_kratos_users() -> None: + daemon.run_with_db_token(__migrate_kratos_users) + + +def __migrate_kratos_users(): + users_kratos = get_cached_values(False) + users_database = user.get_all() + + for user_database in users_database: + user_id = str(user_database.id) + if user_id not in users_kratos or users_kratos[user_id] is None: + continue + user_identity = users_kratos[user_id]["identity"] + if user_database.email != user_identity["traits"]["email"]: + user_database.email = user_identity["traits"]["email"] + if ( + user_database.verified + != user_identity["verifiable_addresses"][0]["verified"] + ): + user_database.verified = user_identity["verifiable_addresses"][0][ + "verified" + ] + if ( + user_database.created_at + != user_identity["verifiable_addresses"][0]["created_at"] + ): + user_database.created_at = user_identity["verifiable_addresses"][0][ + "created_at" + ] + if user_database.metadata_public != user_identity["metadata_public"]: + user_database.metadata_public = user_identity["metadata_public"] + sso_provider = ( + ( + user_identity["metadata_public"] + .get("registration_scope", {}) + .get("provider_id", None) + ) + if user_identity["metadata_public"] + else None + ) + if user_database.sso_provider != sso_provider: + user_database.sso_provider = sso_provider + + general.commit() From 38647f6fd11f7de085aba5450c5d52e357272cb6 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 17 Nov 2025 09:34:30 +0100 Subject: [PATCH 10/16] inbox mail threads --- business_objects/user.py | 26 ++++ enums.py | 8 +- global_objects/inbox_mail.py | 231 ++++++++++++++++++++++++----------- models.py | 49 +++++--- 4 files changed, 214 insertions(+), 100 deletions(-) diff --git a/business_objects/user.py b/business_objects/user.py index 8fa65dd..9f8d027 100644 --- a/business_objects/user.py +++ b/business_objects/user.py @@ -26,6 +26,32 @@ def get_user_cached_if_not_admin(user_id: str) -> Optional[User]: return user +# TODO use new roles form kratos +def get_admin_users() -> List[User]: + kernai_admins = ( + session.query(User) + .filter(User.email.ilike("%@kern.ai"), User.verified == True) + .all() + ) + + query = """ + SELECT email FROM global.full_admin_access + """ + + result = general.execute_all(query) + full_admin_emails = [row[0].lower() for row in result] if result else [] + + if full_admin_emails: + full_admins = ( + session.query(User).filter(User.email.in_(full_admin_emails)).all() + ) + else: + full_admins = [] + + admin_users = {user.id: user for user in kernai_admins + full_admins} + return list(admin_users.values()) + + @TTLCacheDecorator(CacheEnum.USER, 5, "user_id") def get_user_cached(user_id: str) -> User: user = get(user_id) diff --git a/enums.py b/enums.py index 6cacac8..1483601 100644 --- a/enums.py +++ b/enums.py @@ -181,7 +181,8 @@ class Tablenames(Enum): CONVERSATION_SHARE = "conversation_share" CONVERSATION_GLOBAL_SHARE = "conversation_global_share" INBOX_MAIL = "inbox_mail" - INBOX_MAIL_REFERENCE = "inbox_mail_reference" + INBOX_MAIL_THREAD = "inbox_mail_thread" + INBOX_MAIL_THREAD_ASSOCIATION = "inbox_mail_thread_association" def snake_case_to_pascal_case(self): # the type name (written in PascalCase) of a table is needed to create backrefs @@ -1027,8 +1028,3 @@ class MessageType(Enum): class TimedExecutionKey(Enum): LAST_RESET_USER_MESSAGE_COUNT = "LAST_RESET_USER_MESSAGE_COUNT" - - -class InboxMailReferenceScope(Enum): - SENDER = "SENDER" - RECIPIENT = "RECIPIENT" diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index f455267..a966f77 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -1,33 +1,53 @@ +from concurrent.futures import thread from typing import Dict, List, Optional, Any from submodules.model.util import sql_alchemy_to_dict from ..session import session from sqlalchemy import cast, String, func, desc, asc -from submodules.model.business_objects import general -from submodules.model.models import InboxMail, InboxMailReference +from submodules.model.business_objects import general, user as user_bo +from submodules.model.models import ( + InboxMail, + InboxMailThread, + InboxMailThreadAssociation, +) from sqlalchemy import or_ -from submodules.model.enums import InboxMailReferenceScope def get_by_thread( - org_id: str, - user_id: str, - thread_id: str, + org_id: str, user_id: str, thread_id: str, user_is_admin: bool = False ) -> List[InboxMail]: - inbox_mail_entities = ( - session.query(InboxMail) - .join(InboxMailReference, InboxMail.id == InboxMailReference.inbox_mail_id) - .filter( - InboxMail.organization_id == org_id, - InboxMail.thread_id == thread_id, - InboxMailReference.user_id == user_id, - ) - .order_by(asc(InboxMail.created_at)) - .all() + # Regular users: only threads they participate in. + # Admins: threads they participate in (normal), plus all admin support threads. + query = session.query(InboxMail).join( + InboxMailThread, InboxMail.thread_id == InboxMailThread.id ) + if user_is_admin: + participant_thread_ids = ( + session.query(InboxMailThreadAssociation.thread_id) + .filter(InboxMailThreadAssociation.user_id == str(user_id)) + .subquery() + ) + query = query.filter( + (InboxMailThread.id == thread_id) + & ( + (InboxMailThread.id.in_(participant_thread_ids)) + | (InboxMailThread.is_admin_support_thread == True) + ) + ) + else: + query = query.join( + InboxMailThreadAssociation, + InboxMailThreadAssociation.thread_id == InboxMailThread.id, + ).filter( + InboxMailThreadAssociation.user_id == str(user_id), + InboxMailThread.id == thread_id, + InboxMailThread.organization_id == org_id, + ) + + inbox_mail_entities = query.order_by(asc(InboxMail.created_at)).all() return inbox_mail_entities @@ -36,96 +56,159 @@ def get_overview_by_threads( user_id: str, page: int = 1, limit: int = 10, + user_is_admin: bool = False, ) -> Dict[str, Any]: - subquery = ( - session.query( - InboxMail.thread_id, - func.max(InboxMail.created_at).label("latest_created_at"), + + # Regular users: only threads they participate in. + # Admins: threads they participate in (normal), plus all admin support threads. + + base_query = session.query(InboxMailThread) + + if user_is_admin: + participant_thread_ids = ( + session.query(InboxMailThreadAssociation.thread_id) + .filter(InboxMailThreadAssociation.user_id == str(user_id)) + .subquery() ) - .join(InboxMailReference, InboxMail.id == InboxMailReference.inbox_mail_id) - .filter( - InboxMail.organization_id == org_id, - InboxMailReference.user_id == user_id, + query = base_query.filter( + (InboxMailThread.id.in_(participant_thread_ids)) + | (InboxMailThread.is_admin_support_thread == True) ) - .group_by(InboxMail.thread_id) - .subquery() - ) - total_threads = session.query(func.count()).select_from(subquery).scalar() - thread_summaries = ( - session.query(InboxMail) - .join( - subquery, - (InboxMail.thread_id == subquery.c.thread_id) - & (InboxMail.created_at == subquery.c.latest_created_at), + else: + query = base_query.join( + InboxMailThreadAssociation, + InboxMailThreadAssociation.thread_id == InboxMailThread.id, + ).filter( + InboxMailThreadAssociation.user_id == str(user_id), + InboxMailThread.organization_id == org_id, ) - .order_by(desc(InboxMail.created_at)) + + total_threads = query.count() + + threads = ( + query.order_by(desc(InboxMailThread.created_at)) .offset((page - 1) * limit) .limit(limit) .all() ) - threads = [ - {"id": str(mail.thread_id), "latest_mail": {**sql_alchemy_to_dict(mail)}} - for mail in thread_summaries + thread_ids = [t.id for t in threads] + + participants = ( + session.query( + InboxMailThreadAssociation.thread_id, InboxMailThreadAssociation.user_id + ) + .filter(InboxMailThreadAssociation.thread_id.in_(thread_ids)) + .all() + ) + + participants_map = {} + for thread_id, participant_user_id in participants: + participants_map.setdefault(str(thread_id), []).append(str(participant_user_id)) + + thread_dicts = [ + { + **sql_alchemy_to_dict(thread), + "latest_mail": sql_alchemy_to_dict(get_first_in_thread(thread.id)), + "participant_ids": participants_map.get(str(thread.id), []), + } + for thread in threads ] return { - "totalThreads": total_threads, + "total_threads": total_threads, "page": page, "limit": limit, - "threads": threads, + "threads": thread_dicts, } +def get_inbox_mail_thread_by_id(thread_id: str) -> InboxMailThread: + thread_entity = ( + session.query(InboxMailThread).filter(InboxMailThread.id == thread_id).first() + ) + if not thread_entity: + raise ValueError("Inbox mail thread not found") + return thread_entity + + +def get_inbox_mail_thread_association_by_thread_id_and_user_id( + thread_id: str, user_id: str +) -> Optional[InboxMailThreadAssociation]: + association_entity = ( + session.query(InboxMailThreadAssociation) + .filter( + InboxMailThreadAssociation.thread_id == thread_id, + InboxMailThreadAssociation.user_id == user_id, + ) + .first() + ) + return association_entity + + def create_by_thread( org_id: str, sender_id: str, - recipient_ids: List[str], - subject: str, content: str, + recipient_ids: Optional[List[str]] = None, + subject: Optional[str] = None, meta_data: Optional[Dict] = None, - parent_id: Optional[str] = None, thread_id: Optional[str] = None, is_important: bool = False, - is_admin_notification: bool = False, + is_admin_support_thread: bool = False, with_commit: bool = True, ) -> List[InboxMail]: + + if thread_id is None: + thread_entity = InboxMailThread( + created_by=sender_id, + organization_id=org_id, + subject=subject, + meta_data=meta_data or {}, + is_important=is_important, + is_admin_support_thread=is_admin_support_thread, + ) + general.add(thread_entity) + + participant_ids = [sender_id] + recipient_ids + thread_association_entities = [] + for user_id in participant_ids: + thread_association_entity = InboxMailThreadAssociation( + thread_id=thread_entity.id, + user_id=user_id, + ) + thread_association_entities.append(thread_association_entity) + general.add_all(thread_association_entities) + else: + thread_entity = get_inbox_mail_thread_by_id(thread_id) + inbox_mail_entitiy = InboxMail( - organization_id=org_id, - sender_id=sender_id, - original_recipient_ids=recipient_ids, - subject=subject, - content=content, - meta_data=meta_data or {}, - parent_id=parent_id, - thread_id=thread_id, - is_important=is_important, - is_admin_notification=is_admin_notification, + content=content, sender_id=sender_id, thread_id=thread_entity.id ) general.add(inbox_mail_entitiy) - inbox_mail_references = [] - - inbox_mail_sender_reference = InboxMailReference( - inbox_mail_id=inbox_mail_entitiy.id, - user_id=sender_id, - scope=InboxMailReferenceScope.SENDER.value, - is_seen=True, - ) - inbox_mail_references.append(inbox_mail_sender_reference) - - for rid in recipient_ids: - inbox_mail_references.append( - InboxMailReference( - inbox_mail_id=inbox_mail_entitiy.id, - user_id=rid, - scope=InboxMailReferenceScope.RECIPIENT.value, - ) - ) - general.add_all(inbox_mail_references) - if with_commit: general.commit() return inbox_mail_entitiy + + +def get_first_in_thread(thread_id: str) -> Optional[InboxMail]: + inbox_mail_entity = ( + session.query(InboxMail) + .filter(InboxMail.thread_id == thread_id) + .order_by(asc(InboxMail.created_at)) + .first() + ) + return inbox_mail_entity + + +def get_participant_ids_by_thread_id(thread_id: str) -> List[str]: + associations = ( + session.query(InboxMailThreadAssociation) + .filter(InboxMailThreadAssociation.thread_id == thread_id) + .all() + ) + participant_ids = [assoc.user_id for assoc in associations] + return participant_ids diff --git a/models.py b/models.py index 7c7f10f..4765d70 100644 --- a/models.py +++ b/models.py @@ -2563,52 +2563,61 @@ class ConversationGlobalShare(Base): created_at = Column(DateTime, default=sql.func.now()) -class InboxMail(Base): - __tablename__ = Tablenames.INBOX_MAIL.value +class InboxMailThread(Base): + __tablename__ = Tablenames.INBOX_MAIL_THREAD.value __table_args__ = {"schema": "global"} id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + created_by = Column( + UUID(as_uuid=True), + ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), + index=True, + ) organization_id = Column( UUID(as_uuid=True), ForeignKey(f"{Tablenames.ORGANIZATION.value}.id", ondelete="CASCADE"), index=True, ) created_at = Column(DateTime, default=sql.func.now()) + subject = Column(String) + meta_data = Column(JSON) + is_important = Column(Boolean, default=False) + being_working_on = Column(Boolean, default=False) + is_admin_support_thread = Column(Boolean, default=False) + + +class InboxMail(Base): + __tablename__ = Tablenames.INBOX_MAIL.value + __table_args__ = {"schema": "global"} + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + created_at = Column(DateTime, default=sql.func.now()) sender_id = Column( UUID(as_uuid=True), ForeignKey(f"{Tablenames.USER.value}.id", ondelete="SET NULL"), index=True, ) - original_recipient_ids = Column(JSON) - thread_id = Column(UUID(as_uuid=True), index=True, default=uuid.uuid4) - parent_id = Column( + thread_id = Column( UUID(as_uuid=True), - ForeignKey("global.inbox_mail.id", ondelete="SET NULL"), - nullable=True, + ForeignKey( + f"global.{Tablenames.INBOX_MAIL_THREAD.value}.id", ondelete="CASCADE" + ), index=True, ) - subject = Column(String) content = Column(String) - meta_data = Column(JSON) - is_important = Column(Boolean, default=False) - being_working_on = Column(Boolean, default=False) - is_admin_notification = Column(Boolean, default=False) -class InboxMailReference(Base): - __tablename__ = Tablenames.INBOX_MAIL_REFERENCE.value +class InboxMailThreadAssociation(Base): + __tablename__ = Tablenames.INBOX_MAIL_THREAD_ASSOCIATION.value __table_args__ = {"schema": "global"} id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) - inbox_mail_id = Column( + thread_id = Column( UUID(as_uuid=True), - ForeignKey(f"global.{Tablenames.INBOX_MAIL.value}.id", ondelete="CASCADE"), + ForeignKey( + f"global.{Tablenames.INBOX_MAIL_THREAD.value}.id", ondelete="CASCADE" + ), index=True, - nullable=False, ) - scope = Column(String, nullable=False) # enums.InboxMailReferenceScope user_id = Column( UUID(as_uuid=True), ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), index=True, - nullable=False, ) - is_seen = Column(Boolean, default=False) From a817046f94317ad2d32931782f63e5b6c7df35ab Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 17 Nov 2025 09:41:23 +0100 Subject: [PATCH 11/16] fix warning subquery --- global_objects/inbox_mail.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index a966f77..05b63a5 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -33,7 +33,7 @@ def get_by_thread( query = query.filter( (InboxMailThread.id == thread_id) & ( - (InboxMailThread.id.in_(participant_thread_ids)) + (InboxMailThread.id.in_(participant_thread_ids.select())) | (InboxMailThread.is_admin_support_thread == True) ) ) @@ -71,7 +71,7 @@ def get_overview_by_threads( .subquery() ) query = base_query.filter( - (InboxMailThread.id.in_(participant_thread_ids)) + (InboxMailThread.id.in_(participant_thread_ids.select())) | (InboxMailThread.is_admin_support_thread == True) ) else: From a90d2c81bb971b39f13b7383e6d209890593b75b Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 17 Nov 2025 12:06:29 +0100 Subject: [PATCH 12/16] is in progress inbox mail thread --- models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/models.py b/models.py index 4765d70..3adf6a5 100644 --- a/models.py +++ b/models.py @@ -2581,7 +2581,7 @@ class InboxMailThread(Base): subject = Column(String) meta_data = Column(JSON) is_important = Column(Boolean, default=False) - being_working_on = Column(Boolean, default=False) + is_in_progress = Column(Boolean, default=False) is_admin_support_thread = Column(Boolean, default=False) From de62400ecaf9219e5ab17fa14728fe8a7d31ecf9 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 17 Nov 2025 12:07:16 +0100 Subject: [PATCH 13/16] update progress --- global_objects/inbox_mail.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index 05b63a5..fbb78de 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -156,12 +156,13 @@ def create_by_thread( thread_id: Optional[str] = None, is_important: bool = False, is_admin_support_thread: bool = False, + created_by: Optional[str] = None, with_commit: bool = True, ) -> List[InboxMail]: if thread_id is None: thread_entity = InboxMailThread( - created_by=sender_id, + created_by=sender_id if not created_by else created_by, organization_id=org_id, subject=subject, meta_data=meta_data or {}, @@ -212,3 +213,11 @@ def get_participant_ids_by_thread_id(thread_id: str) -> List[str]: ) participant_ids = [assoc.user_id for assoc in associations] return participant_ids + + +def update_thread_progress( + thread_id: str, is_in_progress: bool, with_commit: bool = True +) -> Dict[str, Any]: + thread_entity = get_inbox_mail_thread_by_id(thread_id) + thread_entity.is_in_progress = is_in_progress + general.flush_or_commit(with_commit) From e4e444ade476f5d8d5b14bbf7af4a0c1a1e9c057 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Mon, 17 Nov 2025 15:28:24 +0100 Subject: [PATCH 14/16] unread mail count --- models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/models.py b/models.py index 3adf6a5..fa87f10 100644 --- a/models.py +++ b/models.py @@ -2621,3 +2621,4 @@ class InboxMailThreadAssociation(Base): ForeignKey(f"{Tablenames.USER.value}.id", ondelete="CASCADE"), index=True, ) + unread_mail_count = Column(Integer, default=0) From 2aed3c1347d2868974cb5e2d1e93753d459092a1 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 18 Nov 2025 10:05:55 +0100 Subject: [PATCH 15/16] unread mails --- global_objects/inbox_mail.py | 112 +++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/global_objects/inbox_mail.py b/global_objects/inbox_mail.py index fbb78de..a25c5df 100644 --- a/global_objects/inbox_mail.py +++ b/global_objects/inbox_mail.py @@ -14,6 +14,53 @@ from sqlalchemy import or_ +def get(inbox_mail_id: str) -> InboxMail: + return session.query(InboxMail).filter(InboxMail.id == inbox_mail_id).first() + + +def get_new_inbox_mails( + org_id: str, user_id: str, user_is_admin: bool +) -> Dict[str, int]: + count_query = ( + session.query( + InboxMailThreadAssociation.thread_id, + func.sum(InboxMailThreadAssociation.unread_mail_count).label( + "total_unread_count" + ), + ) + .join( + InboxMailThread, + InboxMailThread.id == InboxMailThreadAssociation.thread_id, + ) + .filter( + InboxMailThread.organization_id == org_id, + InboxMailThreadAssociation.user_id == str(user_id), + ) + .group_by(InboxMailThreadAssociation.thread_id) + ) + + total_unread_count = 0 + for _, thread_unread_count in count_query: + total_unread_count += thread_unread_count + + if user_is_admin: + # Include admin support thread unread counts + admin_unread_count = 0 + admin_threads = ( + session.query(InboxMailThread) + .filter( + InboxMailThread.organization_id == org_id, + InboxMailThread.is_admin_support_thread == True, + ) + .all() + ) + for thread in admin_threads: + meta_data = thread.meta_data or {} + admin_unread_count += meta_data.get("unreadMailCountAdmin", 0) + total_unread_count += admin_unread_count + return total_unread_count + + def get_by_thread( org_id: str, user_id: str, thread_id: str, user_is_admin: bool = False ) -> List[InboxMail]: @@ -51,6 +98,21 @@ def get_by_thread( return inbox_mail_entities +def get_inbox_mail_thread_length(thread_id: str) -> int: + count = session.query(InboxMail).filter(InboxMail.thread_id == thread_id).count() + return count + + +def delete(inbox_mail_id: str, with_commit: bool = True) -> None: + session.query(InboxMail).filter(InboxMail.id == inbox_mail_id).delete() + general.flush_or_commit(with_commit) + + +def delete_thread_by_id(thread_id: str, with_commit: bool = True) -> None: + session.query(InboxMailThread).filter(InboxMailThread.id == thread_id).delete() + general.flush_or_commit(with_commit) + + def get_overview_by_threads( org_id: str, user_id: str, @@ -94,6 +156,7 @@ def get_overview_by_threads( thread_ids = [t.id for t in threads] + # Get participants for each thread participants = ( session.query( InboxMailThreadAssociation.thread_id, InboxMailThreadAssociation.user_id @@ -106,11 +169,28 @@ def get_overview_by_threads( for thread_id, participant_user_id in participants: participants_map.setdefault(str(thread_id), []).append(str(participant_user_id)) + unread_counts = ( + session.query( + InboxMailThreadAssociation.thread_id, + InboxMailThreadAssociation.unread_mail_count, + ) + .filter( + InboxMailThreadAssociation.thread_id.in_(thread_ids), + InboxMailThreadAssociation.user_id == str(user_id), + ) + .all() + ) + unread_count_map = { + str(thread_id): unread_mail_count + for thread_id, unread_mail_count in unread_counts + } + thread_dicts = [ { **sql_alchemy_to_dict(thread), "latest_mail": sql_alchemy_to_dict(get_first_in_thread(thread.id)), "participant_ids": participants_map.get(str(thread.id), []), + "unread_mail_count": unread_count_map.get(str(thread.id), 0), } for thread in threads ] @@ -146,6 +226,19 @@ def get_inbox_mail_thread_association_by_thread_id_and_user_id( return association_entity +def get_inbox_mail_thread_associations_by_thread_id( + thread_id: str, +) -> List[InboxMailThreadAssociation]: + association_entities = ( + session.query(InboxMailThreadAssociation) + .filter( + InboxMailThreadAssociation.thread_id == thread_id, + ) + .all() + ) + return association_entities + + def create_by_thread( org_id: str, sender_id: str, @@ -160,7 +253,11 @@ def create_by_thread( with_commit: bool = True, ) -> List[InboxMail]: + print(meta_data, flush=True) if thread_id is None: + if is_admin_support_thread: + meta_data = meta_data or {} + meta_data["unreadMailCountAdmin"] = 1 thread_entity = InboxMailThread( created_by=sender_id if not created_by else created_by, organization_id=org_id, @@ -177,11 +274,26 @@ def create_by_thread( thread_association_entity = InboxMailThreadAssociation( thread_id=thread_entity.id, user_id=user_id, + unread_mail_count=1 if user_id != sender_id else 0, ) thread_association_entities.append(thread_association_entity) general.add_all(thread_association_entities) else: thread_entity = get_inbox_mail_thread_by_id(thread_id) + # Only update unread counts if the sender is also the issue creator + if is_admin_support_thread and thread_entity.created_by == sender_id: + meta_data = thread_entity.meta_data or {} + meta_data["unreadMailCountAdmin"] = ( + meta_data.get("unreadMailCountAdmin", 0) + 1 + ) + thread_entity.meta_data = meta_data + association_entities = get_inbox_mail_thread_associations_by_thread_id( + thread_id + ) + for assoc in association_entities: + if assoc.user_id != sender_id: + assoc.unread_mail_count += 1 + general.flush_or_commit(with_commit) inbox_mail_entitiy = InboxMail( content=content, sender_id=sender_id, thread_id=thread_entity.id From b186fc81c08576cc82423969517ee4c4ebac48a5 Mon Sep 17 00:00:00 2001 From: LennartSchmidtKern Date: Tue, 18 Nov 2025 10:06:13 +0100 Subject: [PATCH 16/16] remove kratos --- kratos.py | 342 ------------------------------------------------------ 1 file changed, 342 deletions(-) delete mode 100644 kratos.py diff --git a/kratos.py b/kratos.py deleted file mode 100644 index 473ec36..0000000 --- a/kratos.py +++ /dev/null @@ -1,342 +0,0 @@ -from email.mime.text import MIMEText -import smtplib -from typing import Union, Any, List, Dict -import os -import requests -import logging -from datetime import datetime, timedelta -from urllib.parse import quote - -from submodules.model import daemon -from submodules.model.business_objects import general, user - - -logging.basicConfig(level=logging.INFO) -logger: logging.Logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - -KRATOS_ADMIN_URL = os.getenv("KRATOS_ADMIN_URL") -SMTP_HOST = os.getenv("SMTP_HOST") -SMTP_PORT = os.getenv("SMTP_PORT") -SMTP_USER = os.getenv("SMTP_USER") -SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") - -# user_id -> {"identity" -> full identity, "simple" -> {"id": str, "mail": str, "firstName": str, "lastName": str}} -# "collected" -> timestamp -KRATOS_IDENTITY_CACHE: Dict[str, Any] = {} -KRATOS_IDENTITY_CACHE_TIMEOUT = timedelta(minutes=30) - -LANGUAGE_MESSAGES = { - "en": "Hello!\n\nClick the link to complete your account setup:\n\n", - "de": "Hallo!\n\nKlicken Sie auf den Link, um Ihre Kontoeinrichtung abzuschließen:\n\n", -} - -INVITATION_SUBJECT = "Sie sind zu unserer app eingeladen/You are invited to our app" - -LANGUAGE_EXPIRATION_INFO = { - "en": "This link can only be clicked once and is valid for 2 days. Contact your system admin if you have issues.", - "de": "Dieser Link kann nur einmal angeklickt werden und ist 2 Tage lang gültig. Kontaktieren Sie Ihren Systemadministrator, wenn Sie Probleme haben.", -} - - -def get_cached_values(update_db_users: bool = True) -> Dict[str, Dict[str, Any]]: - global KRATOS_IDENTITY_CACHE - if not KRATOS_IDENTITY_CACHE or len(KRATOS_IDENTITY_CACHE) == 0: - __refresh_identity_cache(update_db_users) - elif ( - KRATOS_IDENTITY_CACHE["collected"] + KRATOS_IDENTITY_CACHE_TIMEOUT - < datetime.now() - ): - __refresh_identity_cache(update_db_users) - return KRATOS_IDENTITY_CACHE - - -def __refresh_identity_cache(update_db_users: bool = True) -> None: - global KRATOS_IDENTITY_CACHE - request = requests.get(f"{KRATOS_ADMIN_URL}/identities") - if request.ok: - collected = datetime.now() - identities = request.json() - - # maybe more pages https://www.ory.sh/docs/ecosystem/api-design#pagination - while next_link := __get_link_from_kratos_request(request): - request = requests.get(next_link) - if request.ok: - identities.extend(request.json()) - - KRATOS_IDENTITY_CACHE = { - identity["id"]: { - "identity": identity, - "simple": __parse_identity_to_simple(identity), - } - for identity in identities - } - - KRATOS_IDENTITY_CACHE["collected"] = collected - else: - KRATOS_IDENTITY_CACHE = {} - - if update_db_users: - migrate_kratos_users() - - -def __get_link_from_kratos_request(request: requests.Response) -> str: - # rel=next only if there is more than 1 page - # ; rel="first",; rel="next" - link = request.headers.get("Link") - if link: - if 'rel="next"' in link: - parts = link.split("<") - for part in parts: - if 'rel="next"' in part: - return part.split(">")[0].replace("/admin", KRATOS_ADMIN_URL) - return None - - -def __get_identity(user_id: str, only_simple: bool = True) -> Dict[str, Any]: - if not isinstance(user_id, str): - user_id = str(user_id) - cache = get_cached_values() - if user_id in cache: - if only_simple: - return cache[user_id]["simple"] - return cache[user_id] - - if len(user_id) == 36: - # check not new entry outside cache - request = requests.get(f"{KRATOS_ADMIN_URL}/identities/{user_id}") - if request.ok: - identity = request.json() - if identity["id"] == user_id: - KRATOS_IDENTITY_CACHE[user_id] = { - "identity": identity, - "simple": __parse_identity_to_simple(identity), - } - if only_simple: - return KRATOS_IDENTITY_CACHE[user_id]["simple"] - return KRATOS_IDENTITY_CACHE[user_id] - # e.g. if id "GOLD_STAR" is requested => wont be in cache but expects a dummy dict - if only_simple: - return __parse_identity_to_simple({"id": user_id}) - return { - "identity": { - "id": user_id, - "traits": {"email": None, "name": {"first": None, "last": None}}, - } - } - - -def __parse_identity_to_simple(identity: Dict[str, Any]) -> Dict[str, str]: - r = { - "id": identity["id"], - "mail": None, - "firstName": None, - "lastName": None, - } - if "traits" in identity: - r["mail"] = identity["traits"]["email"] - if "name" in identity["traits"]: - r["firstName"] = identity["traits"]["name"]["first"] - r["lastName"] = identity["traits"]["name"]["last"] - return r - - -def get_userid_from_mail(user_mail: str) -> str: - values = get_cached_values() - for key in values: - if key == "collected": - continue - if values[key]["simple"]["mail"] == user_mail: - return key - # not in cached values, try search kratos - return __search_kratos_for_user_mail(user_mail)["id"] - - -def __search_kratos_for_user_mail(user_mail: str) -> str: - request = requests.get( - f"{KRATOS_ADMIN_URL}/identities?preview_credentials_identifier_similar={quote(user_mail)}" - ) - if request.ok: - identities = request.json() - for i in identities: - if i["traits"]["email"].lower() == user_mail.lower(): - return i - return None - - -def resolve_user_mail_by_id(user_id: str) -> str: - i = __get_identity(user_id) - if i: - return i["mail"] - return None - - -def resolve_user_name_by_id(user_id: str) -> Dict[str, str]: - i = __get_identity(user_id, False) - if i: - i = i["identity"] - return i["traits"]["name"] if "name" in i["traits"] else None - return None - - -def resolve_all_user_ids( - relevant_ids: List[str], as_list: bool = True -) -> Union[Dict[str, Dict[str, str]], List[Dict[str, str]]]: - final = [] if as_list else {} - for id in relevant_ids: - i = __get_identity(id) - if as_list: - final.append(i) - else: - final[id] = i - return final - - -def expand_user_mail_name( - users: List[Dict[str, str]], user_id_key="id" -) -> List[Dict[str, str]]: - final = [] - for user in users: - i = __get_identity(user[user_id_key]) - user = {**user, **i} - final.append(user) - return final - - -def resolve_user_name_and_email_by_id(user_id: str) -> dict: - i = __get_identity(user_id, False) - if i: - i = i["identity"] - if i and "traits" in i and i["traits"]: - return i["traits"]["name"], i["traits"]["email"] - return None - - -def create_user_kratos(email: str, provider: str = None): - payload_registration = { - "schema_id": "default", - "traits": {"email": email}, - } - if provider: - payload_registration["metadata_public"] = { - "registration_scope": { - "provider_id": provider, - "invitation_sso": True, - } - } - response_create = requests.post( - f"{KRATOS_ADMIN_URL}/identities", - json=payload_registration, - ) - return response_create.json() if response_create.ok else None - - -def delete_user_kratos(user_id: str) -> bool: - response_delete = requests.delete(f"{KRATOS_ADMIN_URL}/identities/{user_id}") - if response_delete.ok: - del KRATOS_IDENTITY_CACHE[user_id] - return True - return False - - -def get_recovery_link(user_id: str) -> str: - payload_recovery_link = { - "expires_in": "48h", - "identity_id": user_id, - } - response_link = requests.post( - f"{KRATOS_ADMIN_URL}/recovery/link", json=payload_recovery_link - ) - return response_link.json() if response_link.ok else None - - -def email_with_link(to_email: str, recovery_link: str) -> None: - msg = MIMEText( - f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", - ) - msg["Subject"] = INVITATION_SUBJECT - msg["From"] = "signup@kern.ai" - msg["To"] = to_email - - with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: - if SMTP_USER and SMTP_PASSWORD: - server.ehlo() - server.starttls() - server.login(SMTP_USER, SMTP_PASSWORD) - server.send_message(msg) - - -def send_bulk_emails(emails: List[str], recovery_links: List[str]) -> None: - - with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: - if SMTP_USER and SMTP_PASSWORD: - server.ehlo() - server.starttls() - server.login(SMTP_USER, SMTP_PASSWORD) - - for to_email, recovery_link in zip(emails, recovery_links): - msg = MIMEText( - f"{LANGUAGE_MESSAGES['de']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['de']}\n\n\n------\n\n{LANGUAGE_MESSAGES['en']}{recovery_link}\n\n{LANGUAGE_EXPIRATION_INFO['en']}", - ) - msg["Subject"] = INVITATION_SUBJECT - msg["From"] = "signup@kern.ai" - msg["To"] = to_email - server.send_message(msg) - - -def check_user_exists(email: str) -> bool: - request = requests.get( - f"{KRATOS_ADMIN_URL}/identities?preview_credentials_identifier_similar={quote(email)}" - ) - if request.ok: - identities = request.json() - for i in identities: - if i["traits"]["email"].lower() == email.lower(): - return True - return False - - -def migrate_kratos_users() -> None: - daemon.run_with_db_token(__migrate_kratos_users) - - -def __migrate_kratos_users(): - users_kratos = get_cached_values(False) - users_database = user.get_all() - - for user_database in users_database: - user_id = str(user_database.id) - if user_id not in users_kratos or users_kratos[user_id] is None: - continue - user_identity = users_kratos[user_id]["identity"] - if user_database.email != user_identity["traits"]["email"]: - user_database.email = user_identity["traits"]["email"] - if ( - user_database.verified - != user_identity["verifiable_addresses"][0]["verified"] - ): - user_database.verified = user_identity["verifiable_addresses"][0][ - "verified" - ] - if ( - user_database.created_at - != user_identity["verifiable_addresses"][0]["created_at"] - ): - user_database.created_at = user_identity["verifiable_addresses"][0][ - "created_at" - ] - if user_database.metadata_public != user_identity["metadata_public"]: - user_database.metadata_public = user_identity["metadata_public"] - sso_provider = ( - ( - user_identity["metadata_public"] - .get("registration_scope", {}) - .get("provider_id", None) - ) - if user_identity["metadata_public"] - else None - ) - if user_database.sso_provider != sso_provider: - user_database.sso_provider = sso_provider - - general.commit()