Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):

self.is_paused = False # pause request generation
self._pause_cond = threading.Condition()
self._rejecting_new_requests = False # blocks new requests during abort drain

self._ctrl_output_queues = {}
self._ctrl_response_mailboxes = collections.defaultdict(collections.OrderedDict)
Expand Down Expand Up @@ -1325,7 +1326,7 @@ def _insert_zmq_task_to_scheduler(self):
trace_print(LoggingEventName.REQUEST_QUEUE_START, data["request_id"], data.get("user", ""))
self.llm_logger.debug(f"Receive request from api server: {request}")

if self.is_paused:
if self.is_paused or self._rejecting_new_requests:
self.llm_logger.warning(f"Engine is paused, drop request: {request}")
self._send_error_response(
request.request_id,
Expand Down Expand Up @@ -1445,39 +1446,20 @@ def _control_pause(self, control_request: ControlRequest):
if self.is_paused:
self.llm_logger.info("Engine is already paused, no need to pause again.")
return
self.is_paused = True
self._rejecting_new_requests = True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 _rejecting_new_requests = Trueis_paused = True 之间存在异常安全性隐患。

resource_manager.add_abort_req_ids()log_status() 抛出异常,此时:

  • _rejecting_new_requests = True(已设置)
  • is_paused = False(尚未设置)

_control_resume()is_paused=False 时会提前返回,不会重置 _rejecting_new_requests,导致引擎永久拒绝所有新请求。建议使用 try/finally 保证异常时清理状态:

try:
    ...
except Exception:
    with self._pause_cond:
        self._rejecting_new_requests = False
    raise

self.resource_manager.log_status()

self.llm_logger.info("Abort running requests.")
# Scheduling loop picks them up via _trigger_abort when they enter resource_manager
all_req_ids = list(set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))
self.llm_logger.info(f"Pause: aborting {len(all_req_ids)} total requests.")
if all_req_ids:
self.resource_manager.add_abort_req_ids(all_req_ids)
self._wait_inflight_drained()

self.resource_manager.log_status()
# preempted all running reqs. preempted reqs will be append to ResourceManager.waiting queue
timeout, count = 60, 0
while self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
count += 1
if count >= timeout * 1000:
break
if count >= timeout * 1000:
error_msg = f"Emptying engine worker queue timed out after {timeout} seconds, worker may hanged!"
self.llm_logger.error(error_msg)
raise Exception(error_msg)
running_reqs = self.resource_manager.preempted_all()
if len(running_reqs) > 0:
self.llm_logger.info(f"Total {len(running_reqs)} requests need to be aborted.")
self.resource_manager.get_real_bsz()
self.engine_worker_queue.put_tasks((running_reqs, self.resource_manager.real_bsz))
self.resource_manager.wait_worker_inflight_requests_finish(timeout=60)
# self.engine_worker_queue.clear_data()
self.token_processor.clear_data()
with self._pause_cond:
self.is_paused = True

This comment was marked as outdated.

self.resource_manager.log_status()

# abort inflight requests to user
inflight_requests = self.scheduler.get_inflight_requests()
self.llm_logger.info(f"Abort inflight requests (total {len(inflight_requests)}).")
for req in inflight_requests:
self._send_error_response(req.request_id, "Request is aborted since engine is paused.")
self.scheduler.reset()

if envs.ENABLE_V1_KVCACHE_MANAGER:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 PR 执行流描述了 scheduler.reset(),但此处代码缺失

PR 描述的执行流程末尾明确写有 scheduler.reset() + cache reset,但实际只有 cache reset,self.scheduler.reset() 未被调用。

查看 local_scheduler.reset()(line 115-119)的实现,它会清空:

  • ids_read_cursor(重置为 0)
  • ids(所有历史请求 ID 列表)
  • requests(待处理请求字典)
  • responses(已接收响应字典)

_wait_inflight_drained() 只检查 requests 和 abort 队列为空,不检查 responses。若 scheduler.responses 中有残留未消费数据,resume 后可能产生状态不一致。

请确认:省略 scheduler.reset() 是有意为之(abort pipeline 的正常路径已保证 responses 被消费完),还是遗漏实现?

self.resource_manager.cache_manager.reset_cache()
else:
Expand All @@ -1500,6 +1482,21 @@ def _control_pause(self, control_request: ControlRequest):
self.llm_logger.info("Successfully paused request generation.")
return None

def _wait_inflight_drained(self):
"""
Wait until resource_manager.requests is completely empty.
No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained.

This comment was marked as outdated.

"""
start_time = time.time()
while (
Comment thread
jackyYang6 marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 当前 _wait_inflight_drained() 无任何超时机制,若 abort pipeline 因 bug 或硬件异常卡住,调用 pause 的线程将永久阻塞,引擎无法恢复。

旧代码对 worker queue 有 60s 超时并抛出异常,是有意为之的安全网。建议至少引入可配置的超时参数:

def _wait_inflight_drained(self, timeout: float = 120.0):
    start_time = time.time()
    while (
        self.resource_manager.requests
        or self.scheduler.requests
        or self.resource_manager.waiting_abort_req_id_set
        or self.resource_manager.to_be_aborted_req_id_set
    ):
        elapsed = time.time() - start_time
        if elapsed > timeout:
            raise TimeoutError(f"_wait_inflight_drained timed out after {timeout}s")
        time.sleep(0.005)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _wait_inflight_drained() 无超时机制,可能导致永久阻塞

原代码对 worker queue 等待有 60s 超时并 raise Exception,新设计完全去掉了超时保护。注释说 "No timeout — abort pipeline will complete",但若 abort pipeline 因 bug 或异常卡住(如 worker hang、ZMQ 消息丢失),_control_pause() 将永远阻塞,上游 RL 框架调用方无法感知,造成静默挂起。

建议添加兜底超时:

DRAIN_TIMEOUT = 120
start_time = time.time()
while (self.resource_manager.requests or self.scheduler.requests
       or self.resource_manager.waiting_abort_req_id_set
       or self.resource_manager.to_be_aborted_req_id_set):
    if time.time() - start_time > DRAIN_TIMEOUT:
        self.llm_logger.error(f"Drain timed out after {DRAIN_TIMEOUT}s, abort pipeline may have stalled!")
        raise TimeoutError(f"_wait_inflight_drained timed out after {DRAIN_TIMEOUT}s")
    time.sleep(0.005)

self.resource_manager.requests
or self.scheduler.requests
or self.resource_manager.waiting_abort_req_id_set
or self.resource_manager.to_be_aborted_req_id_set
):
time.sleep(0.005)
self.llm_logger.info(f"All inflight requests drained, take time: {time.time() - start_time:.3f} seconds")

def _control_resume(self, control_request: ControlRequest) -> Optional[dict]:
"""Control function for resuming request generation.

Expand All @@ -1515,6 +1512,7 @@ def _control_resume(self, control_request: ControlRequest) -> Optional[dict]:
self.llm_logger.info("Engine is not paused, no need to resume.")
return None
self.is_paused = False
self._rejecting_new_requests = False
self._pause_cond.notify_all()

# resume cache transfer
Expand Down
19 changes: 13 additions & 6 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,22 +1137,29 @@ def test_control_pause_and_resume_paths(self):
eng = self._make_mixed_engine()
eng.is_paused = False
eng._pause_cond = threading.Condition()
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False), put_tasks=Mock())
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False))
eng.resource_manager = Mock(
preempted_all=Mock(return_value=[Request(request_id="r1", prompt_token_ids=[1], prompt_token_ids_len=1)]),
get_real_bsz=Mock(),
wait_worker_inflight_requests_finish=Mock(),
requests={"r1": Mock(output_token_ids=[1, 2, 3])},
waiting_abort_req_id_set=set(),
to_be_aborted_req_id_set=set(),
add_abort_req_ids=Mock(),
log_status=Mock(),
cache_manager=Mock(reset=Mock()),
real_bsz=1,
)
eng.token_processor = Mock(clear_data=Mock())
eng.scheduler = Mock(get_inflight_requests=Mock(return_value=[]), reset=Mock())
mock_scheduler = Mock(reset=Mock())
mock_scheduler.requests = {}
mock_scheduler.mutex = threading.Lock()
mock_scheduler.responses = {}
mock_scheduler.batch_responses_per_step = []
eng.scheduler = mock_scheduler
eng._send_error_response = Mock()
eng._wait_inflight_drained = Mock()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 测试中添加了 eng._wait_output_queue_empty = Mock(),并且为 mock_scheduler 设置了 responses={}batch_responses_per_step=[] 等属性(这些字段形似 _wait_output_queue_empty 所需的条件变量),但 _control_pause() 实现中从未定义也未调用该方法。

PR 描述的 Modifications 表格和执行流均明确列出:

_wait_output_queue_empty() — New method: polls until scheduler output queue is consumed (no timeout)

此方法的用途是在 cache reset 前确认所有 ZMQ 响应已发出。请确认:

  1. 该方法是否遗漏了实现?
  2. 或者该步骤留待后续 PR?如是,建议在 PR 描述中说明。

with patch("fastdeploy.engine.common_engine.envs.ENABLE_V1_KVCACHE_SCHEDULER", True):
eng._control_pause(ControlRequest(request_id="ctrl1", method="pause"))
self.assertTrue(eng.is_paused)
eng.resource_manager.add_abort_req_ids.assert_called_once()

eng._control_resume(ControlRequest(request_id="ctrl2", method="resume"))
self.assertFalse(eng.is_paused)
Expand Down
Loading