Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def __init__(
self.max_model_len = 0
self.dtype = "bfloat16"
self.enable_logprob = False
self.compute_logits_stats = False
self.max_logprobs = 20
self.logprobs_mode = "raw_logprobs"
self.redundant_experts_num = 0
Expand Down
12 changes: 12 additions & 0 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ class EngineArgs:
Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values.
"""

compute_logits_stats: bool = False
"""
Flag to enable per-token logits statistics (min/max/mean/std) output.
Only effective when enable_logprob is True.
"""

max_logprobs: int = 20
"""
Maximum number of log probabilities to return when `enable_logprob` is True. The default value comes the default for the
Expand Down Expand Up @@ -887,6 +893,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
default=EngineArgs.enable_logprob,
help="Enable output of token-level log probabilities.",
)
model_group.add_argument(
"--compute-logits-stats",
action="store_true",
default=EngineArgs.compute_logits_stats,
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
Comment on lines 893 to +901
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 标题目前为“【TI-Consisent】...”,不符合仓库要求的 [CLASS]Title 格式(模板里给出的 tag 列表如 [Feature] / [BugFix] 等)。建议将标题改为类似 [Feature] Add logits_stats metric for ZMQ logprobs,并修正 Consisent 的拼写以便后续检索与自动化流程识别。

Copilot uses AI. Check for mistakes.
model_group.add_argument(
"--max-logprobs",
type=int,
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ def _start_worker_service(self):
"use_internode_ll_two_stage": self.cfg.parallel_config.use_internode_ll_two_stage,
"disable_sequence_parallel_moe": self.cfg.parallel_config.disable_sequence_parallel_moe,
"enable_logprob": self.cfg.model_config.enable_logprob,
"compute_logits_stats": self.cfg.model_config.compute_logits_stats,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common_engine.py中也得加这个参数

"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
"moe_gate_fp32": self.cfg.model_config.moe_gate_fp32,
"shutdown_comm_group_if_worker_idle": self.cfg.parallel_config.shutdown_comm_group_if_worker_idle,
Expand Down
3 changes: 3 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ class CompletionOutput:
delta_message: Optional[DeltaMessage] = None
multipart: Optional[list[Any]] = None
num_image_tokens: Optional[int] = None
logits_stats: Optional[dict[str, float]] = None

def to_dict(self):
"""
Expand All @@ -745,6 +746,7 @@ def to_dict(self):
"text": self.text,
"reasoning_content": self.reasoning_content,
"reasoning_token_num": self.reasoning_token_num,
"logits_stats": self.logits_stats,
}

@classmethod
Expand All @@ -770,6 +772,7 @@ def __repr__(self) -> str:
f"logprobs={self.logprobs}, "
f"top_logprobs={self.top_logprobs}, "
f"draft_top_logprobs={self.draft_top_logprobs}, "
f"logits_stats={self.logits_stats}, "
)

def get(self, key: str, default_value=None):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class LogProbEntry(BaseModel):
logprob: float
bytes: Optional[List[int]] = None
top_logprobs: Optional[List[LogProbEntry]] = None
logits_stats: Optional[Dict[str, float]] = None


class LogProbs(BaseModel):
Expand Down
74 changes: 64 additions & 10 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,22 +817,74 @@ def _create_chat_logprobs(
request_decode_flag: Optional[bool] = True,
) -> Optional[LogProbs]:
"""Create OpenAI-style logprobs for chat completions."""
if output_top_logprobs is None or len(output_top_logprobs) < 3 or any(not lst for lst in output_top_logprobs):
if (
output_top_logprobs is None
or len(output_top_logprobs) < 3
or any(not lst for lst in output_top_logprobs[:3])
):
return None
logprobs_res: Optional[LogProbs] = None
for logprob_token_ids, logprobs, sampled_token_ranks in zip(
output_top_logprobs[0], output_top_logprobs[1], output_top_logprobs[2]
):
top_logprobs = LogprobsLists(
logprob_token_ids=[logprob_token_ids],
logprobs=[logprobs],
sampled_token_ranks=[sampled_token_ranks],
)

# Check if output_top_logprobs is a LogprobsLists object(NamedTuple) or a list
is_logprobslists = hasattr(output_top_logprobs, "logprob_token_ids")

# Extract logits stats if available
if is_logprobslists:
# output_top_logprobs is LogprobsLists namedtuple
has_logits_stats = output_top_logprobs.logits_min is not None
else:
# list from msgpack: [logprob_token_ids, logprobs, sampled_token_ranks, logits_min, logits_max, logits_mean, logits_std]
has_logits_stats = len(output_top_logprobs) >= 7 and output_top_logprobs[3] is not None

if is_logprobslists:
num_tokens = len(output_top_logprobs.logprobs)
_tk_ids = lambda idx: output_top_logprobs.logprob_token_ids[idx]
_lps = lambda idx: output_top_logprobs.logprobs[idx]
_ranks = lambda idx: output_top_logprobs.sampled_token_ranks[idx]
_lmin = lambda idx: output_top_logprobs.logits_min[idx]
_lmax = lambda idx: output_top_logprobs.logits_max[idx]
_lmean = lambda idx: output_top_logprobs.logits_mean[idx]
_lstd = lambda idx: output_top_logprobs.logits_std[idx]
else:
num_tokens = len(output_top_logprobs[1])
_tk_ids = lambda idx: output_top_logprobs[0][idx]
_lps = lambda idx: output_top_logprobs[1][idx]
_ranks = lambda idx: output_top_logprobs[2][idx]
_lmin = lambda idx: output_top_logprobs[3][idx]
_lmax = lambda idx: output_top_logprobs[4][idx]
_lmean = lambda idx: output_top_logprobs[5][idx]
_lstd = lambda idx: output_top_logprobs[6][idx]

for idx in range(num_tokens):
logits_stats = None
if has_logits_stats:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
logits_min=[_lmin(idx)],
logits_max=[_lmax(idx)],
logits_mean=[_lmean(idx)],
logits_std=[_lstd(idx)],
)
logits_stats = {
"min": float(_lmin(idx)),
"max": float(_lmax(idx)),
"mean": float(_lmean(idx)),
"std": float(_lstd(idx)),
}
else:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
)
step_logprobs_res = self._build_logprobs_response(
request_logprobs=request_logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request_top_logprobs,
request_decode_flag=request_decode_flag,
logits_stats=logits_stats,
)
if logprobs_res is None:
logprobs_res = step_logprobs_res
Expand All @@ -846,6 +898,7 @@ def _build_logprobs_response(
response_logprobs: Optional[LogprobsLists],
request_top_logprobs: int,
request_decode_flag: bool,
logits_stats: Optional[dict[str, float]] = None,
) -> Optional[LogProbs]:
"""
Construct a logprobs response object in line with the OpenAI style.
Expand Down Expand Up @@ -893,6 +946,7 @@ def _build_logprobs_response(
logprob=top_logprob_entries[0].logprob,
bytes=top_logprob_entries[0].bytes,
top_logprobs=top_logprob_entries[1:], # Here are the complete topk candidates
logits_stats=logits_stats,
)

return LogProbs(content=[sampled_entry])
Expand All @@ -914,7 +968,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Normalize to plain Python lists (support both Tensor and list inputs)
if hasattr(token_ids, "tolist"):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Normalize to plain Python lists (support both Tensor and list inputs)
if hasattr(token_ids, "tolist"):
Expand Down
21 changes: 21 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, cfg, cached_generated_tokens, engine_worker_queue, split_conn

self.speculative_decoding = self.cfg.speculative_config.method is not None
self.use_logprobs = self.cfg.model_config.enable_logprob
self.compute_logits_stats = self.cfg.model_config.compute_logits_stats
self.enable_draft_logprob = self.cfg.speculative_config.enable_draft_logprob

if self.speculative_decoding:
Expand Down Expand Up @@ -350,6 +351,26 @@ def _process_batch_output_use_zmq(self, receive_datas):
logprobs_list: LogprobsLists = stream_data.logprobs.tolists()
result.outputs.logprob = float(logprobs_list.logprobs[0][0])
result.outputs.top_logprobs = logprobs_list
# Extract logits statistics if available
if self.compute_logits_stats:
assert (
logprobs_list.logits_min is not None
), "logits_min is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_max is not None
), "logits_max is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_mean is not None
), "logits_mean is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_std is not None
), "logits_std is None when compute_logits_stats is enabled"
Comment on lines +356 to +367
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里用 assert ... is not None 来保证 logits_* 存在:
1)在 python -O 下 assert 会被跳过,可能导致后续 float(None) 等异常;
2)assert 触发后会被外层 except 吞掉,只打 warning,最终静默缺失 logits_stats,与 --compute-logits-stats 的预期不一致。
建议改成显式的条件判断:若缺字段则记录更明确的错误并决定是否直接报错/降级关闭 logits_stats 输出。

Suggested change
assert (
logprobs_list.logits_min is not None
), "logits_min is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_max is not None
), "logits_max is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_mean is not None
), "logits_mean is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_std is not None
), "logits_std is None when compute_logits_stats is enabled"
missing_fields = []
if logprobs_list.logits_min is None:
missing_fields.append("logits_min")
if logprobs_list.logits_max is None:
missing_fields.append("logits_max")
if logprobs_list.logits_mean is None:
missing_fields.append("logits_mean")
if logprobs_list.logits_std is None:
missing_fields.append("logits_std")
if missing_fields:
# When compute_logits_stats is enabled, all logits_* fields must be present
raise ValueError(
"Missing logits stats fields when compute_logits_stats is enabled: "
+ ", ".join(missing_fields)
)

Copilot uses AI. Check for mistakes.
result.outputs.logits_stats = {
"min": float(logprobs_list.logits_min[0]),
"max": float(logprobs_list.logits_max[0]),
"mean": float(logprobs_list.logits_mean[0]),
"std": float(logprobs_list.logits_std[0]),
}
except Exception as e:
llm_logger.warning(f"Failed to parse logprobs from StreamTransferData: {e}")
if getattr(stream_data, "prompt_logprobs", None) is not None:
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2953,9 +2953,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
# Synchronize before using token_ids, logprobs and ranks to ensure async copy are completed.
paddle.device.synchronize()
chunk_slice = slice(start_idx, start_idx + num_logits)
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/metax_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2833,9 +2833,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
chunk_slice = slice(start_idx, start_idx + num_logits)
logprobs_tensors.logprob_token_ids[chunk_slice].copy_(token_ids, False)
logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, False)
Expand Down
54 changes: 54 additions & 0 deletions fastdeploy/worker/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class LogprobsLists(NamedTuple):
logprobs: list[list[float]]
# [num_reqs]
sampled_token_ranks: list[int]
# Logits statistics for each sequence (optional)
logits_min: Optional[list[float]] = None # [num_reqs]
logits_max: Optional[list[float]] = None # [num_reqs]
logits_mean: Optional[list[float]] = None # [num_reqs]
logits_std: Optional[list[float]] = None # [num_reqs]
Comment on lines 44 to +51
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

本文件未启用 from __future__ import annotations,但新增的 Optional[list[float]] / list[list[int]] 等内置泛型注解在 Python 3.7/3.8 下会导致导入时异常;同时 setup.py 仍声明 python_requires=">=3.7"。建议:1)在文件顶部增加 from __future__ import annotations;或 2)把这些新增注解改为 Optional[List[float]] 等 typing 形式并补充导入,以保持与声明的 Python 版本兼容。

Copilot uses AI. Check for mistakes.
Comment on lines +48 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR里没有这些参数的计算逻辑?


def slice_columns(self, start: int, end: int):
"""
Expand All @@ -54,6 +59,14 @@ def slice_columns(self, start: int, end: int):
[row[start:end] for row in self.logprob_token_ids],
[row[start:end] for row in self.logprobs],
self.sampled_token_ranks, # unchanged
# [row[start:end] for row in self.logits_min],
# [row[start:end] for row in self.logits_max],
# [row[start:end] for row in self.logits_mean],
# [row[start:end] for row in self.logits_std],
self.logits_min, # unchanged
self.logits_max, # unchanged
self.logits_mean, # unchanged
self.logits_std, # unchanged
)

def slice_rows(self, start: int, end: int):
Expand All @@ -65,6 +78,10 @@ def slice_rows(self, start: int, end: int):
self.logprob_token_ids[start:end],
self.logprobs[start:end],
self.sampled_token_ranks[start:end],
self.logits_min[start:end] if self.logits_min is not None else None,
self.logits_max[start:end] if self.logits_max is not None else None,
self.logits_mean[start:end] if self.logits_mean is not None else None,
self.logits_std[start:end] if self.logits_std is not None else None,
)


Expand All @@ -77,13 +94,22 @@ class LogprobsTensors(NamedTuple):
logprobs: paddle.Tensor
# [num_reqs]
selected_token_ranks: paddle.Tensor
# Logits statistics for each sequence (optional)
logits_min: Optional[paddle.Tensor] = None # [num_reqs]
logits_max: Optional[paddle.Tensor] = None # [num_reqs]
logits_mean: Optional[paddle.Tensor] = None # [num_reqs]
logits_std: Optional[paddle.Tensor] = None

def tolists(self):
"""Convert to lists."""
return LogprobsLists(
self.logprob_token_ids.tolist(),
self.logprobs.tolist(),
self.selected_token_ranks.tolist(),
self.logits_min.tolist() if self.logits_min is not None else None,
self.logits_max.tolist() if self.logits_max is not None else None,
self.logits_mean.tolist() if self.logits_mean is not None else None,
self.logits_std.tolist() if self.logits_std is not None else None,
)

@staticmethod
Expand All @@ -97,6 +123,10 @@ def empty_cpu(num_positions: int, num_tokens_per_position: int) -> "LogprobsTens
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

@staticmethod
Expand All @@ -110,6 +140,10 @@ def empty(num_positions: int, num_tokens_per_position: int) -> "LogprobsTensors"
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

def slice_rows(self, start: int, end: int):
Expand All @@ -122,6 +156,26 @@ def slice_rows(self, start: int, end: int):
paddle.to_tensor(self.logprob_token_ids.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.logprobs.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.selected_token_ranks.cpu()[start:end], place="cpu"),
(
paddle.to_tensor(self.logits_min.cpu()[start:end], place="cpu")
if self.logits_min is not None
else None
),
(
paddle.to_tensor(self.logits_max.cpu()[start:end], place="cpu")
if self.logits_max is not None
else None
),
(
paddle.to_tensor(self.logits_mean.cpu()[start:end], place="cpu")
if self.logits_mean is not None
else None
),
(
paddle.to_tensor(self.logits_std.cpu()[start:end], place="cpu")
if self.logits_std is not None
else None
),
)


Expand Down
5 changes: 5 additions & 0 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,11 @@ def parse_args():
action="store_true",
help="Enable output of token-level log probabilities.",
)
parser.add_argument(
"--compute_logits_stats",
action="store_true",
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
parser.add_argument(
"--max_logprobs",
type=int,
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/xpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ def _get_prompt_logprobs_list(self, hidden_states: paddle.Tensor) -> list[Option
raw_logprobs = logits
else:
raw_logprobs = self.sampler.compute_logprobs(logits)
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
chunk_slice = slice(start_idx, start_idx + num_logits)
logprobs_tensors.logprob_token_ids[chunk_slice].copy_(token_ids, False)
logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, False)
Expand Down
Loading
Loading