Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ def _fetch_request():
except Exception as e:
err_msg = "Error happened while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
self.llm_logger.error(err_msg)
# Failed to connect to engine worker queue, retry after 5 seconds
if self.engine_worker_queue.is_broken():
self.llm_logger.error("Failed to connect to engine worker queue, retry after 5 seconds")
time.sleep(5)

def _get_scheduler_unhandled_request_num(self) -> int:
"""
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ def _validate_split_kv_size(value: int) -> int:
"PREFILL_CONTINUOUS_REQUEST_DECODE_RESOURCES": lambda: int(
os.getenv("PREFILL_CONTINUOUS_REQUEST_DECODE_RESOURCES", "1")
),
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")),
"FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")),
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "1")),
"FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "1")),
"FD_FILL_BITMASK_BATCH": lambda: int(os.getenv("FD_FILL_BITMASK_BATCH", "4")),
"FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")),
"FD_ENABLE_ASYNC_LLM": lambda: int(os.getenv("FD_ENABLE_ASYNC_LLM", "0")),
Expand Down
10 changes: 10 additions & 0 deletions fastdeploy/inter_communicator/engine_worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,13 @@ def cleanup(self):
"""
if self.manager is not None and self.is_server:
self.manager.shutdown()

def is_broken(self):
try:
self.manager.connect()
return False
except (ConnectionRefusedError, ConnectionResetError, BrokenPipeError, EOFError, OSError):
llm_logger.error("Failed to connect to engine worker queue")
return True
except Exception:
return False
35 changes: 35 additions & 0 deletions fastdeploy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,12 @@ def is_port_available(host, port):
import errno
import socket

# If FD_ENGINE_TASK_QUEUE_WITH_SHM is enabled, then check the file socket is available
if envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
socket_path = f"/dev/shm/fd_task_queue_{port}.sock"
if not is_file_socket_available(socket_path):
return False

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand All @@ -637,6 +643,35 @@ def is_port_available(host, port):
return True


def is_file_socket_available(socket_path):
"""
Check the Unix domain socket (file socket) is available.

Args:
socket_path: Path to the socket file, e.g. /dev/shm/fd_task_queue_8000.sock

Returns:
True if the socket is available (not in use), False otherwise.
"""
import errno
import os
import socket

if not os.path.exists(socket_path):
return True

# File exists, try to connect to see if someone is listening
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
try:
s.connect(socket_path)
return False
except OSError as e:
if e.errno in (errno.ECONNREFUSED, errno.ENOENT):
# Stale socket file: exists but nobody is listening
return True
return False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 is_file_socket_available() 中,对于非 ECONNREFUSED/ENOENTOSError(如 EACCES 权限拒绝、ECONNABORTED 等)直接返回 False,将导致端口被误判为「不可用」,最终 find_free_ports 可能无法找到可用端口。

建议修复方式:对非预期错误记录日志并返回 True(保守策略,让 TCP 层绑定去兜底),或显式列举应返回 False 的错误码:

except OSError as e:
    if e.errno in (errno.ECONNREFUSED, errno.ENOENT):
        return True
    # 其他 OSError(如 EACCES)视为无法判断,保守返回 True
    llm_logger.warning(f"Unexpected OSError when checking socket {socket_path}: {e}")
    return True



def find_free_ports(
port_range: tuple[int, int] = (8000, 65535),
num_ports: int = 1,
Expand Down
95 changes: 10 additions & 85 deletions tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import queue
import shutil
import signal
import socket
import subprocess
import sys
import time
Expand All @@ -30,6 +29,7 @@
sys.path.insert(0, project_root)

from ci_use.EB_Lite_with_adapter.zmq_client import LLMControlClient, LLMReqClient
from e2e.utils.serving_utils import clean_ports, is_port_open

env = os.environ.copy()

Expand Down Expand Up @@ -79,88 +79,6 @@ def zmq_control_client():
return client


def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False


def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()

# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass

# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass

# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)

# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)


@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
Expand All @@ -170,8 +88,15 @@ def setup_and_run_server():
- Waits for server port to open (up to 30 seconds)
- Tears down server after all tests finish
"""
# 清理/dev/shm中的临时文件
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 rm -rf /dev/shm/* 会清除 /dev/shm 下所有文件,在共享 CI 环境中可能误删其他并发测试或进程(如其他实例的 socket 文件、POSIX 共享内存等),导致不相关任务失败。

建议只清理本次测试已知的文件(例如 fd_task_queue_*.sock),或仅在独占 CI 机器上执行全量清理:

import glob
for f in glob.glob("/dev/shm/fd_task_queue_*.sock"):
    try:
        os.remove(f)
    except Exception:
        pass

subprocess.run("rm -rf /dev/shm/*", shell=True)
print("Successfully cleaned up /dev/shm.")
except Exception as e:
print(f"Failed to cleanup /dev/shm: {e}")

print("Pre-test port cleanup...")
clean_ports()
clean_ports(PORTS_TO_CLEAN)

base_path = os.getenv("MODEL_PATH")
if base_path:
Expand Down Expand Up @@ -236,7 +161,7 @@ def setup_and_run_server():
print("\n===== Post-test server cleanup... =====")
try:
os.killpg(process.pid, signal.SIGTERM)
clean_ports()
clean_ports(PORTS_TO_CLEAN)
print(f"API server (pid={process.pid}) terminated")
except Exception as e:
print(f"Failed to terminate API server: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,25 @@
# limitations under the License.

import os
import signal
import socket
import subprocess
import sys
import time
import traceback

import pytest

from fastdeploy import LLM, SamplingParams

FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
MAX_WAIT_SECONDS = 60

current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, "..", ".."))
sys.path.insert(0, project_root)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
clean_ports,
)

def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
MAX_WAIT_SECONDS = 60


def format_chat_prompt(messages):
Expand Down Expand Up @@ -74,35 +68,23 @@ def llm(model_path):
"""
Fixture to initialize the LLM model with a given model path
"""
try:
output = subprocess.check_output(f"lsof -i:{FD_ENGINE_QUEUE_PORT} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {FD_ENGINE_QUEUE_PORT}, pid={pid}")
except subprocess.CalledProcessError:
pass
# Clean ports before starting the test
clean_ports()

try:
start = time.time()
llm = LLM(
model=model_path,
tensor_parallel_size=1,
port=FD_API_PORT,
engine_worker_queue_port=FD_ENGINE_QUEUE_PORT,
cache_queue_port=FD_CACHE_QUEUE_PORT,
max_model_len=32768,
quantization="wint8",
logits_processors=["LogitBiasLogitsProcessor"],
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 time.sleep(2) 替换了原有「等待端口就绪」的超时轮询逻辑,在慢速机器或负载高的 CI 环境中 2 秒可能不够,导致后续推理调用失败,出现偶发性测试不稳定(flaky test)。

建议恢复轮询或使用更可靠的健康检查:

wait_start = time.time()
while not is_port_open("127.0.0.1", FD_ENGINE_QUEUE_PORT):
    if time.time() - wait_start > MAX_WAIT_SECONDS:
        pytest.fail(f"Engine did not start within {MAX_WAIT_SECONDS}s")
    time.sleep(1)

)

# Wait for the port to be open
wait_start = time.time()
while not is_port_open("127.0.0.1", FD_ENGINE_QUEUE_PORT):
if time.time() - wait_start > MAX_WAIT_SECONDS:
pytest.fail(
f"Model engine did not start within {MAX_WAIT_SECONDS} seconds on port {FD_ENGINE_QUEUE_PORT}"
)
time.sleep(1)

time.sleep(2)
print(f"Model loaded successfully from {model_path} in {time.time() - start:.2f}s.")
yield llm
except Exception:
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/utils/serving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,60 @@ def kill_process_on_port(port: int):
pass


def kill_process_by_unix_socket(
socket_path: str,
force: bool = True,
):
"""
根据 unix socket 文件路径杀掉对应进程
cmd: ss -xlpn | grep /dev/shm/fd_task_queue_8664.sock
Args:
socket_path: 例如 /dev/shm/fd_task_queue_8664.sock
force:
True -> SIGKILL
False -> SIGTERM
Returns:
pid 或 None
"""
try:
output = subprocess.check_output(
["ss", "-xlpn"],
text=True,
)
for line in output.splitlines():
if socket_path not in line:
continue
m = re.search(r"pid=(\d+)", line)
if not m:
continue
pid = int(m.group(1))
os.kill(
pid,
signal.SIGKILL if force else signal.SIGTERM,
)
return pid
except Exception:
pass
return None


def cleanup_unix_socket(socket_path: str):
if not os.path.exists(socket_path):
return
try:
pid = kill_process_by_unix_socket(socket_path)
print(f"Killed process by unix socket: {socket_path}, pid={pid}")
except Exception as e:
print(f"Failed to kill process by unix socket: {socket_path}, error={e}")
finally:
try:
if os.path.exists(socket_path):
os.remove(socket_path)
print(f"Cleaned unix socket: {socket_path}")
except Exception:
pass


def clean_ports(ports=None):
"""
Kill all processes occupying the ports
Expand All @@ -117,6 +171,11 @@ def clean_ports(ports=None):
kill_process_on_port(port)
time.sleep(1)

# Clean unix socket, fd_task_queue_*.sock, for FD_ENGINE_TASK_QUEUE_WITH_SHM = 1
print("Cleaning unix socket")
for port in ports:
cleanup_unix_socket(f"/dev/shm/fd_task_queue_{port}.sock")


def clean(ports=None):
"""
Expand Down
Loading
Loading