feat(mcp): 支持动态鉴权连接管理并优化运行态稳定性#749
Conversation
延续 f51bae0,将 BaseContext.mcp_user_id 字段在 SkillsMiddleware 加载 MCP 工具时统一读取为 work_id,与 runtime_config_middleware 保持一致。同步更新对应测试夹具与字段名。
- run_queue_service.list_run_stream_events 恢复 after_seq 排他游标
(f"({after_seq}"),修复 list_run_stream_events 轮询时同 seq 重复返回导致
agent_run_service 死循环
- test_chat_service_langfuse_stream 补 current_user.user_id 字段并更新
input_context 断言(work_id/department_id)
- test_chat_stream_attachment_materialize 补 _materialize_attachment_files
必填 user_id 参数
- 修复由废弃 McpConnectionService Facade 并底层直连 proxy_service 引发的参数注入不匹配问题 - 修复并优化 test_mcp_auth_proxy_service 中对于 httpx Mock 的 StreamConsumed 处理及重试模拟逻辑 - 更新 test_chat_service_langfuse_stream 测试中对 AgentConfigRepository 的 Mock 注入依赖 - 修正 mcp_internal_router 集成测试中 Header 缺失时默认抛出 401 Unauthorized 而非 422 的断言 - 重命名集成测试目录下 test_mcp_router.py 为 test_integration_mcp_router.py,解决 Pytest 全量测试模块命名冲突问题
1. 针对 DynamicMCPTokenAuth 数据库频繁查询问题,在 client_pool.py 中引入 15 秒 TTL 在内存缓存并提供联动清理接口; 2. 修复 _normalize_token_payload 对 naive datetime 默认填充时区的偏差,避免 token 无限自动刷新的 Bug; 3. 改进 _calculate_config_hash 在遇到非 JSON 序列化对象时对 json.dumps 降级保护,避免序列化崩溃; 4. 补齐相关功能的单元测试,并修正部分 Module 层的 import 格式。
- 修复 `client_pool.py` 中由于 cache revision 变化导致的旧 session 实例泄漏问题 - 将 `clear_mcp_cache` 调整为异步函数并调用 `shutdown`,防止清理缓存时产生孤儿子进程 - 增加对 `_resolved_headers_cache` 字典的惰性清理,避免无界膨胀 - 修复 `connection_service.py` 唯一性约束冲突未捕获导致 HTTP 500 的问题,改抛 ValueError - 修正 `proxy_service.py` 的 sse read timeout 配置和 Authorization 头部被覆盖漏洞 - 移除遗留冗余脚本 `fix_mcp_service_imports.py` 和 `fix_tests.py`
1. 密码学安全升级:弃用单纯 SHA-256,采用 V2 HKDF 派生密钥加盐加密,并兼容解析 V1 遗留凭据。 2. 修复并发与连接泄露:重构 proxy_service 以复用共享 httpx 客户端,并在 client_pool 中采用 Future 占位模式替代全局协程锁,提升启动并发效率。 3. 缓存优化:引入 cachetools,使用 LRUCache 与 TTLCache 替换无界字典,防止内存泄漏。 4. 数据库一致性修复:在 server_service 删除实例前提前清理 Redis 缓存,防止级联删除后遗失追踪句柄。 5. 测试修复:全面修复因 httpx.Timeout、过期 TTL 以及代理环境变量带来的测试失败问题。
…则直接跳过连接鉴权校验,无需配置空连接即可直接测试和使用
… user_id 字段与 id 属性不匹配导致的连接读取失效
- 重构 test_mcp_server 路由中的鉴权连接校验逻辑,复用 get_all_mcp_tools 中的连接解析
- 捕获 ValueError('Active MCP connection not found') 以在未配置连接时精确返回 400 友好报错
- 补充对应的集成测试用例,验证在绑定连接时测试端点能够成功加载工具
…n 测试传入 work_id;client pool hash 忽略代理 JWT
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive MCP multi-authentication orchestration and internal proxy system, adding support for various authentication providers and centralizing token retrieval, caching, and automatic 401 retry logic via an internal proxy route. It also implements a long-lived client session pool, Redis-based manifest caching, and fine-grained access control for user and department scopes. The review feedback highlights a critical bug in the 401 retry flow where the local memory cache is not cleared alongside the Redis token cache, which would cause retries to fail. Additionally, it identifies potential AttributeError exceptions in both RuntimeConfigMiddleware and SkillsMiddleware if the context object is None during tool loading.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
|
||
| _mark_reauth_required(connection, "MCP upstream returned 401 after retry") |
There was a problem hiding this comment.
当上游 MCP 服务返回 401 时,虽然从 Redis token_cache 中删除了失效的 token,但 client_pool.py 中本地内存缓存 _resolved_headers_cache(15秒 TTL)依然残留着旧的 Header。这会导致紧接着的第二次重试请求直接命中内存缓存并继续发送失效的旧 token,从而导致重试必定失败。建议在删除 Redis 缓存的同时,调用 clear_server_resolved_headers_cache(server.name) 清理本地内存缓存。
| _mark_reauth_required(connection, "MCP upstream returned 401 after retry") | |
| from yuxi.services.mcp.client_pool import clear_server_resolved_headers_cache | |
| clear_server_resolved_headers_cache(server.name) | |
| if connection is not None and getattr(connection, "id", None) is not None: | |
| await token_cache.delete_access_token(connection.id) |
| user_id = getattr(context, "user_id", None) | ||
| work_id = getattr(context, "work_id", None) | ||
| mcp_tools = await get_enabled_mcp_tools( | ||
| server_name, | ||
| auth_context=AuthContext( | ||
| user_id=user_id, | ||
| department_id=getattr(context, "department_id", None), | ||
| work_id=work_id, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
在运行时,context 可能会因为 runtime_context 未能成功解析而为 None。直接对 context 调用 getattr 会触发 AttributeError 异常。建议在调用前增加 None 安全保护,以提高运行时的稳定性。
| user_id = getattr(context, "user_id", None) | |
| work_id = getattr(context, "work_id", None) | |
| mcp_tools = await get_enabled_mcp_tools( | |
| server_name, | |
| auth_context=AuthContext( | |
| user_id=user_id, | |
| department_id=getattr(context, "department_id", None), | |
| work_id=work_id, | |
| ), | |
| ) | |
| user_id = getattr(context, "user_id", None) if context is not None else None | |
| work_id = getattr(context, "work_id", None) if context is not None else None | |
| mcp_tools = await get_enabled_mcp_tools( | |
| server_name, | |
| auth_context=AuthContext( | |
| user_id=user_id, | |
| department_id=getattr(context, "department_id", None) if context is not None else None, | |
| work_id=work_id, | |
| ), | |
| ) |
| user_id = getattr(context, "user_id", None) | ||
| work_id = getattr(context, "work_id", None) | ||
| mcp_tools = await get_enabled_mcp_tools( | ||
| server_name, | ||
| auth_context=AuthContext( | ||
| user_id=user_id, | ||
| department_id=getattr(context, "department_id", None), | ||
| work_id=work_id, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
类似于运行时配置中间件,这里的 context 在某些调用上下文中也可能为 None。直接使用 getattr(context, ...) 会导致 AttributeError 崩溃。建议增加 None 安全保护。
| user_id = getattr(context, "user_id", None) | |
| work_id = getattr(context, "work_id", None) | |
| mcp_tools = await get_enabled_mcp_tools( | |
| server_name, | |
| auth_context=AuthContext( | |
| user_id=user_id, | |
| department_id=getattr(context, "department_id", None), | |
| work_id=work_id, | |
| ), | |
| ) | |
| user_id = getattr(context, "user_id", None) if context is not None else None | |
| work_id = getattr(context, "work_id", None) if context is not None else None | |
| mcp_tools = await get_enabled_mcp_tools( | |
| server_name, | |
| auth_context=AuthContext( | |
| user_id=user_id, | |
| department_id=getattr(context, "department_id", None) if context is not None else None, | |
| work_id=work_id, | |
| ), | |
| ) |
|
15000 行,大佬,这完全没法 review 啊 😭 |
确实有点尴尬,我让AI再检视看看 然后看了下,主要还是测试代码多了好多。功能本身还好吧,后端用了设计模式,前端几个组件 当前分支
|
变更描述
简要描述这个 PR 做了什么
变更类型
测试
相关日志或者截图

说明
(可选)有什么需要特别说明的吗?
💡 提示: 提交前可以运行
make lint和make format检查代码规范