From 460dacc442e9671257ff32a96bcdf456c9905720 Mon Sep 17 00:00:00 2001 From: freeliuzc Date: Mon, 13 Apr 2026 19:41:17 +0800 Subject: [PATCH 1/2] [Speculative Decoding] Support mtp super ultra overlap in pd-split mode with insert_task overlap (#7323) * support mtp overlap in pd-split mode with insert_task overlap --- fastdeploy/eplb/async_expert_loader.py | 20 ++- .../model_executor/pre_and_post_process.py | 43 +++-- .../xpu_pre_and_post_process.py | 23 +++ fastdeploy/spec_decode/mtp.py | 65 +++---- fastdeploy/worker/gpu_model_runner.py | 160 +++++++++++------- tests/worker/test_gpu_model_runner.py | 159 ++++++++++++++++- 6 files changed, 349 insertions(+), 121 deletions(-) diff --git a/fastdeploy/eplb/async_expert_loader.py b/fastdeploy/eplb/async_expert_loader.py index 0cf9eb0453e..3672e69c2fd 100644 --- a/fastdeploy/eplb/async_expert_loader.py +++ b/fastdeploy/eplb/async_expert_loader.py @@ -24,8 +24,23 @@ import paddle try: - from cuda import cudart -except ImportError: + import cuda as _cuda_pkg + + _cuda_ver = getattr(_cuda_pkg, "__version__", None) + if _cuda_ver is None: + # cuda-python >= 13.x 无顶层 __version__,通过 cuda-bindings 子包判断 + import importlib.metadata as _meta + + _cuda_ver = _meta.version("cuda-bindings") + _cuda_major = int(_cuda_ver.split(".")[0]) + if _cuda_major >= 13: + from cuda.bindings import runtime as cudart + else: + from cuda import cudart +except Exception as _e: + import warnings + + warnings.warn(f"cuda-python import failed, async_expert_loader will be unavailable: {_e}") cudart = None from fastdeploy.config import EPLBConfig @@ -98,6 +113,7 @@ def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, epl raise ImportError( "cuda-python not installed. Install the version matching your CUDA toolkit:\n" " CUDA 12.x → pip install cuda-python==12.*\n" + " CUDA 13.x → pip install cuda-python cuda-bindings\n" ) # Register memory with CUDA diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 4c7a09b93c0..2d9d9c121a5 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -125,36 +125,33 @@ DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" -if current_platform.is_cuda(): - def async_set_value(tgt, src): - if isinstance(src, (int, float, bool)): - src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype) - elif isinstance(src, (list, np.array)): - dtype_str = str(tgt.dtype).split(".")[1] - if isinstance(src, list): - src = np.array(src, dtype=dtype_str if dtype_str != "bfloat16" else "float32") +def async_set_value(tgt, src): + if isinstance(src, (int, float, bool)): + src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype) + elif isinstance(src, (list, np.ndarray)): + dtype_str = str(tgt.dtype).split(".")[1] + if isinstance(src, list): + src = np.array(src, dtype=dtype_str if dtype_str != "bfloat16" else "float32") + if current_platform.is_cuda(): if str(src.dtype) != dtype_str: srt_tensor = paddle.empty(tgt.shape, dtype=str(src.dtype)) src = custom_numpy_to_tensor(src, srt_tensor) else: return custom_numpy_to_tensor(src, tgt) - elif isinstance(src, paddle.Tensor): - pass else: - raise ValueError("async_set_value unsupported src type: {}".format(type(src))) - if src.shape != tgt.shape: - src = src.reshape(tgt.shape) - if src.dtype != tgt.dtype: - src = src.cast(tgt.dtype) - if src.place != tgt.place: - src = src.to(tgt.place) - tgt.copy_(src, blocking=False) - -else: - - def async_set_value(*args, **kwargs): - raise RuntimeError("async_set_value is only available on CUDA") + src = paddle.to_tensor(src, dtype=tgt.dtype) + elif isinstance(src, paddle.Tensor): + pass + else: + raise ValueError("async_set_value unsupported src type: {}".format(type(src))) + if src.shape != tgt.shape: + src = src.reshape(tgt.shape) + if src.dtype != tgt.dtype: + src = src.cast(tgt.dtype) + if src.place != tgt.place: + src = src.to(tgt.place) + tgt.copy_(src, blocking=False) def pre_process( diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index e5a1d9419c8..9232989dd48 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -55,6 +55,29 @@ DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1" +def async_set_value(tgt, src): + if isinstance(src, (int, float, bool)): + src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype) + elif isinstance(src, (list, np.ndarray)): + dtype_str = str(tgt.dtype).split(".")[1] + np_dtype = dtype_str if dtype_str != "bfloat16" else "float32" + if isinstance(src, list): + src = np.array(src, dtype=np_dtype) + # TODO: support async_numpy_to_tensor + src = paddle.to_tensor(src, dtype=tgt.dtype) + elif isinstance(src, paddle.Tensor): + pass + else: + raise ValueError("async_set_value unsupported src type: {}".format(type(src))) + if src.shape != tgt.shape: + src = src.reshape(tgt.shape) + if src.dtype != tgt.dtype: + src = src.cast(tgt.dtype) + if src.place != tgt.place: + src = src.to(tgt.place) + tgt.copy_(src, blocking=False) + + def _build_stream_transfer_data( output_tokens: paddle.Tensor, pooler_outputs: List = None, diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 2d8d310a469..70c626df28f 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -49,7 +49,10 @@ share_external_data, update_attn_mask_offsets, ) + + # temporary solution from fastdeploy.model_executor.xpu_pre_and_post_process import ( + async_set_value, xpu_pre_process, xpu_process_output, ) @@ -483,28 +486,32 @@ def insert_tasks_v1( input_ids = request.prompt_token_ids + request.output_token_ids self.model_inputs["input_ids_len"][idx] = length - 1 - self.model_inputs["pre_ids"][idx : idx + 1] = -1 + async_set_value(self.model_inputs["pre_ids"][idx : idx + 1], -1) self.model_inputs["input_ids"][idx : idx + 1, : length - 1] = self.target_model_inputs["input_ids"][ idx : idx + 1, 1:length ] - self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = self.target_model_inputs[ - "input_ids" - ][idx : idx + 1, 1:length].cpu() + # TODO: use token_all_ids replace with input_ids_cpu + if getattr(self, "hybrid_mode", False) and "input_ids_cpu" in self.model_inputs: + self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = self.target_model_inputs[ + "input_ids" + ][idx : idx + 1, 1:length].cpu() encoder_block_num = len(request.block_tables) - self.model_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num - self.model_inputs["block_tables"][idx : idx + 1, :] = -1 - self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( - request.block_tables, dtype="int32" + async_set_value(self.model_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num) + async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1) + async_set_value( + self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables ) - self.model_inputs["stop_flags"][idx : idx + 1] = False - self.model_inputs["batch_drop"][idx : idx + 1] = False - self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length + async_set_value(self.model_inputs["stop_flags"][idx : idx + 1], False) + async_set_value(self.model_inputs["batch_drop"][idx : idx + 1], False) + + async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], length) self.exist_prefill_flag = True - self.model_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = length - self.model_inputs["step_idx"][idx : idx + 1] = ( - len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0 + async_set_value(self.model_inputs["seq_lens_decoder"][idx : idx + 1], prefill_start_index) + async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], length) + async_set_value( + self.model_inputs["step_idx"][idx : idx + 1], + len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0, ) if self.use_attn_mask_offset: inputs = request.multimodal_inputs @@ -522,18 +529,19 @@ def insert_tasks_v1( if ( self.fd_config.scheduler_config.splitwise_role == "decode" ): # In PD, we continue to decode after P generates first token - self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0 + async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], 0) self.exist_prefill_flag = False - self.model_inputs["recompute_token_num"][idx : idx + 1] = 0 - self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = length + 1 + async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], length + 1) # NOTE(liuzichang): # extra 1 : P-D split need rollback one step - self.model_inputs["mask_rollback"][idx : idx + 1] = 1 + + async_set_value(self.model_inputs["recompute_token_num"][idx : idx + 1], 0) + async_set_value(self.model_inputs["mask_rollback"][idx : idx + 1], 1) # has_prefill_task = True elif request.task_type.value == RequestType.DECODE.value: # decode task encoder_block_num = len(request.block_tables) - self.model_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num - self.model_inputs["block_tables"][idx : idx + 1, :] = -1 + async_set_value(self.model_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num) + async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1) if current_platform.is_cuda(): async_set_value( self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables @@ -542,16 +550,13 @@ def insert_tasks_v1( self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( request.block_tables, dtype="int32" ) - # if self.model_inputs["is_block_step"][idx]: # has tasks to continue to decode - # has_decode_task = True - # continue else: - self.model_inputs["block_tables"][idx : idx + 1, :] = -1 - self.model_inputs["stop_flags"][idx : idx + 1] = True - self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = 0 - self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0 - self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0 - self.model_inputs["is_block_step"][idx : idx + 1] = False + async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1) + async_set_value(self.model_inputs["stop_flags"][idx : idx + 1], True) + async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], 0) + async_set_value(self.model_inputs["seq_lens_decoder"][idx : idx + 1], 0) + async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], 0) + async_set_value(self.model_inputs["is_block_step"][idx : idx + 1], False) continue # TODO(liuzichang): Solve splitewise-p bug to restore diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index bca07c82170..2feaaf4244e 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -872,9 +872,7 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = input_ids = prompt_token_ids + request.output_token_ids prompt_len = len(prompt_token_ids) # prompt_tokens - self.share_inputs["token_ids_all"][idx : idx + 1, :prompt_len] = np.array( - prompt_token_ids, dtype="int64" - ) + async_set_value(self.share_inputs["token_ids_all"][idx : idx + 1, :prompt_len], prompt_token_ids) # generated_token_ids fill -1 self.share_inputs["token_ids_all"][idx : idx + 1, prompt_len:] = -1 @@ -884,33 +882,39 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = self.deterministic_logger.log_prefill_input( request.request_id, idx, prefill_start_index, prefill_end_index, input_ids ) - logger.debug( f"Handle prefill request {request} at idx {idx}, " f"{prefill_start_index=}, {prefill_end_index=}, " f"need_prefilled_token_num={len(input_ids)}" f"prompt_len={prompt_len}" ) - self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( - input_ids[prefill_start_index:prefill_end_index] + async_set_value( + self.share_inputs["input_ids"][idx : idx + 1, :length], + input_ids[prefill_start_index:prefill_end_index], ) encoder_block_num = len(request.block_tables) - self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num - self.share_inputs["block_tables"][idx : idx + 1, :] = -1 - self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( - request.block_tables, dtype="int32" + async_set_value(self.share_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num) + + async_set_value(self.share_inputs["block_tables"][idx : idx + 1, :], -1) + + async_set_value( + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables ) - self.share_inputs["stop_flags"][idx : idx + 1] = False - self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.share_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = length - self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length + + async_set_value(self.share_inputs["stop_flags"][idx : idx + 1], False) + + async_set_value(self.share_inputs["seq_lens_decoder"][idx : idx + 1], prefill_start_index) + async_set_value(self.share_inputs["seq_lens_this_time_buffer"][idx : idx + 1], length) + async_set_value(self.share_inputs["seq_lens_encoder"][idx : idx + 1], length) self.exist_prefill_flag = True - self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 - self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) - self.share_inputs["is_block_step"][idx : idx + 1] = False + async_set_value(self.share_inputs["step_seq_lens_decoder"][idx : idx + 1], 0) + async_set_value(self.share_inputs["prompt_lens"][idx : idx + 1], len(input_ids)) + + async_set_value(self.share_inputs["is_block_step"][idx : idx + 1], False) self.share_inputs["is_chunk_step"][idx : idx + 1] = prefill_end_index < len(input_ids) - self.share_inputs["step_idx"][idx : idx + 1] = ( - len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0 + async_set_value( + self.share_inputs["step_idx"][idx : idx + 1], + len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0, ) # pooling model request.sampling_params is None if request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None: @@ -927,21 +931,37 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = if ( self.fd_config.scheduler_config.splitwise_role == "decode" ): # In PD, we continue to decode after P generate first token - self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 + # TODO: delete useless operation like this + async_set_value(self.share_inputs["seq_lens_encoder"][idx : idx + 1], 0) self.exist_prefill_flag = False - self._cached_launch_token_num = -1 + if self._cached_launch_token_num != -1: + token_num_one_step = ( + (self.speculative_config.num_speculative_tokens + 1) if self.speculative_decoding else 1 + ) + self._cached_launch_token_num += token_num_one_step + self._cached_real_bsz += 1 if self.speculative_decoding: - # D speculate decode, seq_lens_this_time = length + 1 - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + 1 - self.share_inputs["draft_tokens"][idx : idx + 1, 0 : length + 1] = paddle.to_tensor( - request.draft_token_ids[0 : length + 1], - dtype="int64", + # D first decode step, [Target first token, MTP first draft token] + # MTP in P only generate one draft token in any num_model_step config + draft_tokens_to_write = request.draft_token_ids[0:2] + if len(draft_tokens_to_write) != 2: + raise ValueError( + "Expected at least 2 draft tokens for speculative suffix decode, " + f"but got {len(draft_tokens_to_write)} for request {request.request_id}." + ) + async_set_value( + self.share_inputs["draft_tokens"][idx : idx + 1, 0:2], + draft_tokens_to_write, ) + async_set_value(self.share_inputs["seq_lens_this_time_buffer"][idx : idx + 1], 2) + logger.debug( + f"insert request {request.request_id} idx: {idx} suffix tokens {request.draft_token_ids}" + ) elif request.task_type.value == RequestType.DECODE.value: # decode task logger.debug(f"Handle decode request {request} at idx {idx}") encoder_block_num = len(request.block_tables) - self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num - self.share_inputs["block_tables"][idx : idx + 1, :] = -1 + async_set_value(self.share_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num) + async_set_value(self.share_inputs["block_tables"][idx : idx + 1, :], -1) if current_platform.is_cuda(): async_set_value( self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables @@ -950,6 +970,7 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( request.block_tables, dtype="int32" ) + # CPU Tensor self.share_inputs["preempted_idx"][idx : idx + 1, :] = 0 continue else: # preempted task @@ -958,12 +979,12 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = elif request.task_type.value == RequestType.ABORT.value: logger.info(f"Handle abort request {request} at idx {idx}") self.share_inputs["preempted_idx"][idx : idx + 1, :] = 1 - self.share_inputs["block_tables"][idx : idx + 1, :] = -1 - self.share_inputs["stop_flags"][idx : idx + 1] = True - self.share_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = 0 - self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 - self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 - self.share_inputs["is_block_step"][idx : idx + 1] = False + async_set_value(self.share_inputs["block_tables"][idx : idx + 1, :], -1) + async_set_value(self.share_inputs["stop_flags"][idx : idx + 1], True) + async_set_value(self.share_inputs["seq_lens_this_time_buffer"][idx : idx + 1], 0) + async_set_value(self.share_inputs["seq_lens_decoder"][idx : idx + 1], 0) + async_set_value(self.share_inputs["seq_lens_encoder"][idx : idx + 1], 0) + async_set_value(self.share_inputs["is_block_step"][idx : idx + 1], False) self.prompt_logprobs_reqs.pop(request.request_id, None) self.in_progress_prompt_logprobs.pop(request.request_id, None) self.forward_batch_reqs_list[idx] = None @@ -971,53 +992,61 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = continue assert len(request.eos_token_ids) == self.model_config.eos_tokens_lens - self.share_inputs["eos_token_id"][:] = np.array(request.eos_token_ids, dtype="int64").reshape(-1, 1) - - self.share_inputs["top_p"][idx : idx + 1] = request.get("top_p", 0.7) - self.share_inputs["top_k"][idx : idx + 1] = request.get("top_k", 0) - self.share_inputs["top_k_list"][idx] = request.get("top_k", 0) - self.share_inputs["min_p"][idx : idx + 1] = request.get("min_p", 0.0) self.share_inputs["min_p_list"][idx] = request.get("min_p", 0.0) - self.share_inputs["temperature"][idx : idx + 1] = request.get("temperature", 0.95) - self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) - self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) - self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) - self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = request.get("temp_scaled_logprobs", False) - self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = request.get( - "top_p_normalized_logprobs", False + self.share_inputs["top_k_list"][idx] = request.get("top_k", 0) + async_set_value(self.share_inputs["eos_token_id"][:], request.eos_token_ids) + async_set_value(self.share_inputs["top_p"][idx : idx + 1], request.get("top_p", 0.7)) + async_set_value(self.share_inputs["top_k"][idx : idx + 1], request.get("top_k", 0)) + async_set_value(self.share_inputs["min_p"][idx : idx + 1], request.get("min_p", 0.0)) + async_set_value(self.share_inputs["temperature"][idx : idx + 1], request.get("temperature", 0.95)) + async_set_value(self.share_inputs["penalty_score"][idx : idx + 1], request.get("repetition_penalty", 1.0)) + async_set_value(self.share_inputs["frequency_score"][idx : idx + 1], request.get("frequency_penalty", 0.0)) + async_set_value(self.share_inputs["presence_score"][idx : idx + 1], request.get("presence_penalty", 0.0)) + async_set_value( + self.share_inputs["temp_scaled_logprobs"][idx : idx + 1], request.get("temp_scaled_logprobs", False) ) - self.share_inputs["generated_modality"][idx : idx + 1] = request.get("generated_modality", 0) - - self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1) - self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( - "max_tokens", self.model_config.max_model_len + async_set_value( + self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1], + request.get("top_p_normalized_logprobs", False), + ) + async_set_value( + self.share_inputs["generated_modality"][idx : idx + 1], request.get("generated_modality", 0) + ) + async_set_value(self.share_inputs["min_dec_len"][idx : idx + 1], request.get("min_tokens", 1)) + async_set_value( + self.share_inputs["max_dec_len"][idx : idx + 1], + request.get("max_tokens", self.model_config.max_model_len), ) if request.get("seed") is not None: - self.share_inputs["infer_seed"][idx : idx + 1] = request.get("seed") + async_set_value(self.share_inputs["infer_seed"][idx : idx + 1], request.get("seed")) if request.get("bad_words_token_ids") is not None and len(request.get("bad_words_token_ids")) > 0: bad_words_len = len(request.get("bad_words_token_ids")) - self.share_inputs["bad_tokens_len"][idx] = bad_words_len - self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len] = np.array( - request.get("bad_words_token_ids"), dtype="int64" + async_set_value(self.share_inputs["bad_tokens_len"][idx : idx + 1], bad_words_len) + async_set_value( + self.share_inputs["bad_tokens"][idx : idx + 1, :bad_words_len], request.get("bad_words_token_ids") ) else: - self.share_inputs["bad_tokens_len"][idx] = 1 - self.share_inputs["bad_tokens"][idx : idx + 1, :] = np.array([-1], dtype="int64") + async_set_value(self.share_inputs["bad_tokens_len"][idx : idx + 1], 1) + async_set_value(self.share_inputs["bad_tokens"][idx : idx + 1, :], -1) if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None: stop_seqs_num = len(request.get("stop_seqs_len")) for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): request.sampling_params.stop_seqs_len.append(0) - self.share_inputs["stop_seqs_len"][idx : idx + 1, :] = np.array( - request.sampling_params.stop_seqs_len, dtype="int32" + async_set_value( + self.share_inputs["stop_seqs_len"][idx : idx + 1, :], request.sampling_params.stop_seqs_len ) - self.share_inputs["stop_seqs"][ - idx : idx + 1, :stop_seqs_num, : len(request.get("stop_token_ids")[0]) - ] = np.array(request.get("stop_token_ids"), dtype="int64") + # 每条 stop sequence pad 到 stop_seqs_max_len,凑齐空行后整块写入 + # 避免对第 3 维做部分切片(非连续内存)导致 async_set_value stride 错位 + stop_token_ids = request.get("stop_token_ids") + max_len = self.model_config.stop_seqs_max_len + padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids] + padded.extend([[-1] * max_len] * (self.model_config.max_stop_seqs_num - stop_seqs_num)) + async_set_value(self.share_inputs["stop_seqs"][idx : idx + 1, :, :], padded) else: - self.share_inputs["stop_seqs_len"][idx : idx + 1, :] = 0 + async_set_value(self.share_inputs["stop_seqs_len"][idx : idx + 1, :], 0) self.pooling_params = batch_pooling_params # For logits processors @@ -1026,7 +1055,8 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = self.sampler.apply_logits_processor(idx, logits_info, prefill_tokens) self._process_mm_features(req_dicts) - if len(rope_3d_position_ids["position_ids_idx"]) > 0: + + if len(rope_3d_position_ids["position_ids_idx"]) > 0 and self.enable_mm: packed_position_ids = paddle.to_tensor( np.concatenate(rope_3d_position_ids["position_ids_lst"]), dtype="int64" ) diff --git a/tests/worker/test_gpu_model_runner.py b/tests/worker/test_gpu_model_runner.py index 8400fb79271..9227074b445 100644 --- a/tests/worker/test_gpu_model_runner.py +++ b/tests/worker/test_gpu_model_runner.py @@ -14,7 +14,7 @@ import unittest from dataclasses import dataclass -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch import numpy as np import paddle @@ -772,5 +772,162 @@ def test_execute_model_overlap_zero_output_flushes_preempted_batch(self): self.assertEqual(runner._cached_real_bsz, 22) +def _sync_async_set_value(tgt, src): + """Synchronous stand-in for async_set_value used in tests (no CUDA required). + + Writes to real numpy arrays; silently skips Mock objects (untracked share_inputs + fields whose values we do not assert on). + """ + from unittest.mock import MagicMock + + import numpy as np + + if isinstance(tgt, MagicMock): + return # untracked field — nothing to write + if isinstance(src, (int, float, bool)): + tgt[:] = src + elif isinstance(src, (list, np.ndarray)): + tgt[:] = np.array(src).reshape(tgt.shape) + elif hasattr(src, "numpy"): + tgt[:] = src.numpy() + else: + tgt[:] = src + + +class TestInsertTasksV1SplitwiseSuffix(unittest.TestCase): + """Tests for insert_tasks_v1 splitwise_role=\'decode\' + SpecMethod.SUFFIX branch.""" + + def _make_share_inputs(self, bsz=4, max_draft=6): + """Mock-backed share_inputs; only keys we assert on hold real numpy arrays.""" + import numpy as np + + # Keys whose values we want to inspect after the call + tracked = { + "seq_lens_encoder": np.zeros((bsz, 1), dtype=np.int32), + "draft_tokens": np.zeros((bsz, max_draft), dtype=np.int64), + "seq_lens_this_time_buffer": np.zeros((bsz, 1), dtype=np.int32), + "req_ids": [""] * bsz, + "preempted_idx": np.zeros((bsz, 1), dtype=np.int32), + "num_running_requests": 0, + "running_requests_ids": [], + } + + class _SI: + def get_index_by_batch_id(self, batch_id): + return batch_id + + def __getitem__(self, key): + # Return real array for tracked keys; Mock for everything else + if key in tracked: + return tracked[key] + return MagicMock() + + def __setitem__(self, key, value): + tracked[key] = value + + return _SI() + + def _make_runner(self, bsz=4, num_spec_tokens=3): + from unittest.mock import Mock + + from fastdeploy.spec_decode import SpecMethod + from fastdeploy.worker.gpu_model_runner import GPUModelRunner + + runner = GPUModelRunner.__new__(GPUModelRunner) + runner.enable_mm = False + runner.is_pooling_model = False + runner.speculative_decoding = True + runner.spec_method = SpecMethod.SUFFIX + runner.speculative_config = Mock(num_speculative_tokens=num_spec_tokens) + runner.deterministic_logger = None + runner.routing_replay_manager = Mock() + runner.prompt_logprobs_reqs = {} + runner.in_progress_prompt_logprobs = {} + runner.forward_batch_reqs_list = [None] * bsz + runner._cached_launch_token_num = -1 + runner._cached_real_bsz = 0 + runner.exist_prefill_flag = True + runner.proposer = Mock() + runner.sampler = Mock() + runner.model_config = Mock(eos_tokens_lens=1) + runner.share_inputs = self._make_share_inputs(bsz=bsz, max_draft=num_spec_tokens + 2) + + fd_config = Mock() + fd_config.scheduler_config.splitwise_role = "decode" + fd_config.routing_replay_config.enable_routing_replay = False + runner.fd_config = fd_config + runner.scheduler_config = fd_config.scheduler_config + return runner + + def _make_prefill_request(self, idx, draft_token_ids): + from unittest.mock import Mock + + from fastdeploy.engine.request import RequestType + + req = Mock() + req.task_type = Mock(value=RequestType.PREFILL.value) + req.idx = idx + req.request_id = f"req_{idx}" + req.prompt_token_ids = [10, 20, 30] + req.output_token_ids = [99] + req.draft_token_ids = draft_token_ids + req.pooling_params = None + req.guided_json = None + req.guided_regex = None + req.structural_tag = None + req.guided_grammar = None + req.prefill_start_index = 0 + req.prefill_end_index = 3 + req.multimodal_inputs = None + req.get = Mock(return_value=None) + req.eos_token_ids = [2] + req.block_tables = [] + return req + + @patch("fastdeploy.worker.gpu_model_runner.async_set_value", side_effect=_sync_async_set_value) + def test_draft_tokens_and_seq_lens_written(self, _mock_asv): + """draft_tokens[0:2] and seq_lens_this_time_buffer=2 are written.""" + runner = self._make_runner(num_spec_tokens=3) + req = self._make_prefill_request(idx=0, draft_token_ids=[101, 202, 303]) + runner.insert_tasks_v1([req], num_running_requests=1) + + self.assertEqual(runner.share_inputs["draft_tokens"][0, 0], 101) + self.assertEqual(runner.share_inputs["draft_tokens"][0, 1], 202) + self.assertEqual(runner.share_inputs["seq_lens_this_time_buffer"][0, 0], 2) + + @patch("fastdeploy.worker.gpu_model_runner.async_set_value", side_effect=_sync_async_set_value) + def test_exist_prefill_flag_cleared(self, _mock_asv): + runner = self._make_runner() + req = self._make_prefill_request(idx=0, draft_token_ids=[1, 2]) + runner.insert_tasks_v1([req], num_running_requests=1) + self.assertFalse(runner.exist_prefill_flag) + + @patch("fastdeploy.worker.gpu_model_runner.async_set_value", side_effect=_sync_async_set_value) + def test_cached_launch_token_num_incremented(self, _mock_asv): + runner = self._make_runner(num_spec_tokens=3) + runner._cached_launch_token_num = 10 + runner._cached_real_bsz = 2 + req = self._make_prefill_request(idx=0, draft_token_ids=[1, 2]) + runner.insert_tasks_v1([req], num_running_requests=1) + # token_num_one_step = num_speculative_tokens + 1 = 4 + self.assertEqual(runner._cached_launch_token_num, 14) + self.assertEqual(runner._cached_real_bsz, 3) + + @patch("fastdeploy.worker.gpu_model_runner.async_set_value", side_effect=_sync_async_set_value) + def test_cached_launch_token_num_skipped_when_negative_one(self, _mock_asv): + runner = self._make_runner(num_spec_tokens=3) + runner._cached_launch_token_num = -1 + req = self._make_prefill_request(idx=0, draft_token_ids=[1, 2]) + runner.insert_tasks_v1([req], num_running_requests=1) + self.assertEqual(runner._cached_launch_token_num, -1) + + @patch("fastdeploy.worker.gpu_model_runner.async_set_value", side_effect=_sync_async_set_value) + def test_raises_when_fewer_than_two_draft_tokens(self, _mock_asv): + runner = self._make_runner() + req = self._make_prefill_request(idx=0, draft_token_ids=[42]) + with self.assertRaises(ValueError): + runner.insert_tasks_v1([req], num_running_requests=1) + + if __name__ == "__main__": unittest.main() From 0f41924f0a90d8043857d4f220823b69464e0ff6 Mon Sep 17 00:00:00 2001 From: freeliuzc Date: Tue, 12 May 2026 18:28:27 +0800 Subject: [PATCH 2/2] fix note --- fastdeploy/eplb/async_expert_loader.py | 3 ++- fastdeploy/worker/gpu_model_runner.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fastdeploy/eplb/async_expert_loader.py b/fastdeploy/eplb/async_expert_loader.py index 3672e69c2fd..2832a7f635f 100644 --- a/fastdeploy/eplb/async_expert_loader.py +++ b/fastdeploy/eplb/async_expert_loader.py @@ -28,7 +28,8 @@ _cuda_ver = getattr(_cuda_pkg, "__version__", None) if _cuda_ver is None: - # cuda-python >= 13.x 无顶层 __version__,通过 cuda-bindings 子包判断 + # cuda-python >= 13.x does not expose a top-level __version__; + # detect the version via the cuda-bindings package. import importlib.metadata as _meta _cuda_ver = _meta.version("cuda-bindings") diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 2feaaf4244e..a632544095b 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1038,8 +1038,10 @@ def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = async_set_value( self.share_inputs["stop_seqs_len"][idx : idx + 1, :], request.sampling_params.stop_seqs_len ) - # 每条 stop sequence pad 到 stop_seqs_max_len,凑齐空行后整块写入 - # 避免对第 3 维做部分切片(非连续内存)导致 async_set_value stride 错位 + # Pad each stop sequence to stop_seqs_max_len, then fill remaining rows + # and write the whole block at once to avoid partial slicing on the + # third dimension, which may cause async_set_value stride issues on + # non-contiguous memory. stop_token_ids = request.get("stop_token_ids") max_len = self.model_config.stop_seqs_max_len padded = [seq + [-1] * (max_len - len(seq)) for seq in stop_token_ids]