Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@

class RequestStatus(Enum):
WAITING = 0
RUNNING = 1
PREEMPTED = 2
FINISHED = 3
ABORT = 4
RUNNING_PREFILL = 1
RUNNING_DECODE = 2
PREEMPTED = 3
FINISHED = 4
ABORT = 5


class RequestType(Enum):
Expand Down
211 changes: 146 additions & 65 deletions fastdeploy/engine/sched/resource_manager_v1.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ def _validate_split_kv_size(value: int) -> int:
# Whether to enable low latency in mixed scenario
"FD_XPU_ENABLE_MIXED_EP_MODE": lambda: bool(int(os.getenv("FD_XPU_ENABLE_MIXED_EP_MODE", "0"))),
# Reserve output blocks for decoding requests when schedule new prefill requests
"FD_INIT_NEW_TOKEN_RATIO": lambda: float(os.getenv("FD_INIT_NEW_TOKEN_RATIO", "0.7")),
"FD_MIN_NEW_TOKEN_RATIO": lambda: float(os.getenv("FD_MIN_NEW_TOKEN_RATIO", "0.1")),
"FD_NEW_TOKEN_RATIO_DECAY": lambda: float(os.getenv("FD_NEW_TOKEN_RATIO_DECAY", "0.001")),
"FD_CLIP_MAX_NEW_TOKENS": lambda: int(os.getenv("FD_CLIP_MAX_NEW_TOKENS", "4096")),
# Legacy reserve block env vars (kept for backwards compatibility, no longer used)
"FD_RESERVE_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL": lambda: int(
os.getenv("FD_RESERVE_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL", "16")
),
Expand All @@ -224,6 +229,9 @@ def _validate_split_kv_size(value: int) -> int:
"FD_RESERVE_MIN_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL": lambda: int(
os.getenv("FD_RESERVE_MIN_OUTPUT_BLOCK_NUM_FOR_DECODE_WHEN_SCHEDULE_NEW_PREFILL", "0")
),
# When True, use per-request new_token_ratio to estimate reserved blocks (SGLang-style).
# When False, fall back to the legacy fixed-block reservation strategy.
"FD_USE_NEW_TOKEN_RATIO_RESERVE": lambda: bool(int(os.getenv("FD_USE_NEW_TOKEN_RATIO_RESERVE", "1"))),
# Timeout for worker process health check in seconds
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),
# File path for file storage backend
Expand Down
3 changes: 3 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Request,
RequestMetrics,
RequestOutput,
RequestStatus,
SpeculateMetrics,
)
from fastdeploy.inter_communicator import ZmqIpcServer
Expand Down Expand Up @@ -950,6 +951,8 @@ def _process_batch_output(self):
continue

self.total_step += 1
if task.status == RequestStatus.RUNNING_PREFILL:
task.status = RequestStatus.RUNNING_DECODE
current_time = time.time()
trace_carrier = None
if self.tokens_counter[task_id] == 0:
Expand Down
4 changes: 2 additions & 2 deletions tests/engine/test_resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ def test_preempted_all_with_normal_requests(self):
req1 = Mock(spec=Request)
req1.request_id = "req1"
req1.use_extend_tables = False
req1.status = RequestStatus.RUNNING
req1.status = RequestStatus.RUNNING_DECODE
req1.block_tables = [1, 2, 3]
req1.num_cached_blocks = 0
req1.idx = 0

req2 = Mock(spec=Request)
req2.request_id = "req2"
req2.use_extend_tables = False
req2.status = RequestStatus.RUNNING
req2.status = RequestStatus.RUNNING_DECODE
req2.block_tables = [4, 5]
req2.num_cached_blocks = 0
req2.idx = 1
Expand Down
3 changes: 2 additions & 1 deletion tests/output/test_process_batch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import paddle

from fastdeploy.engine.request import RequestMetrics, RequestOutput
from fastdeploy.engine.request import RequestMetrics, RequestOutput, RequestStatus
from fastdeploy.output.token_processor import TokenProcessor

paddle.set_device("cpu")
Expand Down Expand Up @@ -82,6 +82,7 @@ def __init__(self):
self.ic_req_data = {}
self.prompt_token_ids_len = 0
self.trace_carrier = {}
self.status = RequestStatus.RUNNING_DECODE

now = time.time()
self.metrics = RequestMetrics(
Expand Down
15 changes: 14 additions & 1 deletion tests/output/test_token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
import pytest

from fastdeploy import envs
from fastdeploy.engine.request import Request, RequestMetrics, RequestOutput
from fastdeploy.engine.request import (
Request,
RequestMetrics,
RequestOutput,
RequestStatus,
)
from fastdeploy.output import token_processor
from fastdeploy.output.token_processor import (
MAX_BSZ,
Expand Down Expand Up @@ -671,6 +676,7 @@ def test_process_batch_output_consumes_tokens_and_finishes_task():
prompt_token_ids_len=0,
num_total_tokens=1,
block_tables=[1],
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
task.get = lambda key, default=None: getattr(task, key, default)
Expand Down Expand Up @@ -708,6 +714,7 @@ def test_process_batch_output_logprob_records_topk_and_caching():
num_total_tokens=1,
block_tables=[1],
get=lambda key, default=None: None,
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
rm.tasks_list[0] = task
Expand Down Expand Up @@ -784,6 +791,7 @@ def test_process_batch_output_speculative_recovery_stop_finishes():
num_total_tokens=1,
block_tables=[1],
get=lambda key, default=None: None,
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
rm.tasks_list[0] = task
Expand Down Expand Up @@ -911,6 +919,7 @@ def test_process_batch_output_speculative_logprob_targets_topk_scores():
num_total_tokens=1,
block_tables=[1],
get=lambda key, default=None: None,
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
rm.tasks_list[0] = task
Expand Down Expand Up @@ -1076,6 +1085,7 @@ def test_process_batch_output_records_second_decode_token():
num_total_tokens=1,
block_tables=[1],
get=lambda key, default=None: None,
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
task.metrics.inference_start_time = time.time()
Expand Down Expand Up @@ -1145,6 +1155,7 @@ def test_process_batch_output_prefill_sets_draft_tokens():
num_total_tokens=1,
block_tables=[1],
get=lambda key, default=None: None,
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
rm.tasks_list[0] = task
Expand Down Expand Up @@ -1186,6 +1197,7 @@ def test_process_batch_output_logs_recovery_stop_for_non_speculative():
prompt_token_ids_len=0,
num_total_tokens=1,
block_tables=[1],
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
task.get = lambda k, d=None: getattr(task, k, d)
Expand Down Expand Up @@ -1223,6 +1235,7 @@ def test_process_batch_output_sets_multimodal_token_counts():
num_total_tokens=1,
block_tables=[1],
multimodal_inputs={"num_input_image_tokens": 4, "num_input_video_tokens": 5},
status=RequestStatus.RUNNING_DECODE,
)
task.trace_carrier = None
task.get = lambda key, default=None: getattr(task, key, default)
Expand Down
5 changes: 3 additions & 2 deletions tests/v1/test_resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def test_schedule_decode_and_waiting_prefill(self):

decode_request = _make_request(request_id="req-decode", prompt_token_ids=[1, 2])
decode_request.idx = 0
decode_request.status = RequestStatus.RUNNING
decode_request.status = RequestStatus.RUNNING_DECODE
decode_request.num_computed_tokens = 2
decode_request.output_token_ids = [99]
decode_request.block_tables = [1]
Expand All @@ -665,7 +665,7 @@ def test_schedule_decode_and_waiting_prefill(self):
self.assertGreaterEqual(len(scheduled_reqs), 2)
self.assertEqual(error_reqs, [])
self.assertIn(decode_request.request_id, manager.using_extend_tables_req_id)
self.assertEqual(waiting_request.status, RequestStatus.RUNNING)
self.assertEqual(waiting_request.status, RequestStatus.RUNNING_PREFILL)

def test_trigger_preempt_records_tasks(self):
manager = _build_manager()
Expand All @@ -678,6 +678,7 @@ def test_trigger_preempt_records_tasks(self):
preempted_req = _make_request(request_id="req-preempted")
preempted_req.idx = 0
preempted_req.use_extend_tables = False
preempted_req.status = RequestStatus.RUNNING_DECODE
request = _make_request(request_id="req-target")
request.idx = 1
manager.running = [request, preempted_req]
Expand Down
Loading