From ad310b66d77d75b864c09d84a61b2cd392ad662d Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Tue, 18 Nov 2025 14:02:41 +0000 Subject: [PATCH 01/17] experiment 2: something wrong with dispatch op syn in apply_ep_decode --- .../layers/backends/xpu/moe/fused_moe.py | 10 +-- fastdeploy/worker/xpu_model_runner.py | 63 +++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 2a7d4846084..af49fbba27d 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -451,11 +451,12 @@ def apply_ep_decode( """ Apply the EP decoder method. """ + print("Apply the EP decoder method.") gate_out = gate(x.cast("float32")) - + print("Select topk experts and weights") # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) - + print("Dispatch") # 2. EP Dispatch expertwise_scale = None use_fp8 = False @@ -471,7 +472,7 @@ def apply_ep_decode( expertwise_scale=expertwise_scale, use_fp8=use_fp8, ) - + print("Compute ffn") # 3. Compute ffn ffn_out = self.compute_ffn( layer, @@ -479,7 +480,7 @@ def apply_ep_decode( token_nums_per_expert, valid_token_num, ) - + print("EP combine") # 4. EP combine return self.ep_decoder_runner.combine( ffn_out, @@ -498,6 +499,7 @@ def apply( compute Fused MoE. """ if layer.ep_size > 1: + print(f"layer.fd_config.model_config.moe_phase.phase {layer.fd_config.model_config.moe_phase.phase}") if layer.fd_config.model_config.moe_phase.phase == "prefill": return self.apply_ep_prefill(layer, x, gate) elif layer.fd_config.model_config.moe_phase.phase == "decode": diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index b60eb8cdfc9..7f801ed08c9 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -419,6 +419,56 @@ def exist_prefill(self): else: return 0 + def exist_decode(self): + """ + 单卡上的only decode判断 + """ + # 进入函数的已经不是空请求了 + # # 空请求返回true,非空闲状态下的空请求,说明部分卡没有在计算而是在空跑 + # if int(paddle.max(self.share_inputs["seq_lens_encoder"])) == 0 and int(paddle.max(self.share_inputs["seq_lens_decoder"])) == 0: + # return 1 + # if not self.not_need_stop():# 专家并行过程中,空的卡返回true + # return 1 + # 非空请求判断是否是decode only请求 + if ( + int(paddle.max(self.share_inputs["seq_lens_decoder"])) != 0 + and int(paddle.max(self.share_inputs["seq_lens_encoder"])) == 0 + ): + return 1 + else: + return 0 + + def only_decode(self): + """ + check whether decode only + """ + # Update Batch type for if_only_decode + if_only_decode = True + decoder_exists = None + # 在ep场景下no_need_stop如果都是F,返回false,走prefill分支模型 + # 否则,需要进一步判断 + # mix ep in single node + if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": + no_need_stop_list = [] + no_need_stops = self.not_need_stop() + paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stops) + no_need_stop = all(no_need_stop_list) + if no_need_stop: + if_only_decode = False + else: + only_decode_batch_list = [] + decoder_exists = self.exist_decode() or not self.not_need_stop() + paddle.distributed.all_gather_object(only_decode_batch_list, decoder_exists) + if_only_decode = all(only_decode_batch_list) + + print( + f"only_decode_batch_list {only_decode_batch_list} decoder_exists {decoder_exists} self.exist_decode() {self.exist_decode()} if_only_decode {if_only_decode}" + ) + + if_only_decode = if_only_decode and (decoder_exists if decoder_exists is not None else self.exist_decode()) + + return if_only_decode + def insert_tasks_v1(self, req_dicts: List[Request]): """ Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 @@ -938,6 +988,19 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: self.initialize_attention_backend() if self.pd_disaggregation_mode == "per_chunk" or self.pd_disaggregation_mode == "per_query": self.forward_meta.kv_signal_sender = self.kv_signal_sender + # if self.not_need_stop(): + if True: # 空闲状态的卡不进判断,忙碌卡的逻辑还是根据 + print("********************** ", self.not_need_stop()) + if_only_decode = self.only_decode() + # if if_only_decode: + # print("it is only decode step!") + # else: + # print("it is prefill step") + + if self.fd_config.scheduler_config.splitwise_role == "mixed": + self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" + print(f"moe_phase.phase {self.fd_config.model_config.moe_phase.phase}") + # Get sampling metadata # TODU(lilujia): sync with GPU self.sampling_metadata = SamplingMetadata( From 15a0d99d5adf2dc57af016a687bd6afee37999b8 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Wed, 19 Nov 2025 13:18:38 +0000 Subject: [PATCH 02/17] success experience --- .../layers/backends/xpu/moe/ep.py | 43 +++++++++++++------ .../layers/backends/xpu/moe/fused_moe.py | 8 ++-- fastdeploy/model_executor/layers/moe/ep.py | 12 +++--- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index 71c2dd600ff..f07a1dcac87 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -67,11 +67,16 @@ def __init__( group = paddle.distributed.new_group(range(ep_size)) self.group = group self.num_local_experts = num_experts // ep_size - self.deepep_engine = None + self.deepep_engine = None # deepep_engine只调用dispatch, combine + self.deepep_engine_low_latency = ( + None # deepep_engine_low_latency只调用low_latency_dispatch,low_latency_combine + ) self.init_deepep_engine() def init_deepep_engine(self): - if self.splitwise_role == "mixed" or self.moe_phase.phase == "prefill": + if ( + self.splitwise_role == "mixed" or self.moe_phase.phase == "prefill" + ): # 集中式和分离式的P节点, 集中式的场景其实需要两种buffer并存,按需取用 self.deepep_engine = deep_ep.Buffer( self.group, int(1e9), @@ -80,7 +85,7 @@ def init_deepep_engine(self): low_latency_mode=False, num_qps_per_rank=1, ) - elif self.moe_phase.phase == "decode": + # elif self.moe_phase.phase == "decode": # 分离式的D节点 logger.info("Initializing Low Latency Buffer") self.get_low_latency_buffer() else: @@ -105,20 +110,20 @@ def get_low_latency_buffer(self): ) # Allocate a buffer if not existed or not enough buffer size if ( - self.deepep_engine is None - or self.deepep_engine.group != self.group - or not self.deepep_engine.low_latency_mode - or self.deepep_engine.num_rdma_bytes < num_rdma_bytes + self.deepep_engine_low_latency is None + or self.deepep_engine_low_latency.group != self.group + or not self.deepep_engine_low_latency.low_latency_mode + or self.deepep_engine_low_latency.num_rdma_bytes < num_rdma_bytes ): # NOTES: for best performance, the QP number **must** be equal to the number of the local experts assert self.num_experts % self.ep_size == 0 - self.deepep_engine = deep_ep.Buffer( + self.deepep_engine_low_latency = deep_ep.Buffer( self.group, 0, num_rdma_bytes, self.num_experts, low_latency_mode=True, - num_qps_per_rank=self.num_experts // self.num_ranks, + num_qps_per_rank=self.num_experts // self.ep_size, ) def low_latency_dispatch( @@ -151,7 +156,7 @@ def low_latency_dispatch( handle, dispatch_hook, valid_token_num, - ) = self.deepep_engine.low_latency_dispatch( + ) = self.deepep_engine_low_latency.low_latency_dispatch( hidden_states, moe_in_w4a8_scale, topk_idx, @@ -176,7 +181,7 @@ def low_latency_combine( Return: combined_hidden_states: [num_tokens, hidden_size] """ - combined_hidden_states, combine_hook = self.deepep_engine.low_latency_combine( + combined_hidden_states, combine_hook = self.deepep_engine_low_latency.low_latency_combine( hidden_states, topk_idx, topk_weights, @@ -196,7 +201,14 @@ def barrier_all(self): """ barrier_all """ - self.deepep_engine.barrier_all() + if self.deepep_engine is None and self.deepep_engine_low_latency is None: + raise ValueError("The DeepEP engine has not been initialized yet.") + + if self.deepep_engine is not None: + self.deepep_engine.barrier_all() + if self.deepep_engine_low_latency is not None: + self.deepep_engine_low_latency.barrier_all() + # self.deepep_engine.barrier_all() class XPUEPRunner: @@ -333,6 +345,7 @@ def dispatch( *args, **kwargs, ): + # print("============XPUEPPrefillRunner dispatch==========") self.num_combined_tokens = x.shape[0] x_scale_tensor = kwargs.get("x_scale_tensor", None) dispatch_args = { @@ -340,6 +353,7 @@ def dispatch( "topk_idx": topk_idx, "topk_weights": topk_weights, } + # print("============XPUEPPrefillRunner dispatch end==========") return self.ep_engine.deepep_engine.dispatch(**dispatch_args) def combine( @@ -397,6 +411,7 @@ def dispatch( *args, **kwargs, ): + print("===========XPUEPDecoderRunner dispatch start===========") expertwise_scale = kwargs.get("expertwise_scale", None) use_fp8 = expertwise_scale is not None @@ -410,8 +425,8 @@ def dispatch( # no need to call dispatch_hook here, because it has already been done in xDeepEP # if dispatch_hook is not None: # dispatch_hook() - - return recv_hidden_states, recv_expert_count, handle, valid_token_num + print("===========XPUEPDecoderRunner dispatch end===========") + return recv_hidden_states, recv_expert_count, handle, dispatch_hook, valid_token_num def combine(self, ffn_out, topk_idx, topk_weights, handle): combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index af49fbba27d..dbdd880321c 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -464,6 +464,7 @@ def apply_ep_decode( permute_input, token_nums_per_expert, handle, + hook, valid_token_num, ) = self.ep_decoder_runner.dispatch( x, @@ -472,14 +473,15 @@ def apply_ep_decode( expertwise_scale=expertwise_scale, use_fp8=use_fp8, ) + print(valid_token_num) print("Compute ffn") # 3. Compute ffn ffn_out = self.compute_ffn( layer, permute_input, token_nums_per_expert, - valid_token_num, - ) + max(1, valid_token_num), + ) # 确保空跑时也不为0 print("EP combine") # 4. EP combine return self.ep_decoder_runner.combine( @@ -499,7 +501,7 @@ def apply( compute Fused MoE. """ if layer.ep_size > 1: - print(f"layer.fd_config.model_config.moe_phase.phase {layer.fd_config.model_config.moe_phase.phase}") + # print(f"layer.fd_config.model_config.moe_phase.phase {layer.fd_config.model_config.moe_phase.phase}") if layer.fd_config.model_config.moe_phase.phase == "prefill": return self.apply_ep_prefill(layer, x, gate) elif layer.fd_config.model_config.moe_phase.phase == "decode": diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index a1dcda67f7e..6130c8df198 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -279,7 +279,7 @@ def low_latency_dispatch( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") - + print("==========low_latency_dispatch==========") ( packed_recv_x, recv_expert_count, @@ -297,7 +297,7 @@ def low_latency_dispatch( return_recv_hook=True, # num_per_channel=quant_group_size, ) - + print("==========low_latency_dispatch end==========") return packed_recv_x, recv_expert_count, handle, dispatch_hook def low_latency_dispatch_two_stage( @@ -310,7 +310,7 @@ def low_latency_dispatch_two_stage( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") - + print("==========low_latency_dispatch_two_stage==========") ( packed_recv_x, packed_recv_count, @@ -328,7 +328,7 @@ def low_latency_dispatch_two_stage( async_finish=False, return_recv_hook=True, ) - + print("==========low_latency_dispatch_two_stage end==========") return packed_recv_x, packed_recv_count, handle, dispatch_hook def low_latency_combine( @@ -634,13 +634,15 @@ def dispatch( ): expertwise_scale = kwargs.get("expertwise_scale", None) use_fp8 = kwargs.get("use_fp8", False) - + print("========== decoderunner dispatch ============") if not self.use_internode_ll_two_stage: + print("========== not two stage ===============") recv_hidden_states, recv_expert_count, handle, dispatch_hook = self.ep_engine.low_latency_dispatch( x, topk_idx, expertwise_scale, use_fp8 ) else: # just supports dispatch_use_fp8 = True now! + print("================ two stage ================") assert use_fp8 is True recv_hidden_states, recv_expert_count, handle, dispatch_hook = ( self.ep_engine.low_latency_dispatch_two_stage(x, topk_idx, topk_weights, expertwise_scale, use_fp8) From 13201950a72f28a305058de33d88ecd7a8c37c5a Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 03:10:54 +0000 Subject: [PATCH 03/17] delete debug message printer --- fastdeploy/model_executor/layers/backends/xpu/moe/ep.py | 2 +- .../model_executor/layers/backends/xpu/moe/fused_moe.py | 6 ------ fastdeploy/model_executor/layers/moe/ep.py | 7 ------- 3 files changed, 1 insertion(+), 14 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index f07a1dcac87..581529f4a31 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -426,7 +426,7 @@ def dispatch( # if dispatch_hook is not None: # dispatch_hook() print("===========XPUEPDecoderRunner dispatch end===========") - return recv_hidden_states, recv_expert_count, handle, dispatch_hook, valid_token_num + return recv_hidden_states, recv_expert_count, handle, valid_token_num def combine(self, ffn_out, topk_idx, topk_weights, handle): combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index dbdd880321c..1850305aa5d 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -451,9 +451,7 @@ def apply_ep_decode( """ Apply the EP decoder method. """ - print("Apply the EP decoder method.") gate_out = gate(x.cast("float32")) - print("Select topk experts and weights") # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) print("Dispatch") @@ -464,7 +462,6 @@ def apply_ep_decode( permute_input, token_nums_per_expert, handle, - hook, valid_token_num, ) = self.ep_decoder_runner.dispatch( x, @@ -473,8 +470,6 @@ def apply_ep_decode( expertwise_scale=expertwise_scale, use_fp8=use_fp8, ) - print(valid_token_num) - print("Compute ffn") # 3. Compute ffn ffn_out = self.compute_ffn( layer, @@ -482,7 +477,6 @@ def apply_ep_decode( token_nums_per_expert, max(1, valid_token_num), ) # 确保空跑时也不为0 - print("EP combine") # 4. EP combine return self.ep_decoder_runner.combine( ffn_out, diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 6130c8df198..721fc80a883 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -279,7 +279,6 @@ def low_latency_dispatch( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") - print("==========low_latency_dispatch==========") ( packed_recv_x, recv_expert_count, @@ -297,7 +296,6 @@ def low_latency_dispatch( return_recv_hook=True, # num_per_channel=quant_group_size, ) - print("==========low_latency_dispatch end==========") return packed_recv_x, recv_expert_count, handle, dispatch_hook def low_latency_dispatch_two_stage( @@ -310,7 +308,6 @@ def low_latency_dispatch_two_stage( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") - print("==========low_latency_dispatch_two_stage==========") ( packed_recv_x, packed_recv_count, @@ -328,7 +325,6 @@ def low_latency_dispatch_two_stage( async_finish=False, return_recv_hook=True, ) - print("==========low_latency_dispatch_two_stage end==========") return packed_recv_x, packed_recv_count, handle, dispatch_hook def low_latency_combine( @@ -634,15 +630,12 @@ def dispatch( ): expertwise_scale = kwargs.get("expertwise_scale", None) use_fp8 = kwargs.get("use_fp8", False) - print("========== decoderunner dispatch ============") if not self.use_internode_ll_two_stage: - print("========== not two stage ===============") recv_hidden_states, recv_expert_count, handle, dispatch_hook = self.ep_engine.low_latency_dispatch( x, topk_idx, expertwise_scale, use_fp8 ) else: # just supports dispatch_use_fp8 = True now! - print("================ two stage ================") assert use_fp8 is True recv_hidden_states, recv_expert_count, handle, dispatch_hook = ( self.ep_engine.low_latency_dispatch_two_stage(x, topk_idx, topk_weights, expertwise_scale, use_fp8) From 2064c46cbefc10afbd783b03445210fcb5ba8cc7 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 03:22:19 +0000 Subject: [PATCH 04/17] delete useless printer --- .../model_executor/layers/backends/xpu/moe/ep.py | 5 +---- .../layers/backends/xpu/moe/fused_moe.py | 4 +++- fastdeploy/model_executor/layers/moe/ep.py | 5 +++++ fastdeploy/worker/xpu_model_runner.py | 15 +++------------ 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index 581529f4a31..c675bdfb5a9 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -345,7 +345,6 @@ def dispatch( *args, **kwargs, ): - # print("============XPUEPPrefillRunner dispatch==========") self.num_combined_tokens = x.shape[0] x_scale_tensor = kwargs.get("x_scale_tensor", None) dispatch_args = { @@ -353,7 +352,6 @@ def dispatch( "topk_idx": topk_idx, "topk_weights": topk_weights, } - # print("============XPUEPPrefillRunner dispatch end==========") return self.ep_engine.deepep_engine.dispatch(**dispatch_args) def combine( @@ -411,7 +409,6 @@ def dispatch( *args, **kwargs, ): - print("===========XPUEPDecoderRunner dispatch start===========") expertwise_scale = kwargs.get("expertwise_scale", None) use_fp8 = expertwise_scale is not None @@ -425,7 +422,7 @@ def dispatch( # no need to call dispatch_hook here, because it has already been done in xDeepEP # if dispatch_hook is not None: # dispatch_hook() - print("===========XPUEPDecoderRunner dispatch end===========") + return recv_hidden_states, recv_expert_count, handle, valid_token_num def combine(self, ffn_out, topk_idx, topk_weights, handle): diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 1850305aa5d..06105334e8c 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -452,6 +452,7 @@ def apply_ep_decode( Apply the EP decoder method. """ gate_out = gate(x.cast("float32")) + # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) print("Dispatch") @@ -470,6 +471,7 @@ def apply_ep_decode( expertwise_scale=expertwise_scale, use_fp8=use_fp8, ) + # 3. Compute ffn ffn_out = self.compute_ffn( layer, @@ -477,6 +479,7 @@ def apply_ep_decode( token_nums_per_expert, max(1, valid_token_num), ) # 确保空跑时也不为0 + # 4. EP combine return self.ep_decoder_runner.combine( ffn_out, @@ -495,7 +498,6 @@ def apply( compute Fused MoE. """ if layer.ep_size > 1: - # print(f"layer.fd_config.model_config.moe_phase.phase {layer.fd_config.model_config.moe_phase.phase}") if layer.fd_config.model_config.moe_phase.phase == "prefill": return self.apply_ep_prefill(layer, x, gate) elif layer.fd_config.model_config.moe_phase.phase == "decode": diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 721fc80a883..a1dcda67f7e 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -279,6 +279,7 @@ def low_latency_dispatch( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") + ( packed_recv_x, recv_expert_count, @@ -296,6 +297,7 @@ def low_latency_dispatch( return_recv_hook=True, # num_per_channel=quant_group_size, ) + return packed_recv_x, recv_expert_count, handle, dispatch_hook def low_latency_dispatch_two_stage( @@ -308,6 +310,7 @@ def low_latency_dispatch_two_stage( ): if self.deepep_engine is None: raise RuntimeError("DeepEP buffer not initialized!") + ( packed_recv_x, packed_recv_count, @@ -325,6 +328,7 @@ def low_latency_dispatch_two_stage( async_finish=False, return_recv_hook=True, ) + return packed_recv_x, packed_recv_count, handle, dispatch_hook def low_latency_combine( @@ -630,6 +634,7 @@ def dispatch( ): expertwise_scale = kwargs.get("expertwise_scale", None) use_fp8 = kwargs.get("use_fp8", False) + if not self.use_internode_ll_two_stage: recv_hidden_states, recv_expert_count, handle, dispatch_hook = self.ep_engine.low_latency_dispatch( x, topk_idx, expertwise_scale, use_fp8 diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 7f801ed08c9..cb0ba586138 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -988,18 +988,9 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: self.initialize_attention_backend() if self.pd_disaggregation_mode == "per_chunk" or self.pd_disaggregation_mode == "per_query": self.forward_meta.kv_signal_sender = self.kv_signal_sender - # if self.not_need_stop(): - if True: # 空闲状态的卡不进判断,忙碌卡的逻辑还是根据 - print("********************** ", self.not_need_stop()) - if_only_decode = self.only_decode() - # if if_only_decode: - # print("it is only decode step!") - # else: - # print("it is prefill step") - - if self.fd_config.scheduler_config.splitwise_role == "mixed": - self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" - print(f"moe_phase.phase {self.fd_config.model_config.moe_phase.phase}") + if_only_decode = self.only_decode() + if self.fd_config.scheduler_config.splitwise_role == "mixed": # 混合式默认初始化为prefill + self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" # Get sampling metadata # TODU(lilujia): sync with GPU From 27c58435ffe56f9adbc49726f44c24dee75c35bd Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 08:48:24 +0000 Subject: [PATCH 05/17] check only_decode by exist_prefill --- fastdeploy/worker/xpu_model_runner.py | 38 ++++++--------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index cb0ba586138..6b94d0d3cd0 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -419,36 +419,16 @@ def exist_prefill(self): else: return 0 - def exist_decode(self): - """ - 单卡上的only decode判断 - """ - # 进入函数的已经不是空请求了 - # # 空请求返回true,非空闲状态下的空请求,说明部分卡没有在计算而是在空跑 - # if int(paddle.max(self.share_inputs["seq_lens_encoder"])) == 0 and int(paddle.max(self.share_inputs["seq_lens_decoder"])) == 0: - # return 1 - # if not self.not_need_stop():# 专家并行过程中,空的卡返回true - # return 1 - # 非空请求判断是否是decode only请求 - if ( - int(paddle.max(self.share_inputs["seq_lens_decoder"])) != 0 - and int(paddle.max(self.share_inputs["seq_lens_encoder"])) == 0 - ): - return 1 - else: - return 0 - def only_decode(self): """ check whether decode only """ - # Update Batch type for if_only_decode + # Update Batch type for cuda graph for if_only_decode if_only_decode = True - decoder_exists = None - # 在ep场景下no_need_stop如果都是F,返回false,走prefill分支模型 - # 否则,需要进一步判断 + prefill_exists = None # mix ep in single node if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": + # 在ep场景下no_need_stop如果都是F,表示全部卡空闲,返回false,走高吞吐分支,否则为部分卡空闲,需要进一步判断 no_need_stop_list = [] no_need_stops = self.not_need_stop() paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stops) @@ -457,15 +437,13 @@ def only_decode(self): if_only_decode = False else: only_decode_batch_list = [] - decoder_exists = self.exist_decode() or not self.not_need_stop() - paddle.distributed.all_gather_object(only_decode_batch_list, decoder_exists) + prefill_exists = self.exist_prefill() + paddle.distributed.all_gather_object(only_decode_batch_list, not prefill_exists) if_only_decode = all(only_decode_batch_list) - print( - f"only_decode_batch_list {only_decode_batch_list} decoder_exists {decoder_exists} self.exist_decode() {self.exist_decode()} if_only_decode {if_only_decode}" - ) - - if_only_decode = if_only_decode and (decoder_exists if decoder_exists is not None else self.exist_decode()) + if_only_decode = if_only_decode and not ( + prefill_exists if prefill_exists is not None else self.exist_prefill() + ) return if_only_decode From 772fab3adcc925729d2a0662e4aed6565e74fd26 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 09:05:09 +0000 Subject: [PATCH 06/17] support both distribute and mixed infer --- .../layers/backends/xpu/moe/ep.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index c675bdfb5a9..94111d70e69 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -74,9 +74,18 @@ def __init__( self.init_deepep_engine() def init_deepep_engine(self): - if ( - self.splitwise_role == "mixed" or self.moe_phase.phase == "prefill" - ): # 集中式和分离式的P节点, 集中式的场景其实需要两种buffer并存,按需取用 + if self.splitwise_role == "mixed": # 集中式场景需要初始化两种buffer,按需取用 + self.deepep_engine = deep_ep.Buffer( + self.group, + int(1e9), + 0, + num_experts=self.num_experts, + low_latency_mode=False, + num_qps_per_rank=1, + ) + logger.info("Initializing Low Latency Buffer") + self.get_low_latency_buffer() + elif self.moe_phase.phase == "prefill": # 分离式的P节点 self.deepep_engine = deep_ep.Buffer( self.group, int(1e9), @@ -85,7 +94,7 @@ def init_deepep_engine(self): low_latency_mode=False, num_qps_per_rank=1, ) - # elif self.moe_phase.phase == "decode": # 分离式的D节点 + elif self.moe_phase.phase == "decode": # 分离式的D节点 logger.info("Initializing Low Latency Buffer") self.get_low_latency_buffer() else: From 5409c6c8917d65c5512cee38dd8f0759bb0cdbc7 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 09:08:44 +0000 Subject: [PATCH 07/17] delete unused printer --- .../model_executor/layers/backends/xpu/moe/fused_moe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 06105334e8c..965360686f3 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -455,7 +455,7 @@ def apply_ep_decode( # 1. Select topk experts and weights topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) - print("Dispatch") + # 2. EP Dispatch expertwise_scale = None use_fp8 = False @@ -477,8 +477,8 @@ def apply_ep_decode( layer, permute_input, token_nums_per_expert, - max(1, valid_token_num), - ) # 确保空跑时也不为0 + max(1, valid_token_num), # 确保空跑时也不为0 + ) # 4. EP combine return self.ep_decoder_runner.combine( From e70455b6fea3382ca72a71c88621e31419f9b8af Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 09:16:09 +0000 Subject: [PATCH 08/17] revise note --- fastdeploy/worker/xpu_model_runner.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 6b94d0d3cd0..de4b392fa62 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -428,7 +428,7 @@ def only_decode(self): prefill_exists = None # mix ep in single node if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": - # 在ep场景下no_need_stop如果都是F,表示全部卡空闲,返回false,走高吞吐分支,否则为部分卡空闲,需要进一步判断 + # 在ep场景下no_need_stop如果都是false,表示全部卡空闲,返回false,走高吞吐分支,否则为部分卡空闲,需要进一步判断 no_need_stop_list = [] no_need_stops = self.not_need_stop() paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stops) @@ -967,7 +967,9 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: if self.pd_disaggregation_mode == "per_chunk" or self.pd_disaggregation_mode == "per_query": self.forward_meta.kv_signal_sender = self.kv_signal_sender if_only_decode = self.only_decode() - if self.fd_config.scheduler_config.splitwise_role == "mixed": # 混合式默认初始化为prefill + if ( + self.fd_config.scheduler_config.splitwise_role == "mixed" + ): # 集中式场景,phase默认初始化为prefill, 推理运行时不同类型的batch能够在此处实现phase切换 self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" # Get sampling metadata From 9fc6117e607363e0e76b2f795ba7edd1ee12de6f Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 09:30:29 +0000 Subject: [PATCH 09/17] change some name of variable --- fastdeploy/worker/xpu_model_runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index de4b392fa62..d9538e83e2c 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -430,10 +430,10 @@ def only_decode(self): if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": # 在ep场景下no_need_stop如果都是false,表示全部卡空闲,返回false,走高吞吐分支,否则为部分卡空闲,需要进一步判断 no_need_stop_list = [] - no_need_stops = self.not_need_stop() - paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stops) - no_need_stop = all(no_need_stop_list) - if no_need_stop: + no_need_stop = self.not_need_stop() + paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stop) + if_all_device_empty = all(no_need_stop_list) + if if_all_device_empty: if_only_decode = False else: only_decode_batch_list = [] From 2106861c0e4d916fb19c3df8b51bde8488a4e227 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Thu, 20 Nov 2025 09:40:28 +0000 Subject: [PATCH 10/17] CI --- scripts/run_ci_xpu.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scripts/run_ci_xpu.sh b/scripts/run_ci_xpu.sh index f4c217b3543..1741bd871ee 100644 --- a/scripts/run_ci_xpu.sh +++ b/scripts/run_ci_xpu.sh @@ -317,6 +317,7 @@ export BKCL_PCIE_RING=1 export XSHMEM_MODE=1 export XSHMEM_QP_NUM_PER_RANK=32 export BKCL_RDMA_VERBS=1 +export MOE_FFN_USE_DENSE_INPUT=1 wget -q https://paddle-qa.bj.bcebos.com/xpu_third_party/xDeepEP.tar.gz tar -xzf xDeepEP.tar.gz @@ -383,6 +384,7 @@ unset BKCL_PCIE_RING unset XSHMEM_MODE unset XSHMEM_QP_NUM_PER_RANK unset BKCL_RDMA_VERBS +unset MOE_FFN_USE_DENSE_INPUT stop_processes >kill.log 2>&1 if [ ${ep_online_exit_code} -ne 0 ]; then @@ -412,6 +414,7 @@ export BKCL_PCIE_RING=1 export XSHMEM_MODE=1 export XSHMEM_QP_NUM_PER_RANK=32 export BKCL_RDMA_VERBS=1 +export MOE_FFN_USE_DENSE_INPUT=1 export port_num=$((8188 + XPU_ID * 100)) # 启动服务 @@ -469,6 +472,7 @@ unset BKCL_PCIE_RING unset XSHMEM_MODE unset XSHMEM_QP_NUM_PER_RANK unset BKCL_RDMA_VERBS +unset MOE_FFN_USE_DENSE_INPUT stop_processes >kill.log 2>&1 if [ ${ep_online_exit_code} -ne 0 ]; then @@ -499,6 +503,7 @@ export BKCL_PCIE_RING=1 export XSHMEM_MODE=1 export XSHMEM_QP_NUM_PER_RANK=32 export BKCL_RDMA_VERBS=1 +export MOE_FFN_USE_DENSE_INPUT=1 export port_num=$((8188 + XPU_ID * 100)) # 启动服务 @@ -558,6 +563,7 @@ unset BKCL_PCIE_RING unset XSHMEM_MODE unset XSHMEM_QP_NUM_PER_RANK unset BKCL_RDMA_VERBS +unset MOE_FFN_USE_DENSE_INPUT stop_processes >kill.log 2>&1 if [ ${ep_online_exit_code} -ne 0 ]; then From 33d0f1d029fed47377ccebd1059ed641d434f465 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Fri, 21 Nov 2025 03:11:38 +0000 Subject: [PATCH 11/17] refactor DeepEPEngine --- .../layers/backends/xpu/moe/ep.py | 158 +++++++++--------- 1 file changed, 78 insertions(+), 80 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index 94111d70e69..bc04403dae0 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -19,17 +19,15 @@ import deep_ep import paddle from paddle import nn -from paddleformers.utils.log import logger import fastdeploy from fastdeploy.config import MoEPhase from fastdeploy.utils import singleton -@singleton -class DeepEPEngine: +class DeepEPEngineBase: """ - A wrapper class for DeepEP engine. + Base class for DeepEP engine implementations. """ def __init__( @@ -45,7 +43,7 @@ def __init__( group=None, ): """ - Initialize the DeepEP engine. + Initialize the DeepEP engine base. Args: group: The MPI group object. ep_size: The number of ranks. @@ -67,42 +65,47 @@ def __init__( group = paddle.distributed.new_group(range(ep_size)) self.group = group self.num_local_experts = num_experts // ep_size - self.deepep_engine = None # deepep_engine只调用dispatch, combine - self.deepep_engine_low_latency = ( - None # deepep_engine_low_latency只调用low_latency_dispatch,low_latency_combine + self.deepep_engine = None + + def barrier_all(self): + """ + barrier_all + """ + if self.deepep_engine is not None: + self.deepep_engine.barrier_all() + + +@singleton +class DeepEPEngineHighThroughput(DeepEPEngineBase): + """ + High throughput version of DeepEP engine for prefill phase. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.deepep_engine = deep_ep.Buffer( + self.group, + int(1e9), + 0, + num_experts=self.num_experts, + low_latency_mode=False, + num_qps_per_rank=1, ) - self.init_deepep_engine() - - def init_deepep_engine(self): - if self.splitwise_role == "mixed": # 集中式场景需要初始化两种buffer,按需取用 - self.deepep_engine = deep_ep.Buffer( - self.group, - int(1e9), - 0, - num_experts=self.num_experts, - low_latency_mode=False, - num_qps_per_rank=1, - ) - logger.info("Initializing Low Latency Buffer") - self.get_low_latency_buffer() - elif self.moe_phase.phase == "prefill": # 分离式的P节点 - self.deepep_engine = deep_ep.Buffer( - self.group, - int(1e9), - 0, - num_experts=self.num_experts, - low_latency_mode=False, - num_qps_per_rank=1, - ) - elif self.moe_phase.phase == "decode": # 分离式的D节点 - logger.info("Initializing Low Latency Buffer") - self.get_low_latency_buffer() - else: - raise ValueError(f"Unknown generation phase {self.moe_phase}") + + +@singleton +class DeepEPEngineLowLatency(DeepEPEngineBase): + """ + Low latency version of DeepEP engine for decode phase. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.get_low_latency_buffer() def get_low_latency_buffer(self): """ - Get the DeepEP buffer. + Initialize low latency buffer for decode phase. Args: group: The MPI group object. num_max_dispatch_tokens_per_rank: The maximum number of tokens per rank to dispatch. @@ -117,23 +120,16 @@ def get_low_latency_buffer(self): self.ep_size, self.num_experts, ) - # Allocate a buffer if not existed or not enough buffer size - if ( - self.deepep_engine_low_latency is None - or self.deepep_engine_low_latency.group != self.group - or not self.deepep_engine_low_latency.low_latency_mode - or self.deepep_engine_low_latency.num_rdma_bytes < num_rdma_bytes - ): - # NOTES: for best performance, the QP number **must** be equal to the number of the local experts - assert self.num_experts % self.ep_size == 0 - self.deepep_engine_low_latency = deep_ep.Buffer( - self.group, - 0, - num_rdma_bytes, - self.num_experts, - low_latency_mode=True, - num_qps_per_rank=self.num_experts // self.ep_size, - ) + # NOTES: for best performance, the QP number **must** be equal to the number of the local experts + assert self.num_experts % self.ep_size == 0 + self.deepep_engine = deep_ep.Buffer( + self.group, + 0, + num_rdma_bytes, + self.num_experts, + low_latency_mode=True, + num_qps_per_rank=self.num_experts // self.ep_size, + ) def low_latency_dispatch( self, @@ -165,7 +161,7 @@ def low_latency_dispatch( handle, dispatch_hook, valid_token_num, - ) = self.deepep_engine_low_latency.low_latency_dispatch( + ) = self.deepep_engine.low_latency_dispatch( hidden_states, moe_in_w4a8_scale, topk_idx, @@ -186,11 +182,10 @@ def low_latency_combine( handle, ): """ - Return: combined_hidden_states: [num_tokens, hidden_size] """ - combined_hidden_states, combine_hook = self.deepep_engine_low_latency.low_latency_combine( + combined_hidden_states, combine_hook = self.deepep_engine.low_latency_combine( hidden_states, topk_idx, topk_weights, @@ -206,25 +201,24 @@ def clean_low_latency_buffer(self): """ pass - def barrier_all(self): - """ - barrier_all - """ - if self.deepep_engine is None and self.deepep_engine_low_latency is None: - raise ValueError("The DeepEP engine has not been initialized yet.") - - if self.deepep_engine is not None: - self.deepep_engine.barrier_all() - if self.deepep_engine_low_latency is not None: - self.deepep_engine_low_latency.barrier_all() - # self.deepep_engine.barrier_all() - class XPUEPRunner: """ EPRunnerBase """ + def _init_ep_engine(self, engine_class): + self.ep_engine = engine_class( + num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, + hidden_size=self.hidden_size, + num_experts=self.num_experts + self.redundant_experts_num, + ep_size=self.ep_size, + ep_rank=self.ep_rank, + splitwise_role=self.splitwise_role, + moe_phase=self.moe_phase, + group=self.ep_group, + ) + def __init__( self, top_k: int, @@ -248,19 +242,17 @@ def __init__( self.ep_rank = ep_rank self.redundant_experts_num = redundant_experts_num self.ep_group = ep_group + self.ep_engine = None self.init_ep_engine() def init_ep_engine(self): - self.ep_engine = DeepEPEngine( - num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, - hidden_size=self.hidden_size, - num_experts=self.num_experts + self.redundant_experts_num, - ep_size=self.ep_size, - ep_rank=self.ep_rank, - splitwise_role=self.splitwise_role, - moe_phase=self.moe_phase, - group=self.ep_group, - ) + """Initialize the EP engine with default implementation""" + self._init_ep_engine(self._get_engine_class()) + + @abstractmethod + def _get_engine_class(self): + """Get the engine class to be initialized""" + raise NotImplementedError("Subclasses must implement this method") def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): """ @@ -346,6 +338,9 @@ def __init__( ep_group=ep_group, ) + def _get_engine_class(self): + return DeepEPEngineHighThroughput + def dispatch( self, x: paddle.Tensor, @@ -410,6 +405,9 @@ def __init__( ep_group=ep_group, ) + def _get_engine_class(self): + return DeepEPEngineLowLatency + def dispatch( self, x: paddle.Tensor, From 7d64bc99af26fafa400406b87c8734221878ef03 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Fri, 21 Nov 2025 03:19:14 +0000 Subject: [PATCH 12/17] change some methods'position --- .../layers/backends/xpu/moe/ep.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py index bc04403dae0..b49c4240e9f 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -73,6 +73,8 @@ def barrier_all(self): """ if self.deepep_engine is not None: self.deepep_engine.barrier_all() + else: + raise RuntimeError("The deepep engine has not been initialized yet.") @singleton @@ -207,18 +209,6 @@ class XPUEPRunner: EPRunnerBase """ - def _init_ep_engine(self, engine_class): - self.ep_engine = engine_class( - num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, - hidden_size=self.hidden_size, - num_experts=self.num_experts + self.redundant_experts_num, - ep_size=self.ep_size, - ep_rank=self.ep_rank, - splitwise_role=self.splitwise_role, - moe_phase=self.moe_phase, - group=self.ep_group, - ) - def __init__( self, top_k: int, @@ -249,6 +239,18 @@ def init_ep_engine(self): """Initialize the EP engine with default implementation""" self._init_ep_engine(self._get_engine_class()) + def _init_ep_engine(self, engine_class): + self.ep_engine = engine_class( + num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, + hidden_size=self.hidden_size, + num_experts=self.num_experts + self.redundant_experts_num, + ep_size=self.ep_size, + ep_rank=self.ep_rank, + splitwise_role=self.splitwise_role, + moe_phase=self.moe_phase, + group=self.ep_group, + ) + @abstractmethod def _get_engine_class(self): """Get the engine class to be initialized""" From d165e58af16c57d3be3ce83e85805be164713f60 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Fri, 21 Nov 2025 10:40:11 +0000 Subject: [PATCH 13/17] check whether decode only using shared memory and barrier --- fastdeploy/worker/xpu_model_runner.py | 49 ++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index d9538e83e2c..d9e469b8f7d 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -408,6 +408,19 @@ def __init__( # Forward meta store the global meta information of the forward self.forward_meta: ForwardMeta = None + # Initialize shared memory and barrier for only_decode optimization + if self.fd_config.parallel_config.use_ep: + import multiprocessing + from threading import Barrier + + # Create shared memory list for all processes + group_size = self.parallel_config.expert_parallel_size + self.shared_only_decode_list = [multiprocessing.Value("i", 0) for _ in range(group_size)] + self.shared_not_need_stop_list = [multiprocessing.Value("i", 0) for _ in range(group_size)] + + # Create barrier for synchronization with timeout + self.decode_barrier = Barrier(group_size, timeout=10.0) + self.pd_disaggregation_mode: str = self.fd_config.parallel_config.pd_disaggregation_mode def exist_prefill(self): @@ -421,14 +434,38 @@ def exist_prefill(self): def only_decode(self): """ - check whether decode only + check whether decode only using shared memory and barrier for all devices """ - # Update Batch type for cuda graph for if_only_decode + # Use shared memory to avoid d2h copy + if hasattr(self, "shared_only_decode_list") and self.fd_config.parallel_config.use_ep: + try: + world_size = self.parallel_config.expert_parallel_size + rank = self.rank % world_size + + # First check if all devices are empty + no_need_stop = self.not_need_stop() + self.shared_not_need_stop_list[rank].value = 1 if not no_need_stop else 0 + self.decode_barrier.wait() + if_all_device_empty = all(p.value == 1 for p in self.shared_not_need_stop_list) + self.decode_barrier.wait() + + if if_all_device_empty: + return False + + # Then check if only decode + self.shared_only_decode_list[rank].value = self.forward_meta.len_info_cpu[0] <= 0 + self.decode_barrier.wait() + if_only_decode = all(p.value for p in self.shared_only_decode_list) + self.decode_barrier.wait() + + return if_only_decode + except Exception as e: + logger.warning(f"Shared memory only_decode failed: {e}, fallback to original implementation") + + # Fallback to original implementation if_only_decode = True prefill_exists = None - # mix ep in single node if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": - # 在ep场景下no_need_stop如果都是false,表示全部卡空闲,返回false,走高吞吐分支,否则为部分卡空闲,需要进一步判断 no_need_stop_list = [] no_need_stop = self.not_need_stop() paddle.distributed.all_gather_object(no_need_stop_list, not no_need_stop) @@ -444,7 +481,6 @@ def only_decode(self): if_only_decode = if_only_decode and not ( prefill_exists if prefill_exists is not None else self.exist_prefill() ) - return if_only_decode def insert_tasks_v1(self, req_dicts: List[Request]): @@ -964,9 +1000,12 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: self.forward_meta.pos_emb_type = self.share_inputs["pos_emb_type"] self.forward_meta.attn_backend = self.attn_backends[0] self.initialize_attention_backend() + if self.pd_disaggregation_mode == "per_chunk" or self.pd_disaggregation_mode == "per_query": self.forward_meta.kv_signal_sender = self.kv_signal_sender + if_only_decode = self.only_decode() + print("if_only_decode: ", if_only_decode) if ( self.fd_config.scheduler_config.splitwise_role == "mixed" ): # 集中式场景,phase默认初始化为prefill, 推理运行时不同类型的batch能够在此处实现phase切换 From a52f9f93c3b75854f58683aee81c651b6653402a Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Fri, 21 Nov 2025 10:47:29 +0000 Subject: [PATCH 14/17] delete printer --- fastdeploy/worker/xpu_model_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index d9e469b8f7d..c32d21ae132 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1005,7 +1005,6 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: self.forward_meta.kv_signal_sender = self.kv_signal_sender if_only_decode = self.only_decode() - print("if_only_decode: ", if_only_decode) if ( self.fd_config.scheduler_config.splitwise_role == "mixed" ): # 集中式场景,phase默认初始化为prefill, 推理运行时不同类型的batch能够在此处实现phase切换 From 086f67b7e0c740d651b599cff49b6c9a7a704afc Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Mon, 24 Nov 2025 03:13:07 +0000 Subject: [PATCH 15/17] reduce barriers'num --- fastdeploy/worker/xpu_model_runner.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index c32d21ae132..c880d2de85f 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -442,23 +442,21 @@ def only_decode(self): world_size = self.parallel_config.expert_parallel_size rank = self.rank % world_size - # First check if all devices are empty + # Combined check in one Barrier round no_need_stop = self.not_need_stop() self.shared_not_need_stop_list[rank].value = 1 if not no_need_stop else 0 - self.decode_barrier.wait() - if_all_device_empty = all(p.value == 1 for p in self.shared_not_need_stop_list) - self.decode_barrier.wait() - - if if_all_device_empty: - return False - - # Then check if only decode self.shared_only_decode_list[rank].value = self.forward_meta.len_info_cpu[0] <= 0 + + # Single Barrier for both checks self.decode_barrier.wait() + + if_all_device_empty = all(p.value == 1 for p in self.shared_not_need_stop_list) if_only_decode = all(p.value for p in self.shared_only_decode_list) + + # Single Barrier for reset self.decode_barrier.wait() - return if_only_decode + return False if if_all_device_empty else if_only_decode except Exception as e: logger.warning(f"Shared memory only_decode failed: {e}, fallback to original implementation") From ab23b3890b033972786a8679f87cb47c8e825ffa Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Tue, 25 Nov 2025 02:32:14 +0000 Subject: [PATCH 16/17] adapt to the op bugfix #5196 --- fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 965360686f3..2a7d4846084 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -477,7 +477,7 @@ def apply_ep_decode( layer, permute_input, token_nums_per_expert, - max(1, valid_token_num), # 确保空跑时也不为0 + valid_token_num, ) # 4. EP combine From 5fbe28460c808784830df5aac8212bf73972ed96 Mon Sep 17 00:00:00 2001 From: zhangcaiji Date: Tue, 25 Nov 2025 02:48:50 +0000 Subject: [PATCH 17/17] translate Chinese comments into English --- fastdeploy/worker/xpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index c880d2de85f..f371ead5963 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1005,7 +1005,7 @@ def _prepare_inputs(self, is_dummy_run=False) -> None: if_only_decode = self.only_decode() if ( self.fd_config.scheduler_config.splitwise_role == "mixed" - ): # 集中式场景,phase默认初始化为prefill, 推理运行时不同类型的batch能够在此处实现phase切换 + ): # Centralized scenario: the phase is initialized as "prefill" by default. During inference runtime, different types of batches can achieve phase switching at this point. self.fd_config.model_config.moe_phase.phase = "decode" if if_only_decode else "prefill" # Get sampling metadata