Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fastdeploy/engine/common_engine_prepare_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,15 @@ def _fetch_request_decode(self) -> bool:

def _fetch_request_prefill(self) -> bool:
"""Fetch and prepare requests for a prefill instance. Returns True if tasks were fetched."""
max_inflight_prefill = envs.FD_MAX_INFLIGHT_PREFILL
inflight_prefill = len(self.resource_manager.running)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 这里用 len(self.resource_manager.running) 计算剩余额度,无法限制默认的多 fetch 线程场景。

_prepare_request_v1() 在 prefill 角色会按 FD_PREFILL_PREPARE_REQ_THREAD_NUM 启动多个 _fetch_loop(默认 3 个线程)。每个线程都会在请求还没有执行到 add_request_in_p() 之前读取同一个 running 长度并调用 scheduler.get_requests(batch=available_for_new);而这些已出队、正在申请 D 侧资源或异步预处理的请求尚未进入 running。因此 FD_MAX_INFLIGHT_PREFILL=20 时,3 个线程可以同时各拉 20 个请求,实际 inflight 变成 60,限制失效并继续放大 PD 侧资源压力。

建议修复方式:
把“检查剩余额度”和“登记已占用额度”放到同一个共享临界区,例如在 prefill fetch 入口维护一个受锁保护的 pending 计数,出队前先 reserve available_for_new,请求失败或完成后再释放;或者将 FD_MAX_INFLIGHT_PREFILL 的判断移动到 ResourceManagerV1 内部,与 preallocate_resource_in_p() / add_request_in_p() 共用锁,并把已出队但未进入 running 的 pending 请求也计入上限。

if inflight_prefill >= max_inflight_prefill:
return False
available_for_new = max_inflight_prefill - inflight_prefill
num_prefill_batch = min(
int(self.resource_manager.available_batch()),
self.cfg.max_prefill_batch,
available_for_new,
)
max_num_batched_tokens = self.cfg.scheduler_config.max_num_batched_tokens
available_blocks = self.cfg.cache_config.max_block_num_per_seq
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def _validate_split_kv_size(value: int) -> int:
# Number of worker threads for prepare requests in prefill instance
"FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")),
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
"FD_MAX_INFLIGHT_PREFILL": lambda: int(os.getenv("FD_MAX_INFLIGHT_PREFILL", "20")),
# Timeout (seconds) for D to reclaim preallocated blocks if P never follows through. 0 to disable.
"FD_DECODE_PREALLOC_BLOCK_TIMEOUT": lambda: int(os.getenv("FD_DECODE_PREALLOC_BLOCK_TIMEOUT", "1200")),
"FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int(
Expand Down
Loading