From 65a2fccd585c4e53b946d9e00d93b19e1423f2e7 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 16 Jun 2026 11:30:07 +0000 Subject: [PATCH 1/5] [BugFix] Seperate prometheus multiproc dir for single-server multi-dp services --- fastdeploy/engine/common_engine.py | 7 +++ fastdeploy/engine/engine.py | 7 +++ .../entrypoints/openai/multi_api_server.py | 9 +-- .../metrics/prometheus_multiprocess_setup.py | 59 ++++++++++++++----- 4 files changed, 60 insertions(+), 22 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 4997f16da1b..f5d676e22bb 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -67,6 +67,7 @@ ) from fastdeploy.inter_communicator.fmq import FMQ from fastdeploy.metrics.metrics import main_process_metrics +from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir from fastdeploy.model_executor.guided_decoding import schema_checker from fastdeploy.plugins.token_processor import load_token_processor_plugins from fastdeploy.spec_decode import SpecMethod @@ -2570,6 +2571,7 @@ def launch_components(self): self.launched_expert_service_signal.value[0] = 1 self.dp_processed = [] self.dp_engine_worker_queue_server = [] + base_prom_dir = get_original_prom_dir() for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, @@ -2608,10 +2610,15 @@ def launch_components(self): f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}" + f" data parallel id {i}" ) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(i, base_prom_dir) self.dp_processed[-1].start() while self.launched_expert_service_signal.value[i] == 0: time.sleep(1) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(0, base_prom_dir) + def check_worker_initialize_status(self): """ Check the initlialize status of workers by stdout logging diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 85d7459815f..f33500a8993 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -45,6 +45,7 @@ from fastdeploy.engine.request import Request from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal from fastdeploy.metrics.metrics import main_process_metrics +from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir from fastdeploy.platforms import current_platform from fastdeploy.utils import EngineError, console_logger, envs, llm_logger @@ -805,6 +806,7 @@ def launch_components(self): self.launched_expert_service_signal.value[0] = 1 self.dp_processed = [] self.dp_engine_worker_queue_server = [] + base_prom_dir = get_original_prom_dir() for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, @@ -842,8 +844,13 @@ def launch_components(self): f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}" + f" data parallel id {i}" ) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(i, base_prom_dir) self.dp_processed[-1].start() + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(0, base_prom_dir) + for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py index 545fe2b1fb1..dbef909bf2c 100644 --- a/fastdeploy/entrypoints/openai/multi_api_server.py +++ b/fastdeploy/entrypoints/openai/multi_api_server.py @@ -20,6 +20,7 @@ import sys import time +from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir from fastdeploy.platforms import current_platform from fastdeploy.utils import find_free_ports, get_logger, is_port_available @@ -108,13 +109,7 @@ def start_servers( env["FD_ENABLE_MULTI_API_SERVER"] = "1" env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}" if "PROMETHEUS_MULTIPROC_DIR" in env: - prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR") - prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}") - # Create the directory if it doesn't exist - if not os.path.exists(prom_dir_i): - os.makedirs(prom_dir_i, exist_ok=True) - env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i - logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}") + setup_dp_prometheus_dir(i, env["PROMETHEUS_MULTIPROC_DIR"], env) cmd = [ sys.executable, diff --git a/fastdeploy/metrics/prometheus_multiprocess_setup.py b/fastdeploy/metrics/prometheus_multiprocess_setup.py index 509fb7d158c..42b7723bf3e 100644 --- a/fastdeploy/metrics/prometheus_multiprocess_setup.py +++ b/fastdeploy/metrics/prometheus_multiprocess_setup.py @@ -20,27 +20,56 @@ from fastdeploy.utils import llm_logger +_original_prom_dir = None + + +def get_original_prom_dir(): + """Return the PROMETHEUS_MULTIPROC_DIR before any dp suffix was appended.""" + return _original_prom_dir + def setup_multiprocess_prometheus(): - """ - Cleans and recreates the Prometheus multiprocess directory. - """ + """Cleans and recreates the Prometheus multiprocess directory.""" + global _original_prom_dir if "PROMETHEUS_MULTIPROC_DIR" not in os.environ: - base_dir = "/tmp/prom_main" - instance_id = str(uuid.uuid4()) - prom_dir = f"{base_dir}_{instance_id}" + prom_dir = f"/tmp/prom_main_{uuid.uuid4()}" if os.path.exists(prom_dir): shutil.rmtree(prom_dir, ignore_errors=True) os.makedirs(prom_dir, exist_ok=True) - llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}") os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir + _original_prom_dir = prom_dir + llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}") return prom_dir - else: - prom_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"] - llm_logger.warning( - f"Found PROMETHEUS_MULTIPROC_DIR:{prom_dir} was set by user. " - "you will find inaccurate metrics. Unset the variable " - "will properly handle cleanup." - ) - return os.environ["PROMETHEUS_MULTIPROC_DIR"] + + user_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"] + _original_prom_dir = user_dir + os.makedirs(user_dir, exist_ok=True) + llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to {user_dir}") + return user_dir + + +def setup_dp_prometheus_dir(dp_id, base_dir, env_dict=None): + """Set up an isolated PROMETHEUS_MULTIPROC_DIR subdirectory for a DP rank. + + For DP0: moves existing .db files from base_dir into dp0/ and updates env. + mmap writes remain valid after rename on the same filesystem. + For DP1+: creates dp{i}/ subdirectory and updates env. Fork triggers PID + change → prometheus_client reset → new .db files in the subdirectory. + + Args: + dp_id: Data parallel rank id. + base_dir: Original PROMETHEUS_MULTIPROC_DIR (before any dp suffix). + env_dict: If provided, write to this dict instead of os.environ. + """ + prom_dir_dp = os.path.join(base_dir, f"dp{dp_id}") + os.makedirs(prom_dir_dp, exist_ok=True) + if dp_id == 0 and os.path.isdir(base_dir): + for fname in os.listdir(base_dir): + src = os.path.join(base_dir, fname) + if os.path.isfile(src) and fname.endswith(".db"): + os.rename(src, os.path.join(prom_dir_dp, fname)) + llm_logger.info(f"Moved {src} -> {prom_dir_dp}") + target = env_dict if env_dict is not None else os.environ + target["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_dp + llm_logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {dp_id}: {prom_dir_dp}") From a30775d516882d11276759e4eb438c95aff71f98 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 22 Jun 2026 02:52:29 +0000 Subject: [PATCH 2/5] [chore] fix ci test and code style --- fastdeploy/engine/common_engine.py | 5 ++++- fastdeploy/engine/engine.py | 5 ++++- tests/entrypoints/openai/test_multi_api_server.py | 2 +- tests/metrics/test_prometheus_multiprocess_setup.py | 8 ++------ 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index f5d676e22bb..c0dc199a10b 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -67,7 +67,10 @@ ) from fastdeploy.inter_communicator.fmq import FMQ from fastdeploy.metrics.metrics import main_process_metrics -from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir +from fastdeploy.metrics.prometheus_multiprocess_setup import ( + get_original_prom_dir, + setup_dp_prometheus_dir, +) from fastdeploy.model_executor.guided_decoding import schema_checker from fastdeploy.plugins.token_processor import load_token_processor_plugins from fastdeploy.spec_decode import SpecMethod diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index f33500a8993..dd8d6bcea86 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -45,7 +45,10 @@ from fastdeploy.engine.request import Request from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal from fastdeploy.metrics.metrics import main_process_metrics -from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir +from fastdeploy.metrics.prometheus_multiprocess_setup import ( + get_original_prom_dir, + setup_dp_prometheus_dir, +) from fastdeploy.platforms import current_platform from fastdeploy.utils import EngineError, console_logger, envs, llm_logger diff --git a/tests/entrypoints/openai/test_multi_api_server.py b/tests/entrypoints/openai/test_multi_api_server.py index a333ee6de0c..c039dcfbc65 100644 --- a/tests/entrypoints/openai/test_multi_api_server.py +++ b/tests/entrypoints/openai/test_multi_api_server.py @@ -231,7 +231,7 @@ def capture_popen(*args, **kwargs): for i, prom_dir in enumerate(prom_dirs): # The directory should contain the server index (0 or 1) # to uniquely identify each server's metrics directory - self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}") + self.assertIn(f"/dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain /dp{i}") if __name__ == "__main__": diff --git a/tests/metrics/test_prometheus_multiprocess_setup.py b/tests/metrics/test_prometheus_multiprocess_setup.py index 5ccc960d522..36508503150 100644 --- a/tests/metrics/test_prometheus_multiprocess_setup.py +++ b/tests/metrics/test_prometheus_multiprocess_setup.py @@ -51,15 +51,11 @@ def test_when_env_var_already_set(self): test_dir = "/tmp/existing_dir" os.environ["PROMETHEUS_MULTIPROC_DIR"] = test_dir - with patch("fastdeploy.utils.llm_logger.warning") as mock_logger: + with patch("fastdeploy.utils.llm_logger.info") as mock_logger: result = setup_multiprocess_prometheus() assert result == test_dir - mock_logger.assert_called_once_with( - "Found PROMETHEUS_MULTIPROC_DIR:/tmp/existing_dir was set by user. " - "you will find inaccurate metrics. Unset the variable " - "will properly handle cleanup." - ) + mock_logger.assert_called_once_with(f"PROMETHEUS_MULTIPROC_DIR is set to {test_dir}") def test_cleanup_failure_handling(self): """测试清理目录失败时的处理""" From 55849c9645bcf7812c65e176d330b31e2f45ddbc Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 22 Jun 2026 07:30:35 +0000 Subject: [PATCH 3/5] [chore] fix ci test --- tests/cache_manager/test_cache_transfer_manager.py | 1 + tests/graph_optimization/test_graph_opt_backend.py | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/cache_manager/test_cache_transfer_manager.py b/tests/cache_manager/test_cache_transfer_manager.py index 76419eba8cd..bf55bc78296 100644 --- a/tests/cache_manager/test_cache_transfer_manager.py +++ b/tests/cache_manager/test_cache_transfer_manager.py @@ -663,6 +663,7 @@ def register_buffer(self, ptr, size): self.manager.head_dim = 2 self.manager.num_layers = 1 self.manager.num_extra_layers = 0 + self.manager.cache_scale_shape = [self.manager.num_gpu_blocks, self.manager.head_num, self.manager.block_size] with patch("fastdeploy.cache_manager.cache_transfer_manager.cuda_host_alloc", side_effect=[10, 20, 30, 40]): self.manager._init_storage_buffer(args) diff --git a/tests/graph_optimization/test_graph_opt_backend.py b/tests/graph_optimization/test_graph_opt_backend.py index f4b19c24bd0..ac7a055596a 100644 --- a/tests/graph_optimization/test_graph_opt_backend.py +++ b/tests/graph_optimization/test_graph_opt_backend.py @@ -111,7 +111,11 @@ def setUp(self): # Create input data self.input_tensor = paddle.randint(32, shape=self.input_shape, dtype=self.dtype) - self.forward_meta = ForwardMeta(ids_remove_padding=self.input_tensor, step_use_cudagraph=True) + self.forward_meta = ForwardMeta( + ids_remove_padding=self.input_tensor, + step_use_cudagraph=True, + seq_lens_kv=paddle.full(shape=[self.max_num_seqs], fill_value=8, dtype="int32"), + ) # Compute baseline result once baseline_model = Attention(fd_config=self.baseline_fd_config, **self.model_config) From dab1fce35b6ddb6bcb9ae0daa2998733de85526d Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Mon, 22 Jun 2026 10:02:10 +0000 Subject: [PATCH 4/5] [chore] empty commit From c4788ec09784a88b9cf2788859e22f1329642541 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Tue, 23 Jun 2026 11:19:57 +0000 Subject: [PATCH 5/5] [fix] fix ci test --- tests/cache_manager/test_cache_transfer_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/cache_manager/test_cache_transfer_manager.py b/tests/cache_manager/test_cache_transfer_manager.py index bf55bc78296..f968657bc28 100644 --- a/tests/cache_manager/test_cache_transfer_manager.py +++ b/tests/cache_manager/test_cache_transfer_manager.py @@ -497,6 +497,7 @@ def __init__(self): self.manager.has_cache_scale = True self.manager.swap_space_ready_signal = DummySignal() self.manager.value_cache_shape = [2, 1, 1, 1] + self.manager.cache_scale_shape = [self.manager.num_gpu_blocks, self.manager.head_num, self.manager.block_size] with ( patch(