From c969160253ac9c0492d68a92ac7ec6246754454f Mon Sep 17 00:00:00 2001 From: Atlas Date: Fri, 27 Mar 2026 10:05:35 +0800 Subject: [PATCH 1/2] Keep provider-aware org sync independent from unrelated PR scope Narrow the branch back to directory sync so it can merge independently without dragging in unrelated WeCom runtime or channel behavior. The result keeps the provider-aware sync backend, enterprise org browser UI, migration, i18n, and regression coverage while preserving existing saved secrets for repeat sync operations. Constraint: PR must remain independent from forgot-password work and must not fork Alembic heads Rejected: Keep WeCom runtime fixes in this PR | mixes unrelated behavior and review scope Rejected: Force re-entry of stored secrets before every sync | breaks existing saved-config flow Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep channel runtime fixes in a separate PR; do not re-mix websocket or session changes into org sync Tested: cd backend && .venv/bin/python -m pytest tests/test_org_sync.py Tested: cd backend && .venv/bin/alembic heads Tested: cd frontend && npm run build Not-tested: Live Feishu or WeCom API sync against real tenants --- .../versions/add_wecom_org_sync_fields.py | 62 + backend/app/api/enterprise.py | 32 +- backend/app/models/org.py | 11 +- backend/app/services/org_sync_service.py | 1062 +++++++++++------ backend/tests/test_org_sync.py | 241 ++++ frontend/src/i18n/en.json | 22 +- frontend/src/i18n/zh.json | 20 + frontend/src/pages/EnterpriseSettings.tsx | 442 ++++++- 8 files changed, 1486 insertions(+), 406 deletions(-) create mode 100644 backend/alembic/versions/add_wecom_org_sync_fields.py create mode 100644 backend/tests/test_org_sync.py diff --git a/backend/alembic/versions/add_wecom_org_sync_fields.py b/backend/alembic/versions/add_wecom_org_sync_fields.py new file mode 100644 index 00000000..e7f0646a --- /dev/null +++ b/backend/alembic/versions/add_wecom_org_sync_fields.py @@ -0,0 +1,62 @@ +"""Add provider-aware org sync fields. + +Revision ID: add_wecom_org_sync_fields +Revises: add_daily_token_usage +""" + +from alembic import op + +revision = "add_wecom_org_sync_fields" +down_revision = "add_daily_token_usage" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + """ + ALTER TABLE org_departments + ADD COLUMN IF NOT EXISTS wecom_id VARCHAR(100) + """ + ) + op.execute( + """ + ALTER TABLE org_departments + ADD COLUMN IF NOT EXISTS sync_provider VARCHAR(20) NOT NULL DEFAULT 'feishu' + """ + ) + op.execute( + """ + ALTER TABLE org_members + ADD COLUMN IF NOT EXISTS wecom_user_id VARCHAR(100) + """ + ) + op.execute( + """ + ALTER TABLE org_members + ADD COLUMN IF NOT EXISTS sync_provider VARCHAR(20) NOT NULL DEFAULT 'feishu' + """ + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_departments_wecom_id ON org_departments(wecom_id)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_departments_sync_provider ON org_departments(sync_provider)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_members_wecom_user_id ON org_members(wecom_user_id)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_org_members_sync_provider ON org_members(sync_provider)" + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_org_members_sync_provider") + op.execute("DROP INDEX IF EXISTS ix_org_members_wecom_user_id") + op.execute("DROP INDEX IF EXISTS ix_org_departments_sync_provider") + op.execute("DROP INDEX IF EXISTS ix_org_departments_wecom_id") + op.execute("ALTER TABLE org_members DROP COLUMN IF EXISTS sync_provider") + op.execute("ALTER TABLE org_members DROP COLUMN IF EXISTS wecom_user_id") + op.execute("ALTER TABLE org_departments DROP COLUMN IF EXISTS sync_provider") + op.execute("ALTER TABLE org_departments DROP COLUMN IF EXISTS wecom_id") diff --git a/backend/app/api/enterprise.py b/backend/app/api/enterprise.py index 9df1eea8..f2973d8e 100644 --- a/backend/app/api/enterprise.py +++ b/backend/app/api/enterprise.py @@ -484,6 +484,14 @@ async def get_system_setting( db: AsyncSession = Depends(get_db), ): """Get a system setting by key.""" + if key == "org_sync": + if current_user.role not in ("platform_admin", "org_admin"): + raise HTTPException(status_code=403, detail="Admin access required") + from app.services.org_sync_service import org_sync_service + + value = await org_sync_service.get_public_config(db) + return {"key": key, "value": value} + result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() if not setting: @@ -502,6 +510,12 @@ async def update_system_setting( # Platform-level settings (e.g. PUBLIC_BASE_URL) require platform_admin if key == "platform" and current_user.role != "platform_admin": raise HTTPException(status_code=403, detail="Only platform admin can modify platform settings") + if key == "org_sync": + from app.services.org_sync_service import org_sync_service + + value = await org_sync_service.save_config(db, data.value) + return {"key": key, "value": value} + result = await db.execute(select(SystemSetting).where(SystemSetting.key == key)) setting = result.scalar_one_or_none() if setting: @@ -525,7 +539,10 @@ async def list_org_departments( db: AsyncSession = Depends(get_db), ): """List all departments, optionally filtered by tenant.""" - query = select(OrgDepartment) + from app.services.org_sync_service import org_sync_service + + provider, _, _ = await org_sync_service.get_active_provider(db) + query = select(OrgDepartment).where(OrgDepartment.sync_provider == provider) if tenant_id: query = query.where(OrgDepartment.tenant_id == uuid.UUID(tenant_id)) result = await db.execute(query.order_by(OrgDepartment.name)) @@ -533,7 +550,9 @@ async def list_org_departments( return [ { "id": str(d.id), + "provider": d.sync_provider, "feishu_id": d.feishu_id, + "wecom_id": d.wecom_id, "name": d.name, "parent_id": str(d.parent_id) if d.parent_id else None, "path": d.path, @@ -554,7 +573,13 @@ async def list_org_members( db: AsyncSession = Depends(get_db), ): """List org members, optionally filtered by department, search, or tenant.""" - query = select(OrgMember).where(OrgMember.status == "active") + from app.services.org_sync_service import org_sync_service + + provider, _, _ = await org_sync_service.get_active_provider(db) + query = select(OrgMember).where( + OrgMember.status == "active", + OrgMember.sync_provider == provider, + ) if tenant_id: query = query.where(OrgMember.tenant_id == uuid.UUID(tenant_id)) if department_id: @@ -573,6 +598,7 @@ async def list_org_members( return [ { "id": str(m.id), + "provider": m.sync_provider, "name": m.name, "email": m.email, "title": m.title, @@ -587,7 +613,7 @@ async def list_org_members( async def trigger_org_sync( current_user: User = Depends(get_current_admin), ): - """Manually trigger org structure sync from Feishu.""" + """Manually trigger org structure sync from the active provider.""" from app.services.org_sync_service import org_sync_service result = await org_sync_service.full_sync() return result diff --git a/backend/app/models/org.py b/backend/app/models/org.py index bc31ef88..1b205012 100644 --- a/backend/app/models/org.py +++ b/backend/app/models/org.py @@ -1,4 +1,4 @@ -"""Organization structure models — departments and members synced from Feishu.""" +"""Organization structure models cached from external directory providers.""" import uuid from datetime import datetime @@ -8,15 +8,18 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import Base +from app.models.tenant import Tenant # noqa: F401 class OrgDepartment(Base): - """Department from Feishu org structure.""" + """Department synced from an external directory provider.""" __tablename__ = "org_departments" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) feishu_id: Mapped[str | None] = mapped_column(String(100), unique=True) + wecom_id: Mapped[str | None] = mapped_column(String(100), index=True) + sync_provider: Mapped[str] = mapped_column(String(20), default="feishu", server_default="feishu", nullable=False, index=True) name: Mapped[str] = mapped_column(String(200), nullable=False) parent_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("org_departments.id")) path: Mapped[str] = mapped_column(String(500), default="") @@ -28,13 +31,15 @@ class OrgDepartment(Base): class OrgMember(Base): - """Person from Feishu org structure.""" + """Person synced from an external directory provider.""" __tablename__ = "org_members" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) feishu_open_id: Mapped[str | None] = mapped_column(String(100), unique=True) feishu_user_id: Mapped[str | None] = mapped_column(String(100)) + wecom_user_id: Mapped[str | None] = mapped_column(String(100), index=True) + sync_provider: Mapped[str] = mapped_column(String(20), default="feishu", server_default="feishu", nullable=False, index=True) name: Mapped[str] = mapped_column(String(100), nullable=False) name_translit_full: Mapped[str | None] = mapped_column(String(255), index=True) name_translit_initial: Mapped[str | None] = mapped_column(String(50), index=True) diff --git a/backend/app/services/org_sync_service.py b/backend/app/services/org_sync_service.py index fa79f7d9..ea0195d9 100644 --- a/backend/app/services/org_sync_service.py +++ b/backend/app/services/org_sync_service.py @@ -1,395 +1,785 @@ -"""Feishu organization structure sync service. - -Pulls departments and members from Feishu Contact API and upserts into local DB. -""" +"""Directory sync service for Feishu and WeCom organization structures.""" +import asyncio import uuid from datetime import datetime, timezone import httpx from loguru import logger -from sqlalchemy import select, delete +from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from pypinyin import pinyin, Style +from app.core.security import hash_password from app.database import async_session from app.models.org import OrgDepartment, OrgMember from app.models.system_settings import SystemSetting from app.models.user import User -from app.core.security import hash_password + +ORG_SYNC_KEY = "org_sync" +LEGACY_FEISHU_SYNC_KEY = "feishu_org_sync" FEISHU_APP_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal" FEISHU_DEPT_CHILDREN_URL = "https://open.feishu.cn/open-apis/contact/v3/departments" FEISHU_USERS_URL = "https://open.feishu.cn/open-apis/contact/v3/users/find_by_department" +WECOM_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" +WECOM_DEPARTMENTS_URL = "https://qyapi.weixin.qq.com/cgi-bin/department/list" +WECOM_USERS_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/list" + class OrgSyncService: - """Sync org structure from Feishu into local database.""" + """Sync org structure from the configured directory provider into local DB.""" + + async def _get_stored_config(self, db: AsyncSession) -> dict: + """Return normalized org sync config including stored secrets.""" + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) + setting = result.scalar_one_or_none() + if setting and isinstance(setting.value, dict): + return self._normalize_setting_value(setting.value) - async def _get_feishu_config(self, db: AsyncSession) -> dict | None: - """Load Feishu org sync config from system_settings.""" - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "feishu_org_sync") + legacy_result = await db.execute( + select(SystemSetting).where(SystemSetting.key == LEGACY_FEISHU_SYNC_KEY) ) + legacy_setting = legacy_result.scalar_one_or_none() + if legacy_setting and isinstance(legacy_setting.value, dict): + return self._normalize_setting_value( + { + "provider": "feishu", + "feishu": legacy_setting.value, + "wecom": {}, + } + ) + + return self._normalize_setting_value({}) + + async def get_public_config(self, db: AsyncSession) -> dict: + """Return normalized org sync config for the UI without provider secrets.""" + return self._normalize_setting_value(await self._get_stored_config(db), include_secrets=False) + + def _normalize_setting_value(self, value: dict | None, include_secrets: bool = True) -> dict: + raw = value or {} + provider = raw.get("provider") + if provider not in {"feishu", "wecom"}: + provider = "feishu" + + feishu = raw.get("feishu") + wecom = raw.get("wecom") + + # Accept the old flat feishu shape transparently. + if not isinstance(feishu, dict) and any(k in raw for k in ("app_id", "app_secret", "last_synced_at")): + feishu = { + "app_id": raw.get("app_id", ""), + "app_secret": raw.get("app_secret", "") if include_secrets else "", + "last_synced_at": raw.get("last_synced_at"), + } + + return { + "provider": provider, + "feishu": { + "app_id": (feishu or {}).get("app_id", ""), + "app_secret": (feishu or {}).get("app_secret", "") if include_secrets else "", + "has_secret": bool((feishu or {}).get("app_secret")), + "last_synced_at": (feishu or {}).get("last_synced_at"), + }, + "wecom": { + "corp_id": (wecom or {}).get("corp_id", ""), + "corp_secret": (wecom or {}).get("corp_secret", "") if include_secrets else "", + "has_secret": bool((wecom or {}).get("corp_secret")), + "last_synced_at": (wecom or {}).get("last_synced_at"), + }, + } + + async def save_config(self, db: AsyncSession, value: dict) -> dict: + """Persist normalized org sync config while preserving existing secrets.""" + existing = await self._get_stored_config(db) + normalized = self._normalize_setting_value(value) + + existing_feishu = existing.get("feishu", {}) + existing_wecom = existing.get("wecom", {}) + + if not normalized["feishu"].get("app_secret"): + normalized["feishu"]["app_secret"] = existing_feishu.get("app_secret", "") + if not normalized["wecom"].get("corp_secret"): + normalized["wecom"]["corp_secret"] = existing_wecom.get("corp_secret", "") + if not normalized["feishu"].get("last_synced_at"): + normalized["feishu"]["last_synced_at"] = existing_feishu.get("last_synced_at") + if not normalized["wecom"].get("last_synced_at"): + normalized["wecom"]["last_synced_at"] = existing_wecom.get("last_synced_at") + + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) + setting = result.scalar_one_or_none() + if setting: + setting.value = normalized + else: + setting = SystemSetting(key=ORG_SYNC_KEY, value=normalized) + db.add(setting) + await db.commit() + return normalized + + async def get_active_provider(self, db: AsyncSession) -> tuple[str, dict, dict]: + """Return provider name, provider config, and full normalized setting.""" + setting = await self._get_stored_config(db) + provider = setting.get("provider", "feishu") + return provider, setting.get(provider, {}), setting + + async def _set_last_synced_at( + self, db: AsyncSession, setting_value: dict, provider: str, synced_at: str + ) -> None: + value = self._normalize_setting_value(setting_value) + value.setdefault(provider, {}) + value[provider]["last_synced_at"] = synced_at + + result = await db.execute(select(SystemSetting).where(SystemSetting.key == ORG_SYNC_KEY)) setting = result.scalar_one_or_none() - if not setting: - return None - return setting.value - - async def _get_app_token(self, app_id: str, app_secret: str) -> tuple[str, dict]: - """Get Feishu tenant_access_token. - Returns (token_string, raw_response_dict). - """ - async with httpx.AsyncClient() as client: - resp = await client.post(FEISHU_APP_TOKEN_URL, json={ - "app_id": app_id, - "app_secret": app_secret, - }) + if setting: + setting.value = value + else: + db.add(SystemSetting(key=ORG_SYNC_KEY, value=value)) + + async def _get_feishu_token(self, app_id: str, app_secret: str) -> tuple[str, dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.post( + FEISHU_APP_TOKEN_URL, + json={"app_id": app_id, "app_secret": app_secret}, + ) data = resp.json() - logger.info(f"[OrgSync] Token response: code={data.get('code')}, msg={data.get('msg')}") token = data.get("tenant_access_token") or data.get("app_access_token") or "" return token, data - async def _fetch_departments(self, token: str, parent_id: str = "0") -> list[dict]: - """Recursively fetch all departments from Feishu.""" - all_depts = [] + async def _fetch_feishu_departments(self, token: str, parent_id: str = "0") -> list[dict]: + all_depts: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - url = f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children" - params = { - "department_id_type": "open_department_id", - "page_size": "50", - "fetch_child": "true", - } - if page_token: - params["page_token"] = page_token - - logger.info(f"[OrgSync] GET {url} params={params}") - resp = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"}) - data = resp.json() - logger.info(f"[OrgSync] Dept response: code={data.get('code')}, msg={data.get('msg')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Dept API error: {data}") - break - - items = data.get("data", {}).get("items", []) - if items and not all_depts: # Print first raw item for debugging - logger.info(f"[OrgSync] RAW first dept item keys: {list(items[0].keys())}") - logger.info(f"[OrgSync] RAW first dept item: {items[0]}") - for item in items: - logger.info(f"[OrgSync] dept: {item.get('name')} (id={item.get('open_department_id')})") - all_depts.extend(items) - - if not data.get("data", {}).get("has_more"): - break - page_token = data["data"].get("page_token", "") - - # If fetch_child=true didn't work (no items), try without recursion + manual recurse + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children", + params={ + "department_id_type": "open_department_id", + "page_size": "50", + "fetch_child": "true", + **({"page_token": page_token} if page_token else {}), + }, + headers={"Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu department API failed: code={data.get('code')} msg={data.get('msg')}" + ) + all_depts.extend(data.get("data", {}).get("items", [])) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") if not all_depts: - logger.info("[OrgSync] fetch_child=true returned nothing, trying simple list...") - all_depts = await self._fetch_departments_simple(token, parent_id) - + return await self._fetch_feishu_departments_simple(token, parent_id) return all_depts - async def _fetch_departments_simple(self, token: str, parent_id: str = "0") -> list[dict]: - """Fetch departments without fetch_child, manually recurse.""" - all_depts = [] + async def _fetch_feishu_departments_simple( + self, token: str, parent_id: str = "0" + ) -> list[dict]: + all_depts: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - url = f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children" - params = {"department_id_type": "open_department_id", "page_size": "50"} - if page_token: - params["page_token"] = page_token - resp = await client.get(url, params=params, headers={"Authorization": f"Bearer {token}"}) - data = resp.json() - logger.info(f"[OrgSync] Simple dept response (parent={parent_id}): code={data.get('code')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Simple dept error: {data}") - break - - items = data.get("data", {}).get("items", []) - all_depts.extend(items) - if not data.get("data", {}).get("has_more"): - break - page_token = data["data"].get("page_token", "") + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + f"{FEISHU_DEPT_CHILDREN_URL}/{parent_id}/children", + params={ + "department_id_type": "open_department_id", + "page_size": "50", + **({"page_token": page_token} if page_token else {}), + }, + headers={"Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu department API failed: code={data.get('code')} msg={data.get('msg')}" + ) + items = data.get("data", {}).get("items", []) + all_depts.extend(items) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") - # Recurse into children and inject parent_department_id for dept in list(all_depts): - dept_id = dept.get("open_department_id", "") - if dept_id: - children = await self._fetch_departments_simple(token, dept_id) - # Set parent_department_id explicitly (API may not return it) - for child in children: - if not child.get("parent_department_id"): - child["parent_department_id"] = dept_id - all_depts.extend(children) - + dept_id = dept.get("open_department_id") + if not dept_id: + continue + children = await self._fetch_feishu_departments_simple(token, dept_id) + for child in children: + child.setdefault("parent_department_id", dept_id) + all_depts.extend(children) return all_depts - async def _fetch_department_users(self, token: str, dept_id: str) -> list[dict]: - """Fetch all users in a department.""" - all_users = [] + async def _fetch_feishu_department_users(self, token: str, department_id: str) -> list[dict]: + users: list[dict] = [] page_token = "" while True: - async with httpx.AsyncClient() as client: - params = { - "department_id_type": "open_department_id", - "department_id": dept_id, - "page_size": "50", - } - if page_token: - params["page_token"] = page_token + async with httpx.AsyncClient(timeout=20) as client: resp = await client.get( FEISHU_USERS_URL, - params=params, + params={ + "department_id_type": "open_department_id", + "department_id": department_id, + "page_size": "50", + **({"page_token": page_token} if page_token else {}), + }, headers={"Authorization": f"Bearer {token}"}, ) - data = resp.json() - logger.info(f"[OrgSync] Users response (dept={dept_id}): code={data.get('code')}, items={len(data.get('data', {}).get('items', []))}") - - if data.get("code") != 0: - logger.error(f"[OrgSync] Users API error: {data}") - break + data = resp.json() + if data.get("code") != 0: + raise RuntimeError( + f"Feishu user API failed: code={data.get('code')} msg={data.get('msg')}" + ) + users.extend(data.get("data", {}).get("items", [])) + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token", "") + return users + + async def _get_wecom_token(self, corp_id: str, corp_secret: str) -> tuple[str, dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + WECOM_TOKEN_URL, + params={ + "corpid": corp_id, + "corpsecret": corp_secret, + }, + ) + data = resp.json() + token = data.get("access_token", "") + return token, data - items = data.get("data", {}).get("items", []) - if items and not all_users: # Print first raw user for debugging - logger.info(f"[OrgSync] RAW first user item keys: {list(items[0].keys())}") - logger.info(f"[OrgSync] RAW first user item: {items[0]}") - all_users.extend(items) - if not data.get("data", {}).get("has_more"): + async def _fetch_wecom_departments(self, token: str) -> list[dict]: + async with httpx.AsyncClient(timeout=20) as client: + resp = await client.get( + WECOM_DEPARTMENTS_URL, + params={"access_token": token}, + ) + data = resp.json() + if data.get("errcode") != 0: + raise RuntimeError( + f"WeCom department API failed: errcode={data.get('errcode')} errmsg={data.get('errmsg')}" + ) + return data.get("department", []) + + async def _fetch_wecom_department_users( + self, + token: str, + department_id: str, + client: httpx.AsyncClient | None = None, + ) -> list[dict]: + owned_client = client is None + if owned_client: + client = httpx.AsyncClient(timeout=20) + assert client is not None + try: + for attempt in range(4): + resp = await client.get( + WECOM_USERS_URL, + params={ + "access_token": token, + "department_id": department_id, + "fetch_child": 0, + }, + ) + data = resp.json() + if data.get("errcode") != 45033: break - page_token = data["data"].get("page_token", "") - return all_users - - async def full_sync(self) -> dict: - """Run a full org sync from Feishu. Returns stats.""" - async with async_session() as db: - config = await self._get_feishu_config(db) - if not config: - return {"error": "未配置飞书组织架构同步信息"} - - app_id = config.get("app_id") - app_secret = config.get("app_secret") - if not app_id or not app_secret: - return {"error": "缺少 App ID 或 App Secret"} - - try: - token, token_resp = await self._get_app_token(app_id, app_secret) - if not token: - feishu_code = token_resp.get("code", "?") - feishu_msg = token_resp.get("msg", "unknown") - return {"error": f"获取飞书 token 失败 (code={feishu_code}: {feishu_msg})"} - logger.info(f"[OrgSync] Got token: {token[:20]}...") - except Exception as e: - return {"error": f"连接飞书失败: {str(e)[:100]}"} + await asyncio.sleep(0.5 * (attempt + 1)) + finally: + if owned_client: + await client.aclose() + data = resp.json() + if data.get("errcode") != 0: + raise RuntimeError( + f"WeCom user API failed: errcode={data.get('errcode')} errmsg={data.get('errmsg')}" + ) + return data.get("userlist", []) + + async def _fetch_wecom_users_by_department( + self, + token: str, + department_ids: list[str], + concurrency: int = 4, + ) -> dict[str, list[dict]]: + semaphore = asyncio.Semaphore(concurrency) + results: dict[str, list[dict]] = {} + + async with httpx.AsyncClient(timeout=20) as client: + async def fetch_one(department_id: str) -> None: + async with semaphore: + results[department_id] = await self._fetch_wecom_department_users( + token, + department_id, + client=client, + ) - now = datetime.now(timezone.utc) - dept_count = 0 - member_count = 0 - user_count = 0 + await asyncio.gather(*(fetch_one(department_id) for department_id in department_ids)) + return results - # Resolve tenant_id from the first admin user - admin_result = await db.execute( - select(User).where(User.role == "platform_admin").limit(1) + async def _resolve_tenant_id(self, db: AsyncSession) -> uuid.UUID | None: + admin_result = await db.execute( + select(User).where(User.role == "platform_admin").limit(1) + ) + admin_user = admin_result.scalar_one_or_none() + return admin_user.tenant_id if admin_user else None + + async def _upsert_feishu_departments( + self, db: AsyncSession, departments: list[dict], tenant_id: uuid.UUID | None, now: datetime + ) -> tuple[int, dict[str, OrgDepartment]]: + provider = "feishu" + count = 0 + for item in departments: + external_id = item.get("open_department_id") + if not external_id: + continue + result = await db.execute( + select(OrgDepartment).where( + OrgDepartment.sync_provider == provider, + OrgDepartment.feishu_id == external_id, + ) ) - admin_user = admin_result.scalar_one_or_none() - tenant_id = admin_user.tenant_id if admin_user else None - - # --- Sync departments --- - try: - depts = await self._fetch_departments(token, "0") - logger.info(f"[OrgSync] Total departments fetched: {len(depts)}") - - for d in depts: - feishu_id = d.get("open_department_id", "") - if not feishu_id: - continue + dept = result.scalar_one_or_none() + if dept: + dept.name = item.get("name", dept.name) + dept.member_count = item.get("member_count", 0) + dept.synced_at = now + dept.tenant_id = dept.tenant_id or tenant_id + else: + dept = OrgDepartment( + feishu_id=external_id, + name=item.get("name", ""), + member_count=item.get("member_count", 0), + path=item.get("name", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(dept) + count += 1 + + await db.flush() + result = await db.execute(select(OrgDepartment).where(OrgDepartment.sync_provider == provider)) + dept_map = {dept.feishu_id: dept for dept in result.scalars().all() if dept.feishu_id} + for item in departments: + external_id = item.get("open_department_id") + parent_external_id = item.get("parent_department_id") + if external_id in dept_map and parent_external_id and parent_external_id in dept_map: + dept_map[external_id].parent_id = dept_map[parent_external_id].id + self._rebuild_department_paths(dept_map) + await db.flush() + return count, dept_map + + async def _upsert_wecom_departments( + self, db: AsyncSession, departments: list[dict], tenant_id: uuid.UUID | None, now: datetime + ) -> tuple[int, dict[str, OrgDepartment]]: + provider = "wecom" + existing_result = await db.execute( + select(OrgDepartment).where(OrgDepartment.sync_provider == provider) + ) + existing_departments = { + dept.wecom_id: dept + for dept in existing_result.scalars().all() + if dept.wecom_id + } + count = 0 + for item in departments: + external_id = str(item.get("id", "")).strip() + if not external_id: + continue + dept = existing_departments.get(external_id) + if dept: + dept.name = item.get("name", dept.name) + dept.member_count = item.get("member_num", dept.member_count) + dept.synced_at = now + dept.tenant_id = dept.tenant_id or tenant_id + else: + dept = OrgDepartment( + wecom_id=external_id, + name=item.get("name", ""), + member_count=item.get("member_num", 0), + path=item.get("name", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(dept) + existing_departments[external_id] = dept + count += 1 + + await db.flush() + dept_map = { + dept.wecom_id: dept + for dept in existing_departments.values() + if dept.wecom_id + } + for item in departments: + external_id = str(item.get("id", "")).strip() + parent_external_id = str(item.get("parentid", "")).strip() + if external_id in dept_map and parent_external_id and parent_external_id in dept_map: + dept_map[external_id].parent_id = dept_map[parent_external_id].id + self._rebuild_department_paths(dept_map) + await db.flush() + return count, dept_map + + def _rebuild_department_paths(self, dept_map: dict[str, OrgDepartment]) -> None: + by_db_id = {dept.id: dept for dept in dept_map.values()} + + def build_path(dept: OrgDepartment) -> str: + names = [dept.name] + current = dept + seen: set[uuid.UUID] = set() + while current.parent_id and current.parent_id not in seen and current.parent_id in by_db_id: + seen.add(current.parent_id) + current = by_db_id[current.parent_id] + names.append(current.name) + names.reverse() + return " / ".join([name for name in names if name]) + + for dept in dept_map.values(): + dept.path = build_path(dept) + + async def _upsert_feishu_members( + self, + db: AsyncSession, + token: str, + tenant_id: uuid.UUID | None, + departments: dict[str, OrgDepartment], + now: datetime, + ) -> tuple[int, int]: + provider = "feishu" + member_count = 0 + user_count = 0 + for external_dept_id, dept in departments.items(): + users = await self._fetch_feishu_department_users(token, external_dept_id) + dept.member_count = len(users) + for item in users: + open_id = item.get("open_id", "") + user_id = item.get("user_id", "") + if not open_id and not user_id: + continue + result = None + if user_id: result = await db.execute( - select(OrgDepartment).where(OrgDepartment.feishu_id == feishu_id) + select(OrgMember).where( + OrgMember.sync_provider == provider, + OrgMember.feishu_user_id == user_id, + ) ) - dept = result.scalar_one_or_none() - if dept: - dept.name = d.get("name", dept.name) - dept.member_count = d.get("member_count", 0) - dept.synced_at = now - else: - dept = OrgDepartment( - feishu_id=feishu_id, - name=d.get("name", ""), - member_count=d.get("member_count", 0), - path=d.get("name", ""), - tenant_id=tenant_id, - synced_at=now, + member = result.scalar_one_or_none() + else: + member = None + if not member and open_id: + result = await db.execute( + select(OrgMember).where( + OrgMember.sync_provider == provider, + OrgMember.feishu_open_id == open_id, ) - db.add(dept) - dept_count += 1 - - await db.flush() - - # Build feishu_id -> db_id + parent mapping - dept_map = {} - all_result = await db.execute(select(OrgDepartment)) - for dept in all_result.scalars().all(): - if dept.feishu_id: - dept_map[dept.feishu_id] = dept - - # Set parent_id from Feishu parent info - for d in depts: - feishu_id = d.get("open_department_id", "") - parent_feishu_id = d.get("parent_department_id", "") - if feishu_id in dept_map and parent_feishu_id and parent_feishu_id in dept_map: - dept_map[feishu_id].parent_id = dept_map[parent_feishu_id].id - - await db.flush() - except Exception as e: - import traceback - traceback.print_exc() - logger.error(f"[OrgSync] Department sync failed: {e}") - return {"error": f"部门同步失败: {str(e)[:200]}"} - - # --- Sync members --- - try: - all_dept_result = await db.execute(select(OrgDepartment)) - departments = all_dept_result.scalars().all() - - for dept in departments: - if not dept.feishu_id: - continue - users = await self._fetch_department_users(token, dept.feishu_id) - if users: - logger.info(f"[OrgSync] dept={dept.name} got {len(users)} users, first user keys={list(users[0].keys())}, open_id={users[0].get('open_id','')!r}, user_id={users[0].get('user_id','')!r}") - for u in users: - open_id = u.get("open_id", "") - user_id = u.get("user_id", "") - if not open_id and not user_id: - logger.warning(f"[OrgSync] Skipping user with no open_id and no user_id: {u.get('name','?')}") - continue - if not user_id: - logger.warning(f"[OrgSync] User {u.get('name','?')} has no user_id — App may lack contact:user.employee_id:readonly permission") - - # Find existing member: prefer user_id (tenant-stable), fallback open_id - member = None - if user_id: - result = await db.execute( - select(OrgMember).where(OrgMember.feishu_user_id == user_id) - ) - member = result.scalar_one_or_none() - if not member and open_id: - result = await db.execute( - select(OrgMember).where(OrgMember.feishu_open_id == open_id) - ) - member = result.scalar_one_or_none() - - if member: - member.name = u.get("name", member.name) - member.name_translit_full = "".join([i[0] for i in pinyin(member.name, style=Style.NORMAL)]) - member.name_translit_initial = "".join([i[0] for i in pinyin(member.name, style=Style.FIRST_LETTER)]) - member.email = u.get("email", member.email) - member.avatar_url = u.get("avatar", {}).get("avatar_origin", member.avatar_url) - member.title = (u.get("job_title") or u.get("description") or member.title or "")[:200] - member.department_id = dept.id - member.department_path = dept.path or dept.name - member.phone = u.get("mobile", member.phone) - # Always update IDs to latest values - if open_id: - member.feishu_open_id = open_id - if user_id: - member.feishu_user_id = user_id - member.synced_at = now - else: - new_name = u.get("name", "") - translit_full = "".join([i[0] for i in pinyin(new_name, style=Style.NORMAL)]) if new_name else "" - translit_initial = "".join([i[0] for i in pinyin(new_name, style=Style.FIRST_LETTER)]) if new_name else "" - member = OrgMember( - feishu_open_id=open_id or None, - feishu_user_id=user_id or None, - name=new_name, - name_translit_full=translit_full, - name_translit_initial=translit_initial, - email=u.get("email", ""), - avatar_url=u.get("avatar", {}).get("avatar_origin", ""), - title=(u.get("job_title") or u.get("description") or "")[:200], - department_id=dept.id, - department_path=dept.path or dept.name, - phone=u.get("mobile", ""), - tenant_id=tenant_id, - synced_at=now, - ) - db.add(member) - # Ensure tenant_id is set on existing members - if member.tenant_id is None and tenant_id: - member.tenant_id = tenant_id - member_count += 1 - - # --- Auto-create/update platform User --- - # Prefer user_id (tenant-stable), then open_id, then email - platform_user = None - if user_id: - pu_result = await db.execute( - select(User).where(User.feishu_user_id == user_id) - ) - platform_user = pu_result.scalar_one_or_none() - if not platform_user and open_id: - pu_result = await db.execute( - select(User).where(User.feishu_open_id == open_id) - ) - platform_user = pu_result.scalar_one_or_none() - # Fallback: match by real email (most reliable cross-app identifier) - member_email = u.get("email", "") - if not platform_user and member_email and "@" in member_email and not member_email.endswith("@feishu.local"): - pu_result = await db.execute( - select(User).where(User.email == member_email) - ) - platform_user = pu_result.scalar_one_or_none() - - member_name = u.get("name", "") - if platform_user: - # Update existing user info - platform_user.display_name = member_name or platform_user.display_name - # Always update feishu IDs to track the current app's values - if open_id: - platform_user.feishu_open_id = open_id - if user_id: - platform_user.feishu_user_id = user_id - if tenant_id and not platform_user.tenant_id: - platform_user.tenant_id = tenant_id - else: - # Create new user — prefer user_id in username - username_base = f"feishu_{user_id or (open_id[:16] if open_id else uuid.uuid4().hex[:8])}" - email = member_email or f"{username_base}@feishu.local" - platform_user = User( - username=username_base, - email=email, - password_hash=hash_password(uuid.uuid4().hex), - display_name=member_name, - role="member", - feishu_open_id=open_id or None, - feishu_user_id=user_id or None, - tenant_id=tenant_id, - ) - db.add(platform_user) - user_count += 1 - except Exception as e: - import traceback - traceback.print_exc() - logger.error(f"[OrgSync] Member sync failed: {e}") - return {"error": f"成员同步失败: {str(e)[:200]}", "departments": dept_count} - - # Update last sync time - result = await db.execute( - select(SystemSetting).where(SystemSetting.key == "feishu_org_sync") + ) + member = result.scalar_one_or_none() + + avatar = item.get("avatar", {}) if isinstance(item.get("avatar"), dict) else {} + member_email = item.get("email", "") + if member: + member.name = item.get("name", member.name) + member.email = member_email or member.email + member.avatar_url = avatar.get("avatar_origin", member.avatar_url) + member.title = (item.get("job_title") or item.get("description") or member.title or "")[:200] + member.department_id = dept.id + member.department_path = dept.path or dept.name + member.phone = item.get("mobile", member.phone) + member.synced_at = now + if open_id: + member.feishu_open_id = open_id + if user_id: + member.feishu_user_id = user_id + else: + member = OrgMember( + feishu_open_id=open_id or None, + feishu_user_id=user_id or None, + name=item.get("name", ""), + email=member_email or None, + avatar_url=avatar.get("avatar_origin", ""), + title=(item.get("job_title") or item.get("description") or "")[:200], + department_id=dept.id, + department_path=dept.path or dept.name, + phone=item.get("mobile", ""), + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(member) + if member.tenant_id is None and tenant_id: + member.tenant_id = tenant_id + member_count += 1 + + created = await self._upsert_platform_user_for_feishu(db, item, tenant_id, user_id, open_id) + user_count += int(created) + return member_count, user_count + + async def _upsert_wecom_members( + self, + db: AsyncSession, + token: str, + tenant_id: uuid.UUID | None, + departments: dict[str, OrgDepartment], + now: datetime, + ) -> tuple[int, int]: + provider = "wecom" + users_by_department = await self._fetch_wecom_users_by_department( + token, + list(departments.keys()), + ) + + existing_members_result = await db.execute( + select(OrgMember).where(OrgMember.sync_provider == provider) + ) + existing_members = { + member.wecom_user_id: member + for member in existing_members_result.scalars().all() + if member.wecom_user_id + } + + all_items = [ + item + for users in users_by_department.values() + for item in users + if str(item.get("userid", "")).strip() + ] + emails = { + str(item.get("email", "")).strip() + for item in all_items + if "@" in str(item.get("email", "")).strip() + } + usernames = { + f"wecom_{str(item.get('userid', '')).strip()}" + for item in all_items + if str(item.get("userid", "")).strip() + } + + existing_users_by_email: dict[str, User] = {} + if emails: + users_result = await db.execute(select(User).where(User.email.in_(sorted(emails)))) + existing_users_by_email = { + user.email: user + for user in users_result.scalars().all() + if user.email + } + + existing_users_by_username: dict[str, User] = {} + if usernames: + users_result = await db.execute(select(User).where(User.username.in_(sorted(usernames)))) + existing_users_by_username = { + user.username: user + for user in users_result.scalars().all() + } + + member_count = 0 + user_count = 0 + for external_dept_id, dept in departments.items(): + users = users_by_department.get(external_dept_id, []) + dept.member_count = len(users) + for item in users: + user_id = str(item.get("userid", "")).strip() + if not user_id: + continue + member = existing_members.get(user_id) + if member: + member.name = item.get("name", member.name) + member.email = item.get("email") or member.email + member.avatar_url = item.get("avatar") or member.avatar_url + member.title = (item.get("position") or member.title or "")[:200] + member.department_id = dept.id + member.department_path = dept.path or dept.name + member.phone = item.get("mobile") or member.phone + member.synced_at = now + else: + member = OrgMember( + wecom_user_id=user_id, + name=item.get("name", ""), + email=item.get("email") or None, + avatar_url=item.get("avatar") or "", + title=(item.get("position") or "")[:200], + department_id=dept.id, + department_path=dept.path or dept.name, + phone=item.get("mobile") or "", + tenant_id=tenant_id, + synced_at=now, + sync_provider=provider, + ) + db.add(member) + existing_members[user_id] = member + if member.tenant_id is None and tenant_id: + member.tenant_id = tenant_id + member_count += 1 + + created = self._upsert_platform_user_for_wecom( + db, + item, + tenant_id, + user_id, + existing_users_by_email, + existing_users_by_username, + ) + user_count += int(created) + return member_count, user_count + + async def _upsert_platform_user_for_feishu( + self, + db: AsyncSession, + item: dict, + tenant_id: uuid.UUID | None, + user_id: str, + open_id: str, + ) -> bool: + platform_user = None + if user_id: + result = await db.execute(select(User).where(User.feishu_user_id == user_id)) + platform_user = result.scalar_one_or_none() + if not platform_user and open_id: + result = await db.execute(select(User).where(User.feishu_open_id == open_id)) + platform_user = result.scalar_one_or_none() + email = item.get("email", "") + if not platform_user and email and "@" in email and not email.endswith("@feishu.local"): + result = await db.execute(select(User).where(User.email == email)) + platform_user = result.scalar_one_or_none() + + if platform_user: + platform_user.display_name = item.get("name", platform_user.display_name) + if open_id: + platform_user.feishu_open_id = open_id + if user_id: + platform_user.feishu_user_id = user_id + if tenant_id and not platform_user.tenant_id: + platform_user.tenant_id = tenant_id + return False + + username_base = f"feishu_{user_id or (open_id[:16] if open_id else uuid.uuid4().hex[:8])}" + db.add( + User( + username=username_base, + email=email or f"{username_base}@feishu.local", + password_hash=hash_password(uuid.uuid4().hex), + display_name=item.get("name", username_base), + role="member", + feishu_open_id=open_id or None, + feishu_user_id=user_id or None, + tenant_id=tenant_id, ) - setting = result.scalar_one_or_none() - if setting: - setting.value = {**setting.value, "last_synced_at": now.isoformat()} + ) + return True + + def _upsert_platform_user_for_wecom( + self, + db: AsyncSession, + item: dict, + tenant_id: uuid.UUID | None, + user_id: str, + existing_users_by_email: dict[str, User], + existing_users_by_username: dict[str, User], + ) -> bool: + email = str(item.get("email", "")).strip() + platform_user = None + if email and "@" in email: + platform_user = existing_users_by_email.get(email) + username_base = f"wecom_{user_id}" + if not platform_user: + platform_user = existing_users_by_username.get(username_base) + + if platform_user: + platform_user.display_name = item.get("name", platform_user.display_name) + platform_user.title = item.get("position") or platform_user.title + if tenant_id and not platform_user.tenant_id: + platform_user.tenant_id = tenant_id + return False + + new_user = User( + username=username_base, + email=email or f"{username_base}@wecom.local", + password_hash=hash_password(uuid.uuid4().hex), + display_name=item.get("name", username_base), + title=item.get("position") or None, + role="member", + tenant_id=tenant_id, + ) + db.add(new_user) + existing_users_by_username[new_user.username] = new_user + if new_user.email: + existing_users_by_email[new_user.email] = new_user + return True + + async def full_sync(self) -> dict: + """Run a full sync for the currently configured provider.""" + async with async_session() as db: + provider, provider_config, setting = await self.get_active_provider(db) + now = datetime.now(timezone.utc) + tenant_id = await self._resolve_tenant_id(db) + + if provider == "feishu": + app_id = provider_config.get("app_id", "").strip() + app_secret = provider_config.get("app_secret", "").strip() + if not app_id or not app_secret: + return {"error": "缺少飞书 App ID 或 App Secret", "provider": provider} + try: + token, token_resp = await self._get_feishu_token(app_id, app_secret) + except Exception as exc: + return {"error": f"连接飞书失败: {str(exc)[:100]}", "provider": provider} + if not token: + return { + "error": f"获取飞书 token 失败 (code={token_resp.get('code')}: {token_resp.get('msg')})", + "provider": provider, + } + + try: + raw_departments = await self._fetch_feishu_departments(token) + department_count, dept_map = await self._upsert_feishu_departments( + db, raw_departments, tenant_id, now + ) + member_count, user_count = await self._upsert_feishu_members( + db, token, tenant_id, dept_map, now + ) + except Exception as exc: + logger.exception("[OrgSync] Feishu sync failed") + return {"error": str(exc)[:200], "provider": provider} + else: + corp_id = provider_config.get("corp_id", "").strip() + corp_secret = provider_config.get("corp_secret", "").strip() + if not corp_id or not corp_secret: + return {"error": "缺少企微 Corp ID 或 Corp Secret", "provider": provider} + try: + token, token_resp = await self._get_wecom_token(corp_id, corp_secret) + except Exception as exc: + return {"error": f"连接企微失败: {str(exc)[:100]}", "provider": provider} + if not token: + return { + "error": f"获取企微 token 失败 (errcode={token_resp.get('errcode')}: {token_resp.get('errmsg')})", + "provider": provider, + } + + try: + raw_departments = await self._fetch_wecom_departments(token) + department_count, dept_map = await self._upsert_wecom_departments( + db, raw_departments, tenant_id, now + ) + member_count, user_count = await self._upsert_wecom_members( + db, token, tenant_id, dept_map, now + ) + except Exception as exc: + logger.exception("[OrgSync] WeCom sync failed") + return {"error": str(exc)[:200], "provider": provider} + await self._set_last_synced_at(db, setting, provider, now.isoformat()) await db.commit() - stats = {"departments": dept_count, "members": member_count, "users_created": user_count, "synced_at": now.isoformat()} + stats = { + "provider": provider, + "departments": department_count, + "members": member_count, + "users_created": user_count, + "synced_at": now.isoformat(), + } logger.info(f"[OrgSync] Complete: {stats}") return stats diff --git a/backend/tests/test_org_sync.py b/backend/tests/test_org_sync.py new file mode 100644 index 00000000..9ad29f66 --- /dev/null +++ b/backend/tests/test_org_sync.py @@ -0,0 +1,241 @@ +import uuid +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException + +from app.api import enterprise as enterprise_api +from app.models.org import OrgDepartment, OrgMember +from app.models.user import User +from app.services import org_sync_service + + +class DummyScalars: + def __init__(self, values): + self._values = list(values) + + def all(self): + return list(self._values) + + +class DummyResult: + def __init__(self, value=None, values=None): + self._value = value + self._values = list(values or []) + + def scalar_one_or_none(self): + return self._value + + def scalars(self): + return DummyScalars(self._values) + + +class RecordingDB: + def __init__(self, responses=None): + self.responses = list(responses or []) + self.executed = [] + self.added = [] + self.committed = False + + async def execute(self, statement): + self.executed.append(statement) + if self.responses: + return self.responses.pop(0) + return DummyResult() + + def add(self, obj): + self.added.append(obj) + + async def commit(self): + self.committed = True + + +def make_user(**overrides): + values = { + "id": uuid.uuid4(), + "username": "alice", + "email": "alice@example.com", + "password_hash": "old-hash", + "display_name": "Alice", + "role": "member", + "tenant_id": uuid.uuid4(), + "is_active": True, + } + values.update(overrides) + return User(**values) + + +@pytest.mark.asyncio +async def test_org_sync_public_config_falls_back_to_legacy_feishu_setting(): + legacy_setting = SimpleNamespace( + value={ + "app_id": "cli_123", + "app_secret": "legacy-secret", + "last_synced_at": "2026-03-24T10:00:00+00:00", + } + ) + db = RecordingDB([DummyResult(None), DummyResult(legacy_setting)]) + + value = await org_sync_service.org_sync_service.get_public_config(db) + + assert value["provider"] == "feishu" + assert value["feishu"]["app_id"] == "cli_123" + assert value["feishu"]["app_secret"] == "" + assert value["feishu"]["has_secret"] is True + assert value["feishu"]["last_synced_at"] == "2026-03-24T10:00:00+00:00" + assert value["wecom"]["corp_id"] == "" + assert value["wecom"]["has_secret"] is False + + +@pytest.mark.asyncio +async def test_org_sync_public_config_redacts_provider_secrets(): + stored_setting = SimpleNamespace( + value={ + "provider": "wecom", + "feishu": {"app_id": "cli_123", "app_secret": "keep-feishu", "last_synced_at": None}, + "wecom": {"corp_id": "ww123", "corp_secret": "keep-wecom", "last_synced_at": None}, + } + ) + db = RecordingDB([DummyResult(stored_setting)]) + + value = await org_sync_service.org_sync_service.get_public_config(db) + + assert value["feishu"]["app_secret"] == "" + assert value["feishu"]["has_secret"] is True + assert value["wecom"]["corp_secret"] == "" + assert value["wecom"]["has_secret"] is True + + +@pytest.mark.asyncio +async def test_org_sync_save_config_preserves_existing_provider_secrets(): + existing_setting = SimpleNamespace( + key="org_sync", + value={ + "provider": "wecom", + "feishu": {"app_id": "cli_123", "app_secret": "keep-feishu", "last_synced_at": None}, + "wecom": {"corp_id": "ww123", "corp_secret": "keep-wecom", "last_synced_at": None}, + }, + ) + db = RecordingDB([DummyResult(existing_setting), DummyResult(existing_setting)]) + + saved = await org_sync_service.org_sync_service.save_config( + db, + { + "provider": "wecom", + "feishu": {"app_id": "cli_456", "app_secret": ""}, + "wecom": {"corp_id": "ww456", "corp_secret": ""}, + }, + ) + + assert saved["feishu"]["app_secret"] == "keep-feishu" + assert saved["wecom"]["corp_secret"] == "keep-wecom" + assert existing_setting.value["feishu"]["app_id"] == "cli_456" + assert existing_setting.value["wecom"]["corp_id"] == "ww456" + assert db.committed is True + + +@pytest.mark.asyncio +async def test_get_active_provider_uses_stored_secrets_for_sync(): + stored_setting = SimpleNamespace( + value={ + "provider": "wecom", + "feishu": {"app_id": "cli_123", "app_secret": "keep-feishu", "last_synced_at": None}, + "wecom": {"corp_id": "ww123", "corp_secret": "keep-wecom", "last_synced_at": None}, + } + ) + db = RecordingDB([DummyResult(stored_setting)]) + + provider, provider_config, setting = await org_sync_service.org_sync_service.get_active_provider(db) + + assert provider == "wecom" + assert provider_config["corp_id"] == "ww123" + assert provider_config["corp_secret"] == "keep-wecom" + assert setting["wecom"]["corp_secret"] == "keep-wecom" + + +@pytest.mark.asyncio +async def test_get_org_sync_setting_requires_admin(): + with pytest.raises(HTTPException) as excinfo: + await enterprise_api.get_system_setting( + key="org_sync", + current_user=make_user(role="member"), + db=RecordingDB(), + ) + + assert excinfo.value.status_code == 403 + assert excinfo.value.detail == "Admin access required" + + +@pytest.mark.asyncio +async def test_list_org_departments_filters_by_active_provider(monkeypatch): + wecom_dept = OrgDepartment( + wecom_id="2", + sync_provider="wecom", + name="Engineering", + member_count=3, + ) + db = RecordingDB([DummyResult(values=[wecom_dept])]) + + async def fake_get_active_provider(_db): + return "wecom", {"corp_id": "ww123"}, {"provider": "wecom"} + + monkeypatch.setattr(org_sync_service.org_sync_service, "get_active_provider", fake_get_active_provider) + + rows = await enterprise_api.list_org_departments( + tenant_id=None, + current_user=make_user(), + db=db, + ) + + assert rows == [ + { + "id": str(wecom_dept.id), + "provider": "wecom", + "feishu_id": None, + "wecom_id": "2", + "name": "Engineering", + "parent_id": None, + "path": None, + "member_count": 3, + } + ] + assert "sync_provider" in str(db.executed[0]) + + +@pytest.mark.asyncio +async def test_list_org_members_filters_by_active_provider(monkeypatch): + wecom_member = OrgMember( + wecom_user_id="zhangsan", + sync_provider="wecom", + name="张三", + email="zhangsan@example.com", + title="Engineer", + department_path="Root / Engineering", + ) + db = RecordingDB([DummyResult(values=[wecom_member])]) + + async def fake_get_active_provider(_db): + return "wecom", {"corp_id": "ww123"}, {"provider": "wecom"} + + monkeypatch.setattr(org_sync_service.org_sync_service, "get_active_provider", fake_get_active_provider) + + rows = await enterprise_api.list_org_members( + department_id=None, + search=None, + tenant_id=None, + current_user=make_user(), + db=db, + ) + + assert rows == [ + { + "id": str(wecom_member.id), + "provider": "wecom", + "name": "张三", + "email": "zhangsan@example.com", + "title": "Engineer", + "department_path": "Root / Engineering", + "avatar_url": None, + } + ] + assert "sync_provider" in str(db.executed[0]) diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 51e9a1b5..03b010a8 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -732,16 +732,36 @@ "edit": "Edit" }, "org": { + "directorySync": "Directory Sync Configuration", "feishuSync": "Feishu Directory Sync", + "feishuDescription": "Use the Feishu contact API to sync departments and members into the platform.", + "wecomDescription": "Use the WeCom contact API to sync departments and members into the platform.", + "provider": "Sync Provider", + "providerFeishu": "Feishu", + "providerWecom": "WeCom", "appId": "App ID", "appSecret": "App Secret", + "corpId": "Corp ID", + "corpSecret": "Corp Secret", + "feishuConfigIncomplete": "Enter both App ID and App Secret before syncing.", + "wecomConfigIncomplete": "Enter both Corp ID and Corp Secret before syncing.", "syncing": "Syncing...", "syncNow": "Sync Now", + "showConfig": "Show config", + "hideConfig": "Hide config", "syncComplete": "Sync complete: {{departments}} departments, {{members}} members", + "lastSync": "Last sync: {{time}}", "orgBrowser": "Organization Browser", "allDepartments": "All", + "selectedDepartment": "Selected department: {{name}}", "searchMembers": "Search members...", "member": "Member", + "memberDetails": "Member Details", + "selectMemberHint": "Select a member to view their details here.", + "departmentLabel": "Department", + "emailLabel": "Email", + "titleLabel": "Title", + "noTitle": "No title", "admin": "Admin", "noMembers": "No members" }, @@ -921,4 +941,4 @@ "ws_note": "WebSocket mode requires no public IP, callback URL, or domain verification (ICP). The connection is managed automatically." } } -} \ No newline at end of file +} diff --git a/frontend/src/i18n/zh.json b/frontend/src/i18n/zh.json index 08bef963..ae311e1e 100644 --- a/frontend/src/i18n/zh.json +++ b/frontend/src/i18n/zh.json @@ -802,16 +802,36 @@ "edit": "编辑" }, "org": { + "directorySync": "组织通讯录同步配置", "feishuSync": "飞书通讯录同步配置", + "feishuDescription": "使用飞书开放平台通讯录 API 将部门和成员同步到平台。", + "wecomDescription": "使用企业微信通讯录 API 将部门和成员同步到平台。", + "provider": "同步来源", + "providerFeishu": "飞书", + "providerWecom": "企业微信", "appId": "App ID", "appSecret": "App Secret", + "corpId": "Corp ID", + "corpSecret": "Corp Secret", + "feishuConfigIncomplete": "请先填写 App ID 和 App Secret 再执行同步。", + "wecomConfigIncomplete": "请先填写 Corp ID 和 Corp Secret 再执行同步。", "syncing": "同步中...", "syncNow": "同步", + "showConfig": "展开配置", + "hideConfig": "收起配置", "syncComplete": "同步完成:{{departments}} 个部门,{{members}} 个成员", + "lastSync": "上次同步:{{time}}", "orgBrowser": "组织架构浏览", "allDepartments": "全部", + "selectedDepartment": "当前部门:{{name}}", "searchMembers": "搜索成员...", "member": "成员", + "memberDetails": "成员详情", + "selectMemberHint": "点击左侧成员查看详情卡片。", + "departmentLabel": "部门", + "emailLabel": "邮箱", + "titleLabel": "职位", + "noTitle": "未设置职位", "admin": "管理员", "noMembers": "暂无成员" }, diff --git a/frontend/src/pages/EnterpriseSettings.tsx b/frontend/src/pages/EnterpriseSettings.tsx index a6b1dad0..55dbc0b5 100644 --- a/frontend/src/pages/EnterpriseSettings.tsx +++ b/frontend/src/pages/EnterpriseSettings.tsx @@ -59,35 +59,111 @@ const FALLBACK_LLM_PROVIDERS: LLMProviderSpec[] = [ // ─── Department Tree ─────────────────────────────── -function DeptTree({ departments, parentId, selectedDept, onSelect, level }: { - departments: any[]; parentId: string | null; selectedDept: string | null; - onSelect: (id: string | null) => void; level: number; +interface OrgDepartmentNode { + id: string; + provider: string; + feishu_id?: string | null; + wecom_id?: string | null; + name: string; + parent_id: string | null; + path: string; + member_count: number; +} + +interface OrgMemberEntry { + id: string; + provider: string; + name: string; + email: string; + title: string; + department_path: string; + avatar_url?: string | null; +} + +interface OrgSyncProviderConfig { + app_id?: string; + app_secret?: string; + corp_id?: string; + corp_secret?: string; + has_secret?: boolean; + last_synced_at?: string | null; +} + +interface OrgSyncConfigResponse { + provider: 'feishu' | 'wecom'; + feishu?: OrgSyncProviderConfig; + wecom?: OrgSyncProviderConfig; +} + +function DeptTree({ childrenByParent, parentId, selectedDept, expandedDeptIds, onSelect, onToggle, level }: { + childrenByParent: Map; + parentId: string | null; + selectedDept: string | null; + expandedDeptIds: Record; + onSelect: (id: string | null) => void; + onToggle: (id: string) => void; + level: number; }) { - const children = departments.filter((d: any) => - parentId === null ? !d.parent_id : d.parent_id === parentId - ); + const children = childrenByParent.get(parentId) || []; if (children.length === 0) return null; return ( <> - {children.map((d: any) => ( + {children.map((d) => { + const hasChildren = (childrenByParent.get(d.id) || []).length > 0; + const isExpanded = !!expandedDeptIds[d.id]; + return (
onSelect(d.id)} > - - {departments.some((c: any) => c.parent_id === d.id) ? '▸' : '·'} - - {d.name} - {d.member_count > 0 && ({d.member_count})} + + {d.name} + {d.member_count > 0 && ({d.member_count})}
- + {(!hasChildren || isExpanded) && ( + + )}
- ))} + ); + })} ); } @@ -96,29 +172,45 @@ function DeptTree({ departments, parentId, selectedDept, onSelect, level }: { function OrgTab() { const { t } = useTranslation(); const qc = useQueryClient(); - const [syncForm, setSyncForm] = useState({ app_id: '', app_secret: '' }); + const [syncForm, setSyncForm] = useState({ + provider: 'feishu', + feishu: { app_id: '', app_secret: '' }, + wecom: { corp_id: '', corp_secret: '' }, + }); const [syncing, setSyncing] = useState(false); const [syncResult, setSyncResult] = useState(null); const [memberSearch, setMemberSearch] = useState(''); const [selectedDept, setSelectedDept] = useState(null); + const [selectedMemberId, setSelectedMemberId] = useState(null); + const [expandedDeptIds, setExpandedDeptIds] = useState>({}); + const [showSyncConfig, setShowSyncConfig] = useState(true); - const { data: config } = useQuery({ - queryKey: ['system-settings', 'feishu_org_sync'], - queryFn: () => fetchJson('/enterprise/system-settings/feishu_org_sync'), + const { data: config } = useQuery<{ value: OrgSyncConfigResponse }>({ + queryKey: ['system-settings', 'org_sync'], + queryFn: () => fetchJson('/enterprise/system-settings/org_sync'), }); useEffect(() => { - if (config?.value?.app_id) { - setSyncForm({ app_id: config.value.app_id, app_secret: '' }); - } + if (!config?.value) return; + setSyncForm({ + provider: config.value.provider || 'feishu', + feishu: { + app_id: config.value.feishu?.app_id || '', + app_secret: '', + }, + wecom: { + corp_id: config.value.wecom?.corp_id || '', + corp_secret: '', + }, + }); }, [config]); const currentTenantId = localStorage.getItem('current_tenant_id') || ''; - const { data: departments = [] } = useQuery({ + const { data: departments = [] } = useQuery({ queryKey: ['org-departments', currentTenantId], queryFn: () => fetchJson(`/enterprise/org/departments${currentTenantId ? `?tenant_id=${currentTenantId}` : ''}`), }); - const { data: members = [] } = useQuery({ + const { data: members = [] } = useQuery({ queryKey: ['org-members', selectedDept, memberSearch, currentTenantId], queryFn: () => { const params = new URLSearchParams(); @@ -129,54 +221,205 @@ function OrgTab() { }, }); + const childrenByParent = useMemo(() => { + const tree = new Map(); + for (const department of departments) { + const key = department.parent_id || null; + const siblings = tree.get(key) || []; + siblings.push(department); + tree.set(key, siblings); + } + for (const siblings of tree.values()) { + siblings.sort((a, b) => a.name.localeCompare(b.name)); + } + return tree; + }, [departments]); + + const selectedDepartment = useMemo( + () => departments.find((department) => department.id === selectedDept) || null, + [departments, selectedDept], + ); + + const selectedMember = useMemo( + () => members.find((member) => member.id === selectedMemberId) || null, + [members, selectedMemberId], + ); + + useEffect(() => { + if (departments.length === 0) return; + setExpandedDeptIds((prev) => { + const next = { ...prev }; + let changed = false; + for (const department of departments) { + if (!department.parent_id && next[department.id] === undefined) { + next[department.id] = true; + changed = true; + } + } + return changed ? next : prev; + }); + }, [departments]); + + useEffect(() => { + if (members.length === 0) { + setSelectedMemberId(null); + return; + } + if (!selectedMemberId || !members.some((member) => member.id === selectedMemberId)) { + setSelectedMemberId(members[0].id); + } + }, [members, selectedMemberId]); + const saveConfig = async () => { - await fetchJson('/enterprise/system-settings/feishu_org_sync', { + await fetchJson('/enterprise/system-settings/org_sync', { method: 'PUT', - body: JSON.stringify({ value: { app_id: syncForm.app_id, app_secret: syncForm.app_secret } }), + body: JSON.stringify({ + value: { + provider: syncForm.provider, + feishu: syncForm.feishu, + wecom: syncForm.wecom, + }, + }), }); - qc.invalidateQueries({ queryKey: ['system-settings', 'feishu_org_sync'] }); + qc.invalidateQueries({ queryKey: ['system-settings', 'org_sync'] }); }; const triggerSync = async () => { + if (!canSync) { + setSyncResult({ + error: activeProvider === 'wecom' + ? t('enterprise.org.wecomConfigIncomplete') + : t('enterprise.org.feishuConfigIncomplete'), + }); + return; + } + setSyncing(true); setSyncResult(null); try { - if (syncForm.app_secret) await saveConfig(); + await saveConfig(); const result = await fetchJson('/enterprise/org/sync', { method: 'POST' }); setSyncResult(result); qc.invalidateQueries({ queryKey: ['org-departments'] }); qc.invalidateQueries({ queryKey: ['org-members'] }); + qc.invalidateQueries({ queryKey: ['system-settings', 'org_sync'] }); } catch (e: any) { setSyncResult({ error: e.message }); } setSyncing(false); }; + const activeProvider = syncForm.provider; + const activeLastSyncedAt = activeProvider === 'wecom' + ? config?.value?.wecom?.last_synced_at + : config?.value?.feishu?.last_synced_at; + const activeIdentifier = activeProvider === 'wecom' + ? syncForm.wecom.corp_id + : syncForm.feishu.app_id; + const activeSecret = activeProvider === 'wecom' + ? syncForm.wecom.corp_secret + : syncForm.feishu.app_secret; + const storedActiveConfig = activeProvider === 'wecom' + ? config?.value?.wecom + : config?.value?.feishu; + const storedActiveIdentifier = activeProvider === 'wecom' + ? storedActiveConfig?.corp_id + : storedActiveConfig?.app_id; + const hasStoredSecretForCurrentIdentifier = Boolean( + storedActiveConfig?.has_secret && storedActiveIdentifier === activeIdentifier, + ); + const canSync = Boolean(activeIdentifier && (activeSecret || hasStoredSecretForCurrentIdentifier)); + + const toggleDepartment = (id: string) => { + setExpandedDeptIds((prev) => ({ ...prev, [id]: !prev[id] })); + }; + return (
{/* Sync Config */}
-

{t('enterprise.org.feishuSync')}

-

- {t('enterprise.org.feishuSync')} -

-
-
- - setSyncForm({ ...syncForm, app_id: e.target.value })} placeholder="cli_xxxxxxxx" /> -
-
- - setSyncForm({ ...syncForm, app_secret: e.target.value })} placeholder={config?.value?.app_id ? '' : ''} /> +
+
+

{t('enterprise.org.directorySync')}

+
+ {activeProvider === 'wecom' ? t('enterprise.org.providerWecom') : t('enterprise.org.providerFeishu')} + {activeLastSyncedAt && {t('enterprise.org.lastSync', { time: new Date(activeLastSyncedAt).toLocaleString() })}} +
+
-
- - {config?.value?.last_synced_at && ( + {!showSyncConfig && ( - Last sync: {new Date(config.value.last_synced_at).toLocaleString()} + {activeProvider === 'wecom' ? syncForm.wecom.corp_id : syncForm.feishu.app_id} )}
@@ -194,33 +437,106 @@ function OrgTab() {
{t('enterprise.org.allDepartments')}
setSelectedDept(null)} > {t('common.all')}
- + {departments.length === 0 &&
{t('common.noData')}
}
-
- setMemberSearch(e.target.value)} style={{ marginBottom: '12px', fontSize: '13px' }} /> -
- {members.map((m: any) => ( -
-
- {m.name?.[0] || '?'} -
-
-
{m.name}
-
- {m.title || '-'} · {m.department_path || '-'} - {m.email && ` · ${m.email}`} +
+
+ setMemberSearch(e.target.value)} style={{ flex: 1, minWidth: '220px', fontSize: '13px' }} /> +
+ {selectedDepartment ? t('enterprise.org.selectedDepartment', { name: selectedDepartment.name }) : t('enterprise.org.allDepartments')} +
+
+
+
+ {members.map((m) => ( +
+
+
{m.name}
+
+ {m.title || t('enterprise.org.noTitle')} +
+
+ {m.department_path || '-'} +
+
+ + ))} + {members.length === 0 &&
{t('enterprise.org.noMembers')}
} +
+
+
+ {t('enterprise.org.memberDetails')}
- ))} - {members.length === 0 &&
{t('enterprise.org.noMembers')}
} + {selectedMember ? ( + <> +
+
+ {selectedMember.name?.[0] || '?'} +
+
+
{selectedMember.name}
+
{selectedMember.title || t('enterprise.org.noTitle')}
+
+
+
+
+
{t('enterprise.org.departmentLabel')}
+
{selectedMember.department_path || '-'}
+
+
+
{t('enterprise.org.emailLabel')}
+
{selectedMember.email || '-'}
+
+
+
{t('enterprise.org.titleLabel')}
+
{selectedMember.title || t('enterprise.org.noTitle')}
+
+
+
{t('enterprise.org.provider')}
+
{selectedMember.provider === 'wecom' ? t('enterprise.org.providerWecom') : t('enterprise.org.providerFeishu')}
+
+
+ + ) : ( +
+ {t('enterprise.org.selectMemberHint')} +
+ )} +
From f22c17b8e82059ac35e649ecbca2c79dd1d32fde Mon Sep 17 00:00:00 2001 From: Atlas Date: Fri, 27 Mar 2026 17:02:47 +0800 Subject: [PATCH 2/2] Keep org sync migration on the latest mainline Alembic head Upstream/main gained a new OrgMember transliteration migration after the sync-only branch was originally cut. Retarget the provider-aware org sync migration so PR #183 continues to merge as a single-head schema change instead of reintroducing a forked upgrade path. Constraint: PR #183 must remain mergeable after upstream added be48e94fa052 Rejected: Leave dual heads and rely on a later merge migration | unnecessary schema complexity for a feature branch Confidence: high Scope-risk: narrow Reversibility: clean Directive: Recheck Alembic heads whenever upstream/main adds migrations before pushing long-lived PR branches Tested: cd backend && .venv/bin/alembic heads Tested: cd backend && .venv/bin/python -m pytest tests/test_org_sync.py Not-tested: Full alembic upgrade/downgrade against a live database snapshot --- backend/alembic/versions/add_wecom_org_sync_fields.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/alembic/versions/add_wecom_org_sync_fields.py b/backend/alembic/versions/add_wecom_org_sync_fields.py index e7f0646a..710159c8 100644 --- a/backend/alembic/versions/add_wecom_org_sync_fields.py +++ b/backend/alembic/versions/add_wecom_org_sync_fields.py @@ -1,13 +1,13 @@ """Add provider-aware org sync fields. Revision ID: add_wecom_org_sync_fields -Revises: add_daily_token_usage +Revises: be48e94fa052 """ from alembic import op revision = "add_wecom_org_sync_fields" -down_revision = "add_daily_token_usage" +down_revision = "be48e94fa052" branch_labels = None depends_on = None