[Engine] Revert TTFT optimize (#6680) and add EP batched token scheduler#7791
Conversation
|
Thanks for your contribution! |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览CI 有 1 个 Required 任务失败,另有 1 个运行中、4 个等待中,合并被阻塞,请优先处理 Required 失败任务。
2 任务状态汇总2.1 Required任务 : 2/8 通过
2.2 可选任务 — 16/20 通过
3 失败详情(仅 required)Approval — 流程审批(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请相关RD(jiangjiajun等/zhouchong等)在PR上审批 链接: 查看日志 |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-12 18:08:51
📋 Review 摘要
PR 概述:回滚 EP 模式 TTFT 优化(删除 engine_forward_signal 机制),并在 dp_scheduler 中引入基于超时的批量请求收集策略
变更范围:engine/common_engine.py、scheduler/dp_scheduler.py、worker/worker_process.py、splitwise/
影响面 Tag:[Engine] [Scheduler] [PD Disaggregation]
📝 PR 规范检查
标题 [for test] revert ttft optimize 中 [for test] 不属于 checklist §D1 官方 Tag 列表;PR 描述各必填段落均为空(仅保留模板注释)。
标题建议(可直接复制):
[Engine] Revert TTFT optimize and add EP batched token scheduler
PR 描述建议(可直接复制):
## Motivation
回滚 TTFT(Time to First Token)优化。该优化通过 `engine_forward_signal` IPC 信号在 EP(Expert Parallel)模式下同步调度器与 Worker 前向执行状态。本次回滚删除该信号机制,并在 `dp_scheduler` 中引入基于 `FD_EP_BATCHED_TOKEN_TIMEOUT` 超时的批量请求收集策略作为替代方案。
## Modifications
- `fastdeploy/engine/common_engine.py`:删除 `engine_forward_signal` IPCSignal;重构调度循环,先检查 `exist_tasks()` 再提交请求拉取;移除 EP 模式下发送空任务批到 Worker 的逻辑;`RuntimeError` 处理仅保留在 mixed 分支
- `fastdeploy/scheduler/dp_scheduler.py`:新增资源前置检查;将单请求拉取改为带超时的批量请求收集循环,引入 `FD_EP_BATCHED_TOKEN_TIMEOUT` 环境变量(默认 0.1s)
- `fastdeploy/envs.py` / `docs/`:新增 `FD_EP_BATCHED_TOKEN_TIMEOUT` 环境变量
- `fastdeploy/splitwise/internal_adapter_utils.py`:移除 `ENABLE_V1_KVCACHE_SCHEDULER` 条件分支,简化 `unhandled_request_num` 计算
- `fastdeploy/worker/worker_process.py`:删除 `engine_forward_signal`;将 `_run_eplb` 移至 event loop 顶部;删除 EP prefill barrier 及空任务 assert 断言
- `tests/`:同步更新 `test_common_engine.py`、`test_dp_scheduler.py`、`test_internal_adapter_utils.py`、`test_metrics.py`
## Usage or Command
N/A
## Accuracy Tests
N/A
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | fastdeploy/engine/common_engine.py:1091 |
非 mixed 模式下 get_request_pool.submit() 缺少 RuntimeError 捕获,线程池关闭时调度线程会崩溃 |
| 🔴 Bug | fastdeploy/scheduler/dp_scheduler.py:161 |
current_prefill_tokens / required_total_blocks 在请求被拒绝前已累加,外层 while 的 token 预算检查可能被误触发,导致后续可调度的小请求被跳过 |
| ❓ 疑问 | fastdeploy/scheduler/dp_scheduler.py:183 |
日志信息与实际语义相反:len(requests)==0 说明无任何请求入队,但日志显示"has put all just-pulled request into the queue" |
总体评价
此 PR 回滚了 EP 模式的 TTFT 优化并引入新的批量调度策略,核心逻辑变动较大。dp_scheduler.py 的计数器提前累加是正确性 Bug,可能导致部分有效请求被漏调度;common_engine.py 非 mixed 路径遗漏异常处理存在稳定性风险。建议修复两处 P1 问题后再合入。
| if self.cfg.scheduler_config.splitwise_role != "mixed": | ||
| if not is_fetching: | ||
| is_fetching = True | ||
| get_request_pool.submit(_fetch_request) |
There was a problem hiding this comment.
🔴 Bug 非 mixed 模式下 get_request_pool.submit(_fetch_request) 缺少 RuntimeError 捕获。
mixed 模式的 else 分支保留了完整的 try/except 处理(线程池关闭时优雅退出),但非 mixed 路径没有。当引擎关闭时,ThreadPoolExecutor 会抛出 RuntimeError: cannot schedule new futures after shutdown,将直接导致调度线程崩溃。
建议修复:
if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
try:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
raise| for request_id in batch_ids: | ||
| request = self.requests[request_id] | ||
| required_input_blocks = self.calc_required_blocks(request.prompt_tokens_ids_len, block_size) | ||
| current_prefill_tokens += request.prompt_tokens_ids_len |
There was a problem hiding this comment.
🔴 Bug current_prefill_tokens 和 required_total_blocks 在请求被拒绝 之前 已完成累加,当 required_total_blocks > available_blocks 触发 break 时,这两个计数器已包含被拒绝请求的数据。
外层 while 循环下一次迭代时,wait_for 仍会返回同一批 batch_ids(游标未推进),进入 for 循环后计数器会被再次累加,导致:
current_prefill_tokens虚高,可能误触发外层current_prefill_tokens > max_num_batched_tokens检查,提前退出循环,后续体积较小、本可调度的请求(如 req3)被跳过。- 外层循环只能靠超时(
FD_EP_BATCHED_TOKEN_TIMEOUT= 0.1s)退出,浪费等待时间。
建议在 break 之前检查而非之后累加:
if required_total_blocks + required_input_blocks + reserved_output_blocks > available_blocks:
break # 先判断,通过后再累加
current_prefill_tokens += request.prompt_tokens_ids_len
required_total_blocks += required_input_blocks + reserved_output_blocks| if batch_ids: | ||
| if len(batch_ids) > 0 and len(requests) == 0: | ||
| self.scheduler_logger.debug( | ||
| f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}" |
There was a problem hiding this comment.
❓ 疑问 日志信息语义与条件相反。
此处触发条件为 len(batch_ids) > 0 and len(requests) == 0,即拉取到了请求 ID 但 一个请求都没有入队(全部因资源不足被拒绝)。然而日志显示 "Scheduler has put all just-pulled request into the queue",容易误导排查方向。
建议改为:
self.scheduler_logger.debug(
f"No requests could be scheduled due to insufficient resources, pending={len(batch_ids)}"
)
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7791 +/- ##
==========================================
Coverage ? 63.83%
==========================================
Files ? 458
Lines ? 63515
Branches ? 9731
==========================================
Hits ? 40543
Misses ? 20201
Partials ? 2771
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Motivation
通过实际测试,#6680 导致k3_kl_mean 相比branch release2.6上涨50倍;由于相关部分已经被多次修改,代码已经无法自动通过git revert
Modifications
Revert 结果
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.