diff --git a/fastdeploy/engine/common_engine_prepare_mixin.py b/fastdeploy/engine/common_engine_prepare_mixin.py index c6ea2f3ee9d..4129dfec664 100644 --- a/fastdeploy/engine/common_engine_prepare_mixin.py +++ b/fastdeploy/engine/common_engine_prepare_mixin.py @@ -89,9 +89,15 @@ def _fetch_request_decode(self) -> bool: def _fetch_request_prefill(self) -> bool: """Fetch and prepare requests for a prefill instance. Returns True if tasks were fetched.""" + max_inflight_prefill = envs.FD_MAX_INFLIGHT_PREFILL + inflight_prefill = len(self.resource_manager.running) + if inflight_prefill >= max_inflight_prefill: + return False + available_for_new = max_inflight_prefill - inflight_prefill num_prefill_batch = min( int(self.resource_manager.available_batch()), self.cfg.max_prefill_batch, + available_for_new, ) max_num_batched_tokens = self.cfg.scheduler_config.max_num_batched_tokens available_blocks = self.cfg.cache_config.max_block_num_per_seq diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 61b6b1e31d1..0979159f339 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -196,6 +196,7 @@ def _validate_split_kv_size(value: int) -> int: # Number of worker threads for prepare requests in prefill instance "FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")), "FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")), + "FD_MAX_INFLIGHT_PREFILL": lambda: int(os.getenv("FD_MAX_INFLIGHT_PREFILL", "20")), # Timeout (seconds) for D to reclaim preallocated blocks if P never follows through. 0 to disable. "FD_DECODE_PREALLOC_BLOCK_TIMEOUT": lambda: int(os.getenv("FD_DECODE_PREALLOC_BLOCK_TIMEOUT", "1200")), "FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int(