From ac9751fc792bbd8ea7a1ef1497791cf2266bb816 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 20 Nov 2025 13:56:20 +0800 Subject: [PATCH 1/7] [Models] Add forward_meta to moe models' forward function --- .../layers/moe/fused_moe_backend_base.py | 4 +++- fastdeploy/model_executor/layers/moe/moe.py | 3 ++- .../model_executor/models/deepseek_v3.py | 7 +++---- .../model_executor/models/ernie4_5_moe.py | 21 ++++++++++++++----- .../model_executor/models/ernie4_5_mtp.py | 4 ++-- .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 18 ++++++++-------- fastdeploy/model_executor/models/glm4_moe.py | 11 ++++++---- fastdeploy/model_executor/models/gpt_oss.py | 6 +++--- fastdeploy/model_executor/models/qwen2.py | 4 ++-- fastdeploy/model_executor/models/qwen3moe.py | 10 ++++----- fastdeploy/spec_decode/mtp.py | 2 +- 11 files changed, 53 insertions(+), 37 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index b34291a96f4..e51e995987d 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -19,6 +19,7 @@ import paddle from paddle import nn +from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.utils import ( TensorTracker, default_weight_loader, @@ -198,13 +199,14 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, + forward_meta: ForwardMeta, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. """ if layer.ep_size > 1: is_moe_start_layer = layer.layer_idx == layer.fd_config.model_config.moe_layer_start_index - if layer.fd_config.model_config.moe_phase.phase == "prefill": + if forward_meta.moe_phase.phase == "prefill": if layer.fd_config.scheduler_config.splitwise_role == "mixed" and is_moe_start_layer: self.ep_prefill_runner.clean_low_latency_buffer() return self.apply_ep_prefill(layer, x, gate) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 13d0f88319d..83b3d304dcb 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -22,6 +22,7 @@ from fastdeploy import envs from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce +from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.layers.utils import get_tensor from fastdeploy.model_executor.utils import h2d_copy, slice_fn from fastdeploy.platforms import current_platform @@ -615,7 +616,7 @@ def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer): return out - def forward(self, x: paddle.Tensor, gate: nn.Layer): + def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): """ Defines the forward computation of the moe layer. diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index dd19b50f71b..01a070804a2 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -104,7 +104,7 @@ def load_state_dict(self, state_dict): self.up_gate_proj.load_state_dict(state_dict) self.down_proj.load_state_dict(state_dict) - def forward(self, x): + def forward(self, x, forward_meta): """ """ gate_up_out = self.up_gate_proj(x) act_out = self.act_fn(gate_up_out) @@ -187,7 +187,7 @@ def load_state_dict(self, state_dict): self.experts.load_state_dict(state_dict) self.shared_experts.load_state_dict(state_dict) - def forward(self, hidden_states: paddle.Tensor): + def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta): """ """ shared_experts_out = self.shared_experts(hidden_states) moe_out = self.experts(hidden_states, self.gate) @@ -517,8 +517,7 @@ def forward( hidden_states = self.self_attn(forward_meta, hidden_states, position_ids, mask_encoder_batch) hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) - hidden_states = self.mlp(hidden_states) - + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index 5140d963201..41c6d8a57f3 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -94,7 +94,7 @@ def load_state_dict(self, state_dict): self.up_gate_proj.load_state_dict(state_dict) self.down_proj.load_state_dict(state_dict) - def forward(self, hidden_states: paddle.Tensor): + def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta): gate_up_out = self.up_gate_proj(hidden_states) act_out = self.act_fn(gate_up_out) down_out = self.down_proj(act_out) @@ -213,8 +213,16 @@ def load_state_dict(self, state_dict): def update_state_dict(self, state_dict): self.experts.load_state_dict(state_dict, True) - def forward(self, hidden_states: paddle.Tensor): - out = self.experts(hidden_states, self.gate) + def forward( + self, + hidden_states: paddle.Tensor, + forward_meta: ForwardMeta, + ): + out = self.experts( + x=hidden_states, + gate=self.gate, + forward_meta=forward_meta, + ) if self.num_shared_experts > 0: s_x = self.shared_experts(hidden_states) out = out + s_x @@ -344,7 +352,10 @@ def forward( residual, ) - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp( + forward_meta=forward_meta, + hidden_states=hidden_states, + ) return hidden_states, residual @@ -623,7 +634,7 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.ernie.layers[i].mlp.experts(fake_hidden_states, self.ernie.layers[i].mlp.gate) + self.ernie.layers[i].mlp.experts(fake_hidden_states, self.ernie.layers[i].mlp.gate, self.forward_meta) def forward( self, diff --git a/fastdeploy/model_executor/models/ernie4_5_mtp.py b/fastdeploy/model_executor/models/ernie4_5_mtp.py index 0aedb040062..2d57ed504cb 100644 --- a/fastdeploy/model_executor/models/ernie4_5_mtp.py +++ b/fastdeploy/model_executor/models/ernie4_5_mtp.py @@ -436,7 +436,7 @@ def compute_logits(self, hidden_states: paddle.Tensor): return logits - def empty_input_forward(self): + def empty_input_forward(self, forward_meta): """ empty_input_forward """ @@ -448,7 +448,7 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.ernie.layers[i].mlp.fused_moe(fake_hidden_states) + self.ernie.layers[i].mlp.fused_moe(hidden_states=fake_hidden_states, forward_meta=forward_meta) def forward( self, diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index c8b414bcdf7..95a53b184b1 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -170,8 +170,8 @@ def __init__( model_format="", ) - def forward(self, hidden_states: paddle.Tensor): - out = self.experts(hidden_states, self.gate) + def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta): + out = self.experts(hidden_states, self.gate, forward_meta) return out def load_state_dict(self, state_dict): @@ -270,7 +270,7 @@ def load_state_dict(self, state_dict): if self.num_shared_experts > 0: self.shared_experts.load_state_dict(state_dict) - def forward(self, hidden_states: paddle.Tensor, vl_moe_meta: VLMoEMeta): + def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta, vl_moe_meta: VLMoEMeta): if self.num_shared_experts > 0: shared_experts_out = self.shared_experts(hidden_states) hidden_states, text_input, image_input = text_image_gather_scatter( @@ -282,8 +282,8 @@ def forward(self, hidden_states: paddle.Tensor, vl_moe_meta: VLMoEMeta): vl_moe_meta.image_index, True, ) - text_out = self.text_fused_moe(text_input) - image_out = self.image_fused_moe(image_input) + text_out = self.text_fused_moe(text_input, forward_meta) + image_out = self.image_fused_moe(image_input, forward_meta) hidden_states, _, _ = text_image_gather_scatter( hidden_states, text_out, @@ -395,9 +395,9 @@ def forward( hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) if isinstance(self.mlp, Ernie4_5_VLMoE): - hidden_states = self.mlp(hidden_states, vl_moe_meta) + hidden_states = self.mlp(hidden_states, forward_meta, vl_moe_meta) else: - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual @@ -766,8 +766,8 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.ernie.layers[i].mlp.text_fused_moe(fake_hidden_states) - self.ernie.layers[i].mlp.image_fused_moe(fake_hidden_states) + self.ernie.layers[i].mlp.text_fused_moe(fake_hidden_states, self.forward_meta) + self.ernie.layers[i].mlp.image_fused_moe(fake_hidden_states, self.forward_meta) def get_input_embeddings( self, diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 9cd0c7003f3..9c3b4791a09 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -85,7 +85,7 @@ def __init__( act_method=fd_config.model_config.hidden_act, ) - def forward(self, x): + def forward(self, x, forward_meta): """ """ gate_up_out = self.up_gate_proj(x) act_out = self.act_fn(gate_up_out) @@ -161,9 +161,9 @@ def __init__( reduce_results=False, ) - def forward(self, x): + def forward(self, x, forward_meta): shared_experts_out = self.shared_experts(x) - out = self.experts(x, self.gate) + out = self.experts(x, self.gate, forward_meta) out = out + shared_experts_out # We do to TP all reduce after the sum of experts. if self.tensor_parallel_size > 1: @@ -306,7 +306,10 @@ def forward( # Fully Connected hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp( + hidden_states, + forward_meta, + ) return hidden_states, residual diff --git a/fastdeploy/model_executor/models/gpt_oss.py b/fastdeploy/model_executor/models/gpt_oss.py index e951fff92f5..682c9f5f1ec 100644 --- a/fastdeploy/model_executor/models/gpt_oss.py +++ b/fastdeploy/model_executor/models/gpt_oss.py @@ -124,8 +124,8 @@ def __init__(self, fd_config: FDConfig, layer_id: int, prefix: str = ""): model_format="", ) - def forward(self, hidden_states: paddle.Tensor): - expert_output = self.experts(hidden_states, self.router) + def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta): + expert_output = self.experts(hidden_states, self.router, forward_meta) return expert_output @@ -173,7 +173,7 @@ def forward( # Fully Connected hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual diff --git a/fastdeploy/model_executor/models/qwen2.py b/fastdeploy/model_executor/models/qwen2.py index 3b3baee6222..59164985c8f 100644 --- a/fastdeploy/model_executor/models/qwen2.py +++ b/fastdeploy/model_executor/models/qwen2.py @@ -89,7 +89,7 @@ def load_state_dict(self, state_dict): self.up_gate_proj.load_state_dict(state_dict) self.down_proj.load_state_dict(state_dict) - def forward(self, x): + def forward(self, x, forward_meta): """ """ gate_up_out = self.up_gate_proj(x) act_out = self.act_fn(gate_up_out) @@ -205,7 +205,7 @@ def forward( # Fully Connected hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual diff --git a/fastdeploy/model_executor/models/qwen3moe.py b/fastdeploy/model_executor/models/qwen3moe.py index 74adb5cc3b9..591bd31dfb5 100644 --- a/fastdeploy/model_executor/models/qwen3moe.py +++ b/fastdeploy/model_executor/models/qwen3moe.py @@ -79,8 +79,8 @@ def __init__( weight_dtype="float32", ) - def forward(self, x): - return self.experts(x, self.gate) + def forward(self, x, forward_meta): + return self.experts(x, self.gate, forward_meta) def load_state_dict(self, state_dict): """ """ @@ -125,7 +125,7 @@ def load_state_dict(self, state_dict): self.up_gate_proj.load_state_dict(state_dict) self.down_proj.load_state_dict(state_dict) - def forward(self, x): + def forward(self, x, forward_meta): """ """ gate_up_out = self.up_gate_proj(x) act_out = self.act_fn(gate_up_out) @@ -204,7 +204,7 @@ def forward( # Fully Connected hidden_states, residual = self.post_attention_layernorm(hidden_states, residual) - hidden_states = self.mlp(hidden_states) + hidden_states = self.mlp(hidden_states, forward_meta) return hidden_states, residual @@ -428,7 +428,7 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.model.layers[i].mlp.experts(fake_hidden_states, self.model.layers[i].mlp.gate) + self.model.layers[i].mlp.experts(fake_hidden_states, self.model.layers[i].mlp.gate, self.forward_meta) def forward( self, diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 5553c1047fd..0e3541ae27f 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -1078,7 +1078,7 @@ def _propose_xpu(self, step_use_cudagraph: bool = False, is_dummy_run: bool = Fa self._get_self_hidden_states(hidden_states) else: if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(self.forward_meta) def _get_self_hidden_states(self, hidden_states): target_hidden_states = eagle_get_self_hidden_states( From 96c733b2c00cb0b417e1b9b4d8de25433bd0ffec Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 20 Nov 2025 14:34:04 +0800 Subject: [PATCH 2/7] fix missing param --- fastdeploy/model_executor/layers/moe/moe.py | 4 ++-- fastdeploy/model_executor/models/deepseek_v3.py | 4 ++-- fastdeploy/model_executor/models/ernie4_5_moe.py | 4 ++-- .../model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 83b3d304dcb..c8fb5b060a6 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -594,7 +594,7 @@ def load_state_dict(self, state_dict, is_rearrange: bool = False): else: self.quant_method.process_loaded_weights(self, state_dict) - def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer): + def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): """ Forward split allgather function. """ @@ -609,7 +609,7 @@ def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer): if end_offset > token_num: end_offset = token_num part_x[: (end_offset - start_offset), :] = x[start_offset:end_offset, :] - out = self.quant_method.apply(self, part_x, gate) + out = self.quant_method.apply(self, part_x, gate, forward_meta) multi_outs = paddle.zeros([token_num_per_rank * self.tp_size, x.shape[1]], dtype=x.dtype) paddle.distributed.all_gather(multi_outs, out, self.tp_group) out = multi_outs[:token_num, :] diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 01a070804a2..abd589e3651 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -189,8 +189,8 @@ def load_state_dict(self, state_dict): def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta): """ """ - shared_experts_out = self.shared_experts(hidden_states) - moe_out = self.experts(hidden_states, self.gate) + shared_experts_out = self.shared_experts(hidden_states, forward_meta) + moe_out = self.experts(hidden_states, self.gate, forward_meta) moe_out = moe_out + shared_experts_out # We do to TP all reduce after the sum of experts. if self.tp_size > 1: diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index 41c6d8a57f3..12780e4a284 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -224,7 +224,7 @@ def forward( forward_meta=forward_meta, ) if self.num_shared_experts > 0: - s_x = self.shared_experts(hidden_states) + s_x = self.shared_experts(hidden_states, forward_meta) out = out + s_x return out @@ -353,8 +353,8 @@ def forward( ) hidden_states = self.mlp( - forward_meta=forward_meta, hidden_states=hidden_states, + forward_meta=forward_meta, ) return hidden_states, residual diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 95a53b184b1..6e504773ce9 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -272,7 +272,7 @@ def load_state_dict(self, state_dict): def forward(self, hidden_states: paddle.Tensor, forward_meta: ForwardMeta, vl_moe_meta: VLMoEMeta): if self.num_shared_experts > 0: - shared_experts_out = self.shared_experts(hidden_states) + shared_experts_out = self.shared_experts(hidden_states, forward_meta) hidden_states, text_input, image_input = text_image_gather_scatter( hidden_states, vl_moe_meta.text_input, From 61b6fcdff37c129e30067b1a180142bf20ed7100 Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 20 Nov 2025 14:58:56 +0800 Subject: [PATCH 3/7] fix --- fastdeploy/model_executor/layers/moe/moe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index c8fb5b060a6..83b3d304dcb 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -594,7 +594,7 @@ def load_state_dict(self, state_dict, is_rearrange: bool = False): else: self.quant_method.process_loaded_weights(self, state_dict) - def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): + def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer): """ Forward split allgather function. """ @@ -609,7 +609,7 @@ def forward_split_allgather(self, x: paddle.Tensor, gate: nn.Layer, forward_meta if end_offset > token_num: end_offset = token_num part_x[: (end_offset - start_offset), :] = x[start_offset:end_offset, :] - out = self.quant_method.apply(self, part_x, gate, forward_meta) + out = self.quant_method.apply(self, part_x, gate) multi_outs = paddle.zeros([token_num_per_rank * self.tp_size, x.shape[1]], dtype=x.dtype) paddle.distributed.all_gather(multi_outs, out, self.tp_group) out = multi_outs[:token_num, :] From 7a1bdb065538556ed63c0cc837cbf7e762ac05cb Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Thu, 20 Nov 2025 15:00:47 +0800 Subject: [PATCH 4/7] fix --- .../model_executor/layers/moe/fused_moe_backend_base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index e51e995987d..b34291a96f4 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -19,7 +19,6 @@ import paddle from paddle import nn -from fastdeploy.model_executor.forward_meta import ForwardMeta from fastdeploy.model_executor.utils import ( TensorTracker, default_weight_loader, @@ -199,14 +198,13 @@ def apply( layer: nn.Layer, x: paddle.Tensor, gate: nn.Layer, - forward_meta: ForwardMeta, ) -> paddle.Tensor: """ Paddle Cutlass compute Fused MoE. """ if layer.ep_size > 1: is_moe_start_layer = layer.layer_idx == layer.fd_config.model_config.moe_layer_start_index - if forward_meta.moe_phase.phase == "prefill": + if layer.fd_config.model_config.moe_phase.phase == "prefill": if layer.fd_config.scheduler_config.splitwise_role == "mixed" and is_moe_start_layer: self.ep_prefill_runner.clean_low_latency_buffer() return self.apply_ep_prefill(layer, x, gate) From 80e09f1e111b97bfddd476acc0ee7a3c1491839c Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Wed, 26 Nov 2025 10:39:19 +0800 Subject: [PATCH 5/7] fix forward_meta --- fastdeploy/model_executor/models/ernie4_5_moe.py | 4 ++-- .../model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py | 6 +++--- fastdeploy/model_executor/models/qwen3moe.py | 4 ++-- fastdeploy/worker/gcu_model_runner.py | 6 +++--- fastdeploy/worker/gpu_model_runner.py | 6 +++--- fastdeploy/worker/hpu_model_runner.py | 4 ++-- fastdeploy/worker/metax_model_runner.py | 6 +++--- fastdeploy/worker/xpu_model_runner.py | 6 +++--- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index 12780e4a284..54d15c2839a 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -622,7 +622,7 @@ def compute_logits(self, hidden_states: paddle.Tensor): return logits - def empty_input_forward(self): + def empty_input_forward(self, forward_meta): """ empty_input_forward """ @@ -634,7 +634,7 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.ernie.layers[i].mlp.experts(fake_hidden_states, self.ernie.layers[i].mlp.gate, self.forward_meta) + self.ernie.layers[i].mlp.experts(fake_hidden_states, self.ernie.layers[i].mlp.gate, forward_meta) def forward( self, diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 6e504773ce9..eda2e85278b 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -754,7 +754,7 @@ def compute_logits(self, hidden_states: paddle.Tensor): return logits - def empty_input_forward(self): + def empty_input_forward(self, forward_meta): """ empty_input_forward """ @@ -766,8 +766,8 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.ernie.layers[i].mlp.text_fused_moe(fake_hidden_states, self.forward_meta) - self.ernie.layers[i].mlp.image_fused_moe(fake_hidden_states, self.forward_meta) + self.ernie.layers[i].mlp.text_fused_moe(fake_hidden_states, forward_meta) + self.ernie.layers[i].mlp.image_fused_moe(fake_hidden_states, forward_meta) def get_input_embeddings( self, diff --git a/fastdeploy/model_executor/models/qwen3moe.py b/fastdeploy/model_executor/models/qwen3moe.py index 591bd31dfb5..0e7f26f9dda 100644 --- a/fastdeploy/model_executor/models/qwen3moe.py +++ b/fastdeploy/model_executor/models/qwen3moe.py @@ -416,7 +416,7 @@ def compute_logits(self, hidden_states: paddle.Tensor): return logits - def empty_input_forward(self): + def empty_input_forward(self, forward_meta): """ empty_input_forward """ @@ -428,7 +428,7 @@ def empty_input_forward(self): self.fd_config.model_config.moe_layer_start_index, self.fd_config.model_config.num_hidden_layers, ): - self.model.layers[i].mlp.experts(fake_hidden_states, self.model.layers[i].mlp.gate, self.forward_meta) + self.model.layers[i].mlp.experts(fake_hidden_states, self.model.layers[i].mlp.gate, forward_meta) def forward( self, diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 3444cc7dd1f..6bd8da02b24 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -971,7 +971,7 @@ class at the server level, which is too granular for ModelRunner. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop(): - self._execute_empty_input() + self._execute_empty_input(self.forward_meta) return None # 1. Prepare inputs of model and sampler. @@ -1088,14 +1088,14 @@ class at the server level, which is too granular for ModelRunner. self.seq_lens_this_time_buffer.copy_(self.share_inputs["seq_lens_this_time"], False) return None - def _execute_empty_input(self) -> None: + def _execute_empty_input(self, forward_meta) -> None: """ In certain scenarios, such as during EP, the runner needs to execute partial modules of the model without input data. This requires the model to implement the `empty_input_forward` method. """ if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(forward_meta) else: raise ValueError(f"{type(self.model)} has no attribute 'empty_input_forward") diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index d5be2801fab..505f09efb8f 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -2202,7 +2202,7 @@ class at the server level, which is too granular for ModelRunner. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop(): - self._execute_empty_input() + self._execute_empty_input(self.forward_meta) return None # 2. Padding inputs for cuda graph @@ -2473,14 +2473,14 @@ def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Opti return pooler_output - def _execute_empty_input(self) -> None: + def _execute_empty_input(self, forward_meta) -> None: """ In certain scenarios, such as during EP, the runner needs to execute partial modules of the model without input data. This requires the model to implement the `empty_input_forward` method. """ if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(forward_meta) else: raise ValueError(f"{type(self.model)} has no attribute 'empty_input_forward") diff --git a/fastdeploy/worker/hpu_model_runner.py b/fastdeploy/worker/hpu_model_runner.py index 1146e4db8d0..afa89ff4ae9 100644 --- a/fastdeploy/worker/hpu_model_runner.py +++ b/fastdeploy/worker/hpu_model_runner.py @@ -1358,14 +1358,14 @@ class at the server level, which is too granular for ModelRunner. self.prof.step() return None - def _execute_empty_input(self) -> None: + def _execute_empty_input(self, forward_meta) -> None: """ In certain scenarios, such as during EP, the runner needs to execute partial modules of the model without input data. This requires the model to implement the `empty_input_forward` method. """ if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(forward_meta) else: raise ValueError(f"{type(self.model)} has no attribute 'empty_input_forward") diff --git a/fastdeploy/worker/metax_model_runner.py b/fastdeploy/worker/metax_model_runner.py index b346f7be6ab..3038a34fc2b 100644 --- a/fastdeploy/worker/metax_model_runner.py +++ b/fastdeploy/worker/metax_model_runner.py @@ -1812,7 +1812,7 @@ class at the server level, which is too granular for ModelRunner. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop(): - self._execute_empty_input() + self._execute_empty_input(self.forward_meta) return None # 2. Padding inputs for cuda graph @@ -1998,14 +1998,14 @@ class at the server level, which is too granular for ModelRunner. ) return None - def _execute_empty_input(self) -> None: + def _execute_empty_input(self, forward_meta) -> None: """ In certain scenarios, such as during EP, the runner needs to execute partial modules of the model without input data. This requires the model to implement the `empty_input_forward` method. """ if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(forward_meta) else: raise ValueError(f"{type(self.model)} has no attribute 'empty_input_forward") diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index c5ade2af728..af9dc2a8706 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -1090,7 +1090,7 @@ class at the server level, which is too granular for ModelRunner. # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, # when there is data on other runner, the current runner is required to execute part of the model. if not self.not_need_stop() and not is_dummy_run: - self._execute_empty_input() + self._execute_empty_input(self.forward_meta) return None # 2. Padding inputs for cuda grph @@ -1186,14 +1186,14 @@ class at the server level, which is too granular for ModelRunner. destroy_kv_signal_sender(self.kv_signal_sender) return None - def _execute_empty_input(self) -> None: + def _execute_empty_input(self, forward_meta) -> None: """ In certain scenarios, such as during EP, the runner needs to execute partial modules of the model without input data. This requires the model to implement the `empty_input_forward` method. """ if hasattr(self.model, "empty_input_forward"): - self.model.empty_input_forward() + self.model.empty_input_forward(forward_meta) else: raise ValueError(f"{type(self.model)} has no attribute 'empty_input_forward") From 67dd8a877c74673ddf8f5cb8a31a10edab6bbc9e Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Tue, 2 Dec 2025 18:53:04 +0800 Subject: [PATCH 6/7] fix test and remove chunked MoE releated in config --- fastdeploy/config.py | 2 -- fastdeploy/model_executor/forward_meta.py | 4 ++++ fastdeploy/model_executor/layers/moe/moe.py | 14 +++++++------- fastdeploy/worker/gpu_model_runner.py | 6 +++--- tests/layers/test_ffn.py | 11 +++++++++-- tests/layers/test_fusedmoe.py | 11 ++++++++++- tests/utils.py | 9 ++++++++- 7 files changed, 41 insertions(+), 16 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index e99d8f53109..209c2333c52 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -549,8 +549,6 @@ def __init__( self.enable_expert_parallel = False self.enable_chunked_moe = False self.chunked_moe_size = 256 - self.max_moe_num_chunk = 1 - self.moe_num_chunk = 1 self.local_data_parallel_id = 0 # Engine worker queue port diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index e38cf3ad381..4e9df0d3ce3 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -143,6 +143,10 @@ class ForwardMeta: # Flag of profile run is_dummy_or_profile_run: bool = False + # chunked MoE related + moe_num_chunk: int = 1 + max_moe_num_chunk: int = 1 + def clear_caches(self): """Safely clean up the caches""" if self.caches: diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 83b3d304dcb..893224b20ab 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -636,7 +636,7 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): ): out = self.forward_split_allgather(x, gate) elif self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.enable_chunked_moe: - out = self.forward_chunked_moe(x, gate) + out = self.forward_chunked_moe(x, gate, forward_meta) else: out = self.forward_normal(x, gate) @@ -644,7 +644,7 @@ def forward(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): out = tensor_model_parallel_all_reduce(out, self.tp_group) return out - def forward_chunked_moe(self, x: paddle.Tensor, gate: nn.Layer): + def forward_chunked_moe(self, x: paddle.Tensor, gate: nn.Layer, forward_meta: ForwardMeta): """ Split input to multi chunk to reduce the memory usage of moe. @@ -663,11 +663,11 @@ def forward_chunked_moe(self, x: paddle.Tensor, gate: nn.Layer): # input size that are less than a chunk, less than the max size data or empty input # need to be repeated until the max chunk data infer MOE finished. if token_num > chunk_size: # chunked moe - x_split_list = paddle.tensor_split(x, self.fd_config.parallel_config.moe_num_chunk, axis=0) - out_split_list = [None] * self.fd_config.parallel_config.moe_num_chunk + x_split_list = paddle.tensor_split(x, forward_meta.moe_num_chunk, axis=0) + out_split_list = [None] * forward_meta.moe_num_chunk - for i in range(self.fd_config.parallel_config.max_moe_num_chunk): - if i < self.fd_config.parallel_config.moe_num_chunk: + for i in range(forward_meta.max_moe_num_chunk): + if i < forward_meta.moe_num_chunk: out_split_list[i] = self.quant_method.apply(self, x_split_list[i], gate) else: # just need to use real data to infer max_moe_num_chunk times. @@ -677,7 +677,7 @@ def forward_chunked_moe(self, x: paddle.Tensor, gate: nn.Layer): else: # when only one chunk, just need to use real data to infer once. out = self.quant_method.apply(self, x, gate) - for i in range(self.fd_config.parallel_config.max_moe_num_chunk - 1): + for i in range(forward_meta.max_moe_num_chunk - 1): self.quant_method.apply(self, fake_x, gate) return out diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 505f09efb8f..a2d4773487c 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -275,11 +275,11 @@ def collect_distributed_status(self): token_num = self.share_inputs["ids_remove_padding"].shape[0] if token_num > chunk_size: - self.fd_config.parallel_config.moe_num_chunk = (token_num + chunk_size - 1) // chunk_size + self.forward_meta.moe_num_chunk = (token_num + chunk_size - 1) // chunk_size else: - self.fd_config.parallel_config.moe_num_chunk = 1 + self.forward_meta.moe_num_chunk = 1 - dist_status_obj.moe_num_chunk = self.fd_config.parallel_config.moe_num_chunk + dist_status_obj.moe_num_chunk = self.forward_meta.moe_num_chunk # only ep need to collect and sync distributed status if self.fd_config.parallel_config.use_ep and self.fd_config.scheduler_config.splitwise_role == "mixed": diff --git a/tests/layers/test_ffn.py b/tests/layers/test_ffn.py index eb551fd9eb1..5704cccdcdf 100644 --- a/tests/layers/test_ffn.py +++ b/tests/layers/test_ffn.py @@ -44,6 +44,13 @@ os.environ.setdefault("DG_NVCC_OVERRIDE_CPP_STANDARD", "17") +class MockForwardMeta: + def __init__(self): + # chunked MoE related. + self.moe_num_chunk = 1 + self.max_moe_num_chunk = 1 + + class FFNWrapper(paddle.nn.Layer): def __init__(self, model_config: ModelConfig): super().__init__() @@ -134,7 +141,7 @@ def test_ffn(self): init_distributed_environment() ffn = FFNWrapper(self.model_config) - + forward_meta = MockForwardMeta() moe_cuda_graphs = [None] * 100 cache_hidden_states = [None] * 100 test_token_nums = [10, 20, 40, 60, 80, 100, 128, 160, 192, 256, 4096, 4096 * 4] @@ -147,7 +154,7 @@ def test_ffn(self): num_layers = self.num_layers for _ in range(num_layers): - out = ffn.ffn(cache_hidden_states[idx]) + out = ffn.ffn(cache_hidden_states[idx], forward_meta=forward_meta) moe_cuda_graphs[idx].capture_end() diff --git a/tests/layers/test_fusedmoe.py b/tests/layers/test_fusedmoe.py index 8a603427876..ed4fe5b28b6 100644 --- a/tests/layers/test_fusedmoe.py +++ b/tests/layers/test_fusedmoe.py @@ -432,6 +432,13 @@ ) +class MockForwardMeta: + def __init__(self): + # chunked MoE related. + self.moe_num_chunk = 1 + self.max_moe_num_chunk = 1 + + class FuseMoEWrapper(paddle.nn.Layer): def __init__( self, @@ -607,7 +614,9 @@ def test_fused_moe(self): def fake_model_run(): for j in range(num_layers): - out = fused_moe[j % real_weight_layers].fused_moe(cache_hidden_states[idx], gating) + out = fused_moe[j % real_weight_layers].fused_moe( + cache_hidden_states[idx], gating, forward_meta=MockForwardMeta() + ) return out diff --git a/tests/utils.py b/tests/utils.py index bfc0651575e..b6bf65317a8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -29,6 +29,13 @@ ) +class MockForwardMeta: + def __init__(self): + # chunked MoE related. + self.moe_num_chunk = 1 + self.max_moe_num_chunk = 1 + + class FakeModelConfig: def __init__(self): self.hidden_size = 768 @@ -85,7 +92,7 @@ def __init__(self, op_name, op_fn, num_layers=20, weight_size=None, gate=None): def _fake_model_run(self, x): for j in range(self.num_layers): if self.gate: - out = self.op_fn(x, self.gate) + out = self.op_fn(x, self.gate, forward_meta=MockForwardMeta()) else: out = self.op_fn(x) return out From 34b3be140b52abf14c86fb86c646dad52f84addb Mon Sep 17 00:00:00 2001 From: Wanglongzhi2001 <583087864@qq.com> Date: Tue, 2 Dec 2025 21:21:01 +0800 Subject: [PATCH 7/7] fix test --- fastdeploy/worker/gpu_model_runner.py | 2 +- tests/distributed/chunked_moe.py | 27 ++++++++++++++++----------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index a2d4773487c..01ee60ed6d6 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1448,7 +1448,7 @@ def initialize_forward_meta(self, is_dummy_or_profile_run=False): if_only_decode = dist_status.if_only_decode if self.fd_config.parallel_config.enable_chunked_moe: - self.fd_config.parallel_config.max_moe_num_chunk = dist_status.max_moe_num_chunk + self.forward_meta.max_moe_num_chunk = dist_status.max_moe_num_chunk only_decode_use_cudagraph = self.use_cudagraph and if_only_decode diff --git a/tests/distributed/chunked_moe.py b/tests/distributed/chunked_moe.py index 86adce5b534..fa78ba22bf5 100644 --- a/tests/distributed/chunked_moe.py +++ b/tests/distributed/chunked_moe.py @@ -28,6 +28,13 @@ class MockStructuredOutputsConfig: logits_processors = [] +class MockForwardMeta: + def __init__(self): + # chunked MoE related. + self.moe_num_chunk = 1 + self.max_moe_num_chunk = 1 + + class MockModelConfig: max_model_len = 10 pad_token_id = 0 @@ -60,8 +67,6 @@ class ParallelConfig: enable_expert_parallel = True enable_chunked_moe = True chunked_moe_size = 2 - max_moe_num_chunk = 1 - moe_num_chunk = 1 use_ep = True use_sequence_parallel_moe = False @@ -147,19 +152,19 @@ def setup_fused_moe(self): def run_model_runner(self): self.model_runner.initialize_forward_meta() - assert self.model_runner.fd_config.parallel_config.max_moe_num_chunk == 5, ( + assert self.model_runner.forward_meta.max_moe_num_chunk == 5, ( f"chunk size is 2, max token_num is 10, max_moe_num_chunk should be 5, " - f"but got {self.model_runner.fd_config.parallel_config.max_moe_num_chunk}" + f"but got {self.model_runner.forward_meta.max_moe_num_chunk }" ) if dist.get_rank() == 0: - assert self.model_runner.fd_config.parallel_config.moe_num_chunk == 5, ( - f"chunk size is 2, token_num is 10, moe_num_chunk in rank 0 should be 5" - f"but got {self.model_runner.fd_config.parallel_config.moe_num_chunk}" + assert self.model_runner.forward_meta.moe_num_chunk == 5, ( + f"chunk size is 2, token_num is 10, moe_num_chunk in rank 0 should be 5, " + f"but got {self.model_runner.forward_meta.moe_num_chunk}" ) else: - assert self.model_runner.fd_config.parallel_config.moe_num_chunk == 1, ( - f"chunk size is 2, token_num is 1, moe_num_chunk in rank 1 should be 1" - f", but got {self.model_runner.fd_config.parallel_config.moe_num_chunk}" + assert self.model_runner.forward_meta.moe_num_chunk == 1, ( + f"chunk size is 2, token_num is 1, moe_num_chunk in rank 1 should be 1, " + f"but got {self.model_runner.forward_meta.moe_num_chunk}" ) def run_fused_moe(self): @@ -169,7 +174,7 @@ def run_fused_moe(self): else: x = paddle.ones([1]) - out = self.fused_moe.forward(x, gate) + out = self.fused_moe.forward(x, gate, MockForwardMeta()) assert out.shape == x.shape def test_case(self):