Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions fastdeploy/eplb/async_expert_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,24 @@
import paddle

try:
from cuda import cudart
except ImportError:
import cuda as _cuda_pkg

_cuda_ver = getattr(_cuda_pkg, "__version__", None)
if _cuda_ver is None:
# cuda-python >= 13.x does not expose a top-level __version__;
# detect the version via the cuda-bindings package.
import importlib.metadata as _meta

_cuda_ver = _meta.version("cuda-bindings")
_cuda_major = int(_cuda_ver.split(".")[0])
if _cuda_major >= 13:
from cuda.bindings import runtime as cudart
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 except Exception 过于宽泛

当前将 except ImportError 改为 except Exception,会捕获所有异常(包括 AttributeErrorNameErrorTypeError 等非导入相关错误),可能让调试困难。例如 _cuda_ver.split(".")[0] 若返回非预期对象,int(...) 会抛 ValueError,被静默吞掉后 cudart = None,只留一行 warning,难以排查根因。

建议拆分为更精确的异常类型:

except (ImportError, AttributeError, ValueError) as _e:
    import warnings
    warnings.warn(f"cuda-python import failed, async_expert_loader will be unavailable: {_e}")
    cudart = None

else:
from cuda import cudart
except Exception as _e:
import warnings

warnings.warn(f"cuda-python import failed, async_expert_loader will be unavailable: {_e}")
cudart = None

from fastdeploy.config import EPLBConfig
Expand Down Expand Up @@ -98,6 +114,7 @@ def create_mmap(model_name: List, ep_rank: int, ep_size: int, shm_uuid: str, epl
raise ImportError(
"cuda-python not installed. Install the version matching your CUDA toolkit:\n"
" CUDA 12.x → pip install cuda-python==12.*\n"
" CUDA 13.x → pip install cuda-python cuda-bindings\n"
)

# Register memory with CUDA
Expand Down
43 changes: 20 additions & 23 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,36 +125,33 @@

DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1"

if current_platform.is_cuda():

def async_set_value(tgt, src):
if isinstance(src, (int, float, bool)):
src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype)
elif isinstance(src, (list, np.array)):
dtype_str = str(tgt.dtype).split(".")[1]
if isinstance(src, list):
src = np.array(src, dtype=dtype_str if dtype_str != "bfloat16" else "float32")
def async_set_value(tgt, src):
if isinstance(src, (int, float, bool)):
src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype)
elif isinstance(src, (list, np.ndarray)):
dtype_str = str(tgt.dtype).split(".")[1]
if isinstance(src, list):
src = np.array(src, dtype=dtype_str if dtype_str != "bfloat16" else "float32")
if current_platform.is_cuda():
if str(src.dtype) != dtype_str:
srt_tensor = paddle.empty(tgt.shape, dtype=str(src.dtype))
src = custom_numpy_to_tensor(src, srt_tensor)
else:
return custom_numpy_to_tensor(src, tgt)
elif isinstance(src, paddle.Tensor):
pass
else:
raise ValueError("async_set_value unsupported src type: {}".format(type(src)))
if src.shape != tgt.shape:
src = src.reshape(tgt.shape)
if src.dtype != tgt.dtype:
src = src.cast(tgt.dtype)
if src.place != tgt.place:
src = src.to(tgt.place)
tgt.copy_(src, blocking=False)

else:

def async_set_value(*args, **kwargs):
raise RuntimeError("async_set_value is only available on CUDA")
src = paddle.to_tensor(src, dtype=tgt.dtype)
elif isinstance(src, paddle.Tensor):
pass
else:
raise ValueError("async_set_value unsupported src type: {}".format(type(src)))
if src.shape != tgt.shape:
src = src.reshape(tgt.shape)
if src.dtype != tgt.dtype:
src = src.cast(tgt.dtype)
if src.place != tgt.place:
src = src.to(tgt.place)
tgt.copy_(src, blocking=False)


def pre_process(
Expand Down
23 changes: 23 additions & 0 deletions fastdeploy/model_executor/xpu_pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@
DISABLE_RECOVER = envs.FD_DISABLED_RECOVER == "1"


def async_set_value(tgt, src):
if isinstance(src, (int, float, bool)):
src = paddle.full(tgt.shape, fill_value=src, dtype=tgt.dtype)
elif isinstance(src, (list, np.ndarray)):
dtype_str = str(tgt.dtype).split(".")[1]
np_dtype = dtype_str if dtype_str != "bfloat16" else "float32"
if isinstance(src, list):
src = np.array(src, dtype=np_dtype)
# TODO: support async_numpy_to_tensor
src = paddle.to_tensor(src, dtype=tgt.dtype)
elif isinstance(src, paddle.Tensor):
pass
else:
raise ValueError("async_set_value unsupported src type: {}".format(type(src)))
if src.shape != tgt.shape:
src = src.reshape(tgt.shape)
if src.dtype != tgt.dtype:
src = src.cast(tgt.dtype)
if src.place != tgt.place:
src = src.to(tgt.place)
tgt.copy_(src, blocking=False)


def _build_stream_transfer_data(
output_tokens: paddle.Tensor,
pooler_outputs: List = None,
Expand Down
65 changes: 35 additions & 30 deletions fastdeploy/spec_decode/mtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
share_external_data,
update_attn_mask_offsets,
)

# temporary solution
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 async_set_value 跨模块重复定义

当前 async_set_valuepre_and_post_process.py(CUDA 路径)和 xpu_pre_and_post_process.py(XPU 路径)分别实现,注释已标注 # temporary solution。两个实现逻辑高度相似,仅 CUDA 分支有 custom_numpy_to_tensor 优化。

建议后续提独立 issue 跟踪:将两个实现统一到 fastdeploy/model_executor/pre_and_post_process.py 或新建 common_ops.py,通过 current_platform.is_cuda() 分支区分,避免后续维护两份代码。

from fastdeploy.model_executor.xpu_pre_and_post_process import (
async_set_value,
xpu_pre_process,
xpu_process_output,
)
Expand Down Expand Up @@ -483,28 +486,32 @@ def insert_tasks_v1(
input_ids = request.prompt_token_ids + request.output_token_ids

self.model_inputs["input_ids_len"][idx] = length - 1
self.model_inputs["pre_ids"][idx : idx + 1] = -1
async_set_value(self.model_inputs["pre_ids"][idx : idx + 1], -1)
self.model_inputs["input_ids"][idx : idx + 1, : length - 1] = self.target_model_inputs["input_ids"][
idx : idx + 1, 1:length
]
self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = self.target_model_inputs[
"input_ids"
][idx : idx + 1, 1:length].cpu()
# TODO: use token_all_ids replace with input_ids_cpu
if getattr(self, "hybrid_mode", False) and "input_ids_cpu" in self.model_inputs:
self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = self.target_model_inputs[
"input_ids"
][idx : idx + 1, 1:length].cpu()
encoder_block_num = len(request.block_tables)
self.model_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num
self.model_inputs["block_tables"][idx : idx + 1, :] = -1
self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
request.block_tables, dtype="int32"
async_set_value(self.model_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num)
async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1)
async_set_value(
self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables
)
self.model_inputs["stop_flags"][idx : idx + 1] = False
self.model_inputs["batch_drop"][idx : idx + 1] = False

self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length
async_set_value(self.model_inputs["stop_flags"][idx : idx + 1], False)
async_set_value(self.model_inputs["batch_drop"][idx : idx + 1], False)

async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], length)
self.exist_prefill_flag = True
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = length
self.model_inputs["step_idx"][idx : idx + 1] = (
len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0
async_set_value(self.model_inputs["seq_lens_decoder"][idx : idx + 1], prefill_start_index)
async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], length)
async_set_value(
self.model_inputs["step_idx"][idx : idx + 1],
len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0,
)
if self.use_attn_mask_offset:
inputs = request.multimodal_inputs
Expand All @@ -522,18 +529,19 @@ def insert_tasks_v1(
if (
self.fd_config.scheduler_config.splitwise_role == "decode"
): # In PD, we continue to decode after P generates first token
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0
async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], 0)
self.exist_prefill_flag = False
self.model_inputs["recompute_token_num"][idx : idx + 1] = 0
self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = length + 1
async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], length + 1)
# NOTE(liuzichang):
# extra 1 : P-D split need rollback one step
self.model_inputs["mask_rollback"][idx : idx + 1] = 1

async_set_value(self.model_inputs["recompute_token_num"][idx : idx + 1], 0)
async_set_value(self.model_inputs["mask_rollback"][idx : idx + 1], 1)
# has_prefill_task = True
elif request.task_type.value == RequestType.DECODE.value: # decode task
encoder_block_num = len(request.block_tables)
self.model_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num
self.model_inputs["block_tables"][idx : idx + 1, :] = -1
async_set_value(self.model_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num)
async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1)
if current_platform.is_cuda():
async_set_value(
self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num], request.block_tables
Expand All @@ -542,16 +550,13 @@ def insert_tasks_v1(
self.model_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
request.block_tables, dtype="int32"
)
# if self.model_inputs["is_block_step"][idx]: # has tasks to continue to decode
# has_decode_task = True
# continue
else:
self.model_inputs["block_tables"][idx : idx + 1, :] = -1
self.model_inputs["stop_flags"][idx : idx + 1] = True
self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1] = 0
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.model_inputs["is_block_step"][idx : idx + 1] = False
async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1)
async_set_value(self.model_inputs["stop_flags"][idx : idx + 1], True)
async_set_value(self.model_inputs["seq_lens_this_time_buffer"][idx : idx + 1], 0)
async_set_value(self.model_inputs["seq_lens_decoder"][idx : idx + 1], 0)
async_set_value(self.model_inputs["seq_lens_encoder"][idx : idx + 1], 0)
async_set_value(self.model_inputs["is_block_step"][idx : idx + 1], False)
continue

# TODO(liuzichang): Solve splitewise-p bug to restore
Expand Down
Loading
Loading