From 71979d36a45086f39ef1a3cedf0e667e337271ed Mon Sep 17 00:00:00 2001 From: Atlas Date: Tue, 24 Mar 2026 10:39:45 +0800 Subject: [PATCH 1/5] Add platform email delivery for password recovery and broadcast notices This change adds a platform-owned SMTP delivery path and wires it into forgot-password plus optional enterprise broadcast email delivery. The reset flow now issues single-use database-backed tokens, exposes public reset endpoints, and adds frontend pages for requesting and consuming reset links. The README and env example now document the required SMTP and public URL configuration for local and production setups. Constraint: SMTP configuration must come from environment variables rather than admin-managed secrets Constraint: Forgot-password responses must not reveal whether an email address exists Rejected: Reusing per-agent email tooling | wrong trust boundary for system-owned auth mail Rejected: Stateless reset JWTs | harder to revoke and audit than DB-backed single-use tokens Confidence: high Scope-risk: moderate Reversibility: clean Directive: Do not move reset-link generation away from PUBLIC_BASE_URL without verifying frontend route compatibility Tested: backend pytest tests/test_password_reset_and_notifications.py Tested: frontend npm run build Tested: manual local validation of forgot-password, reset-password, and SMTP delivery with Docker PostgreSQL/Redis Not-tested: full end-to-end browser automation for clicking the emailed reset link --- .env.example | 15 ++ README.md | 31 +++ .../versions/add_password_reset_tokens.py | 32 +++ backend/app/api/auth.py | 78 ++++++- backend/app/api/notification.py | 39 +++- backend/app/config.py | 11 + backend/app/main.py | 1 + backend/app/models/password_reset_token.py | 23 ++ backend/app/schemas/schemas.py | 10 +- .../app/services/password_reset_service.py | 78 +++++++ backend/app/services/system_email_service.py | 82 +++++++ backend/entrypoint.sh | 1 + .../test_password_reset_and_notifications.py | 211 ++++++++++++++++++ frontend/src/App.tsx | 4 + frontend/src/pages/EnterpriseSettings.tsx | 26 ++- frontend/src/pages/ForgotPassword.tsx | 83 +++++++ frontend/src/pages/Login.tsx | 13 +- frontend/src/pages/ResetPassword.tsx | 111 +++++++++ frontend/src/services/api.ts | 11 +- 19 files changed, 848 insertions(+), 12 deletions(-) create mode 100644 backend/alembic/versions/add_password_reset_tokens.py create mode 100644 backend/app/models/password_reset_token.py create mode 100644 backend/app/services/password_reset_service.py create mode 100644 backend/app/services/system_email_service.py create mode 100644 backend/tests/test_password_reset_and_notifications.py create mode 100644 frontend/src/pages/ForgotPassword.tsx create mode 100644 frontend/src/pages/ResetPassword.tsx diff --git a/.env.example b/.env.example index c27b9b06..029cca14 100644 --- a/.env.example +++ b/.env.example @@ -24,3 +24,18 @@ FEISHU_REDIRECT_URI=http://localhost:3000/auth/feishu/callback # Jina AI API key (for jina_search and jina_read tools — get one at https://jina.ai) # Without a key, the tools still work but with lower rate limits JINA_API_KEY= + +# Public app URL used in user-facing links, such as password reset emails +PUBLIC_BASE_URL=http://localhost:3008 + +# System email delivery (used for forgot-password and optional broadcast emails) +SYSTEM_EMAIL_FROM_ADDRESS= +SYSTEM_EMAIL_FROM_NAME=Clawith +SYSTEM_SMTP_HOST= +SYSTEM_SMTP_PORT=465 +SYSTEM_SMTP_USERNAME= +SYSTEM_SMTP_PASSWORD= +SYSTEM_SMTP_SSL=true + +# Password reset token lifetime in minutes +PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=30 diff --git a/README.md b/README.md index ea7c0954..5e0e423b 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,37 @@ Agent workspace files (soul.md, memory, skills, workspace files) are stored in ` The first user to register automatically becomes the **platform admin**. Open the app, click "Register", and create your account. +### System Email and Password Reset + +Clawith can send platform-owned emails for password reset and optional broadcast delivery. Configure SMTP in `.env`: + +```bash +PUBLIC_BASE_URL=http://localhost:3008 +SYSTEM_EMAIL_FROM_ADDRESS=bot@example.com +SYSTEM_EMAIL_FROM_NAME=Clawith +SYSTEM_SMTP_HOST=smtp.example.com +SYSTEM_SMTP_PORT=465 +SYSTEM_SMTP_USERNAME=bot@example.com +SYSTEM_SMTP_PASSWORD=your-app-password +SYSTEM_SMTP_SSL=true +PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=30 +``` + +`PUBLIC_BASE_URL` must point to the user-facing frontend because reset links are generated as `/reset-password?token=...`. + +Quick local validation: + +```bash +cd backend && .venv/bin/python -m pytest tests/test_password_reset_and_notifications.py +cd frontend && npm run build +``` + +Manual flow: +1. Open `http://localhost:3008/login` +2. Click `Forgot password?` +3. Submit a registered email +4. Open the emailed reset link and set a new password + ### Network Troubleshooting If `git clone` is slow or times out: diff --git a/backend/alembic/versions/add_password_reset_tokens.py b/backend/alembic/versions/add_password_reset_tokens.py new file mode 100644 index 00000000..cd0fe56e --- /dev/null +++ b/backend/alembic/versions/add_password_reset_tokens.py @@ -0,0 +1,32 @@ +"""Add password_reset_tokens table. + +Revision ID: add_password_reset_tokens +Revises: multi_tenant_registration +""" + +from alembic import op + +revision = "add_password_reset_tokens" +down_revision = "multi_tenant_registration" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute(""" + CREATE TABLE IF NOT EXISTS password_reset_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + token_hash VARCHAR(128) NOT NULL UNIQUE, + expires_at TIMESTAMPTZ NOT NULL, + used_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + ) + """) + op.execute("CREATE INDEX IF NOT EXISTS ix_password_reset_tokens_user_id ON password_reset_tokens(user_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_password_reset_tokens_token_hash ON password_reset_tokens(token_hash)") + op.execute("CREATE INDEX IF NOT EXISTS ix_password_reset_tokens_expires_at ON password_reset_tokens(expires_at)") + + +def downgrade() -> None: + op.execute("DROP TABLE IF EXISTS password_reset_tokens") diff --git a/backend/app/api/auth.py b/backend/app/api/auth.py index eb6b243b..1c722d10 100644 --- a/backend/app/api/auth.py +++ b/backend/app/api/auth.py @@ -1,6 +1,6 @@ """Authentication API routes.""" -import uuid +from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException, status from loguru import logger @@ -9,8 +9,17 @@ from app.core.security import create_access_token, get_current_user, hash_password, verify_password from app.database import get_db +from app.models.password_reset_token import PasswordResetToken from app.models.user import User -from app.schemas.schemas import TokenResponse, UserLogin, UserOut, UserRegister, UserUpdate +from app.schemas.schemas import ( + ForgotPasswordRequest, + ResetPasswordRequest, + TokenResponse, + UserLogin, + UserOut, + UserRegister, + UserUpdate, +) router = APIRouter(prefix="/auth", tags=["auth"]) @@ -141,6 +150,71 @@ async def login(data: UserLogin, db: AsyncSession = Depends(get_db)): ) +@router.post("/forgot-password") +async def forgot_password(data: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)): + """Request a password reset link without revealing account existence.""" + generic_response = { + "ok": True, + "message": "If an account with that email exists, a password reset email has been sent.", + } + + result = await db.execute(select(User).where(User.email == data.email)) + user = result.scalar_one_or_none() + if not user or not user.is_active: + return generic_response + + try: + from app.services.password_reset_service import build_password_reset_url, create_password_reset_token + from app.services.system_email_service import send_system_email + + raw_token, expires_at = await create_password_reset_token(db, user.id) + await db.commit() + + reset_url = await build_password_reset_url(db, raw_token) + expiry_minutes = int((expires_at - datetime.now(timezone.utc)).total_seconds() // 60) + await send_system_email( + user.email, + "Reset your Clawith password", + ( + f"Hello {user.display_name or user.username},\n\n" + f"We received a request to reset your Clawith password.\n\n" + f"Reset link: {reset_url}\n\n" + f"This link expires in {expiry_minutes} minutes. If you did not request this, you can ignore this email." + ), + ) + except Exception as exc: + logger.warning(f"Failed to process password reset email for {data.email}: {exc}") + + return generic_response + + +@router.post("/reset-password") +async def reset_password(data: ResetPasswordRequest, db: AsyncSession = Depends(get_db)): + """Reset a password using a valid single-use token.""" + from app.services.password_reset_service import consume_password_reset_token + + token = await consume_password_reset_token(db, data.token) + if not token: + raise HTTPException(status_code=400, detail="Invalid or expired reset token") + + result = await db.execute(select(User).where(User.id == token.user_id)) + user = result.scalar_one_or_none() + if not user or not user.is_active: + raise HTTPException(status_code=400, detail="Invalid or expired reset token") + + user.password_hash = hash_password(data.new_password) + + # Invalidate any other older token rows for the same user. + other_tokens = await db.execute(select(PasswordResetToken).where(PasswordResetToken.user_id == user.id)) + now = datetime.now(timezone.utc) + for row in other_tokens.scalars().all(): + if row.id != token.id and row.used_at is None: + row.used_at = now + + await db.flush() + return {"ok": True} + + @router.get("/me", response_model=UserOut) async def get_me(current_user: User = Depends(get_current_user)): """Get current user profile.""" diff --git a/backend/app/api/notification.py b/backend/app/api/notification.py index 6e1828b8..3e8d31bf 100644 --- a/backend/app/api/notification.py +++ b/backend/app/api/notification.py @@ -116,6 +116,7 @@ async def mark_all_read( class BroadcastRequest(BaseModel): title: str = Field(..., max_length=200) body: str = Field("", max_length=1000) + send_email: bool = False @router.post("/notifications/broadcast") @@ -138,12 +139,22 @@ async def broadcast_notification( sender_name = current_user.display_name or current_user.username or "Admin" count_users = 0 count_agents = 0 + count_emails = 0 + + if req.send_email: + from app.services.system_email_service import get_system_email_config + + try: + get_system_email_config() + except Exception as exc: + raise HTTPException(400, f"System email is not configured: {exc}") # Notify all users in tenant users_result = await db.execute( select(User).where(User.tenant_id == tenant_id, User.id != current_user.id) ) - for user in users_result.scalars().all(): + users = users_result.scalars().all() + for user in users: await send_notification( db, user_id=user.id, type="broadcast", @@ -167,6 +178,28 @@ async def broadcast_notification( ) count_agents += 1 - await db.commit() - return {"ok": True, "users_notified": count_users, "agents_notified": count_agents} + if req.send_email: + from app.services.system_email_service import send_system_email + + for user in users: + if not user.email: + continue + await send_system_email( + user.email, + req.title, + ( + f"{req.body}\n\n" + f"Sent by: {sender_name}" + if req.body.strip() + else f"Sent by: {sender_name}" + ), + ) + count_emails += 1 + await db.commit() + return { + "ok": True, + "users_notified": count_users, + "agents_notified": count_agents, + "emails_sent": count_emails, + } diff --git a/backend/app/config.py b/backend/app/config.py index 701e2551..6e7399b3 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -62,11 +62,21 @@ class Settings(BaseSettings): JWT_SECRET_KEY: str = "change-me-jwt-secret" JWT_ALGORITHM: str = "HS256" JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 # 24 hours + PASSWORD_RESET_TOKEN_EXPIRE_MINUTES: int = 60 # File Storage AGENT_DATA_DIR: str = _default_agent_data_dir() AGENT_TEMPLATE_DIR: str = "/app/agent_template" + # System email (platform-owned outbound mail) + SYSTEM_EMAIL_FROM_ADDRESS: str = "" + SYSTEM_EMAIL_FROM_NAME: str = "Clawith" + SYSTEM_SMTP_HOST: str = "" + SYSTEM_SMTP_PORT: int = 465 + SYSTEM_SMTP_USERNAME: str = "" + SYSTEM_SMTP_PASSWORD: str = "" + SYSTEM_SMTP_SSL: bool = True + # Docker (for Agent containers) DOCKER_NETWORK: str = "clawith_network" OPENCLAW_IMAGE: str = "openclaw:local" @@ -76,6 +86,7 @@ class Settings(BaseSettings): FEISHU_APP_ID: str = "" FEISHU_APP_SECRET: str = "" FEISHU_REDIRECT_URI: str = "" + PUBLIC_BASE_URL: str = "" # CORS CORS_ORIGINS: list[str] = ["http://localhost:3000", "http://localhost:5173"] diff --git a/backend/app/main.py b/backend/app/main.py index 7c0b6bd9..3df508a0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -98,6 +98,7 @@ async def lifespan(app: FastAPI): import app.models.trigger # noqa import app.models.notification # noqa import app.models.gateway_message # noqa + import app.models.password_reset_token # noqa async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Add 'atlassian' to channel_type_enum if it doesn't exist yet (idempotent) diff --git a/backend/app/models/password_reset_token.py b/backend/app/models/password_reset_token.py new file mode 100644 index 00000000..737cc6bf --- /dev/null +++ b/backend/app/models/password_reset_token.py @@ -0,0 +1,23 @@ +"""Password reset token model.""" + +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, String, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from app.database import Base + + +class PasswordResetToken(Base): + """Single-use password reset token.""" + + __tablename__ = "password_reset_tokens" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True) + token_hash: Mapped[str] = mapped_column(String(128), nullable=False, unique=True, index=True) + expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True) + used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False) diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index d8d96dbb..8507ff2f 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -21,6 +21,15 @@ class UserLogin(BaseModel): password: str +class ForgotPasswordRequest(BaseModel): + email: EmailStr + + +class ResetPasswordRequest(BaseModel): + token: str = Field(min_length=20, max_length=512) + new_password: str = Field(min_length=6, max_length=128) + + class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" @@ -440,4 +449,3 @@ class GatewaySendMessageRequest(BaseModel): target: str # Name of target person or agent content: str = Field(min_length=1) channel: str | None = None # Optional: "feishu", "agent", etc. Auto-detected if omitted. - diff --git a/backend/app/services/password_reset_service.py b/backend/app/services/password_reset_service.py new file mode 100644 index 00000000..b2c9085b --- /dev/null +++ b/backend/app/services/password_reset_service.py @@ -0,0 +1,78 @@ +"""Password reset token lifecycle helpers.""" + +from __future__ import annotations + +import hashlib +import secrets +import uuid +from datetime import datetime, timedelta, timezone + +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.models.password_reset_token import PasswordResetToken +from app.models.system_settings import SystemSetting + + +def _hash_token(token: str) -> str: + """Hash a raw reset token before persistence or lookup.""" + return hashlib.sha256(token.encode("utf-8")).hexdigest() + + +async def create_password_reset_token(db: AsyncSession, user_id: uuid.UUID) -> tuple[str, datetime]: + """Create a new single-use token and invalidate older unused tokens.""" + now = datetime.now(timezone.utc) + await db.execute( + update(PasswordResetToken) + .where(PasswordResetToken.user_id == user_id, PasswordResetToken.used_at.is_(None)) + .values(used_at=now) + ) + + raw_token = secrets.token_urlsafe(32) + expires_at = now + timedelta(minutes=get_settings().PASSWORD_RESET_TOKEN_EXPIRE_MINUTES) + db.add( + PasswordResetToken( + user_id=user_id, + token_hash=_hash_token(raw_token), + expires_at=expires_at, + ) + ) + await db.flush() + return raw_token, expires_at + + +async def get_public_base_url(db: AsyncSession) -> str: + """Resolve the public base URL used for user-facing links.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key == "platform")) + setting = result.scalar_one_or_none() + if setting and setting.value and setting.value.get("public_base_url"): + return str(setting.value["public_base_url"]).strip().rstrip("/") + + env_value = getattr(get_settings(), "PUBLIC_BASE_URL", "") if hasattr(get_settings(), "PUBLIC_BASE_URL") else "" + env_value = str(env_value).strip().rstrip("/") + if env_value: + return env_value + + raise RuntimeError("Public base URL is not configured.") + + +async def build_password_reset_url(db: AsyncSession, raw_token: str) -> str: + """Build the user-facing reset URL.""" + base_url = await get_public_base_url(db) + return f"{base_url}/reset-password?token={raw_token}" + + +async def consume_password_reset_token(db: AsyncSession, raw_token: str) -> PasswordResetToken | None: + """Load a valid reset token and mark it used.""" + now = datetime.now(timezone.utc) + result = await db.execute( + select(PasswordResetToken).where(PasswordResetToken.token_hash == _hash_token(raw_token)) + ) + token = result.scalar_one_or_none() + if not token or token.used_at or token.expires_at <= now: + return None + + token.used_at = now + await db.flush() + return token diff --git a/backend/app/services/system_email_service.py b/backend/app/services/system_email_service.py new file mode 100644 index 00000000..9a78d146 --- /dev/null +++ b/backend/app/services/system_email_service.py @@ -0,0 +1,82 @@ +"""System-owned outbound email service.""" + +from __future__ import annotations + +import smtplib +import ssl +from dataclasses import dataclass +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.utils import formataddr, make_msgid + +from app.config import get_settings +from app.services.email_service import _force_ipv4 + + +class SystemEmailConfigError(RuntimeError): + """Raised when system email configuration is missing or invalid.""" + + +@dataclass(slots=True) +class SystemEmailConfig: + """Resolved system email configuration.""" + + from_address: str + from_name: str + smtp_host: str + smtp_port: int + smtp_username: str + smtp_password: str + smtp_ssl: bool + + +def get_system_email_config() -> SystemEmailConfig: + """Resolve and validate the env-driven system email configuration.""" + settings = get_settings() + from_address = settings.SYSTEM_EMAIL_FROM_ADDRESS.strip() + smtp_host = settings.SYSTEM_SMTP_HOST.strip() + smtp_username = settings.SYSTEM_SMTP_USERNAME.strip() or from_address + smtp_password = settings.SYSTEM_SMTP_PASSWORD + + if not from_address or not smtp_host or not smtp_password: + raise SystemEmailConfigError( + "System email is not configured. Set SYSTEM_EMAIL_FROM_ADDRESS, SYSTEM_SMTP_HOST, and SYSTEM_SMTP_PASSWORD." + ) + + return SystemEmailConfig( + from_address=from_address, + from_name=settings.SYSTEM_EMAIL_FROM_NAME.strip() or "Clawith", + smtp_host=smtp_host, + smtp_port=settings.SYSTEM_SMTP_PORT, + smtp_username=smtp_username, + smtp_password=smtp_password, + smtp_ssl=settings.SYSTEM_SMTP_SSL, + ) + + +async def send_system_email(to: str, subject: str, body: str) -> None: + """Send a plain-text system email.""" + config = get_system_email_config() + + msg = MIMEMultipart() + msg["From"] = formataddr((config.from_name, config.from_address)) + msg["To"] = to + msg["Subject"] = subject + msg["Message-ID"] = make_msgid() + msg["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z") + msg.attach(MIMEText(body, "plain", "utf-8")) + + with _force_ipv4(): + if config.smtp_ssl: + context = ssl.create_default_context() + with smtplib.SMTP_SSL(config.smtp_host, config.smtp_port, context=context, timeout=15) as server: + server.login(config.smtp_username, config.smtp_password) + server.sendmail(config.from_address, [to], msg.as_string()) + else: + with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=15) as server: + server.ehlo() + server.starttls(context=ssl.create_default_context()) + server.ehlo() + server.login(config.smtp_username, config.smtp_password) + server.sendmail(config.from_address, [to], msg.as_string()) diff --git a/backend/entrypoint.sh b/backend/entrypoint.sh index 8668d048..f944ddfb 100755 --- a/backend/entrypoint.sh +++ b/backend/entrypoint.sh @@ -47,6 +47,7 @@ async def main(): import app.models.trigger # noqa import app.models.notification # noqa import app.models.gateway_message # noqa + import app.models.password_reset_token # noqa # Create all tables that don't exist yet (safe to run on every startup) async with engine.begin() as conn: diff --git a/backend/tests/test_password_reset_and_notifications.py b/backend/tests/test_password_reset_and_notifications.py new file mode 100644 index 00000000..891afcf8 --- /dev/null +++ b/backend/tests/test_password_reset_and_notifications.py @@ -0,0 +1,211 @@ +import uuid +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException + +from app.api import auth as auth_api +from app.api.notification import BroadcastRequest, broadcast_notification +from app.core.security import verify_password +from app.models.password_reset_token import PasswordResetToken +from app.models.user import User +from app.schemas.schemas import ForgotPasswordRequest, ResetPasswordRequest +from app.services import password_reset_service +from app.services.system_email_service import SystemEmailConfigError + + +class DummyScalars: + def __init__(self, values): + self._values = list(values) + + def all(self): + return list(self._values) + + +class DummyResult: + def __init__(self, value=None, values=None): + self._value = value + self._values = list(values or []) + + def scalar_one_or_none(self): + return self._value + + def scalars(self): + return DummyScalars(self._values) + + +class RecordingDB: + def __init__(self, responses=None): + self.responses = list(responses or []) + self.executed = [] + self.added = [] + self.flushed = False + self.committed = False + + async def execute(self, statement): + self.executed.append(statement) + if self.responses: + return self.responses.pop(0) + return DummyResult() + + def add(self, obj): + self.added.append(obj) + + async def flush(self): + self.flushed = True + + async def commit(self): + self.committed = True + + +def make_user(**overrides): + values = { + "id": uuid.uuid4(), + "username": "alice", + "email": "alice@example.com", + "password_hash": "old-hash", + "display_name": "Alice", + "role": "member", + "tenant_id": uuid.uuid4(), + "is_active": True, + } + values.update(overrides) + return User(**values) + + +@pytest.mark.asyncio +async def test_create_password_reset_token_invalidates_older_tokens(monkeypatch): + monkeypatch.setattr( + password_reset_service, + "get_settings", + lambda: SimpleNamespace(PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=15, PUBLIC_BASE_URL=""), + ) + db = RecordingDB() + user_id = uuid.uuid4() + + raw_token, expires_at = await password_reset_service.create_password_reset_token(db, user_id) + + assert db.flushed is True + assert len(db.executed) == 1 + assert "UPDATE password_reset_tokens" in str(db.executed[0]) + assert len(db.added) == 1 + saved_token = db.added[0] + assert isinstance(saved_token, PasswordResetToken) + assert saved_token.user_id == user_id + assert saved_token.token_hash != raw_token + assert len(raw_token) >= 20 + assert expires_at > datetime.now(timezone.utc) + + +@pytest.mark.asyncio +async def test_build_password_reset_url_uses_env_public_base_url(monkeypatch): + monkeypatch.setattr( + password_reset_service, + "get_settings", + lambda: SimpleNamespace(PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=30, PUBLIC_BASE_URL="https://app.example.com/"), + ) + db = RecordingDB([DummyResult(None)]) + + url = await password_reset_service.build_password_reset_url(db, "abc123") + + assert url == "https://app.example.com/reset-password?token=abc123" + + +@pytest.mark.asyncio +async def test_consume_password_reset_token_rejects_expired_tokens(): + expired = PasswordResetToken( + user_id=uuid.uuid4(), + token_hash="hashed", + expires_at=datetime.now(timezone.utc) - timedelta(minutes=1), + ) + db = RecordingDB([DummyResult(expired)]) + + token = await password_reset_service.consume_password_reset_token(db, "raw-token") + + assert token is None + assert expired.used_at is None + + +@pytest.mark.asyncio +async def test_forgot_password_returns_generic_response_for_unknown_email(): + db = RecordingDB([DummyResult(None)]) + + response = await auth_api.forgot_password(ForgotPasswordRequest(email="missing@example.com"), db) + + assert response == { + "ok": True, + "message": "If an account with that email exists, a password reset email has been sent.", + } + + +@pytest.mark.asyncio +async def test_forgot_password_hides_email_delivery_failures(monkeypatch): + user = make_user() + db = RecordingDB([DummyResult(user)]) + + async def fake_create_password_reset_token(*_args, **_kwargs): + raise RuntimeError("smtp failed") + + monkeypatch.setattr(password_reset_service, "create_password_reset_token", fake_create_password_reset_token) + + response = await auth_api.forgot_password(ForgotPasswordRequest(email=user.email), db) + + assert response["ok"] is True + assert "password reset email" in response["message"] + + +@pytest.mark.asyncio +async def test_reset_password_updates_user_and_invalidates_other_tokens(monkeypatch): + user = make_user(password_hash=auth_api.hash_password("old-password")) + consumed = PasswordResetToken( + id=uuid.uuid4(), + user_id=user.id, + token_hash="current", + expires_at=datetime.now(timezone.utc) + timedelta(minutes=30), + ) + older = PasswordResetToken( + id=uuid.uuid4(), + user_id=user.id, + token_hash="older", + expires_at=datetime.now(timezone.utc) + timedelta(minutes=30), + ) + db = RecordingDB([DummyResult(user), DummyResult(values=[consumed, older])]) + + async def fake_consume_password_reset_token(*_args, **_kwargs): + return consumed + + monkeypatch.setattr(password_reset_service, "consume_password_reset_token", fake_consume_password_reset_token) + + response = await auth_api.reset_password( + ResetPasswordRequest(token="t" * 20, new_password="new-password"), + db, + ) + + assert response == {"ok": True} + assert verify_password("new-password", user.password_hash) + assert older.used_at is not None + assert db.flushed is True + + +@pytest.mark.asyncio +async def test_broadcast_notification_rejects_missing_system_email_config(monkeypatch): + current_user = make_user(role="org_admin") + + def fake_get_system_email_config(): + raise SystemEmailConfigError("missing smtp host") + + monkeypatch.setattr( + "app.services.system_email_service.get_system_email_config", + fake_get_system_email_config, + ) + + with pytest.raises(HTTPException) as excinfo: + await broadcast_notification( + BroadcastRequest(title="Maintenance", body="Tonight", send_email=True), + current_user=current_user, + db=RecordingDB(), + ) + + assert excinfo.value.status_code == 400 + assert "System email is not configured" in excinfo.value.detail diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9531b273..3776dd07 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -3,6 +3,8 @@ import { useAuthStore } from './stores'; import { useEffect, useState } from 'react'; import { authApi } from './services/api'; import Login from './pages/Login'; +import ForgotPassword from './pages/ForgotPassword'; +import ResetPassword from './pages/ResetPassword'; import CompanySetup from './pages/CompanySetup'; import Layout from './pages/Layout'; import Dashboard from './pages/Dashboard'; @@ -104,6 +106,8 @@ export default function App() { } /> + } /> + } /> } /> }> } /> diff --git a/frontend/src/pages/EnterpriseSettings.tsx b/frontend/src/pages/EnterpriseSettings.tsx index 71c3b928..64300115 100644 --- a/frontend/src/pages/EnterpriseSettings.tsx +++ b/frontend/src/pages/EnterpriseSettings.tsx @@ -923,8 +923,9 @@ function BroadcastSection() { const { t } = useTranslation(); const [title, setTitle] = useState(''); const [body, setBody] = useState(''); + const [sendEmail, setSendEmail] = useState(false); const [sending, setSending] = useState(false); - const [result, setResult] = useState<{ users: number; agents: number } | null>(null); + const [result, setResult] = useState<{ users: number; agents: number; emails: number } | null>(null); const handleSend = async () => { if (!title.trim()) return; @@ -935,7 +936,7 @@ function BroadcastSection() { const res = await fetch('/api/notifications/broadcast', { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, - body: JSON.stringify({ title: title.trim(), body: body.trim() }), + body: JSON.stringify({ title: title.trim(), body: body.trim(), send_email: sendEmail }), }); if (!res.ok) { const err = await res.json().catch(() => ({})); @@ -944,9 +945,14 @@ function BroadcastSection() { return; } const data = await res.json(); - setResult({ users: data.users_notified, agents: data.agents_notified }); + setResult({ + users: data.users_notified, + agents: data.agents_notified, + emails: data.emails_sent || 0, + }); setTitle(''); setBody(''); + setSendEmail(false); } catch (e: any) { alert(e.message || 'Failed'); } @@ -977,13 +983,25 @@ function BroadcastSection() { rows={3} style={{ resize: 'vertical', fontSize: '13px', marginBottom: '12px' }} /> +
{result && ( - {t('enterprise.broadcast.sent', `Sent to ${result.users} users and ${result.agents} agents`, { users: result.users, agents: result.agents })} + {t( + 'enterprise.broadcast.sentWithEmail', + `Sent to ${result.users} users, ${result.agents} agents, and ${result.emails} email recipients`, + { users: result.users, agents: result.agents, emails: result.emails }, + )} )}
diff --git a/frontend/src/pages/ForgotPassword.tsx b/frontend/src/pages/ForgotPassword.tsx new file mode 100644 index 00000000..c74696c7 --- /dev/null +++ b/frontend/src/pages/ForgotPassword.tsx @@ -0,0 +1,83 @@ +import { useEffect, useState } from 'react'; +import { Link } from 'react-router-dom'; +import { authApi } from '../services/api'; + +export default function ForgotPassword() { + const [email, setEmail] = useState(''); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(''); + const [message, setMessage] = useState(''); + + useEffect(() => { + document.documentElement.setAttribute('data-theme', 'dark'); + }, []); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + setError(''); + setMessage(''); + setLoading(true); + + try { + const res = await authApi.forgotPassword({ email: email.trim() }); + setMessage(res.message); + } catch (err: any) { + setError(err.message || 'Failed to request password reset'); + } finally { + setLoading(false); + } + }; + + return ( +
+
+
+
+
+ + Clawith +
+

Forgot password

+

+ Enter your account email and we will send a reset link if the account exists. +

+
+ + {error && ( +
+ {error} +
+ )} + + {message && ( +
+ {message} +
+ )} + +
+
+ + setEmail(e.target.value)} + required + autoFocus + placeholder="name@company.com" + /> +
+ + +
+ +
+ Remembered your password? Back to login +
+
+
+
+ ); +} diff --git a/frontend/src/pages/Login.tsx b/frontend/src/pages/Login.tsx index caffef70..85e70347 100644 --- a/frontend/src/pages/Login.tsx +++ b/frontend/src/pages/Login.tsx @@ -1,5 +1,5 @@ import { useState, useEffect } from 'react'; -import { useNavigate } from 'react-router-dom'; +import { Link, useNavigate } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; import { useAuthStore } from '../stores'; import { authApi } from '../services/api'; @@ -181,6 +181,17 @@ export default function Login() { /> + {!isRegister && ( +
+ + {t('auth.forgotPassword', 'Forgot password?')} + +
+ )} + + + +
+ Back to login +
+ + + + ); +} diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 283f82e4..a62c168b 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -15,7 +15,10 @@ async function request(url: string, options: RequestInit = {}): Promise { if (!res.ok) { // Auto-logout on expired/invalid token (but not on auth endpoints — let them show errors) - const isAuthEndpoint = url.startsWith('/auth/login') || url.startsWith('/auth/register'); + const isAuthEndpoint = url.startsWith('/auth/login') + || url.startsWith('/auth/register') + || url.startsWith('/auth/forgot-password') + || url.startsWith('/auth/reset-password'); if (res.status === 401 && !isAuthEndpoint) { localStorage.removeItem('token'); localStorage.removeItem('user'); @@ -133,6 +136,12 @@ export const authApi = { login: (data: { username: string; password: string }) => request('/auth/login', { method: 'POST', body: JSON.stringify(data) }), + forgotPassword: (data: { email: string }) => + request<{ ok: boolean; message: string }>('/auth/forgot-password', { method: 'POST', body: JSON.stringify(data) }), + + resetPassword: (data: { token: string; new_password: string }) => + request<{ ok: boolean }>('/auth/reset-password', { method: 'POST', body: JSON.stringify(data) }), + me: () => request('/auth/me'), updateMe: (data: Partial) => From 8f74458343970fbd09229e2203d3f6aede6c2f99 Mon Sep 17 00:00:00 2001 From: Atlas Date: Tue, 24 Mar 2026 10:47:07 +0800 Subject: [PATCH 2/5] Localize password recovery and broadcast email UI copy This change removes English-only fallback text from the new password recovery flow and broadcast email controls so the feature matches the existing bilingual frontend behavior. The new auth pages now read from i18n, and both English and Simplified Chinese dictionaries include the new strings. Constraint: New user-facing copy must follow the existing frontend i18n structure instead of introducing page-local string tables Rejected: Leaving default English fallbacks in component code | inconsistent with the rest of the localized UI Confidence: high Scope-risk: narrow Reversibility: clean Directive: Add future auth and enterprise UI copy to locale files at the same time as the component change Tested: frontend npm run build Tested: manual browser check of /forgot-password Chinese rendering Not-tested: full language-switch regression across all newly added strings --- frontend/src/i18n/en.json | 33 +++++++++++++++++++++++++-- frontend/src/i18n/zh.json | 29 +++++++++++++++++++++++ frontend/src/pages/ForgotPassword.tsx | 16 +++++++------ frontend/src/pages/ResetPassword.tsx | 28 ++++++++++++----------- 4 files changed, 84 insertions(+), 22 deletions(-) diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 51e9a1b5..ece9aa2e 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -79,7 +79,27 @@ "invitationHint": "Token consumption is significant, so invitation codes are required. We recommend deploying your own instance and configuring leading models for the best experience.", "usernamePlaceholder": "Enter username", "passwordPlaceholder": "Enter password", - "emailPlaceholder": "you@example.com" + "emailPlaceholder": "you@example.com", + "forgotPassword": "Forgot password?", + "forgotPasswordTitle": "Forgot password", + "forgotPasswordSubtitle": "Enter your account email and we will send a reset link if the account exists.", + "forgotPasswordRequestFailed": "Failed to request password reset", + "sendResetLink": "Send reset link", + "rememberedPassword": "Remembered your password?", + "backToLogin": "Back to login", + "resetPasswordTitle": "Reset password", + "resetPasswordSubtitle": "Choose a new password for your account.", + "resetPasswordMissingToken": "Reset token is missing from the link.", + "resetPasswordTooShort": "New password must be at least 6 characters.", + "resetPasswordMismatch": "Passwords do not match.", + "resetPasswordFailed": "Failed to reset password", + "resetPasswordSuccess": "Password updated. Redirecting to login...", + "newPassword": "New password", + "newPasswordPlaceholder": "At least 6 characters", + "confirmNewPassword": "Confirm new password", + "confirmNewPasswordPlaceholder": "Repeat your new password", + "updatePassword": "Update password", + "emailPlaceholderReset": "name@company.com" }, "roles": { "platformAdmin": "Platform Admin", @@ -755,6 +775,15 @@ "saved": "Saved", "themeColor": "Theme Color" }, + "broadcast": { + "title": "Broadcast Notification", + "description": "Send a notification to all users and agents in this company.", + "titlePlaceholder": "Notification title", + "bodyPlaceholder": "Optional details...", + "sendEmail": "Also send email to users with a configured address", + "send": "Send Broadcast", + "sentWithEmail": "Sent to {{users}} users, {{agents}} agents, and {{emails}} email recipients" + }, "kb": { "title": "Company Knowledge Base", "rootDir": "Root", @@ -921,4 +950,4 @@ "ws_note": "WebSocket mode requires no public IP, callback URL, or domain verification (ICP). The connection is managed automatically." } } -} \ No newline at end of file +} diff --git a/frontend/src/i18n/zh.json b/frontend/src/i18n/zh.json index 08bef963..9e8e2c77 100644 --- a/frontend/src/i18n/zh.json +++ b/frontend/src/i18n/zh.json @@ -86,6 +86,26 @@ "usernamePlaceholder": "请输入用户名", "passwordPlaceholder": "请输入密码", "emailPlaceholder": "your@email.com", + "forgotPassword": "忘记密码?", + "forgotPasswordTitle": "忘记密码", + "forgotPasswordSubtitle": "输入你的账号邮箱;如果该账号存在,系统会发送一封重置链接邮件。", + "forgotPasswordRequestFailed": "请求密码重置失败", + "sendResetLink": "发送重置链接", + "rememberedPassword": "想起密码了?", + "backToLogin": "返回登录", + "resetPasswordTitle": "重置密码", + "resetPasswordSubtitle": "为你的账号设置一个新密码。", + "resetPasswordMissingToken": "链接中缺少重置令牌。", + "resetPasswordTooShort": "新密码至少需要 6 个字符。", + "resetPasswordMismatch": "两次输入的密码不一致。", + "resetPasswordFailed": "重置密码失败", + "resetPasswordSuccess": "密码已更新,正在跳转到登录页...", + "newPassword": "新密码", + "newPasswordPlaceholder": "至少 6 个字符", + "confirmNewPassword": "确认新密码", + "confirmNewPasswordPlaceholder": "再次输入新密码", + "updatePassword": "更新密码", + "emailPlaceholderReset": "name@company.com", "companyDisabled": "你的公司已被停用,请联系平台管理员。", "invalidCredentials": "用户名或密码错误。", "accountDisabled": "你的账号已被停用。", @@ -826,6 +846,15 @@ "saved": "已保存", "themeColor": "主题色" }, + "broadcast": { + "title": "广播通知", + "description": "向本公司所有用户和数字员工发送通知。", + "titlePlaceholder": "通知标题", + "bodyPlaceholder": "可选补充说明...", + "sendEmail": "同时给已配置邮箱地址的用户发送邮件", + "send": "发送广播", + "sentWithEmail": "已发送给 {{users}} 位用户、{{agents}} 个数字员工,并投递到 {{emails}} 个邮箱" + }, "companyName": { "title": "公司名称", "placeholder": "输入公司名称" diff --git a/frontend/src/pages/ForgotPassword.tsx b/frontend/src/pages/ForgotPassword.tsx index c74696c7..26c05f92 100644 --- a/frontend/src/pages/ForgotPassword.tsx +++ b/frontend/src/pages/ForgotPassword.tsx @@ -1,8 +1,10 @@ import { useEffect, useState } from 'react'; import { Link } from 'react-router-dom'; +import { useTranslation } from 'react-i18next'; import { authApi } from '../services/api'; export default function ForgotPassword() { + const { t } = useTranslation(); const [email, setEmail] = useState(''); const [loading, setLoading] = useState(false); const [error, setError] = useState(''); @@ -22,7 +24,7 @@ export default function ForgotPassword() { const res = await authApi.forgotPassword({ email: email.trim() }); setMessage(res.message); } catch (err: any) { - setError(err.message || 'Failed to request password reset'); + setError(err.message || t('auth.forgotPasswordRequestFailed', 'Failed to request password reset')); } finally { setLoading(false); } @@ -37,9 +39,9 @@ export default function ForgotPassword() { Clawith -

Forgot password

+

{t('auth.forgotPasswordTitle', 'Forgot password')}

- Enter your account email and we will send a reset link if the account exists. + {t('auth.forgotPasswordSubtitle', 'Enter your account email and we will send a reset link if the account exists.')}

@@ -57,24 +59,24 @@ export default function ForgotPassword() {
- + setEmail(e.target.value)} required autoFocus - placeholder="name@company.com" + placeholder={t('auth.emailPlaceholderReset', 'name@company.com')} />
- Remembered your password? Back to login + {t('auth.rememberedPassword', 'Remembered your password?')} {t('auth.backToLogin', 'Back to login')}
diff --git a/frontend/src/pages/ResetPassword.tsx b/frontend/src/pages/ResetPassword.tsx index 674d86d4..a2432c5c 100644 --- a/frontend/src/pages/ResetPassword.tsx +++ b/frontend/src/pages/ResetPassword.tsx @@ -1,8 +1,10 @@ import { useEffect, useMemo, useState } from 'react'; import { Link, useNavigate, useSearchParams } from 'react-router-dom'; +import { useTranslation } from 'react-i18next'; import { authApi } from '../services/api'; export default function ResetPassword() { + const { t } = useTranslation(); const navigate = useNavigate(); const [params] = useSearchParams(); const token = useMemo(() => params.get('token') || '', [params]); @@ -21,15 +23,15 @@ export default function ResetPassword() { setError(''); if (!token) { - setError('Reset token is missing from the link.'); + setError(t('auth.resetPasswordMissingToken', 'Reset token is missing from the link.')); return; } if (password.length < 6) { - setError('New password must be at least 6 characters.'); + setError(t('auth.resetPasswordTooShort', 'New password must be at least 6 characters.')); return; } if (password !== confirmPassword) { - setError('Passwords do not match.'); + setError(t('auth.resetPasswordMismatch', 'Passwords do not match.')); return; } @@ -39,7 +41,7 @@ export default function ResetPassword() { setSuccess(true); window.setTimeout(() => navigate('/login'), 1200); } catch (err: any) { - setError(err.message || 'Failed to reset password'); + setError(err.message || t('auth.resetPasswordFailed', 'Failed to reset password')); } finally { setLoading(false); } @@ -54,9 +56,9 @@ export default function ResetPassword() { Clawith -

Reset password

+

{t('auth.resetPasswordTitle', 'Reset password')}

- Choose a new password for your account. + {t('auth.resetPasswordSubtitle', 'Choose a new password for your account.')}

@@ -68,41 +70,41 @@ export default function ResetPassword() { {success && (
- Password updated. Redirecting to login... + {t('auth.resetPasswordSuccess', 'Password updated. Redirecting to login...')}
)}
- + setPassword(e.target.value)} required autoFocus - placeholder="At least 6 characters" + placeholder={t('auth.newPasswordPlaceholder', 'At least 6 characters')} />
- + setConfirmPassword(e.target.value)} required - placeholder="Repeat your new password" + placeholder={t('auth.confirmNewPasswordPlaceholder', 'Repeat your new password')} />
- Back to login + {t('auth.backToLogin', 'Back to login')}
From 9e61dbbd7a04d6476b57d22434ff58d0fcfcf8bc Mon Sep 17 00:00:00 2001 From: Atlas Date: Tue, 24 Mar 2026 10:56:23 +0800 Subject: [PATCH 3/5] Prevent SMTP latency from blocking password recovery and broadcasts This change moves system email delivery off the main request path for forgot-password and broadcast email sends. Password recovery now queues email delivery after persisting the reset token, and broadcast email sends are isolated per recipient so one SMTP failure does not abort the entire operation. Constraint: Keep the current env-driven SMTP configuration and avoid adding queue infrastructure Constraint: Requests must remain responsive even when SMTP is slow or misconfigured Rejected: Leave SMTP on the request path | risks slow or stalled user-facing requests Rejected: Add a durable mail queue now | too much scope for a targeted hardening pass Confidence: high Scope-risk: narrow Reversibility: clean Directive: Treat emails_sent in broadcast responses as queued recipients, not guaranteed SMTP success Tested: backend pytest tests/test_password_reset_and_notifications.py Tested: backend ruff check app/api/auth.py app/api/notification.py app/services/system_email_service.py tests/test_password_reset_and_notifications.py Tested: manual browser E2E reset-password -> login flow after the hardening change Not-tested: process-crash behavior between response return and background email completion --- backend/app/api/auth.py | 29 +++-- backend/app/api/notification.py | 30 ++++-- backend/app/services/system_email_service.py | 72 ++++++++++++- .../test_password_reset_and_notifications.py | 100 +++++++++++++++++- 4 files changed, 204 insertions(+), 27 deletions(-) diff --git a/backend/app/api/auth.py b/backend/app/api/auth.py index 1c722d10..22d1c41e 100644 --- a/backend/app/api/auth.py +++ b/backend/app/api/auth.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status from loguru import logger from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -151,7 +151,11 @@ async def login(data: UserLogin, db: AsyncSession = Depends(get_db)): @router.post("/forgot-password") -async def forgot_password(data: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)): +async def forgot_password( + data: ForgotPasswordRequest, + background_tasks: BackgroundTasks, + db: AsyncSession = Depends(get_db), +): """Request a password reset link without revealing account existence.""" generic_response = { "ok": True, @@ -165,22 +169,25 @@ async def forgot_password(data: ForgotPasswordRequest, db: AsyncSession = Depend try: from app.services.password_reset_service import build_password_reset_url, create_password_reset_token - from app.services.system_email_service import send_system_email + from app.services.system_email_service import ( + get_system_email_config, + run_background_email_job, + send_password_reset_email, + ) + get_system_email_config() raw_token, expires_at = await create_password_reset_token(db, user.id) await db.commit() reset_url = await build_password_reset_url(db, raw_token) expiry_minutes = int((expires_at - datetime.now(timezone.utc)).total_seconds() // 60) - await send_system_email( + background_tasks.add_task( + run_background_email_job, + send_password_reset_email, user.email, - "Reset your Clawith password", - ( - f"Hello {user.display_name or user.username},\n\n" - f"We received a request to reset your Clawith password.\n\n" - f"Reset link: {reset_url}\n\n" - f"This link expires in {expiry_minutes} minutes. If you did not request this, you can ignore this email." - ), + user.display_name or user.username, + reset_url, + expiry_minutes, ) except Exception as exc: logger.warning(f"Failed to process password reset email for {data.email}: {exc}") diff --git a/backend/app/api/notification.py b/backend/app/api/notification.py index 3e8d31bf..81cd47a5 100644 --- a/backend/app/api/notification.py +++ b/backend/app/api/notification.py @@ -3,7 +3,7 @@ import uuid from typing import Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from pydantic import BaseModel, Field from sqlalchemy import select, func, update from sqlalchemy.ext.asyncio import AsyncSession @@ -122,6 +122,7 @@ class BroadcastRequest(BaseModel): @router.post("/notifications/broadcast") async def broadcast_notification( req: BroadcastRequest, + background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): @@ -140,6 +141,7 @@ async def broadcast_notification( count_users = 0 count_agents = 0 count_emails = 0 + email_recipients = [] if req.send_email: from app.services.system_email_service import get_system_email_config @@ -179,24 +181,32 @@ async def broadcast_notification( count_agents += 1 if req.send_email: - from app.services.system_email_service import send_system_email + from app.services.system_email_service import ( + BroadcastEmailRecipient, + deliver_broadcast_emails, + run_background_email_job, + ) for user in users: if not user.email: continue - await send_system_email( - user.email, - req.title, - ( - f"{req.body}\n\n" - f"Sent by: {sender_name}" - if req.body.strip() - else f"Sent by: {sender_name}" + email_recipients.append( + BroadcastEmailRecipient( + email=user.email, + subject=req.title, + body=( + f"{req.body}\n\n" + f"Sent by: {sender_name}" + if req.body.strip() + else f"Sent by: {sender_name}" + ), ), ) count_emails += 1 await db.commit() + if email_recipients: + background_tasks.add_task(run_background_email_job, deliver_broadcast_emails, email_recipients) return { "ok": True, "users_notified": count_users, diff --git a/backend/app/services/system_email_service.py b/backend/app/services/system_email_service.py index 9a78d146..6ec47e99 100644 --- a/backend/app/services/system_email_service.py +++ b/backend/app/services/system_email_service.py @@ -2,8 +2,12 @@ from __future__ import annotations +import asyncio +import inspect +import logging import smtplib import ssl +from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime from email.mime.multipart import MIMEMultipart @@ -13,6 +17,8 @@ from app.config import get_settings from app.services.email_service import _force_ipv4 +logger = logging.getLogger(__name__) + class SystemEmailConfigError(RuntimeError): """Raised when system email configuration is missing or invalid.""" @@ -31,6 +37,15 @@ class SystemEmailConfig: smtp_ssl: bool +@dataclass(slots=True) +class BroadcastEmailRecipient: + """Prepared broadcast recipient payload.""" + + email: str + subject: str + body: str + + def get_system_email_config() -> SystemEmailConfig: """Resolve and validate the env-driven system email configuration.""" settings = get_settings() @@ -55,8 +70,8 @@ def get_system_email_config() -> SystemEmailConfig: ) -async def send_system_email(to: str, subject: str, body: str) -> None: - """Send a plain-text system email.""" +def _send_system_email_sync(to: str, subject: str, body: str) -> None: + """Send a plain-text system email synchronously.""" config = get_system_email_config() msg = MIMEMultipart() @@ -80,3 +95,56 @@ async def send_system_email(to: str, subject: str, body: str) -> None: server.ehlo() server.login(config.smtp_username, config.smtp_password) server.sendmail(config.from_address, [to], msg.as_string()) + + +async def send_system_email(to: str, subject: str, body: str) -> None: + """Send a plain-text system email without blocking the event loop.""" + await asyncio.to_thread(_send_system_email_sync, to, subject, body) + + +async def send_password_reset_email( + to: str, + display_name: str, + reset_url: str, + expiry_minutes: int, +) -> None: + """Send a password reset email.""" + await send_system_email( + to, + "Reset your Clawith password", + ( + f"Hello {display_name},\n\n" + f"We received a request to reset your Clawith password.\n\n" + f"Reset link: {reset_url}\n\n" + f"This link expires in {expiry_minutes} minutes. If you did not request this, you can ignore this email." + ), + ) + + +async def deliver_broadcast_emails(recipients: Iterable[BroadcastEmailRecipient]) -> None: + """Deliver broadcast emails while isolating per-recipient failures.""" + for recipient in recipients: + try: + await send_system_email(recipient.email, recipient.subject, recipient.body) + except Exception as exc: + logger.warning("Failed to deliver broadcast email to %s: %s", recipient.email, exc) + + +def fire_and_forget(coro) -> None: + """Run an awaitable in the background without failing the request.""" + task = asyncio.create_task(coro) + + def _consume_task_result(done_task: asyncio.Task) -> None: + try: + done_task.result() + except Exception as exc: + logger.warning("Background email task failed: %s", exc) + + task.add_done_callback(_consume_task_result) + + +def run_background_email_job(job, *args, **kwargs) -> None: + """Bridge Starlette background tasks to async email jobs.""" + result = job(*args, **kwargs) + if inspect.isawaitable(result): + fire_and_forget(result) diff --git a/backend/tests/test_password_reset_and_notifications.py b/backend/tests/test_password_reset_and_notifications.py index 891afcf8..ddc457ad 100644 --- a/backend/tests/test_password_reset_and_notifications.py +++ b/backend/tests/test_password_reset_and_notifications.py @@ -4,6 +4,7 @@ import pytest from fastapi import HTTPException +from starlette.background import BackgroundTasks from app.api import auth as auth_api from app.api.notification import BroadcastRequest, broadcast_notification @@ -130,29 +131,63 @@ async def test_consume_password_reset_token_rejects_expired_tokens(): @pytest.mark.asyncio async def test_forgot_password_returns_generic_response_for_unknown_email(): db = RecordingDB([DummyResult(None)]) + background_tasks = BackgroundTasks() - response = await auth_api.forgot_password(ForgotPasswordRequest(email="missing@example.com"), db) + response = await auth_api.forgot_password( + ForgotPasswordRequest(email="missing@example.com"), + background_tasks, + db, + ) assert response == { "ok": True, "message": "If an account with that email exists, a password reset email has been sent.", } + assert background_tasks.tasks == [] @pytest.mark.asyncio async def test_forgot_password_hides_email_delivery_failures(monkeypatch): user = make_user() db = RecordingDB([DummyResult(user)]) + background_tasks = BackgroundTasks() - async def fake_create_password_reset_token(*_args, **_kwargs): + def fake_get_system_email_config(): raise RuntimeError("smtp failed") - monkeypatch.setattr(password_reset_service, "create_password_reset_token", fake_create_password_reset_token) + monkeypatch.setattr("app.services.system_email_service.get_system_email_config", fake_get_system_email_config) - response = await auth_api.forgot_password(ForgotPasswordRequest(email=user.email), db) + response = await auth_api.forgot_password(ForgotPasswordRequest(email=user.email), background_tasks, db) assert response["ok"] is True assert "password reset email" in response["message"] + assert background_tasks.tasks == [] + + +@pytest.mark.asyncio +async def test_forgot_password_queues_background_email(monkeypatch): + user = make_user() + db = RecordingDB([DummyResult(user)]) + background_tasks = BackgroundTasks() + + async def fake_create_password_reset_token(*_args, **_kwargs): + return "raw-token", datetime.now(timezone.utc) + timedelta(minutes=30) + + async def fake_build_password_reset_url(*_args, **_kwargs): + return "https://app.example.com/reset-password?token=raw-token" + + monkeypatch.setattr(password_reset_service, "create_password_reset_token", fake_create_password_reset_token) + monkeypatch.setattr(password_reset_service, "build_password_reset_url", fake_build_password_reset_url) + monkeypatch.setattr( + "app.services.system_email_service.get_system_email_config", + lambda: SimpleNamespace(from_address="bot@example.com"), + ) + + response = await auth_api.forgot_password(ForgotPasswordRequest(email=user.email), background_tasks, db) + + assert response["ok"] is True + assert db.committed is True + assert len(background_tasks.tasks) == 1 @pytest.mark.asyncio @@ -203,9 +238,66 @@ def fake_get_system_email_config(): with pytest.raises(HTTPException) as excinfo: await broadcast_notification( BroadcastRequest(title="Maintenance", body="Tonight", send_email=True), + background_tasks=BackgroundTasks(), current_user=current_user, db=RecordingDB(), ) assert excinfo.value.status_code == 400 assert "System email is not configured" in excinfo.value.detail + + +@pytest.mark.asyncio +async def test_broadcast_notification_queues_email_delivery(monkeypatch): + current_user = make_user(role="org_admin") + target_user = make_user(email="bob@example.com", tenant_id=current_user.tenant_id) + db = RecordingDB([ + DummyResult(values=[target_user]), + DummyResult(values=[]), + ]) + background_tasks = BackgroundTasks() + + monkeypatch.setattr( + "app.services.system_email_service.get_system_email_config", + lambda: SimpleNamespace(from_address="bot@example.com"), + ) + notifications = [] + + async def fake_send_notification(*_args, **kwargs): + notifications.append(kwargs) + + monkeypatch.setattr("app.services.notification_service.send_notification", fake_send_notification) + + response = await broadcast_notification( + BroadcastRequest(title="Maintenance", body="Tonight", send_email=True), + background_tasks=background_tasks, + current_user=current_user, + db=db, + ) + + assert response["ok"] is True + assert response["emails_sent"] == 1 + assert db.committed is True + assert len(notifications) == 1 + assert len(background_tasks.tasks) == 1 + + +@pytest.mark.asyncio +async def test_deliver_broadcast_emails_continues_after_single_failure(monkeypatch): + from app.services.system_email_service import BroadcastEmailRecipient, deliver_broadcast_emails + + delivered = [] + + async def fake_send_system_email(email: str, subject: str, body: str) -> None: + if email == "bad@example.com": + raise RuntimeError("smtp down") + delivered.append((email, subject, body)) + + monkeypatch.setattr("app.services.system_email_service.send_system_email", fake_send_system_email) + + await deliver_broadcast_emails([ + BroadcastEmailRecipient(email="bad@example.com", subject="s1", body="b1"), + BroadcastEmailRecipient(email="good@example.com", subject="s2", body="b2"), + ]) + + assert delivered == [("good@example.com", "s2", "b2")] From 4902e80160845a02f0d494080b0f957da8eb617f Mon Sep 17 00:00:00 2001 From: Atlas Date: Tue, 24 Mar 2026 14:07:36 +0800 Subject: [PATCH 4/5] Keep enterprise directory data in sync across Feishu and WeCom The org sync flow now supports a provider-aware configuration model, adds WeCom directory ingestion, and updates the enterprise settings UI so admins can configure sync, browse a collapsible org tree, and inspect member details without manual directory maintenance. The read path for org sync settings was tightened so only admins can fetch the config and provider secrets are redacted from API responses while still being preserved on save when the UI submits blank secret fields. Constraint: WeCom validation had to remain read-only against the live tenant Constraint: Existing Feishu org sync settings needed to keep working without a data migration Rejected: Add a separate WeCom-only settings key | would duplicate provider config paths and UI state Rejected: Store org sync secrets only in environment variables | conflicts with tenant-managed enterprise settings workflow Confidence: high Scope-risk: moderate Reversibility: clean Directive: Do not expose org_sync secrets in API responses or relax admin-only access without revisiting the trust boundary Tested: backend/.venv/bin/python -m ruff check backend/app/api/enterprise.py backend/app/models/org.py backend/app/services/org_sync_service.py backend/tests/test_password_reset_and_notifications.py Tested: DATABASE_URL=postgresql+asyncpg://postgres:QF20200610@localhost:25432/clawith REDIS_URL=redis://:difyai123456@localhost:16379/0 backend/.venv/bin/python -m pytest backend/tests/test_password_reset_and_notifications.py Tested: cd frontend && npm run build Not-tested: Durable background job processing for very large org syncs --- .../versions/add_wecom_org_sync_fields.py | 62 + backend/app/api/enterprise.py | 51 +- backend/app/models/org.py | 11 +- backend/app/services/org_sync_service.py | 1053 ++++++++++++----- .../test_password_reset_and_notifications.py | 159 ++- frontend/src/i18n/en.json | 18 + frontend/src/i18n/zh.json | 18 + frontend/src/pages/EnterpriseSettings.tsx | 403 ++++++- 8 files changed, 1364 insertions(+), 411 deletions(-) create mode 100644 backend/alembic/versions/add_wecom_org_sync_fields.py diff --git a/backend/alembic/versions/add_wecom_org_sync_fields.py b/backend/alembic/versions/add_wecom_org_sync_fields.py new file mode 100644 index 00000000..e6fb27a7 --- /dev/null +++ b/backend/alembic/versions/add_wecom_org_sync_fields.py @@ -0,0 +1,62 @@ +"""Add provider-aware org sync fields. + +Revision ID: add_wecom_org_sync_fields +Revises: add_password_reset_tokens +""" + +from alembic import op + +revision = "add_wecom_org_sync_fields" +down_revision = "add_password_reset_tokens" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + """ + ALTER TABLE org_departments + ADD COLUMN IF NOT EXISTS wecom_id VARCHAR(100) + """ + ) + op.execute( + """ + ALTER TABLE org_departments + ADD COLUMN IF NOT EXISTS sync_provider VARCHAR(20) NOT NULL DEFAULT 'feishu' + """ + ) + op.execute( + """ + ALTER TABLE org_members + ADD COLUMN IF NOT EXISTS wecom_user_id VARCHAR(100) + """ + ) + op.execute( + """ + ALTER TABLE org_members + ADD COLUMN IF NOT EXISTS sync_provider VARCHAR(20) NOT NULL DEFAULT 'feishu' + """ + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_departments_wecom_id ON org_departments(wecom_id)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_departments_sync_provider ON org_departments(sync_provider)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_members_wecom_user_id ON org_members(wecom_user_id)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_members_sync_provider ON org_members(sync_provider)" + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_org_members_sync_provider") + op.execute("DROP INDEX IF EXISTS ix_org_members_wecom_user_id") + op.execute("DROP INDEX IF EXISTS ix_org_departments_sync_provider") + op.execute("DROP INDEX IF EXISTS ix_org_departments_wecom_id") + op.execute("ALTER TABLE org_members DROP COLUMN IF EXISTS sync_provider") + op.execute("ALTER TABLE org_members DROP COLUMN IF EXISTS wecom_user_id") + op.execute("ALTER TABLE org_departments DROP COLUMN IF EXISTS sync_provider") + op.execute("ALTER TABLE org_departments DROP COLUMN IF EXISTS wecom_id") diff --git a/backend/app/api/enterprise.py b/backend/app/api/enterprise.py index bc0edb12..7f39df3e 100644 --- a/backend/app/api/enterprise.py +++ b/backend/app/api/enterprise.py @@ -5,13 +5,18 @@ from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy import select, func +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession -from app.core.security import get_current_admin, get_current_user, require_role +from app.core.security import get_current_admin, get_current_user from app.database import get_db from app.models.agent import Agent from app.models.audit import ApprovalRequest, AuditLog, EnterpriseInfo +from app.models.invitation_code import InvitationCode from app.models.llm import LLMModel +from app.models.org import OrgDepartment, OrgMember +from app.models.system_settings import SystemSetting +from app.models.tenant import Tenant from app.models.user import User from app.schemas.schemas import ( ApprovalAction, ApprovalRequestOut, AuditLogOut, EnterpriseInfoOut, @@ -209,7 +214,7 @@ async def update_llm_model( await db.commit() await db.refresh(model) return LLMModelOut.model_validate(model) - except SQLAlchemyError as e: + except SQLAlchemyError: await db.rollback() raise HTTPException(status_code=500, detail="Failed to update model") @@ -344,7 +349,7 @@ async def get_enterprise_stats( select(func.count(Agent.id)).where(Agent.tenant_id == tid, Agent.status == "running") ) total_users = await db.execute( - select(func.count(User.id)).where(User.tenant_id == tid, User.is_active == True) + select(func.count(User.id)).where(User.tenant_id == tid, User.is_active) ) pending_approvals = await db.execute( select(func.count(ApprovalRequest.id)).where(ApprovalRequest.status == "pending") @@ -360,8 +365,6 @@ async def get_enterprise_stats( # ─── Tenant Quota Settings ────────────────────────────── -from app.models.tenant import Tenant - class TenantQuotaUpdate(BaseModel): default_message_limit: int | None = None @@ -452,8 +455,6 @@ async def update_tenant_quotas( # ─── System Settings ─────────────────────────────────── -from app.models.system_settings import SystemSetting - class SettingUpdate(BaseModel): value: dict @@ -483,6 +484,14 @@ async def get_system_setting( db: AsyncSession = Depends(get_db), ): """Get a system setting by key.""" + if key == "org_sync": + if current_user.role not in ("platform_admin", "org_admin"): + raise HTTPException(status_code=403, detail="Admin access required") + from app.services.org_sync_service import org_sync_service + + value = await org_sync_service.get_public_config(db) + return {"key": key, "value": value} + result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() if not setting: @@ -501,6 +510,12 @@ async def update_system_setting( # Platform-level settings (e.g. PUBLIC_BASE_URL) require platform_admin if key == "platform" and current_user.role != "platform_admin": raise HTTPException(status_code=403, detail="Only platform admin can modify platform settings") + if key == "org_sync": + from app.services.org_sync_service import org_sync_service + + value = await org_sync_service.save_config(db, data.value) + return {"key": key, "value": value} + result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() if setting: @@ -514,8 +529,6 @@ async def update_system_setting( # ─── Org Structure ────────────────────────────────────── -from app.models.org import OrgDepartment, OrgMember - @router.get("/org/departments") async def list_org_departments( @@ -524,7 +537,10 @@ async def list_org_departments( db: AsyncSession = Depends(get_db), ): """List all departments, optionally filtered by tenant.""" - query = select(OrgDepartment) + from app.services.org_sync_service import org_sync_service + + provider, _, _ = await org_sync_service.get_active_provider(db) + query = select(OrgDepartment).where(OrgDepartment.sync_provider == provider) if tenant_id: query = query.where(OrgDepartment.tenant_id == uuid.UUID(tenant_id)) result = await db.execute(query.order_by(OrgDepartment.name)) @@ -532,7 +548,9 @@ async def list_org_departments( return [ { "id": str(d.id), + "provider": d.sync_provider, "feishu_id": d.feishu_id, + "wecom_id": d.wecom_id, "name": d.name, "parent_id": str(d.parent_id) if d.parent_id else None, "path": d.path, @@ -551,7 +569,13 @@ async def list_org_members( db: AsyncSession = Depends(get_db), ): """List org members, optionally filtered by department, search, or tenant.""" - query = select(OrgMember).where(OrgMember.status == "active") + from app.services.org_sync_service import org_sync_service + + provider, _, _ = await org_sync_service.get_active_provider(db) + query = select(OrgMember).where( + OrgMember.status == "active", + OrgMember.sync_provider == provider, + ) if tenant_id: query = query.where(OrgMember.tenant_id == uuid.UUID(tenant_id)) if department_id: @@ -564,6 +588,7 @@ async def list_org_members( return [ { "id": str(m.id), + "provider": m.sync_provider, "name": m.name, "email": m.email, "title": m.title, @@ -578,7 +603,7 @@ async def list_org_members( async def trigger_org_sync( current_user: User = Depends(get_current_admin), ): - """Manually trigger org structure sync from Feishu.""" + """Manually trigger org structure sync from the active provider.""" from app.services.org_sync_service import org_sync_service result = await org_sync_service.full_sync() return result @@ -586,8 +611,6 @@ async def trigger_org_sync( # ─── Invitation Codes ─────────────────────────────────── -from app.models.invitation_code import InvitationCode - class InvitationCodeCreate(BaseModel): count: int = 1 # how many codes to generate diff --git a/backend/app/models/org.py b/backend/app/models/org.py index 0a5e6fa6..5d24da9d 100644 --- a/backend/app/models/org.py +++ b/backend/app/models/org.py @@ -1,4 +1,4 @@ -"""Organization structure models — departments and members synced from Feishu.""" +"""Organization structure models cached from external directory providers.""" import uuid from datetime import datetime @@ -8,15 +8,18 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import Base +from app.models.tenant import Tenant # noqa: F401 class OrgDepartment(Base): - """Department from Feishu org structure.""" + """Department synced from an external directory provider.""" __tablename__ = "org_departments" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) feishu_id: Mapped[str | None] = mapped_column(String(100), unique=True) + wecom_id: Mapped[str | None] = mapped_column(String(100), index=True) + sync_provider: Mapped[str] = mapped_column(String(20), default="feishu", server_default="feishu", nullable=False, index=True) name: Mapped[str] = mapped_column(String(200), nullable=False) parent_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("org_departments.id")) path: Mapped[str] = mapped_column(String(500), default="") @@ -28,13 +31,15 @@ class OrgDepartment(Base): class OrgMember(Base): - """Person from Feishu org structure.""" + """Person synced from an external directory provider.""" __tablename__ = "org_members" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) feishu_open_id: Mapped[str | None] = mapped_column(String(100), unique=True) feishu_user_id: Mapped[str | None] = mapped_column(String(100)) + wecom_user_id: Mapped[str | None] = mapped_column(String(100), index=True) + sync_provider: Mapped[str] = mapped_column(String(20), default="feishu", server_default="feishu", nullable=False, index=True) name: Mapped[str] = mapped_column(String(100), nullable=False) email: Mapped[str | None] = mapped_column(String(200)) avatar_url: Mapped[str | None] = mapped_column(String(500)) diff --git a/backend/app/services/org_sync_service.py b/backend/app/services/org_sync_service.py index e57c164f..b2b7d2ae 100644 --- a/backend/app/services/org_sync_service.py +++ b/backend/app/services/org_sync_service.py @@ -1,387 +1,782 @@ -"""Feishu organization structure sync service. - -Pulls departments and members from Feishu Contact API and upserts into local DB. -""" +"""Directory sync service for Feishu and WeCom organization structures.""" +import asyncio import uuid from datetime import datetime, timezone import httpx from loguru import logger -from sqlalchemy import select, delete +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.core.security import hash_password from app.database import async_session from app.models.org import OrgDepartment, OrgMember from app.models.system_settings import SystemSetting from app.models.user import User -from app.core.security import hash_password + +ORG_SYNC_KEY = "org_sync" +LEGACY_FEISHU_SYNC_KEY = "feishu_org_sync" FEISHU_APP_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal" FEISHU_DEPT_CHILDREN_URL = "https://open.feishu.cn/open-apis/contact/v3/departments" FEISHU_USERS_URL = "https://open.feishu.cn/open-apis/contact/v3/users/find_by_department" +WECOM_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" +WECOM_DEPARTMENTS_URL = "https://qyapi.weixin.qq.com/cgi-bin/department/list" +WECOM_USERS_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list" + class OrgSyncService: - """Sync org structure from Feishu into local database.""" + """Sync org structure from the configured directory provider into local DB.""" + + async def _get_stored_config(self, db: AsyncSession) -> dict: + """Return normalized org sync config including stored secrets.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) + setting = result.scalar_one_or_none() + if setting and isinstance(setting.value, dict): + return self._normalize_setting_value(setting.value) - async def _get_feishu_config(self, db: AsyncSession) -> dict | None: - """Load Feishu org sync config from system_settings.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "feishu_org_sync") + legacy_result = await db.execute( + select(SystemSetting).where(SystemSetting.key == LEGACY_FEISHU_SYNC_KEY) ) + legacy_setting = legacy_result.scalar_one_or_none() + if legacy_setting and isinstance(legacy_setting.value, dict): + return self._normalize_setting_value( + { + "provider": "feishu", + "feishu": legacy_setting.value, + "wecom": {}, + } + ) + + return self._normalize_setting_value({}) + + async def get_public_config(self, db: AsyncSession) -> dict: + """Return normalized org sync config for the UI without provider secrets.""" + return self._normalize_setting_value(await self._get_stored_config(db), include_secrets=False) + + def _normalize_setting_value(self, value: dict | None, include_secrets: bool = True) -> dict: + raw = value or {} + provider = raw.get("provider") + if provider not in {"feishu", "wecom"}: + provider = "feishu" + + feishu = raw.get("feishu") + wecom = raw.get("wecom") + + # Accept the old flat feishu shape transparently. + if not isinstance(feishu, dict) and any(k in raw for k in ("app_id", "app_secret", "last_synced_at")): + feishu = { + "app_id": raw.get("app_id", ""), + "app_secret": raw.get("app_secret", "") if include_secrets else "", + "last_synced_at": raw.get("last_synced_at"), + } + + return { + "provider": provider, + "feishu": { + "app_id": (feishu or {}).get("app_id", ""), + "app_secret": (feishu or {}).get("app_secret", "") if include_secrets else "", + "last_synced_at": (feishu or {}).get("last_synced_at"), + }, + "wecom": { + "corp_id": (wecom or {}).get("corp_id", ""), + "corp_secret": (wecom or {}).get("corp_secret", "") if include_secrets else "", + "last_synced_at": (wecom or {}).get("last_synced_at"), + }, + } + + async def save_config(self, db: AsyncSession, value: dict) -> dict: + """Persist normalized org sync config while preserving existing secrets.""" + existing = await self._get_stored_config(db) + normalized = self._normalize_setting_value(value) + + existing_feishu = existing.get("feishu", {}) + existing_wecom = existing.get("wecom", {}) + + if not normalized["feishu"].get("app_secret"): + normalized["feishu"]["app_secret"] = existing_feishu.get("app_secret", "") + if not normalized["wecom"].get("corp_secret"): + normalized["wecom"]["corp_secret"] = existing_wecom.get("corp_secret", "") + if not normalized["feishu"].get("last_synced_at"): + normalized["feishu"]["last_synced_at"] = existing_feishu.get("last_synced_at") + if not normalized["wecom"].get("last_synced_at"): + normalized["wecom"]["last_synced_at"] = existing_wecom.get("last_synced_at") + + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) + setting = result.scalar_one_or_none() + if setting: + setting.value = normalized + else: + setting = SystemSetting(key=ORG_SYNC_KEY, value=normalized) + db.add(setting) + await db.commit() + return normalized + + async def get_active_provider(self, db: AsyncSession) -> tuple[str, dict, dict]: + """Return provider name, provider config, and full normalized setting.""" + setting = await self.get_public_config(db) + provider = setting.get("provider", "feishu") + return provider, setting.get(provider, {}), setting + + async def _set_last_synced_at( + self, db: AsyncSession, setting_value: dict, provider: str, synced_at: str + ) -> None: + value = self._normalize_setting_value(setting_value) + value.setdefault(provider, {}) + value[provider]["last_synced_at"] = synced_at + + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) setting = result.scalar_one_or_none() - if not setting: - return None - return setting.value - - async def _get_app_token(self, app_id: str, app_secret: str) -> tuple[str, dict]: - """Get Feishu tenant_access_token. - Returns (token_string, raw_response_dict). - """ - async with httpx.AsyncClient() as client: - resp = await client.post(FEISHU_APP_TOKEN_URL, json={ - "app_id": app_id, - "app_secret": app_secret, - }) + if setting: + setting.value = value + else: + db.add(SystemSetting(key=ORG_SYNC_KEY, value=value)) + + async def _get_feishu_token(self, app_id: str, app_secret: str) -> tuple[str, dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.post( + FEISHU_APP_TOKEN_URL, + json={"app_id": app_id, "app_secret": app_secret}, + ) data = resp.json() - logger.info(f"[OrgSync] Token response: code={data.get('code')}, msg={data.get('msg')}") token = data.get("tenant_access_token") or data.get("app_access_token") or "" return token, data - async def _fetch_departments(self, token: str, parent_id: str = "0") -> list[dict]: - """Recursively fetch all departments from Feishu.""" - all_depts = [] + async def _fetch_feishu_departments(self, token: str, parent_id: str = "0") -> list[dict]: + all_depts: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - url = f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children" - params = { - "department_id_type": "open_department_id", - "page_size": "50", - "fetch_child": "true", - } - if page_token: - params["page_token"] = page_token - - logger.info(f"[OrgSync] GET {url} params={params}") - resp = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"}) - data = resp.json() - logger.info(f"[OrgSync] Dept response: code={data.get('code')}, msg={data.get('msg')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Dept API error: {data}") - break - - items = data.get("data", {}).get("items", []) - if items and not all_depts: # Print first raw item for debugging - logger.info(f"[OrgSync] RAW first dept item keys: {list(items[0].keys())}") - logger.info(f"[OrgSync] RAW first dept item: {items[0]}") - for item in items: - logger.info(f"[OrgSync] dept: {item.get('name')} (id={item.get('open_department_id')})") - all_depts.extend(items) - - if not data.get("data", {}).get("has_more"): - break - page_token = data["data"].get("page_token", "") - - # If fetch_child=true didn't work (no items), try without recursion + manual recurse + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children", + params={ + "department_id_type": "open_department_id", + "page_size": "50", + "fetch_child": "true", + **({"page_token": page_token} if page_token else {}), + }, + headers={"Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu department API failed: code={data.get('code')} msg={data.get('msg')}" + ) + all_depts.extend(data.get("data", {}).get("items", [])) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") if not all_depts: - logger.info("[OrgSync] fetch_child=true returned nothing, trying simple list...") - all_depts = await self._fetch_departments_simple(token, parent_id) - + return await self._fetch_feishu_departments_simple(token, parent_id) return all_depts - async def _fetch_departments_simple(self, token: str, parent_id: str = "0") -> list[dict]: - """Fetch departments without fetch_child, manually recurse.""" - all_depts = [] + async def _fetch_feishu_departments_simple( + self, token: str, parent_id: str = "0" + ) -> list[dict]: + all_depts: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - url = f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children" - params = {"department_id_type": "open_department_id", "page_size": "50"} - if page_token: - params["page_token"] = page_token - resp = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"}) - data = resp.json() - logger.info(f"[OrgSync] Simple dept response (parent={parent_id}): code={data.get('code')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Simple dept error: {data}") - break - - items = data.get("data", {}).get("items", []) - all_depts.extend(items) - if not data.get("data", {}).get("has_more"): - break - page_token = data["data"].get("page_token", "") + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children", + params={ + "department_id_type": "open_department_id", + "page_size": "50", + **({"page_token": page_token} if page_token else {}), + }, + headers={"Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu department API failed: code={data.get('code')} msg={data.get('msg')}" + ) + items = data.get("data", {}).get("items", []) + all_depts.extend(items) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") - # Recurse into children and inject parent_department_id for dept in list(all_depts): - dept_id = dept.get("open_department_id", "") - if dept_id: - children = await self._fetch_departments_simple(token, dept_id) - # Set parent_department_id explicitly (API may not return it) - for child in children: - if not child.get("parent_department_id"): - child["parent_department_id"] = dept_id - all_depts.extend(children) - + dept_id = dept.get("open_department_id") + if not dept_id: + continue + children = await self._fetch_feishu_departments_simple(token, dept_id) + for child in children: + child.setdefault("parent_department_id", dept_id) + all_depts.extend(children) return all_depts - async def _fetch_department_users(self, token: str, dept_id: str) -> list[dict]: - """Fetch all users in a department.""" - all_users = [] + async def _fetch_feishu_department_users(self, token: str, department_id: str) -> list[dict]: + users: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - params = { - "department_id_type": "open_department_id", - "department_id": dept_id, - "page_size": "50", - } - if page_token: - params["page_token"] = page_token + async with httpx.AsyncClient(timeout=20) as client: resp = await client.get( FEISHU_USERS_URL, - params=params, + params={ + "department_id_type": "open_department_id", + "department_id": department_id, + "page_size": "50", + **({"page_token": page_token} if page_token else {}), + }, headers={"Authorization": f"Bearer {token}"}, ) - data = resp.json() - logger.info(f"[OrgSync] Users response (dept={dept_id}): code={data.get('code')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Users API error: {data}") - break + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu user API failed: code={data.get('code')} msg={data.get('msg')}" + ) + users.extend(data.get("data", {}).get("items", [])) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") + return users + + async def _get_wecom_token(self, corp_id: str, corp_secret: str) -> tuple[str, dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + WECOM_TOKEN_URL, + params={ + "corpid": corp_id, + "corpsecret": corp_secret, + }, + ) + data = resp.json() + token = data.get("access_token", "") + return token, data - items = data.get("data", {}).get("items", []) - if items and not all_users: # Print first raw user for debugging - logger.info(f"[OrgSync] RAW first user item keys: {list(items[0].keys())}") - logger.info(f"[OrgSync] RAW first user item: {items[0]}") - all_users.extend(items) - if not data.get("data", {}).get("has_more"): + async def _fetch_wecom_departments(self, token: str) -> list[dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + WECOM_DEPARTMENTS_URL, + params={"access_token": token}, + ) + data = resp.json() + if data.get("errcode") != 0: + raise RuntimeError( + f"WeCom department API failed: errcode={data.get('errcode')} errmsg={data.get('errmsg')}" + ) + return data.get("department", []) + + async def _fetch_wecom_department_users( + self, + token: str, + department_id: str, + client: httpx.AsyncClient | None = None, + ) -> list[dict]: + owned_client = client is None + if owned_client: + client = httpx.AsyncClient(timeout=20) + assert client is not None + try: + for attempt in range(4): + resp = await client.get( + WECOM_USERS_URL, + params={ + "access_token": token, + "department_id": department_id, + "fetch_child": 0, + }, + ) + data = resp.json() + if data.get("errcode") != 45033: break - page_token = data["data"].get("page_token", "") - return all_users - - async def full_sync(self) -> dict: - """Run a full org sync from Feishu. Returns stats.""" - async with async_session() as db: - config = await self._get_feishu_config(db) - if not config: - return {"error": "未配置飞书组织架构同步信息"} - - app_id = config.get("app_id") - app_secret = config.get("app_secret") - if not app_id or not app_secret: - return {"error": "缺少 App ID 或 App Secret"} - - try: - token, token_resp = await self._get_app_token(app_id, app_secret) - if not token: - feishu_code = token_resp.get("code", "?") - feishu_msg = token_resp.get("msg", "unknown") - return {"error": f"获取飞书 token 失败 (code={feishu_code}: {feishu_msg})"} - logger.info(f"[OrgSync] Got token: {token[:20]}...") - except Exception as e: - return {"error": f"连接飞书失败: {str(e)[:100]}"} + await asyncio.sleep(0.5 * (attempt + 1)) + finally: + if owned_client: + await client.aclose() + data = resp.json() + if data.get("errcode") != 0: + raise RuntimeError( + f"WeCom user API failed: errcode={data.get('errcode')} errmsg={data.get('errmsg')}" + ) + return data.get("userlist", []) + + async def _fetch_wecom_users_by_department( + self, + token: str, + department_ids: list[str], + concurrency: int = 4, + ) -> dict[str, list[dict]]: + semaphore = asyncio.Semaphore(concurrency) + results: dict[str, list[dict]] = {} + + async with httpx.AsyncClient(timeout=20) as client: + async def fetch_one(department_id: str) -> None: + async with semaphore: + results[department_id] = await self._fetch_wecom_department_users( + token, + department_id, + client=client, + ) - now = datetime.now(timezone.utc) - dept_count = 0 - member_count = 0 - user_count = 0 + await asyncio.gather(*(fetch_one(department_id) for department_id in department_ids)) + return results - # Resolve tenant_id from the first admin user - admin_result = await db.execute( - select(User).where(User.role == "platform_admin").limit(1) + async def _resolve_tenant_id(self, db: AsyncSession) -> uuid.UUID | None: + admin_result = await db.execute( + select(User).where(User.role == "platform_admin").limit(1) + ) + admin_user = admin_result.scalar_one_or_none() + return admin_user.tenant_id if admin_user else None + + async def _upsert_feishu_departments( + self, db: AsyncSession, departments: list[dict], tenant_id: uuid.UUID | None, now: datetime + ) -> tuple[int, dict[str, OrgDepartment]]: + provider = "feishu" + count = 0 + for item in departments: + external_id = item.get("open_department_id") + if not external_id: + continue + result = await db.execute( + select(OrgDepartment).where( + OrgDepartment.sync_provider == provider, + OrgDepartment.feishu_id == external_id, + ) ) - admin_user = admin_result.scalar_one_or_none() - tenant_id = admin_user.tenant_id if admin_user else None - - # --- Sync departments --- - try: - depts = await self._fetch_departments(token, "0") - logger.info(f"[OrgSync] Total departments fetched: {len(depts)}") - - for d in depts: - feishu_id = d.get("open_department_id", "") - if not feishu_id: - continue + dept = result.scalar_one_or_none() + if dept: + dept.name = item.get("name", dept.name) + dept.member_count = item.get("member_count", 0) + dept.synced_at = now + dept.tenant_id = dept.tenant_id or tenant_id + else: + dept = OrgDepartment( + feishu_id=external_id, + name=item.get("name", ""), + member_count=item.get("member_count", 0), + path=item.get("name", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(dept) + count += 1 + + await db.flush() + result = await db.execute(select(OrgDepartment).where(OrgDepartment.sync_provider == provider)) + dept_map = {dept.feishu_id: dept for dept in result.scalars().all() if dept.feishu_id} + for item in departments: + external_id = item.get("open_department_id") + parent_external_id = item.get("parent_department_id") + if external_id in dept_map and parent_external_id and parent_external_id in dept_map: + dept_map[external_id].parent_id = dept_map[parent_external_id].id + self._rebuild_department_paths(dept_map) + await db.flush() + return count, dept_map + + async def _upsert_wecom_departments( + self, db: AsyncSession, departments: list[dict], tenant_id: uuid.UUID | None, now: datetime + ) -> tuple[int, dict[str, OrgDepartment]]: + provider = "wecom" + existing_result = await db.execute( + select(OrgDepartment).where(OrgDepartment.sync_provider == provider) + ) + existing_departments = { + dept.wecom_id: dept + for dept in existing_result.scalars().all() + if dept.wecom_id + } + count = 0 + for item in departments: + external_id = str(item.get("id", "")).strip() + if not external_id: + continue + dept = existing_departments.get(external_id) + if dept: + dept.name = item.get("name", dept.name) + dept.member_count = item.get("member_num", dept.member_count) + dept.synced_at = now + dept.tenant_id = dept.tenant_id or tenant_id + else: + dept = OrgDepartment( + wecom_id=external_id, + name=item.get("name", ""), + member_count=item.get("member_num", 0), + path=item.get("name", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(dept) + existing_departments[external_id] = dept + count += 1 + + await db.flush() + dept_map = { + dept.wecom_id: dept + for dept in existing_departments.values() + if dept.wecom_id + } + for item in departments: + external_id = str(item.get("id", "")).strip() + parent_external_id = str(item.get("parentid", "")).strip() + if external_id in dept_map and parent_external_id and parent_external_id in dept_map: + dept_map[external_id].parent_id = dept_map[parent_external_id].id + self._rebuild_department_paths(dept_map) + await db.flush() + return count, dept_map + + def _rebuild_department_paths(self, dept_map: dict[str, OrgDepartment]) -> None: + by_db_id = {dept.id: dept for dept in dept_map.values()} + + def build_path(dept: OrgDepartment) -> str: + names = [dept.name] + current = dept + seen: set[uuid.UUID] = set() + while current.parent_id and current.parent_id not in seen and current.parent_id in by_db_id: + seen.add(current.parent_id) + current = by_db_id[current.parent_id] + names.append(current.name) + names.reverse() + return " / ".join([name for name in names if name]) + + for dept in dept_map.values(): + dept.path = build_path(dept) + + async def _upsert_feishu_members( + self, + db: AsyncSession, + token: str, + tenant_id: uuid.UUID | None, + departments: dict[str, OrgDepartment], + now: datetime, + ) -> tuple[int, int]: + provider = "feishu" + member_count = 0 + user_count = 0 + for external_dept_id, dept in departments.items(): + users = await self._fetch_feishu_department_users(token, external_dept_id) + dept.member_count = len(users) + for item in users: + open_id = item.get("open_id", "") + user_id = item.get("user_id", "") + if not open_id and not user_id: + continue + result = None + if user_id: result = await db.execute( - select(OrgDepartment).where(OrgDepartment.feishu_id == feishu_id) + select(OrgMember).where( + OrgMember.sync_provider == provider, + OrgMember.feishu_user_id == user_id, + ) ) - dept = result.scalar_one_or_none() - if dept: - dept.name = d.get("name", dept.name) - dept.member_count = d.get("member_count", 0) - dept.synced_at = now - else: - dept = OrgDepartment( - feishu_id=feishu_id, - name=d.get("name", ""), - member_count=d.get("member_count", 0), - path=d.get("name", ""), - tenant_id=tenant_id, - synced_at=now, + member = result.scalar_one_or_none() + else: + member = None + if not member and open_id: + result = await db.execute( + select(OrgMember).where( + OrgMember.sync_provider == provider, + OrgMember.feishu_open_id == open_id, ) - db.add(dept) - dept_count += 1 - - await db.flush() - - # Build feishu_id -> db_id + parent mapping - dept_map = {} - all_result = await db.execute(select(OrgDepartment)) - for dept in all_result.scalars().all(): - if dept.feishu_id: - dept_map[dept.feishu_id] = dept - - # Set parent_id from Feishu parent info - for d in depts: - feishu_id = d.get("open_department_id", "") - parent_feishu_id = d.get("parent_department_id", "") - if feishu_id in dept_map and parent_feishu_id and parent_feishu_id in dept_map: - dept_map[feishu_id].parent_id = dept_map[parent_feishu_id].id - - await db.flush() - except Exception as e: - import traceback - traceback.print_exc() - logger.error(f"[OrgSync] Department sync failed: {e}") - return {"error": f"部门同步失败: {str(e)[:200]}"} - - # --- Sync members --- - try: - all_dept_result = await db.execute(select(OrgDepartment)) - departments = all_dept_result.scalars().all() - - for dept in departments: - if not dept.feishu_id: - continue - users = await self._fetch_department_users(token, dept.feishu_id) - if users: - logger.info(f"[OrgSync] dept={dept.name} got {len(users)} users, first user keys={list(users[0].keys())}, open_id={users[0].get('open_id','')!r}, user_id={users[0].get('user_id','')!r}") - for u in users: - open_id = u.get("open_id", "") - user_id = u.get("user_id", "") - if not open_id and not user_id: - logger.warning(f"[OrgSync] Skipping user with no open_id and no user_id: {u.get('name','?')}") - continue - if not user_id: - logger.warning(f"[OrgSync] User {u.get('name','?')} has no user_id — App may lack contact:user.employee_id:readonly permission") - - # Find existing member: prefer user_id (tenant-stable), fallback open_id - member = None - if user_id: - result = await db.execute( - select(OrgMember).where(OrgMember.feishu_user_id == user_id) - ) - member = result.scalar_one_or_none() - if not member and open_id: - result = await db.execute( - select(OrgMember).where(OrgMember.feishu_open_id == open_id) - ) - member = result.scalar_one_or_none() - - if member: - member.name = u.get("name", member.name) - member.email = u.get("email", member.email) - member.avatar_url = u.get("avatar", {}).get("avatar_origin", member.avatar_url) - member.title = (u.get("job_title") or u.get("description") or member.title or "")[:200] - member.department_id = dept.id - member.department_path = dept.path or dept.name - member.phone = u.get("mobile", member.phone) - # Always update IDs to latest values - if open_id: - member.feishu_open_id = open_id - if user_id: - member.feishu_user_id = user_id - member.synced_at = now - else: - member = OrgMember( - feishu_open_id=open_id or None, - feishu_user_id=user_id or None, - name=u.get("name", ""), - email=u.get("email", ""), - avatar_url=u.get("avatar", {}).get("avatar_origin", ""), - title=(u.get("job_title") or u.get("description") or "")[:200], - department_id=dept.id, - department_path=dept.path or dept.name, - phone=u.get("mobile", ""), - tenant_id=tenant_id, - synced_at=now, - ) - db.add(member) - # Ensure tenant_id is set on existing members - if member.tenant_id is None and tenant_id: - member.tenant_id = tenant_id - member_count += 1 - - # --- Auto-create/update platform User --- - # Prefer user_id (tenant-stable), then open_id, then email - platform_user = None - if user_id: - pu_result = await db.execute( - select(User).where(User.feishu_user_id == user_id) - ) - platform_user = pu_result.scalar_one_or_none() - if not platform_user and open_id: - pu_result = await db.execute( - select(User).where(User.feishu_open_id == open_id) - ) - platform_user = pu_result.scalar_one_or_none() - # Fallback: match by real email (most reliable cross-app identifier) - member_email = u.get("email", "") - if not platform_user and member_email and "@" in member_email and not member_email.endswith("@feishu.local"): - pu_result = await db.execute( - select(User).where(User.email == member_email) - ) - platform_user = pu_result.scalar_one_or_none() - - member_name = u.get("name", "") - if platform_user: - # Update existing user info - platform_user.display_name = member_name or platform_user.display_name - # Always update feishu IDs to track the current app's values - if open_id: - platform_user.feishu_open_id = open_id - if user_id: - platform_user.feishu_user_id = user_id - if tenant_id and not platform_user.tenant_id: - platform_user.tenant_id = tenant_id - else: - # Create new user — prefer user_id in username - username_base = f"feishu_{user_id or (open_id[:16] if open_id else uuid.uuid4().hex[:8])}" - email = member_email or f"{username_base}@feishu.local" - platform_user = User( - username=username_base, - email=email, - password_hash=hash_password(uuid.uuid4().hex), - display_name=member_name, - role="member", - feishu_open_id=open_id or None, - feishu_user_id=user_id or None, - tenant_id=tenant_id, - ) - db.add(platform_user) - user_count += 1 - except Exception as e: - import traceback - traceback.print_exc() - logger.error(f"[OrgSync] Member sync failed: {e}") - return {"error": f"成员同步失败: {str(e)[:200]}", "departments": dept_count} - - # Update last sync time - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "feishu_org_sync") + ) + member = result.scalar_one_or_none() + + avatar = item.get("avatar", {}) if isinstance(item.get("avatar"), dict) else {} + member_email = item.get("email", "") + if member: + member.name = item.get("name", member.name) + member.email = member_email or member.email + member.avatar_url = avatar.get("avatar_origin", member.avatar_url) + member.title = (item.get("job_title") or item.get("description") or member.title or "")[:200] + member.department_id = dept.id + member.department_path = dept.path or dept.name + member.phone = item.get("mobile", member.phone) + member.synced_at = now + if open_id: + member.feishu_open_id = open_id + if user_id: + member.feishu_user_id = user_id + else: + member = OrgMember( + feishu_open_id=open_id or None, + feishu_user_id=user_id or None, + name=item.get("name", ""), + email=member_email or None, + avatar_url=avatar.get("avatar_origin", ""), + title=(item.get("job_title") or item.get("description") or "")[:200], + department_id=dept.id, + department_path=dept.path or dept.name, + phone=item.get("mobile", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(member) + if member.tenant_id is None and tenant_id: + member.tenant_id = tenant_id + member_count += 1 + + created = await self._upsert_platform_user_for_feishu(db, item, tenant_id, user_id, open_id) + user_count += int(created) + return member_count, user_count + + async def _upsert_wecom_members( + self, + db: AsyncSession, + token: str, + tenant_id: uuid.UUID | None, + departments: dict[str, OrgDepartment], + now: datetime, + ) -> tuple[int, int]: + provider = "wecom" + users_by_department = await self._fetch_wecom_users_by_department( + token, + list(departments.keys()), + ) + + existing_members_result = await db.execute( + select(OrgMember).where(OrgMember.sync_provider == provider) + ) + existing_members = { + member.wecom_user_id: member + for member in existing_members_result.scalars().all() + if member.wecom_user_id + } + + all_items = [ + item + for users in users_by_department.values() + for item in users + if str(item.get("userid", "")).strip() + ] + emails = { + str(item.get("email", "")).strip() + for item in all_items + if "@" in str(item.get("email", "")).strip() + } + usernames = { + f"wecom_{str(item.get('userid', '')).strip()}" + for item in all_items + if str(item.get("userid", "")).strip() + } + + existing_users_by_email: dict[str, User] = {} + if emails: + users_result = await db.execute(select(User).where(User.email.in_(sorted(emails)))) + existing_users_by_email = { + user.email: user + for user in users_result.scalars().all() + if user.email + } + + existing_users_by_username: dict[str, User] = {} + if usernames: + users_result = await db.execute(select(User).where(User.username.in_(sorted(usernames)))) + existing_users_by_username = { + user.username: user + for user in users_result.scalars().all() + } + + member_count = 0 + user_count = 0 + for external_dept_id, dept in departments.items(): + users = users_by_department.get(external_dept_id, []) + dept.member_count = len(users) + for item in users: + user_id = str(item.get("userid", "")).strip() + if not user_id: + continue + member = existing_members.get(user_id) + if member: + member.name = item.get("name", member.name) + member.email = item.get("email") or member.email + member.avatar_url = item.get("avatar") or member.avatar_url + member.title = (item.get("position") or member.title or "")[:200] + member.department_id = dept.id + member.department_path = dept.path or dept.name + member.phone = item.get("mobile") or member.phone + member.synced_at = now + else: + member = OrgMember( + wecom_user_id=user_id, + name=item.get("name", ""), + email=item.get("email") or None, + avatar_url=item.get("avatar") or "", + title=(item.get("position") or "")[:200], + department_id=dept.id, + department_path=dept.path or dept.name, + phone=item.get("mobile") or "", + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(member) + existing_members[user_id] = member + if member.tenant_id is None and tenant_id: + member.tenant_id = tenant_id + member_count += 1 + + created = self._upsert_platform_user_for_wecom( + db, + item, + tenant_id, + user_id, + existing_users_by_email, + existing_users_by_username, + ) + user_count += int(created) + return member_count, user_count + + async def _upsert_platform_user_for_feishu( + self, + db: AsyncSession, + item: dict, + tenant_id: uuid.UUID | None, + user_id: str, + open_id: str, + ) -> bool: + platform_user = None + if user_id: + result = await db.execute(select(User).where(User.feishu_user_id == user_id)) + platform_user = result.scalar_one_or_none() + if not platform_user and open_id: + result = await db.execute(select(User).where(User.feishu_open_id == open_id)) + platform_user = result.scalar_one_or_none() + email = item.get("email", "") + if not platform_user and email and "@" in email and not email.endswith("@feishu.local"): + result = await db.execute(select(User).where(User.email == email)) + platform_user = result.scalar_one_or_none() + + if platform_user: + platform_user.display_name = item.get("name", platform_user.display_name) + if open_id: + platform_user.feishu_open_id = open_id + if user_id: + platform_user.feishu_user_id = user_id + if tenant_id and not platform_user.tenant_id: + platform_user.tenant_id = tenant_id + return False + + username_base = f"feishu_{user_id or (open_id[:16] if open_id else uuid.uuid4().hex[:8])}" + db.add( + User( + username=username_base, + email=email or f"{username_base}@feishu.local", + password_hash=hash_password(uuid.uuid4().hex), + display_name=item.get("name", username_base), + role="member", + feishu_open_id=open_id or None, + feishu_user_id=user_id or None, + tenant_id=tenant_id, ) - setting = result.scalar_one_or_none() - if setting: - setting.value = {**setting.value, "last_synced_at": now.isoformat()} + ) + return True + + def _upsert_platform_user_for_wecom( + self, + db: AsyncSession, + item: dict, + tenant_id: uuid.UUID | None, + user_id: str, + existing_users_by_email: dict[str, User], + existing_users_by_username: dict[str, User], + ) -> bool: + email = str(item.get("email", "")).strip() + platform_user = None + if email and "@" in email: + platform_user = existing_users_by_email.get(email) + username_base = f"wecom_{user_id}" + if not platform_user: + platform_user = existing_users_by_username.get(username_base) + + if platform_user: + platform_user.display_name = item.get("name", platform_user.display_name) + platform_user.title = item.get("position") or platform_user.title + if tenant_id and not platform_user.tenant_id: + platform_user.tenant_id = tenant_id + return False + + new_user = User( + username=username_base, + email=email or f"{username_base}@wecom.local", + password_hash=hash_password(uuid.uuid4().hex), + display_name=item.get("name", username_base), + title=item.get("position") or None, + role="member", + tenant_id=tenant_id, + ) + db.add(new_user) + existing_users_by_username[new_user.username] = new_user + if new_user.email: + existing_users_by_email[new_user.email] = new_user + return True + + async def full_sync(self) -> dict: + """Run a full sync for the currently configured provider.""" + async with async_session() as db: + provider, provider_config, setting = await self.get_active_provider(db) + now = datetime.now(timezone.utc) + tenant_id = await self._resolve_tenant_id(db) + + if provider == "feishu": + app_id = provider_config.get("app_id", "").strip() + app_secret = provider_config.get("app_secret", "").strip() + if not app_id or not app_secret: + return {"error": "缺少飞书 App ID 或 App Secret", "provider": provider} + try: + token, token_resp = await self._get_feishu_token(app_id, app_secret) + except Exception as exc: + return {"error": f"连接飞书失败: {str(exc)[:100]}", "provider": provider} + if not token: + return { + "error": f"获取飞书 token 失败 (code={token_resp.get('code')}: {token_resp.get('msg')})", + "provider": provider, + } + + try: + raw_departments = await self._fetch_feishu_departments(token) + department_count, dept_map = await self._upsert_feishu_departments( + db, raw_departments, tenant_id, now + ) + member_count, user_count = await self._upsert_feishu_members( + db, token, tenant_id, dept_map, now + ) + except Exception as exc: + logger.exception("[OrgSync] Feishu sync failed") + return {"error": str(exc)[:200], "provider": provider} + else: + corp_id = provider_config.get("corp_id", "").strip() + corp_secret = provider_config.get("corp_secret", "").strip() + if not corp_id or not corp_secret: + return {"error": "缺少企微 Corp ID 或 Corp Secret", "provider": provider} + try: + token, token_resp = await self._get_wecom_token(corp_id, corp_secret) + except Exception as exc: + return {"error": f"连接企微失败: {str(exc)[:100]}", "provider": provider} + if not token: + return { + "error": f"获取企微 token 失败 (errcode={token_resp.get('errcode')}: {token_resp.get('errmsg')})", + "provider": provider, + } + + try: + raw_departments = await self._fetch_wecom_departments(token) + department_count, dept_map = await self._upsert_wecom_departments( + db, raw_departments, tenant_id, now + ) + member_count, user_count = await self._upsert_wecom_members( + db, token, tenant_id, dept_map, now + ) + except Exception as exc: + logger.exception("[OrgSync] WeCom sync failed") + return {"error": str(exc)[:200], "provider": provider} + await self._set_last_synced_at(db, setting, provider, now.isoformat()) await db.commit() - stats = {"departments": dept_count, "members": member_count, "users_created": user_count, "synced_at": now.isoformat()} + stats = { + "provider": provider, + "departments": department_count, + "members": member_count, + "users_created": user_count, + "synced_at": now.isoformat(), + } logger.info(f"[OrgSync] Complete: {stats}") return stats diff --git a/backend/tests/test_password_reset_and_notifications.py b/backend/tests/test_password_reset_and_notifications.py index ddc457ad..972c63e5 100644 --- a/backend/tests/test_password_reset_and_notifications.py +++ b/backend/tests/test_password_reset_and_notifications.py @@ -7,12 +7,14 @@ from starlette.background import BackgroundTasks from app.api import auth as auth_api +from app.api import enterprise as enterprise_api from app.api.notification import BroadcastRequest, broadcast_notification from app.core.security import verify_password +from app.models.org import OrgDepartment, OrgMember from app.models.password_reset_token import PasswordResetToken from app.models.user import User from app.schemas.schemas import ForgotPasswordRequest, ResetPasswordRequest -from app.services import password_reset_service +from app.services import org_sync_service, password_reset_service from app.services.system_email_service import SystemEmailConfigError @@ -278,8 +280,159 @@ async def fake_send_notification(*_args, **kwargs): assert response["ok"] is True assert response["emails_sent"] == 1 assert db.committed is True - assert len(notifications) == 1 - assert len(background_tasks.tasks) == 1 + + +@pytest.mark.asyncio +async def test_org_sync_public_config_falls_back_to_legacy_feishu_setting(): + legacy_setting = SimpleNamespace( + value={ + "app_id": "cli_123", + "app_secret": "legacy-secret", + "last_synced_at": "2026-03-24T10:00:00+00:00", + } + ) + db = RecordingDB([DummyResult(None), DummyResult(legacy_setting)]) + + value = await org_sync_service.org_sync_service.get_public_config(db) + + assert value["provider"] == "feishu" + assert value["feishu"]["app_id"] == "cli_123" + assert value["feishu"]["app_secret"] == "" + assert value["feishu"]["last_synced_at"] == "2026-03-24T10:00:00+00:00" + assert value["wecom"]["corp_id"] == "" + + +@pytest.mark.asyncio +async def test_org_sync_public_config_redacts_provider_secrets(): + stored_setting = SimpleNamespace( + value={ + "provider": "wecom", + "feishu": {"app_id": "cli_123", "app_secret": "keep-feishu", "last_synced_at": None}, + "wecom": {"corp_id": "ww123", "corp_secret": "keep-wecom", "last_synced_at": None}, + } + ) + db = RecordingDB([DummyResult(stored_setting)]) + + value = await org_sync_service.org_sync_service.get_public_config(db) + + assert value["feishu"]["app_secret"] == "" + assert value["wecom"]["corp_secret"] == "" + + +@pytest.mark.asyncio +async def test_org_sync_save_config_preserves_existing_provider_secrets(): + existing_setting = SimpleNamespace( + key="org_sync", + value={ + "provider": "wecom", + "feishu": {"app_id": "cli_123", "app_secret": "keep-feishu", "last_synced_at": None}, + "wecom": {"corp_id": "ww123", "corp_secret": "keep-wecom", "last_synced_at": None}, + }, + ) + db = RecordingDB([DummyResult(existing_setting), DummyResult(existing_setting)]) + + saved = await org_sync_service.org_sync_service.save_config( + db, + { + "provider": "wecom", + "feishu": {"app_id": "cli_456", "app_secret": ""}, + "wecom": {"corp_id": "ww456", "corp_secret": ""}, + }, + ) + + assert saved["feishu"]["app_secret"] == "keep-feishu" + assert saved["wecom"]["corp_secret"] == "keep-wecom" + assert existing_setting.value["feishu"]["app_id"] == "cli_456" + assert existing_setting.value["wecom"]["corp_id"] == "ww456" + assert db.committed is True + + +@pytest.mark.asyncio +async def test_get_org_sync_setting_requires_admin(): + with pytest.raises(HTTPException) as excinfo: + await enterprise_api.get_system_setting( + key="org_sync", + current_user=make_user(role="member"), + db=RecordingDB(), + ) + + assert excinfo.value.status_code == 403 + assert excinfo.value.detail == "Admin access required" + + +@pytest.mark.asyncio +async def test_list_org_departments_filters_by_active_provider(monkeypatch): + wecom_dept = OrgDepartment( + wecom_id="2", + sync_provider="wecom", + name="Engineering", + member_count=3, + ) + db = RecordingDB([DummyResult(values=[wecom_dept])]) + + async def fake_get_active_provider(_db): + return "wecom", {"corp_id": "ww123"}, {"provider": "wecom"} + + monkeypatch.setattr(org_sync_service.org_sync_service, "get_active_provider", fake_get_active_provider) + + rows = await enterprise_api.list_org_departments( + tenant_id=None, + current_user=make_user(), + db=db, + ) + + assert rows == [ + { + "id": str(wecom_dept.id), + "provider": "wecom", + "feishu_id": None, + "wecom_id": "2", + "name": "Engineering", + "parent_id": None, + "path": None, + "member_count": 3, + } + ] + assert "sync_provider" in str(db.executed[0]) + + +@pytest.mark.asyncio +async def test_list_org_members_filters_by_active_provider(monkeypatch): + wecom_member = OrgMember( + wecom_user_id="zhangsan", + sync_provider="wecom", + name="张三", + email="zhangsan@example.com", + title="Engineer", + department_path="Root / Engineering", + ) + db = RecordingDB([DummyResult(values=[wecom_member])]) + + async def fake_get_active_provider(_db): + return "wecom", {"corp_id": "ww123"}, {"provider": "wecom"} + + monkeypatch.setattr(org_sync_service.org_sync_service, "get_active_provider", fake_get_active_provider) + + rows = await enterprise_api.list_org_members( + department_id=None, + search=None, + tenant_id=None, + current_user=make_user(), + db=db, + ) + + assert rows == [ + { + "id": str(wecom_member.id), + "provider": "wecom", + "name": "张三", + "email": "zhangsan@example.com", + "title": "Engineer", + "department_path": "Root / Engineering", + "avatar_url": None, + } + ] + assert "sync_provider" in str(db.executed[0]) @pytest.mark.asyncio diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index ece9aa2e..af15c99f 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -752,16 +752,34 @@ "edit": "Edit" }, "org": { + "directorySync": "Directory Sync Configuration", "feishuSync": "Feishu Directory Sync", + "feishuDescription": "Use the Feishu contact API to sync departments and members into the platform.", + "wecomDescription": "Use the WeCom contact API to sync departments and members into the platform.", + "provider": "Sync Provider", + "providerFeishu": "Feishu", + "providerWecom": "WeCom", "appId": "App ID", "appSecret": "App Secret", + "corpId": "Corp ID", + "corpSecret": "Corp Secret", "syncing": "Syncing...", "syncNow": "Sync Now", + "showConfig": "Show config", + "hideConfig": "Hide config", "syncComplete": "Sync complete: {{departments}} departments, {{members}} members", + "lastSync": "Last sync: {{time}}", "orgBrowser": "Organization Browser", "allDepartments": "All", + "selectedDepartment": "Selected department: {{name}}", "searchMembers": "Search members...", "member": "Member", + "memberDetails": "Member Details", + "selectMemberHint": "Select a member to view their details here.", + "departmentLabel": "Department", + "emailLabel": "Email", + "titleLabel": "Title", + "noTitle": "No title", "admin": "Admin", "noMembers": "No members" }, diff --git a/frontend/src/i18n/zh.json b/frontend/src/i18n/zh.json index 9e8e2c77..142e45a8 100644 --- a/frontend/src/i18n/zh.json +++ b/frontend/src/i18n/zh.json @@ -822,16 +822,34 @@ "edit": "编辑" }, "org": { + "directorySync": "组织通讯录同步配置", "feishuSync": "飞书通讯录同步配置", + "feishuDescription": "使用飞书开放平台通讯录 API 将部门和成员同步到平台。", + "wecomDescription": "使用企业微信通讯录 API 将部门和成员同步到平台。", + "provider": "同步来源", + "providerFeishu": "飞书", + "providerWecom": "企业微信", "appId": "App ID", "appSecret": "App Secret", + "corpId": "Corp ID", + "corpSecret": "Corp Secret", "syncing": "同步中...", "syncNow": "同步", + "showConfig": "展开配置", + "hideConfig": "收起配置", "syncComplete": "同步完成:{{departments}} 个部门,{{members}} 个成员", + "lastSync": "上次同步:{{time}}", "orgBrowser": "组织架构浏览", "allDepartments": "全部", + "selectedDepartment": "当前部门:{{name}}", "searchMembers": "搜索成员...", "member": "成员", + "memberDetails": "成员详情", + "selectMemberHint": "点击左侧成员查看详情卡片。", + "departmentLabel": "部门", + "emailLabel": "邮箱", + "titleLabel": "职位", + "noTitle": "未设置职位", "admin": "管理员", "noMembers": "暂无成员" }, diff --git a/frontend/src/pages/EnterpriseSettings.tsx b/frontend/src/pages/EnterpriseSettings.tsx index 64300115..a4c6a7ce 100644 --- a/frontend/src/pages/EnterpriseSettings.tsx +++ b/frontend/src/pages/EnterpriseSettings.tsx @@ -59,35 +59,96 @@ const FALLBACK_LLM_PROVIDERS: LLMProviderSpec[] = [ // ─── Department Tree ─────────────────────────────── -function DeptTree({ departments, parentId, selectedDept, onSelect, level }: { - departments: any[]; parentId: string | null; selectedDept: string | null; - onSelect: (id: string | null) => void; level: number; +interface OrgDepartmentNode { + id: string; + provider: string; + feishu_id?: string | null; + wecom_id?: string | null; + name: string; + parent_id: string | null; + path: string; + member_count: number; +} + +interface OrgMemberEntry { + id: string; + provider: string; + name: string; + email: string; + title: string; + department_path: string; + avatar_url?: string | null; +} + +function DeptTree({ childrenByParent, parentId, selectedDept, expandedDeptIds, onSelect, onToggle, level }: { + childrenByParent: Map; + parentId: string | null; + selectedDept: string | null; + expandedDeptIds: Record; + onSelect: (id: string | null) => void; + onToggle: (id: string) => void; + level: number; }) { - const children = departments.filter((d: any) => - parentId === null ? !d.parent_id : d.parent_id === parentId - ); + const children = childrenByParent.get(parentId) || []; if (children.length === 0) return null; return ( <> - {children.map((d: any) => ( + {children.map((d) => { + const hasChildren = (childrenByParent.get(d.id) || []).length > 0; + const isExpanded = !!expandedDeptIds[d.id]; + return (
onSelect(d.id)} > - - {departments.some((c: any) => c.parent_id === d.id) ? '▸' : '·'} - - {d.name} - {d.member_count > 0 && ({d.member_count})} + + {d.name} + {d.member_count > 0 && ({d.member_count})}
- + {(!hasChildren || isExpanded) && ( + + )}
- ))} + ); + })} ); } @@ -96,29 +157,45 @@ function DeptTree({ departments, parentId, selectedDept, onSelect, level }: { function OrgTab() { const { t } = useTranslation(); const qc = useQueryClient(); - const [syncForm, setSyncForm] = useState({ app_id: '', app_secret: '' }); + const [syncForm, setSyncForm] = useState({ + provider: 'feishu', + feishu: { app_id: '', app_secret: '' }, + wecom: { corp_id: '', corp_secret: '' }, + }); const [syncing, setSyncing] = useState(false); const [syncResult, setSyncResult] = useState(null); const [memberSearch, setMemberSearch] = useState(''); const [selectedDept, setSelectedDept] = useState(null); + const [selectedMemberId, setSelectedMemberId] = useState(null); + const [expandedDeptIds, setExpandedDeptIds] = useState>({}); + const [showSyncConfig, setShowSyncConfig] = useState(true); const { data: config } = useQuery({ - queryKey: ['system-settings', 'feishu_org_sync'], - queryFn: () => fetchJson('/enterprise/system-settings/feishu_org_sync'), + queryKey: ['system-settings', 'org_sync'], + queryFn: () => fetchJson('/enterprise/system-settings/org_sync'), }); useEffect(() => { - if (config?.value?.app_id) { - setSyncForm({ app_id: config.value.app_id, app_secret: '' }); - } + if (!config?.value) return; + setSyncForm({ + provider: config.value.provider || 'feishu', + feishu: { + app_id: config.value.feishu?.app_id || '', + app_secret: '', + }, + wecom: { + corp_id: config.value.wecom?.corp_id || '', + corp_secret: '', + }, + }); }, [config]); const currentTenantId = localStorage.getItem('current_tenant_id') || ''; - const { data: departments = [] } = useQuery({ + const { data: departments = [] } = useQuery({ queryKey: ['org-departments', currentTenantId], queryFn: () => fetchJson(`/enterprise/org/departments${currentTenantId ? `?tenant_id=${currentTenantId}` : ''}`), }); - const { data: members = [] } = useQuery({ + const { data: members = [] } = useQuery({ queryKey: ['org-members', selectedDept, memberSearch, currentTenantId], queryFn: () => { const params = new URLSearchParams(); @@ -129,54 +206,183 @@ function OrgTab() { }, }); + const childrenByParent = useMemo(() => { + const tree = new Map(); + for (const department of departments) { + const key = department.parent_id || null; + const siblings = tree.get(key) || []; + siblings.push(department); + tree.set(key, siblings); + } + for (const siblings of tree.values()) { + siblings.sort((a, b) => a.name.localeCompare(b.name)); + } + return tree; + }, [departments]); + + const selectedDepartment = useMemo( + () => departments.find((department) => department.id === selectedDept) || null, + [departments, selectedDept], + ); + + const selectedMember = useMemo( + () => members.find((member) => member.id === selectedMemberId) || null, + [members, selectedMemberId], + ); + + useEffect(() => { + if (departments.length === 0) return; + setExpandedDeptIds((prev) => { + const next = { ...prev }; + let changed = false; + for (const department of departments) { + if (!department.parent_id && next[department.id] === undefined) { + next[department.id] = true; + changed = true; + } + } + return changed ? next : prev; + }); + }, [departments]); + + useEffect(() => { + if (members.length === 0) { + setSelectedMemberId(null); + return; + } + if (!selectedMemberId || !members.some((member) => member.id === selectedMemberId)) { + setSelectedMemberId(members[0].id); + } + }, [members, selectedMemberId]); + const saveConfig = async () => { - await fetchJson('/enterprise/system-settings/feishu_org_sync', { + await fetchJson('/enterprise/system-settings/org_sync', { method: 'PUT', - body: JSON.stringify({ value: { app_id: syncForm.app_id, app_secret: syncForm.app_secret } }), + body: JSON.stringify({ + value: { + provider: syncForm.provider, + feishu: syncForm.feishu, + wecom: syncForm.wecom, + }, + }), }); - qc.invalidateQueries({ queryKey: ['system-settings', 'feishu_org_sync'] }); + qc.invalidateQueries({ queryKey: ['system-settings', 'org_sync'] }); }; const triggerSync = async () => { setSyncing(true); setSyncResult(null); try { - if (syncForm.app_secret) await saveConfig(); + await saveConfig(); const result = await fetchJson('/enterprise/org/sync', { method: 'POST' }); setSyncResult(result); qc.invalidateQueries({ queryKey: ['org-departments'] }); qc.invalidateQueries({ queryKey: ['org-members'] }); + qc.invalidateQueries({ queryKey: ['system-settings', 'org_sync'] }); } catch (e: any) { setSyncResult({ error: e.message }); } setSyncing(false); }; + const activeProvider = syncForm.provider; + const activeLastSyncedAt = activeProvider === 'wecom' + ? config?.value?.wecom?.last_synced_at + : config?.value?.feishu?.last_synced_at; + const activeIdentifier = activeProvider === 'wecom' + ? syncForm.wecom.corp_id + : syncForm.feishu.app_id; + + const toggleDepartment = (id: string) => { + setExpandedDeptIds((prev) => ({ ...prev, [id]: !prev[id] })); + }; + return (
{/* Sync Config */}
-

{t('enterprise.org.feishuSync')}

-

- {t('enterprise.org.feishuSync')} -

-
-
- - setSyncForm({ ...syncForm, app_id: e.target.value })} placeholder="cli_xxxxxxxx" /> -
-
- - setSyncForm({ ...syncForm, app_secret: e.target.value })} placeholder={config?.value?.app_id ? '' : ''} /> +
+
+

{t('enterprise.org.directorySync')}

+
+ {activeProvider === 'wecom' ? t('enterprise.org.providerWecom') : t('enterprise.org.providerFeishu')} + {activeLastSyncedAt && {t('enterprise.org.lastSync', { time: new Date(activeLastSyncedAt).toLocaleString() })}} +
+
-
- - {config?.value?.last_synced_at && ( + {!showSyncConfig && ( - Last sync: {new Date(config.value.last_synced_at).toLocaleString()} + {activeProvider === 'wecom' ? syncForm.wecom.corp_id : syncForm.feishu.app_id} )}
@@ -194,33 +400,106 @@ function OrgTab() {
{t('enterprise.org.allDepartments')}
setSelectedDept(null)} > {t('common.all')}
- + {departments.length === 0 &&
{t('common.noData')}
}
-
- setMemberSearch(e.target.value)} style={{ marginBottom: '12px', fontSize: '13px' }} /> -
- {members.map((m: any) => ( -
-
- {m.name?.[0] || '?'} -
-
-
{m.name}
-
- {m.title || '-'} · {m.department_path || '-'} - {m.email && ` · ${m.email}`} +
+
+ setMemberSearch(e.target.value)} style={{ flex: 1, minWidth: '220px', fontSize: '13px' }} /> +
+ {selectedDepartment ? t('enterprise.org.selectedDepartment', { name: selectedDepartment.name }) : t('enterprise.org.allDepartments')} +
+
+
+
+ {members.map((m) => ( +
+
+
{m.name}
+
+ {m.title || t('enterprise.org.noTitle')} +
+
+ {m.department_path || '-'} +
+
+ + ))} + {members.length === 0 &&
{t('enterprise.org.noMembers')}
} +
+
+
+ {t('enterprise.org.memberDetails')}
- ))} - {members.length === 0 &&
{t('enterprise.org.noMembers')}
} + {selectedMember ? ( + <> +
+
+ {selectedMember.name?.[0] || '?'} +
+
+
{selectedMember.name}
+
{selectedMember.title || t('enterprise.org.noTitle')}
+
+
+
+
+
{t('enterprise.org.departmentLabel')}
+
{selectedMember.department_path || '-'}
+
+
+
{t('enterprise.org.emailLabel')}
+
{selectedMember.email || '-'}
+
+
+
{t('enterprise.org.titleLabel')}
+
{selectedMember.title || t('enterprise.org.noTitle')}
+
+
+
{t('enterprise.org.provider')}
+
{selectedMember.provider === 'wecom' ? t('enterprise.org.providerWecom') : t('enterprise.org.providerFeishu')}
+
+
+ + ) : ( +
+ {t('enterprise.org.selectMemberHint')} +
+ )} +
From 36297d90918162512e7c60674e189ae5efdf7dd3 Mon Sep 17 00:00:00 2001 From: Atlas Date: Thu, 26 Mar 2026 13:18:39 +0800 Subject: [PATCH 5/5] Improve production readiness of password reset email delivery Address PR #178 review follow-ups by making system SMTP timeout configurable and clarifying PUBLIC_BASE_URL expectations for production reset links.\n\nConstraint: Keep SMTP env-driven and avoid introducing queue infrastructure or behavior changes to auth responses\nRejected: Leaving SMTP timeout hardcoded at 15s | blocks operator tuning for slow providers\nConfidence: high\nScope-risk: narrow\nReversibility: clean\nDirective: Keep reset-link generation on PUBLIC_BASE_URL/public_base_url and avoid localhost values in production\nTested: backend/.venv/bin/python -m pytest tests/test_password_reset_and_notifications.py\nTested: backend/.venv/bin/python -m ruff check app/config.py app/services/system_email_service.py app/services/password_reset_service.py tests/test_password_reset_and_notifications.py\nNot-tested: live SMTP provider behavior under high latency --- .env.example | 4 +- README.md | 2 + backend/app/config.py | 1 + .../app/services/password_reset_service.py | 5 +- backend/app/services/system_email_service.py | 13 ++- .../test_password_reset_and_notifications.py | 92 ++++++++++++++++++- 6 files changed, 112 insertions(+), 5 deletions(-) diff --git a/.env.example b/.env.example index 029cca14..4bd17f0a 100644 --- a/.env.example +++ b/.env.example @@ -25,7 +25,8 @@ FEISHU_REDIRECT_URI=http://localhost:3000/auth/feishu/callback # Without a key, the tools still work but with lower rate limits JINA_API_KEY= -# Public app URL used in user-facing links, such as password reset emails +# Public app URL used in user-facing links, such as password reset emails. +# Production must use your real public HTTPS domain (not localhost). PUBLIC_BASE_URL=http://localhost:3008 # System email delivery (used for forgot-password and optional broadcast emails) @@ -36,6 +37,7 @@ SYSTEM_SMTP_PORT=465 SYSTEM_SMTP_USERNAME= SYSTEM_SMTP_PASSWORD= SYSTEM_SMTP_SSL=true +SYSTEM_SMTP_TIMEOUT_SECONDS=15 # Password reset token lifetime in minutes PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=30 diff --git a/README.md b/README.md index 5e0e423b..a35a8e0d 100644 --- a/README.md +++ b/README.md @@ -168,10 +168,12 @@ SYSTEM_SMTP_PORT=465 SYSTEM_SMTP_USERNAME=bot@example.com SYSTEM_SMTP_PASSWORD=your-app-password SYSTEM_SMTP_SSL=true +SYSTEM_SMTP_TIMEOUT_SECONDS=15 PASSWORD_RESET_TOKEN_EXPIRE_MINUTES=30 ``` `PUBLIC_BASE_URL` must point to the user-facing frontend because reset links are generated as `/reset-password?token=...`. +In production, set it to your public HTTPS domain (for example `https://app.example.com`), not a localhost address. Quick local validation: diff --git a/backend/app/config.py b/backend/app/config.py index 6e7399b3..508a7dfe 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -76,6 +76,7 @@ class Settings(BaseSettings): SYSTEM_SMTP_USERNAME: str = "" SYSTEM_SMTP_PASSWORD: str = "" SYSTEM_SMTP_SSL: bool = True + SYSTEM_SMTP_TIMEOUT_SECONDS: int = 15 # Docker (for Agent containers) DOCKER_NETWORK: str = "clawith_network" diff --git a/backend/app/services/password_reset_service.py b/backend/app/services/password_reset_service.py index b2c9085b..c7fa063d 100644 --- a/backend/app/services/password_reset_service.py +++ b/backend/app/services/password_reset_service.py @@ -54,7 +54,10 @@ async def get_public_base_url(db: AsyncSession) -> str: if env_value: return env_value - raise RuntimeError("Public base URL is not configured.") + raise RuntimeError( + "Public base URL is not configured. Set platform public_base_url or PUBLIC_BASE_URL " + "(required in production for reset links)." + ) async def build_password_reset_url(db: AsyncSession, raw_token: str) -> str: diff --git a/backend/app/services/system_email_service.py b/backend/app/services/system_email_service.py index 6ec47e99..a0f3573d 100644 --- a/backend/app/services/system_email_service.py +++ b/backend/app/services/system_email_service.py @@ -35,6 +35,7 @@ class SystemEmailConfig: smtp_username: str smtp_password: str smtp_ssl: bool + smtp_timeout_seconds: int @dataclass(slots=True) @@ -59,6 +60,8 @@ def get_system_email_config() -> SystemEmailConfig: "System email is not configured. Set SYSTEM_EMAIL_FROM_ADDRESS, SYSTEM_SMTP_HOST, and SYSTEM_SMTP_PASSWORD." ) + smtp_timeout_seconds = max(1, int(settings.SYSTEM_SMTP_TIMEOUT_SECONDS)) + return SystemEmailConfig( from_address=from_address, from_name=settings.SYSTEM_EMAIL_FROM_NAME.strip() or "Clawith", @@ -67,6 +70,7 @@ def get_system_email_config() -> SystemEmailConfig: smtp_username=smtp_username, smtp_password=smtp_password, smtp_ssl=settings.SYSTEM_SMTP_SSL, + smtp_timeout_seconds=smtp_timeout_seconds, ) @@ -85,11 +89,16 @@ def _send_system_email_sync(to: str, subject: str, body: str) -> None: with _force_ipv4(): if config.smtp_ssl: context = ssl.create_default_context() - with smtplib.SMTP_SSL(config.smtp_host, config.smtp_port, context=context, timeout=15) as server: + with smtplib.SMTP_SSL( + config.smtp_host, + config.smtp_port, + context=context, + timeout=config.smtp_timeout_seconds, + ) as server: server.login(config.smtp_username, config.smtp_password) server.sendmail(config.from_address, [to], msg.as_string()) else: - with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=15) as server: + with smtplib.SMTP(config.smtp_host, config.smtp_port, timeout=config.smtp_timeout_seconds) as server: server.ehlo() server.starttls(context=ssl.create_default_context()) server.ehlo() diff --git a/backend/tests/test_password_reset_and_notifications.py b/backend/tests/test_password_reset_and_notifications.py index 972c63e5..91e3e31c 100644 --- a/backend/tests/test_password_reset_and_notifications.py +++ b/backend/tests/test_password_reset_and_notifications.py @@ -1,3 +1,4 @@ +import contextlib import uuid from datetime import datetime, timedelta, timezone from types import SimpleNamespace @@ -14,7 +15,7 @@ from app.models.password_reset_token import PasswordResetToken from app.models.user import User from app.schemas.schemas import ForgotPasswordRequest, ResetPasswordRequest -from app.services import org_sync_service, password_reset_service +from app.services import org_sync_service, password_reset_service, system_email_service from app.services.system_email_service import SystemEmailConfigError @@ -192,6 +193,95 @@ async def fake_build_password_reset_url(*_args, **_kwargs): assert len(background_tasks.tasks) == 1 +def test_system_email_config_uses_configured_timeout(monkeypatch): + monkeypatch.setattr( + system_email_service, + "get_settings", + lambda: SimpleNamespace( + SYSTEM_EMAIL_FROM_ADDRESS="bot@example.com", + SYSTEM_EMAIL_FROM_NAME="Clawith", + SYSTEM_SMTP_HOST="smtp.example.com", + SYSTEM_SMTP_PORT=465, + SYSTEM_SMTP_USERNAME="", + SYSTEM_SMTP_PASSWORD="secret", + SYSTEM_SMTP_SSL=True, + SYSTEM_SMTP_TIMEOUT_SECONDS=42, + ), + ) + + config = system_email_service.get_system_email_config() + + assert config.smtp_timeout_seconds == 42 + + +def test_system_email_config_clamps_non_positive_timeout(monkeypatch): + monkeypatch.setattr( + system_email_service, + "get_settings", + lambda: SimpleNamespace( + SYSTEM_EMAIL_FROM_ADDRESS="bot@example.com", + SYSTEM_EMAIL_FROM_NAME="Clawith", + SYSTEM_SMTP_HOST="smtp.example.com", + SYSTEM_SMTP_PORT=465, + SYSTEM_SMTP_USERNAME="", + SYSTEM_SMTP_PASSWORD="secret", + SYSTEM_SMTP_SSL=True, + SYSTEM_SMTP_TIMEOUT_SECONDS=0, + ), + ) + + config = system_email_service.get_system_email_config() + + assert config.smtp_timeout_seconds == 1 + + +def test_send_system_email_uses_configured_timeout(monkeypatch): + captured = {} + + class DummySMTPSSL: + def __init__(self, host: str, port: int, context=None, timeout: int | None = None): + captured["host"] = host + captured["port"] = port + captured["timeout"] = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def login(self, username: str, password: str): + captured["username"] = username + captured["password"] = password + + def sendmail(self, from_address: str, to_addresses: list[str], message: str): + captured["from"] = from_address + captured["to"] = to_addresses + captured["has_message"] = bool(message) + + monkeypatch.setattr( + system_email_service, + "get_system_email_config", + lambda: system_email_service.SystemEmailConfig( + from_address="bot@example.com", + from_name="Clawith", + smtp_host="smtp.example.com", + smtp_port=465, + smtp_username="bot@example.com", + smtp_password="secret", + smtp_ssl=True, + smtp_timeout_seconds=27, + ), + ) + monkeypatch.setattr(system_email_service.smtplib, "SMTP_SSL", DummySMTPSSL) + monkeypatch.setattr(system_email_service, "_force_ipv4", lambda: contextlib.nullcontext()) + + system_email_service._send_system_email_sync("alice@example.com", "subject", "body") + + assert captured["timeout"] == 27 + assert captured["to"] == ["alice@example.com"] + + @pytest.mark.asyncio async def test_reset_password_updates_user_and_invalidates_other_tokens(monkeypatch): user = make_user(password_hash=auth_api.hash_password("old-password"))