From 693d90c49139bba9241f009ca093b02afe7c082a Mon Sep 17 00:00:00 2001 From: kevincheng2 Date: Wed, 10 Jun 2026 15:56:40 +0800 Subject: [PATCH 1/2] [BugFix] fix consume_signals barrier deadlock in PD separation In PD separation mode, different ranks may receive cache_info at different times. When consume_signals gets a layer0 signal, some ranks find the engine_idx already in idx_cache_task_dict (ready) while others don't (pending). This causes different ranks to put different batch_engine_signals into the queue, leading to mismatched finish_send_cache_barrier.wait() calls and deadlock. Fix: route all layer0 signals through pending_layer0_signals uniformly, then immediately recover any that already have cache_info registered. Each recovered signal is put into the queue individually (single-request batch) to ensure all ranks have identical batch granularity regardless of recovery timing. --- fastdeploy/cache_manager/cache_messager.py | 28 ++++++++++------------ 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index 33407b785e5..90479509ad6 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -892,8 +892,6 @@ def consume_signals(self): layer_id = kv_signal_data[1].item() if layer_id == self.num_layers - 1: logger.info(f"tasks_count: {tasks_count}, layer_id: {layer_id} self.rank_id {self.rank_id}") - ready_engine_signals = [] - pending_engine_signals = [] # format for signal to put in cache_prefilled_engine_ids_queue: [(engine_idx1, prefilled_token_num1), (engine_idx2, prefilled_token_num2)] with self.engine_cache_task_thread_lock: for bi in range(tasks_count): @@ -904,21 +902,19 @@ def consume_signals(self): self.engine_cache_tasks[engine_idx]["prefilled_layer_idx"] = layer_id self.engine_cache_tasks[engine_idx]["prefilled_token_num"] = prefilled_token_num if layer_id == 0: - if engine_idx in self.idx_cache_task_dict: - ready_engine_signals.append((engine_idx, prefilled_token_num)) - else: - pending_engine_signals.append((engine_idx, prefilled_token_num)) - if pending_engine_signals: - with self.pending_layer0_signal_lock: - for engine_idx, prefilled_token_num in pending_engine_signals: + with self.pending_layer0_signal_lock: self.pending_layer0_signals[engine_idx] = (engine_idx, prefilled_token_num) - if pending_engine_signals: - logger.debug(f"cache_task_pending_layer0_signal: {pending_engine_signals}") - if ready_engine_signals: - logger.info( - f"Put batch_engine_signals {ready_engine_signals} into cache_prefilled_engine_ids_queue" - ) - self.cache_prefilled_engine_ids_queue.put(ready_engine_signals) + # Recover signals for engine_idxs that already have cache_info registered. + # This handles the case where cache_info arrives before layer0 signal. + recovered_signals = [] + with self.pending_layer0_signal_lock: + for engine_idx in list(self.pending_layer0_signals.keys()): + if engine_idx in self.idx_cache_task_dict: + recovered_signals.append(self.pending_layer0_signals.pop(engine_idx)) + if recovered_signals: + for signal in recovered_signals: + logger.info(f"consume_signals recovered signal: {signal}") + self.cache_prefilled_engine_ids_queue.put([signal]) except Exception as e: logger.error(f"Consume signals get exception: {e}") From c94fa98243fc97bf7c31596151f33eb116f35cf6 Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Fri, 26 Jun 2026 13:26:37 +0800 Subject: [PATCH 2/2] cp 7981 --- fastdeploy/engine/common_engine_prepare_mixin.py | 6 ++++++ fastdeploy/envs.py | 1 + 2 files changed, 7 insertions(+) diff --git a/fastdeploy/engine/common_engine_prepare_mixin.py b/fastdeploy/engine/common_engine_prepare_mixin.py index c6ea2f3ee9d..4129dfec664 100644 --- a/fastdeploy/engine/common_engine_prepare_mixin.py +++ b/fastdeploy/engine/common_engine_prepare_mixin.py @@ -89,9 +89,15 @@ def _fetch_request_decode(self) -> bool: def _fetch_request_prefill(self) -> bool: """Fetch and prepare requests for a prefill instance. Returns True if tasks were fetched.""" + max_inflight_prefill = envs.FD_MAX_INFLIGHT_PREFILL + inflight_prefill = len(self.resource_manager.running) + if inflight_prefill >= max_inflight_prefill: + return False + available_for_new = max_inflight_prefill - inflight_prefill num_prefill_batch = min( int(self.resource_manager.available_batch()), self.cfg.max_prefill_batch, + available_for_new, ) max_num_batched_tokens = self.cfg.scheduler_config.max_num_batched_tokens available_blocks = self.cfg.cache_config.max_block_num_per_seq diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 61b6b1e31d1..0979159f339 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -196,6 +196,7 @@ def _validate_split_kv_size(value: int) -> int: # Number of worker threads for prepare requests in prefill instance "FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")), "FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")), + "FD_MAX_INFLIGHT_PREFILL": lambda: int(os.getenv("FD_MAX_INFLIGHT_PREFILL", "20")), # Timeout (seconds) for D to reclaim preallocated blocks if P never follows through. 0 to disable. "FD_DECODE_PREALLOC_BLOCK_TIMEOUT": lambda: int(os.getenv("FD_DECODE_PREALLOC_BLOCK_TIMEOUT", "1200")), "FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int(