Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ def __init__(
self.think_end_id = args.get("think_end_id", -1)
self.im_patch_id = args.get("image_patch_id", -1)
self.line_break_id = args.get("line_break_id", -1)
if self.max_logprobs == -1 and hasattr(self, "vocab_size"):
self.max_logprobs = self.vocab_size
if self.max_logprobs < -1:
raise ValueError(" The possible values for max_logprobs can't be less than -1 ")

self._post_init()

Expand Down
13 changes: 12 additions & 1 deletion fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.entrypoints.openai.protocol import ToolCall
from fastdeploy.utils import data_processor_logger
from fastdeploy.worker.output import LogprobsLists, SampleLogprobs
from fastdeploy.worker.output import (
LogprobsLists,
LogprobsTensors,
PromptLogprobs,
SampleLogprobs,
)


class RequestStatus(Enum):
Expand Down Expand Up @@ -462,6 +467,8 @@ def __init__(
request_id: str,
prompt: Optional[str] = None,
prompt_token_ids: Optional[list[int]] = None,
prompt_logprobs: Optional[PromptLogprobs] = None,
prompt_logprobs_tensors: Optional[LogprobsTensors] = None,
output_type: Optional[int] = 3,
outputs: CompletionOutput = None,
finished: bool = False,
Expand All @@ -475,6 +482,8 @@ def __init__(
self.request_id = request_id
self.prompt = prompt
self.prompt_token_ids = prompt_token_ids
self.prompt_logprobs = prompt_logprobs
self.prompt_logprobs_tensors = prompt_logprobs_tensors
self.output_type = output_type
self.outputs = outputs
self.finished = finished
Expand Down Expand Up @@ -520,6 +529,7 @@ def __repr__(self) -> str:
f"RequestOutput(request_id={self.request_id}, "
f"prompt={self.prompt!r}, "
f"prompt_token_ids={self.prompt_token_ids}, "
f"prompt_logprobs={self.prompt_logprobs}, "
f"output_type={self.output_type}, "
f"outputs={self.outputs}, "
f"finished={self.finished}, "
Expand All @@ -545,6 +555,7 @@ def to_dict(self):
"request_id": self.request_id,
"prompt": self.prompt,
"prompt_token_ids": self.prompt_token_ids,
"prompt_logprobs": self.prompt_logprobs,
"output_type": self.output_type,
"outputs": None if self.outputs is None else self.outputs.to_dict(),
"metrics": None if self.metrics is None else self.metrics.to_dict(),
Expand Down
9 changes: 6 additions & 3 deletions fastdeploy/engine/sampling_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import os
import random
from dataclasses import dataclass, fields
from enum import Enum
Expand Down Expand Up @@ -204,10 +205,12 @@ def _verify_args(self) -> None:
raise ValueError(
f"min_tokens must be less than or equal to " f"max_tokens={self.max_tokens}, got {self.min_tokens}."
)
if self.logprobs is not None and self.logprobs < 0:
raise ValueError(f"logprobs must be non-negative, got {self.logprobs}.")
if self.logprobs is not None and self.logprobs > 20:
if self.logprobs is not None and self.logprobs < -1:
raise ValueError(f"logprobs must be greater than -1, got {self.logprobs}.")
if self.logprobs is not None and self.logprobs > 20 and os.getenv("FD_USE_GET_SAVE_OUTPUT_V1", "0") == "0":
raise ValueError("Invalid value for 'top_logprobs': must be less than or equal to 20.")
if self.prompt_logprobs is not None and self.prompt_logprobs < -1:
raise ValueError(f"prompt_logprobs must be greater than or equal to -1, got {self.prompt_logprobs}.")

if not 0 <= self.seed <= 922337203685477580:
raise ValueError("seed must be in [0, 922337203685477580], got " f"{self.seed}.")
Expand Down
135 changes: 132 additions & 3 deletions fastdeploy/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

from __future__ import annotations

import itertools
import logging
import threading
import time
import traceback
import uuid
from collections.abc import Iterable
from typing import Any, Optional, Union

from pydantic import ValidationError
Expand All @@ -37,13 +39,20 @@
llm_logger,
retrive_model_from_server,
)
from fastdeploy.worker.output import Logprob, LogprobsLists
from fastdeploy.worker.output import (
Logprob,
LogprobsLists,
LogprobsTensors,
PromptLogprobs,
)

root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)

NONES = itertools.repeat(None)


class LLM:
"""
Expand Down Expand Up @@ -189,12 +198,17 @@ def generate(
req_ids = self._add_request(prompts=prompts, sampling_params=sampling_params)

topk_logprobs = sampling_params[0].logprobs if sampling_params_len > 1 else sampling_params.logprobs
num_prompt_logprobs = (
sampling_params[0].prompt_logprobs if sampling_params_len > 1 else sampling_params.prompt_logprobs
)

# get output
if stream:
return self._run_engine_stream(req_ids, prompts, use_tqdm=use_tqdm, topk_logprobs=topk_logprobs)
else:
outputs = self._run_engine(req_ids, use_tqdm=use_tqdm, topk_logprobs=topk_logprobs)
outputs = self._run_engine(
req_ids, use_tqdm=use_tqdm, topk_logprobs=topk_logprobs, num_prompt_logprobs=num_prompt_logprobs
)
for i in range(len(outputs)):
outputs[i].prompt = prompts[i]
return outputs
Expand Down Expand Up @@ -321,6 +335,27 @@ def _add_request(
current_sampling_params = sampling_params[i]
else:
current_sampling_params = sampling_params
if kwargs.get("stream") and current_sampling_params.prompt_logprobs is not None:
raise ValueError("prompt_logprobs is not supported with streaming.")
max_logprobs = self.llm_engine.cfg.model_config.max_logprobs
if max_logprobs == -1:
max_logprobs = self.llm_engine.cfg.model_config.ori_vocab_size
if current_sampling_params.logprobs is not None:
num_logprobs = current_sampling_params.logprobs
if num_logprobs == -1:
num_logprobs = self.llm_engine.cfg.model_config.ori_vocab_size
if num_logprobs > max_logprobs:
raise ValueError(
f"Number of logprobs requested ({num_logprobs}) exceeds maximum allowed value ({max_logprobs})."
)
if current_sampling_params.prompt_logprobs is not None:
num_prompt_logprobs = current_sampling_params.prompt_logprobs
if num_prompt_logprobs == -1:
num_prompt_logprobs = self.llm_engine.cfg.model_config.ori_vocab_size
if num_prompt_logprobs > max_logprobs:
raise ValueError(
f"Number of logprobs requested ({num_prompt_logprobs}) exceeds maximum allowed value ({max_logprobs})."
)
if current_sampling_params.guided_decoding is not None:
guided_decoding_dict = current_sampling_params.guided_decoding.to_dict()
tasks.update(guided_decoding_dict)
Expand Down Expand Up @@ -377,7 +412,93 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i
except Exception as e:
llm_logger.error(f"Error building sample logprobs from LogprobsLists: {e}, {str(traceback.format_exc())}")

def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optional[int] = None):
def _build_prompt_logprobs(
self,
prompt_logprobs_tensors: LogprobsTensors,
num_prompt_logprobs: int,
):
"""Update with prompt logprobs from worker.
Args:
prompt_logprobs_tensors: tuple containing the prompt logprobs
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
decoded_tokens = [self._decode_token(token_id) for token_id in token_ids.flatten().tolist()]

# Recover shapes.
num_prompt_tokens, num_logprobs = logprobs.shape

# Pythonize the paddle tensors.
prompt_token_ranks = ranks.tolist()
prompt_logprobs = logprobs.tolist()
token_ids = token_ids.tolist()
result: Optional[PromptLogprobs] = []
# Make Logprob for each position.
for pos in range(num_prompt_tokens):
# Handle flattening.
offset = pos * num_logprobs
offset_end = offset + num_logprobs
decoded_tokens_for_pos = NONES if decoded_tokens is None else decoded_tokens[offset:offset_end]

# Update with the Logprob dictionary for this pos.
result.append(
self._make_logprob_dict(
prompt_logprobs[pos],
token_ids[pos],
decoded_tokens_for_pos,
prompt_token_ranks[pos],
num_prompt_logprobs,
)
)
return result

@staticmethod
def _make_logprob_dict(
logprobs: list[float],
logprob_token_ids: list[int],
decoded_tokens: Iterable[str | None],
rank: int,
num_logprobs: int,
) -> dict[int, Logprob]:
"""Make a Logprob dictionary for a position.
Args:
logprobs: list of log probabilities
logprob_token_ids: list of top token ids
decoded_tokens: list of decoded top tokens
rank: rank of the sampled token
num_logprobs: number of logprobs requested
by the user (in addition to sampled logprob)
Returns:
dict[token id, Logprob]
"""
if num_logprobs == -1:
num_logprobs = len(logprobs)
# We do not need a special case for the sampled token
# being in the topk, since inserting duplicated data
# into a dictionary twice is the same as doing it once.
topk_ranks = range(1, num_logprobs + 1)
ranks = itertools.chain((rank,), topk_ranks)

return {
token_id: Logprob(
logprob=logprob,
rank=rank,
decoded_token=token,
)
for token_id, logprob, rank, token in zip(logprob_token_ids, logprobs, ranks, decoded_tokens)
Comment on lines +489 to +492
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name shadowing in dictionary comprehension. The parameter rank on line 465 is shadowed by the loop variable rank on line 493. This causes the function to use the loop variable instead of the parameter value, which is incorrect. The loop variable should be renamed (e.g., token_rank) to avoid shadowing the parameter.

Suggested change
rank=rank,
decoded_token=token,
)
for token_id, logprob, rank, token in zip(logprob_token_ids, logprobs, ranks, decoded_tokens)
rank=token_rank,
decoded_token=token,
)
for token_id, logprob, token_rank, token in zip(logprob_token_ids, logprobs, ranks, decoded_tokens)

Copilot uses AI. Check for mistakes.
}

def _run_engine(
self,
req_ids: list[str],
use_tqdm: bool,
topk_logprobs: Optional[int] = None,
num_prompt_logprobs: Optional[int] = None,
):
"""
运行引擎,并返回结果列表。
Expand Down Expand Up @@ -422,9 +543,17 @@ def _run_engine(self, req_ids: list[str], use_tqdm: bool, topk_logprobs: Optiona

# filter logprobs
if result.outputs.top_logprobs and topk_logprobs:
if topk_logprobs == -1:
topk_logprobs = self.llm_engine.cfg.model_config.ori_vocab_size
result.outputs.logprobs = self._build_sample_logprobs(
result.outputs.top_logprobs, topk_logprobs
)
if result.prompt_logprobs_tensors and num_prompt_logprobs:
if num_prompt_logprobs == -1:
num_prompt_logprobs = self.llm_engine.cfg.model_config.ori_vocab_size
result.prompt_logprobs = self._build_prompt_logprobs(
result.prompt_logprobs_tensors, num_prompt_logprobs
)

output[pos] = result
finished.append(i)
Expand Down
13 changes: 13 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,19 @@ def _process_batch_output_use_zmq(self, receive_datas):
finished=False,
metrics=metrics,
)
if self.use_logprobs:
if getattr(stream_data, "logprobs", None) is not None:
try:
logprobs_list: LogprobsLists = stream_data.logprobs.tolists()
result.outputs.logprob = float(logprobs_list.logprobs[0][0])
result.outputs.top_logprobs = logprobs_list
except Exception as e:
llm_logger.warning(f"Failed to parse logprobs from StreamTransferData: {e}")
if getattr(stream_data, "prompt_logprobs", None) is not None:
try:
result.prompt_logprobs_tensors = stream_data.prompt_logprobs
except Exception as e:
llm_logger.warning(f"Failed to parse prompt_logprobs from StreamTransferData: {e}")
Comment on lines +297 to +300
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overly broad exception handling. The try-except block on lines 297-300 catches any Exception when trying to assign stream_data.prompt_logprobs to result.prompt_logprobs_tensors, but this is a simple assignment that shouldn't fail. If an exception occurs here, it likely indicates a deeper issue that should be investigated rather than silently logged. Consider either removing the try-except or being more specific about what exceptions are expected.

Suggested change
try:
result.prompt_logprobs_tensors = stream_data.prompt_logprobs
except Exception as e:
llm_logger.warning(f"Failed to parse prompt_logprobs from StreamTransferData: {e}")
result.prompt_logprobs_tensors = stream_data.prompt_logprobs

Copilot uses AI. Check for mistakes.
if self.tokens_counter[task_id] == 0:
if task.messages is not None:
result.prompt = task.messages
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/worker/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Logprob(NamedTuple):
decoded_token: Optional[str] = None


PromptLogprobs = list[dict[int, Logprob] | None]
# [{token_id, logprob}] for tokens sampled from the top-k
SampleLogprobs = list[dict[int, Logprob]]

Expand Down
Loading
Loading