Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(
self.max_model_len = 0
self.dtype = "bfloat16"
self.enable_logprob = False
self.compute_logits_stats = False
self.max_logprobs = 20
self.logprobs_mode = "raw_logprobs"
self.redundant_experts_num = 0
Expand Down
12 changes: 12 additions & 0 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ class EngineArgs:
Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values.
"""

compute_logits_stats: bool = False
"""
Flag to enable per-token logits statistics (min/max/mean/std) output.
Only effective when enable_logprob is True.
"""

max_logprobs: int = 20
"""
Maximum number of log probabilities to return when `enable_logprob` is True. The default value comes the default for the
Expand Down Expand Up @@ -872,6 +878,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
default=EngineArgs.enable_logprob,
help="Enable output of token-level log probabilities.",
)
model_group.add_argument(
"--compute-logits-stats",
action="store_true",
default=EngineArgs.compute_logits_stats,
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
model_group.add_argument(
"--max-logprobs",
type=int,
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ def _start_worker_service(self):
"use_internode_ll_two_stage": self.cfg.parallel_config.use_internode_ll_two_stage,
"disable_sequence_parallel_moe": self.cfg.parallel_config.disable_sequence_parallel_moe,
"enable_logprob": self.cfg.model_config.enable_logprob,
"compute_logits_stats": self.cfg.model_config.compute_logits_stats,
"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
"shutdown_comm_group_if_worker_idle": self.cfg.parallel_config.shutdown_comm_group_if_worker_idle,
"enable_entropy": self.cfg.model_config.enable_entropy,
Expand Down
3 changes: 3 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ class CompletionOutput:
multipart: Optional[list[Any]] = None
num_image_tokens: Optional[int] = None
enable_parser: bool = False
logits_stats: Optional[dict[str, float]] = None

def to_dict(self):
"""
Expand All @@ -745,6 +746,7 @@ def to_dict(self):
"text": self.text,
"reasoning_content": self.reasoning_content,
"reasoning_token_num": self.reasoning_token_num,
"logits_stats": self.logits_stats,
}

@classmethod
Expand All @@ -770,6 +772,7 @@ def __repr__(self) -> str:
f"logprobs={self.logprobs}, "
f"top_logprobs={self.top_logprobs}, "
f"draft_top_logprobs={self.draft_top_logprobs}, "
f"logits_stats={self.logits_stats}, "
)

def get(self, key: str, default_value=None):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class LogProbEntry(BaseModel):
logprob: float
bytes: Optional[List[int]] = None
top_logprobs: Optional[List[LogProbEntry]] = None
logits_stats: Optional[Dict[str, float]] = None


class LogProbs(BaseModel):
Expand Down
74 changes: 64 additions & 10 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,22 +811,74 @@ def _create_chat_logprobs(
request_decode_flag: Optional[bool] = True,
) -> Optional[LogProbs]:
"""Create OpenAI-style logprobs for chat completions."""
if output_top_logprobs is None or len(output_top_logprobs) < 3 or any(not lst for lst in output_top_logprobs):
if (
output_top_logprobs is None
or len(output_top_logprobs) < 3
or any(not lst for lst in output_top_logprobs[:3])
):
return None
logprobs_res: Optional[LogProbs] = None
for logprob_token_ids, logprobs, sampled_token_ranks in zip(
output_top_logprobs[0], output_top_logprobs[1], output_top_logprobs[2]
):
top_logprobs = LogprobsLists(
logprob_token_ids=[logprob_token_ids],
logprobs=[logprobs],
sampled_token_ranks=[sampled_token_ranks],
)

# Check if output_top_logprobs is a LogprobsLists object(NamedTuple) or a list
is_logprobslists = hasattr(output_top_logprobs, "logprob_token_ids")

# Extract logits stats if available
if is_logprobslists:
# output_top_logprobs is LogprobsLists namedtuple
has_logits_stats = output_top_logprobs.logits_min is not None
else:
# list from msgpack: [logprob_token_ids, logprobs, sampled_token_ranks, logits_min, logits_max, logits_mean, logits_std]
has_logits_stats = len(output_top_logprobs) >= 7 and output_top_logprobs[3] is not None

if is_logprobslists:
num_tokens = len(output_top_logprobs.logprobs)
_tk_ids = lambda idx: output_top_logprobs.logprob_token_ids[idx]
_lps = lambda idx: output_top_logprobs.logprobs[idx]
_ranks = lambda idx: output_top_logprobs.sampled_token_ranks[idx]
_lmin = lambda idx: output_top_logprobs.logits_min[idx]
_lmax = lambda idx: output_top_logprobs.logits_max[idx]
_lmean = lambda idx: output_top_logprobs.logits_mean[idx]
_lstd = lambda idx: output_top_logprobs.logits_std[idx]
else:
num_tokens = len(output_top_logprobs[1])
_tk_ids = lambda idx: output_top_logprobs[0][idx]
_lps = lambda idx: output_top_logprobs[1][idx]
_ranks = lambda idx: output_top_logprobs[2][idx]
_lmin = lambda idx: output_top_logprobs[3][idx]
_lmax = lambda idx: output_top_logprobs[4][idx]
_lmean = lambda idx: output_top_logprobs[5][idx]
_lstd = lambda idx: output_top_logprobs[6][idx]

for idx in range(num_tokens):
logits_stats = None
if has_logits_stats:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
logits_min=[_lmin(idx)],
logits_max=[_lmax(idx)],
logits_mean=[_lmean(idx)],
logits_std=[_lstd(idx)],
)
logits_stats = {
"min": float(_lmin(idx)),
"max": float(_lmax(idx)),
"mean": float(_lmean(idx)),
"std": float(_lstd(idx)),
}
else:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
)
step_logprobs_res = self._build_logprobs_response(
request_logprobs=request_logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request_top_logprobs,
request_decode_flag=request_decode_flag,
logits_stats=logits_stats,
)
if logprobs_res is None:
logprobs_res = step_logprobs_res
Expand All @@ -840,6 +892,7 @@ def _build_logprobs_response(
response_logprobs: Optional[LogprobsLists],
request_top_logprobs: int,
request_decode_flag: bool,
logits_stats: Optional[dict[str, float]] = None,
) -> Optional[LogProbs]:
"""
Construct a logprobs response object in line with the OpenAI style.
Expand Down Expand Up @@ -887,6 +940,7 @@ def _build_logprobs_response(
logprob=top_logprob_entries[0].logprob,
bytes=top_logprob_entries[0].bytes,
top_logprobs=top_logprob_entries[1:], # Here are the complete topk candidates
logits_stats=logits_stats,
)

return LogProbs(content=[sampled_entry])
Expand All @@ -908,7 +962,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
Expand Down
21 changes: 21 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(self, cfg, cached_generated_tokens, engine_worker_queue, split_conn

self.speculative_decoding = self.cfg.speculative_config.method is not None
self.use_logprobs = self.cfg.model_config.enable_logprob
self.compute_logits_stats = self.cfg.model_config.compute_logits_stats
self.enable_draft_logprob = self.cfg.speculative_config.enable_draft_logprob

if self.speculative_decoding:
Expand Down Expand Up @@ -349,6 +350,26 @@ def _process_batch_output_use_zmq(self, receive_datas):
logprobs_list: LogprobsLists = stream_data.logprobs.tolists()
result.outputs.logprob = float(logprobs_list.logprobs[0][0])
result.outputs.top_logprobs = logprobs_list
# Extract logits statistics if available
if self.compute_logits_stats:
assert (
logprobs_list.logits_min is not None
), "logits_min is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_max is not None
), "logits_max is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_mean is not None
), "logits_mean is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_std is not None
), "logits_std is None when compute_logits_stats is enabled"
result.outputs.logits_stats = {
"min": float(logprobs_list.logits_min[0]),
"max": float(logprobs_list.logits_max[0]),
"mean": float(logprobs_list.logits_mean[0]),
"std": float(logprobs_list.logits_std[0]),
}
except Exception as e:
llm_logger.warning(f"Failed to parse logprobs from StreamTransferData: {e}")
if getattr(stream_data, "prompt_logprobs", None) is not None:
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3141,9 +3141,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
# Synchronize before using token_ids, logprobs and ranks to ensure async copy are completed.
paddle.device.synchronize()
chunk_slice = slice(start_idx, start_idx + num_logits)
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/metax_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3052,9 +3052,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
chunk_slice = slice(start_idx, start_idx + num_logits)
logprobs_tensors.logprob_token_ids[chunk_slice].copy_(token_ids, False)
logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, False)
Expand Down
54 changes: 54 additions & 0 deletions fastdeploy/worker/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class LogprobsLists(NamedTuple):
logprobs: list[list[float]]
# [num_reqs]
sampled_token_ranks: list[int]
# Logits statistics for each sequence (optional)
logits_min: Optional[list[float]] = None # [num_reqs]
logits_max: Optional[list[float]] = None # [num_reqs]
logits_mean: Optional[list[float]] = None # [num_reqs]
logits_std: Optional[list[float]] = None # [num_reqs]

def slice_columns(self, start: int, end: int):
"""
Expand All @@ -54,6 +59,14 @@ def slice_columns(self, start: int, end: int):
[row[start:end] for row in self.logprob_token_ids],
[row[start:end] for row in self.logprobs],
self.sampled_token_ranks, # unchanged
# [row[start:end] for row in self.logits_min],
# [row[start:end] for row in self.logits_max],
# [row[start:end] for row in self.logits_mean],
# [row[start:end] for row in self.logits_std],
self.logits_min, # unchanged
self.logits_max, # unchanged
self.logits_mean, # unchanged
self.logits_std, # unchanged
)

def slice_rows(self, start: int, end: int):
Expand All @@ -65,6 +78,10 @@ def slice_rows(self, start: int, end: int):
self.logprob_token_ids[start:end],
self.logprobs[start:end],
self.sampled_token_ranks[start:end],
self.logits_min[start:end] if self.logits_min is not None else None,
self.logits_max[start:end] if self.logits_max is not None else None,
self.logits_mean[start:end] if self.logits_mean is not None else None,
self.logits_std[start:end] if self.logits_std is not None else None,
)


Expand All @@ -77,13 +94,22 @@ class LogprobsTensors(NamedTuple):
logprobs: paddle.Tensor
# [num_reqs]
selected_token_ranks: paddle.Tensor
# Logits statistics for each sequence (optional)
logits_min: Optional[paddle.Tensor] = None # [num_reqs]
logits_max: Optional[paddle.Tensor] = None # [num_reqs]
logits_mean: Optional[paddle.Tensor] = None # [num_reqs]
logits_std: Optional[paddle.Tensor] = None

def tolists(self):
"""Convert to lists."""
return LogprobsLists(
self.logprob_token_ids.tolist(),
self.logprobs.tolist(),
self.selected_token_ranks.tolist(),
self.logits_min.tolist() if self.logits_min is not None else None,
self.logits_max.tolist() if self.logits_max is not None else None,
self.logits_mean.tolist() if self.logits_mean is not None else None,
self.logits_std.tolist() if self.logits_std is not None else None,
)

@staticmethod
Expand All @@ -97,6 +123,10 @@ def empty_cpu(num_positions: int, num_tokens_per_position: int) -> "LogprobsTens
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

@staticmethod
Expand All @@ -110,6 +140,10 @@ def empty(num_positions: int, num_tokens_per_position: int) -> "LogprobsTensors"
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

def slice_rows(self, start: int, end: int):
Expand All @@ -122,6 +156,26 @@ def slice_rows(self, start: int, end: int):
paddle.to_tensor(self.logprob_token_ids.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.logprobs.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.selected_token_ranks.cpu()[start:end], place="cpu"),
(
paddle.to_tensor(self.logits_min.cpu()[start:end], place="cpu")
if self.logits_min is not None
else None
),
(
paddle.to_tensor(self.logits_max.cpu()[start:end], place="cpu")
if self.logits_max is not None
else None
),
(
paddle.to_tensor(self.logits_mean.cpu()[start:end], place="cpu")
if self.logits_mean is not None
else None
),
(
paddle.to_tensor(self.logits_std.cpu()[start:end], place="cpu")
if self.logits_std is not None
else None
),
)


Expand Down
5 changes: 5 additions & 0 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,11 @@ def parse_args():
action="store_true",
help="Enable output of token-level log probabilities.",
)
parser.add_argument(
"--compute_logits_stats",
action="store_true",
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
parser.add_argument(
"--max_logprobs",
type=int,
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/xpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ def _get_prompt_logprobs_list(self, hidden_states: paddle.Tensor) -> list[Option
raw_logprobs = logits
else:
raw_logprobs = self.sampler.compute_logprobs(logits)
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
chunk_slice = slice(start_idx, start_idx + num_logits)
logprobs_tensors.logprob_token_ids[chunk_slice].copy_(token_ids, False)
logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, False)
Expand Down
Loading
Loading