Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Whether to enable the decode caches requests for preallocating resource
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),

# Batched token timeout in EP
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),

# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),

Expand Down
3 changes: 3 additions & 0 deletions docs/zh/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
# 是否启用 decode 缓存请求以预分配资源
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),

# EP 中批处理 token 的超时时间
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),

# PD 中最大预取请求数量
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),

Expand Down
53 changes: 17 additions & 36 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,6 @@ def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进
create=True,
)

engine_forward_signal_data = np.zeros([1], dtype=np.int32)
self.engine_forward_signal = IPCSignal(

This comment was marked as outdated.

name="engine_forward_signal",
array=engine_forward_signal_data,
dtype=np.int32,
suffix=current_suffix,
create=True,
)

# worker_live_signal 用于engine感知各worker进程是否存活,记录每个step 时间
worker_healthy_live_recorded_time_array = np.zeros(
shape=[min(self.cfg.worker_num_per_node, self.cfg.parallel_config.tensor_parallel_size)], dtype=np.int32
Expand Down Expand Up @@ -1091,29 +1082,26 @@ def _fetch_request():
with self._pause_cond:
self._pause_cond.wait_for(lambda: not self.is_paused)
try:
if not is_fetching:
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue
if self.cfg.scheduler_config.splitwise_role != "mixed":
if not is_fetching:
is_fetching = True
get_request_pool.submit(_fetch_request)

This comment was marked as outdated.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug 非 mixed 模式下 get_request_pool.submit(_fetch_request) 缺少 RuntimeError 捕获。

mixed 模式的 else 分支保留了完整的 try/except 处理(线程池关闭时优雅退出),但非 mixed 路径没有。当引擎关闭时,ThreadPoolExecutor 会抛出 RuntimeError: cannot schedule new futures after shutdown,将直接导致调度线程崩溃。

建议修复:

if self.cfg.scheduler_config.splitwise_role != "mixed":
    if not is_fetching:
        try:
            is_fetching = True
            get_request_pool.submit(_fetch_request)
        except RuntimeError as e:
            if "shutdown" in str(e):
                self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
                break
            else:
                raise

except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
raise
if self.cfg.scheduler_config.splitwise_role != "mixed":
# Continue preprocessing incoming requests and accumulating them in the queue when forward pass not finished.
# Once the forward pass finishes, these accumulated requests can be scheduled in larger,
# more efficient batches.
if self.engine_worker_queue.exist_tasks() or self.engine_forward_signal.value[0] != 0:
time.sleep(0.001)
continue

else:
# In mixed, todo: optimze cache swap, to decouple swap from scheduler
if self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
continue
if len(self.resource_manager.waiting) == 0 and (not is_fetching):
# Check if the thread pool is still available to avoid submitting tasks to a shutdown thread pool.
try:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
if "shutdown" in str(e):
self.llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
raise

if hasattr(self.resource_manager, "scheduler_unhandled_request_num"):
self.resource_manager.scheduler_unhandled_request_num = self._get_scheduler_unhandled_request_num()
Expand Down Expand Up @@ -1178,13 +1166,6 @@ def _fetch_request():
elif not task.has_been_preempted_before:
task.metrics.inference_start_time = time.time()
self.engine_worker_queue.put_tasks((batch_request, self.resource_manager.real_bsz))

This comment was marked as outdated.

else:
# When there are no actual tasks to schedule, send an empty task batch to EP workers.
# This helps EP workers barrier for syncing tasks not hang.
if self.cfg.parallel_config.enable_expert_parallel:
self.engine_worker_queue.put_tasks(
(batch_request, self.resource_manager.real_bsz)
) # Empty (as idle tasks for ep)

# 4. Response error tasks
if error_tasks:
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _validate_split_kv_size(value: int) -> int:
"FD_ZMQ_CONTROL_CMD_SERVER_PORTS": lambda: os.getenv("FD_ZMQ_CONTROL_CMD_SERVER_PORTS", "8202"),
# Whether to enable the decode caches requests for preallocating resource
"FD_ENABLE_CACHE_TASK": lambda: os.getenv("FD_ENABLE_CACHE_TASK", "0"),
# Batched token timeout in EP
"FD_EP_BATCHED_TOKEN_TIMEOUT": lambda: float(os.getenv("FD_EP_BATCHED_TOKEN_TIMEOUT", "0.1")),
# Max pre-fetch requests number in PD
"FD_EP_MAX_PREFETCH_TASK_NUM": lambda: int(os.getenv("FD_EP_MAX_PREFETCH_TASK_NUM", "8")),
# Enable or disable model caching.
Expand Down
55 changes: 44 additions & 11 deletions fastdeploy/scheduler/dp_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from fastdeploy.logger.request_logger import RequestLogLevel, log_request
from fastdeploy.scheduler.data import ScheduledResponse
from fastdeploy.scheduler.local_scheduler import LocalScheduler
from fastdeploy.utils import get_logger
from fastdeploy.utils import envs, get_logger


class DPLocalScheduler(LocalScheduler):
Expand Down Expand Up @@ -136,19 +136,52 @@ def get_requests(
Returns:
List of Request objects ready for processing
"""
# DP scheduler is used in V1, there is no need to manage request fetching in the scheduler, resource_manager_v1 will do that.
if available_blocks <= reserved_output_blocks or batch < 1:
self.scheduler_logger.debug(
f"Scheduler's resource are insufficient: available_blocks={available_blocks} "
f"reserved_output_blocks={reserved_output_blocks} batch={batch} "
f"max_num_batched_tokens={max_num_batched_tokens}"
)
return []
required_total_blocks = 0
current_prefill_tokens = 0
start_batch_time = time.time()
requests: List[Request] = []

with self.requests_not_empty:
batch_ids = self.requests_not_empty.wait_for(
lambda: self.ids[self.ids_read_cursor : self.ids_read_cursor + 1],
0.005,
)
if batch_ids:
for request_id in batch_ids:
request = self.requests[request_id]
requests.append(request.raw)
self.ids_read_cursor += 1
while True:
batch_ids = self.requests_not_empty.wait_for(
lambda: self.ids[self.ids_read_cursor : self.ids_read_cursor + batch],
0.005,
)
if batch_ids:
for request_id in batch_ids:
request = self.requests[request_id]
required_input_blocks = self.calc_required_blocks(request.prompt_tokens_ids_len, block_size)
current_prefill_tokens += request.prompt_tokens_ids_len
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug current_prefill_tokensrequired_total_blocks 在请求被拒绝 之前 已完成累加,当 required_total_blocks > available_blocks 触发 break 时,这两个计数器已包含被拒绝请求的数据。

外层 while 循环下一次迭代时,wait_for 仍会返回同一批 batch_ids(游标未推进),进入 for 循环后计数器会被再次累加,导致:

  1. current_prefill_tokens 虚高,可能误触发外层 current_prefill_tokens > max_num_batched_tokens 检查,提前退出循环,后续体积较小、本可调度的请求(如 req3)被跳过。
  2. 外层循环只能靠超时(FD_EP_BATCHED_TOKEN_TIMEOUT = 0.1s)退出,浪费等待时间。

建议在 break 之前检查而非之后累加:

if required_total_blocks + required_input_blocks + reserved_output_blocks > available_blocks:
    break  # 先判断,通过后再累加
current_prefill_tokens += request.prompt_tokens_ids_len
required_total_blocks += required_input_blocks + reserved_output_blocks

required_total_blocks += required_input_blocks + reserved_output_blocks
if required_total_blocks > available_blocks:
break

requests.append(request.raw)
self.ids_read_cursor += 1
start_batch_time = time.time()
if current_prefill_tokens > max_num_batched_tokens:
break
if len(requests) >= batch:
break
if (
(current_prefill_tokens > max_num_batched_tokens)
or (len(requests) >= batch)
or (time.time() - start_batch_time > envs.FD_EP_BATCHED_TOKEN_TIMEOUT)
):
break

if batch_ids:
if len(batch_ids) > 0 and len(requests) == 0:
self.scheduler_logger.debug(
f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 日志信息语义与条件相反。

此处触发条件为 len(batch_ids) > 0 and len(requests) == 0,即拉取到了请求 ID 但 一个请求都没有入队(全部因资源不足被拒绝)。然而日志显示 "Scheduler has put all just-pulled request into the queue",容易误导排查方向。

建议改为:

self.scheduler_logger.debug(
    f"No requests could be scheduled due to insufficient resources, pending={len(batch_ids)}"
)

)

if len(requests) > 0:
log_request(
Expand Down
5 changes: 1 addition & 4 deletions fastdeploy/splitwise/internal_adapter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def _get_current_server_info(self):
available_batch_size = min(self.cfg.max_prefill_batch, self.engine.resource_manager.available_batch())

available_block_num = self.engine.resource_manager.available_block_num()
unhandled_request_num = self.engine.scheduler.get_unhandled_request_num()
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
unhandled_request_num = max(unhandled_request_num, len(self.engine.resource_manager.waiting))
server_info = {
"splitwise_role": self.cfg.scheduler_config.splitwise_role,
"block_size": int(self.cfg.cache_config.block_size),
Expand All @@ -65,7 +62,7 @@ def _get_current_server_info(self):
"available_resource": float(1.0 * available_block_num / self.cfg.cache_config.total_block_num),
"max_batch_size": int(available_batch_size),
"max_input_token_num": self.cfg.model_config.max_model_len,
"unhandled_request_num": unhandled_request_num,
"unhandled_request_num": self.engine.scheduler.get_unhandled_request_num(),
"available_batch": int(self.engine.resource_manager.available_batch()),
}
return server_info
Expand Down
38 changes: 3 additions & 35 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,6 @@ def init_health_status(self) -> None:
create=False,
)

# init engine forward signal
# If engine is being forward, engine_forward_signal_data should be 1.
# If engine is out of forward, engine_forward_signal_data should be 0.
# In pd disaggregation + EP parallel, only when engine is out of forward, scheduler send next batch to worker.
# When engine is out of forward, engine_forward_signal_data must be 0, otherwise scheduler will not schedule next batch.
engine_forward_signal_data = np.zeros([1], dtype=np.int32)
self.engine_forward_signal = IPCSignal(
name="engine_forward_signal",
array=engine_forward_signal_data,
dtype=np.int32,
suffix=self.parallel_config.local_engine_worker_queue_port,
create=False,
)

def update_weights_from_tensor(self, mmap_infos):
"""
update_weights_from_tensor
Expand Down Expand Up @@ -473,6 +459,9 @@ def event_loop_normal(self) -> None:
# TODO: Unify status variables model_weights_status (shared memory) and model_weights_signal (numpy array) to one
self.model_weights_signal = np.zeros([1], dtype=np.int32)
while True:
# run eplb
self._run_eplb(tp_rank)

if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS:
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
if self.ranks > 1:
Expand Down Expand Up @@ -551,23 +540,12 @@ def event_loop_normal(self) -> None:

if self._get_exist_task_flag():
logger.debug(f"Rank: {self.local_rank} Detected new requests.")
self.engine_forward_signal.value[0] = 1
tasks, read_finish = self.task_queue.get_tasks()
# Only one of all tp_size client will get read_finish == True.
if read_finish:
self._update_exist_task_flag(False)
self._tp_barrier_wait() if tp_size > 1 else None

# In EP parallel(corresponing to dp attention), we need to barrier for prefill to prevent data imbalance due to inconsistent data arrival.
# Only EP + DP prefill should barrier for data arrival.
# In mixed mode and decoder in D, we should not barrier to influence decoding.
if self.parallel_config.use_ep and self.scheduler_config.splitwise_role == "prefill":
paddle.distributed.barrier(self.parallel_config.ep_group)

assert (
len(tasks) > 0
), f"task_queue.get_tasks() should contain at least one tuple, [([req1, ...] ,real_bsz)], but got len(tasks)={len(tasks)}"

batch_request, control_reqs, max_occupied_batch_index = BatchRequest.from_tasks(tasks)

if len(control_reqs) > 0:
Expand All @@ -594,12 +572,6 @@ def event_loop_normal(self) -> None:

# Process prefill inputs
self.worker.preprocess_new_task(batch_request, max_occupied_batch_index)
else:
if self.scheduler_config.splitwise_role == "prefill":
if tp_size > 1:
# Synchronize the signal for other workers
self._tp_barrier_wait()
continue

# Let the ep group run control method synchronically
if envs.FD_ENABLE_V1_UPDATE_WEIGHTS and self.parallel_config.use_ep:
Expand All @@ -614,7 +586,6 @@ def event_loop_normal(self) -> None:
and not self.worker.model_runner.not_need_stop()
):
self._tp_barrier_wait() if tp_size > 1 else None
self.engine_forward_signal.value[0] = 0
time.sleep(0.001)
continue

Expand All @@ -637,9 +608,6 @@ def event_loop_normal(self) -> None:
if not envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill()
logger.debug(f"execute model cost: {time.time()-start_execute_time:.5f} s")
# run eplb
self._run_eplb(tp_rank)
self.engine_forward_signal.value[0] = 0

if (
not self.parallel_config.use_ep
Expand Down
37 changes: 18 additions & 19 deletions tests/ci_use/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,29 +214,28 @@ def test_metrics_with_clear_and_reset():
"""
Test the metrics monitoring endpoint.
"""
pass # not stable, uncomment after bug fix
# metrics_url = f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"
metrics_url = f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"

# async_concurrency(n=10)
async_concurrency(n=10)

# time.sleep(0.3)
time.sleep(0.3)

# ===== clear_load_weight =====
# clear_url = f"http://0.0.0.0:{FD_API_PORT}/clear_load_weight"
# print("Calling clear_load_weight...")
# r = requests.get(clear_url, timeout=30)
# assert r.status_code == 200, f"clear_load_weight failed: {r.status_code}"

# metrics = get_metrics_dict(metrics_url)
# running = metrics["fastdeploy:num_requests_running"]
# waiting = metrics["fastdeploy:num_requests_waiting"]

# print(
# "ASSERT after the clear_load_weight operation, the value is 0 (Request interruption stopped inference, and related requests were cleared):",
# running,
# "waiting:",
# waiting,
# )
clear_url = f"http://0.0.0.0:{FD_API_PORT}/clear_load_weight"
print("Calling clear_load_weight...")
r = requests.get(clear_url, timeout=30)
assert r.status_code == 200, f"clear_load_weight failed: {r.status_code}"

metrics = get_metrics_dict(metrics_url)
running = metrics["fastdeploy:num_requests_running"]
waiting = metrics["fastdeploy:num_requests_waiting"]

print(
"ASSERT after the clear_load_weight operation, the value is 0 (Request interruption stopped inference, and related requests were cleared):",
running,
"waiting:",
waiting,
)
# assert running == 0 and waiting == 0, "Expected both running and waiting to be 0 after clear_load_weight"


Expand Down
12 changes: 3 additions & 9 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,9 +1477,7 @@ def test_schedule_request_to_worker_v1_decode_preempted_and_errors(self):
task.metrics.scheduler_recv_req_time = time.time()

eng.scheduler = Mock(get_requests=Mock(return_value=[]), put_results=Mock())
eng.engine_worker_queue = Mock(
exist_tasks=Mock(return_value=False), put_tasks=Mock(), num_tasks=Mock(return_value=0)
)
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False), put_tasks=Mock())
eng._send_error_response = Mock()

eng.resource_manager = self._make_v1_decode_rm(eng, ([task], [("rid_x", None), ("rid_y", "bad")]))
Expand Down Expand Up @@ -1513,9 +1511,7 @@ def test_schedule_request_to_worker_v1_decode_prefill_task_path(self):
task.metrics.scheduler_recv_req_time = time.time()

eng.scheduler = Mock(get_requests=Mock(return_value=[]), put_results=Mock())
eng.engine_worker_queue = Mock(
exist_tasks=Mock(return_value=False), put_tasks=Mock(), num_tasks=Mock(return_value=0)
)
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False), put_tasks=Mock())

eng.resource_manager = self._make_v1_decode_rm(eng, ([task], []))

Expand Down Expand Up @@ -1546,9 +1542,7 @@ def test_schedule_request_to_worker_v1_error_task_none_skips_send(self):
task.metrics.scheduler_recv_req_time = time.time()

eng.scheduler = Mock(get_requests=Mock(return_value=[]), put_results=Mock())
eng.engine_worker_queue = Mock(
exist_tasks=Mock(return_value=False), put_tasks=Mock(), num_tasks=Mock(return_value=0)
)
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False), put_tasks=Mock())
eng._send_error_response = Mock()

eng.resource_manager = self._make_v1_decode_rm(eng, ([task], [("rid_none", None)]))
Expand Down
26 changes: 26 additions & 0 deletions tests/scheduler/test_dp_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,32 @@ def test_recycle_expired_requests(self, mock_time):
self.assertEqual(scheduler.ids, ["fresh_req"])
self.assertEqual(scheduler.ids_read_cursor, 1)

def test_get_requests_insufficient_resources(self):
"""Test getting requests when resources are insufficient."""
mock_logger.reset_mock()

# Test with insufficient blocks - mock the condition variable to avoid threading issues
with patch.object(self.scheduler, "requests_not_empty"):
requests = self.scheduler.get_requests(
available_blocks=5, block_size=16, reserved_output_blocks=10, max_num_batched_tokens=1024, batch=1
)

self.assertEqual(requests, [])
# The logger should have been called for insufficient resources
self.assertTrue(mock_logger.debug.called)
# Check the message contains expected content
call_args = mock_logger.debug.call_args[0][0]
self.assertIn("insufficient", call_args.lower())

def test_get_requests_insufficient_batch(self):
"""Test getting requests when batch size is insufficient."""
with patch.object(self.scheduler, "requests_not_empty"):
requests = self.scheduler.get_requests(
available_blocks=20, block_size=16, reserved_output_blocks=10, max_num_batched_tokens=1024, batch=0
)

self.assertEqual(requests, [])

@patch("time.time")
@patch.object(dp_scheduler_module, "envs")
def test_get_requests_no_requests_available(self, mock_envs, mock_time):
Expand Down
Loading
Loading