Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions xllm/core/distributed_runtime/worker_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <torch/torch.h>
#include <unistd.h>

#include <cstdlib>
#include <memory>
#include <optional>
#include <utility>
Expand All @@ -50,6 +51,9 @@ limitations under the License.
extern char** environ;

namespace xllm {
namespace {
void handle_signal(int signum) { _exit(0); }
} // namespace

void WorkerServer::create_server(
const runtime::Options& options,
Expand Down Expand Up @@ -217,6 +221,10 @@ WorkerServer::WorkerServer(int local_worker_idx,
local_worker_idx, master_node_addr, done, parallel_args, d, options);
return;
} else {
// worker process should handle SIGTREM and SIGINT signals.
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);

std::unique_ptr<ForwardSharedMemoryManager> input_shm_manager = nullptr;
std::unique_ptr<ForwardSharedMemoryManager> output_shm_manager = nullptr;
prepare_shm(
Expand Down
9 changes: 9 additions & 0 deletions xllm/core/runtime/master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ limitations under the License.
#include <pybind11/pybind11.h>
#endif

namespace brpc {
DECLARE_bool(graceful_quit_on_sigterm);
DECLARE_bool(graceful_quit_on_sighup);
} // namespace brpc

namespace xllm {

Master::Master(const Options& options, EngineType type) : options_(options) {
LOG(INFO) << "Master init options: " << options.to_string();

// Allow brpc receive SIGTREM and SIGINT signal.
brpc::FLAGS_graceful_quit_on_sigterm = true;
brpc::FLAGS_graceful_quit_on_sighup = true;

#if defined(USE_NPU)
if (options.rank_tablefile().has_value()) {
FLAGS_rank_tablefile = options.rank_tablefile().value();
Expand Down
10 changes: 10 additions & 0 deletions xllm/core/runtime/vlm_engine.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.

#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <cstdlib>
#include <memory>

#include "common/metrics.h"
Expand All @@ -33,7 +34,16 @@ limitations under the License.

namespace xllm {

namespace {
void handle_signal(int signum) { _exit(0); }
} // namespace

VLMEngine::VLMEngine(const runtime::Options& options) : options_(options) {
// worker process should handle SIGTREM and SIGINT signals.
// TODO: delete these code when multi-process impl is supported.
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);

const auto& devices = options_.devices();
CHECK_GT(devices.size(), 0) << "At least one device is required";

Expand Down
6 changes: 5 additions & 1 deletion xllm/pybind/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def __init__(
is_local: bool = True,
**kwargs,
) -> None:
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
signal.signal(signal.SIGINT, lambda s, f: sys.exit(0))

if not os.path.exists(model):
raise ValueError(f"model {model} not exists")

Expand Down Expand Up @@ -79,7 +82,8 @@ def __init__(
def finish(self):
try:
#os.kill(os.getpid(), signal.SIGTERM)
os.kill(os.getpid(), signal.SIGKILL)
#os.kill(os.getpid(), signal.SIGKILL)
util.terminate_process(os.getpid())
except Exception as e:
pass

Expand Down
5 changes: 4 additions & 1 deletion xllm/pybind/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def __init__(
is_local: bool = True,
**kwargs,
) -> None:
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
signal.signal(signal.SIGINT, lambda s, f: sys.exit(0))

if not os.path.exists(model):
raise ValueError(f"model {model} not exists")
Expand Down Expand Up @@ -102,7 +104,8 @@ def __init__(
def finish(self):
try:
#os.kill(os.getpid(), signal.SIGTERM)
os.kill(os.getpid(), signal.SIGKILL)
#os.kill(os.getpid(), signal.SIGKILL)
util.terminate_process(os.getpid())
except Exception as e:
pass

Expand Down
26 changes: 26 additions & 0 deletions xllm/pybind/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
import os
import psutil
import signal
import socket
import sys

def terminate_process(pid, timeout=30):
try:
parent = psutil.Process(pid)
except psutil.NoSuchProcess:
return

children = parent.children(recursive=True)
procs = children + [parent]

for p in procs:
try:
p.terminate()
except psutil.NoSuchProcess:
pass

gone, alive = psutil.wait_procs(procs, timeout=timeout)
for p in alive:
try:
p.kill()
except psutil.NoSuchProcess:
pass

def get_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
Expand Down
5 changes: 4 additions & 1 deletion xllm/pybind/vlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(
is_local: bool = True,
**kwargs,
) -> None:
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
signal.signal(signal.SIGINT, lambda s, f: sys.exit(0))

if not os.path.exists(model):
raise ValueError(f"model {model} not exists")
Expand Down Expand Up @@ -97,7 +99,8 @@ def __init__(
def finish(self):
try:
#os.kill(os.getpid(), signal.SIGTERM)
os.kill(os.getpid(), signal.SIGKILL)
#os.kill(os.getpid(), signal.SIGKILL)
util.terminate_process(os.getpid())
except Exception as e:
pass

Expand Down
Loading