diff --git a/be/src/udf/python/python_server.cpp b/be/src/udf/python/python_server.cpp index 646e1e79039b5b..fad28294ccabf0 100644 --- a/be/src/udf/python/python_server.cpp +++ b/be/src/udf/python/python_server.cpp @@ -26,7 +26,10 @@ #include #include +#include #include +#include +#include #include "arrow/flight/client.h" #include "common/config.h" @@ -88,7 +91,21 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version int success_count = 0; int failure_count = 0; + const auto init_start_time = std::chrono::steady_clock::now(); + constexpr auto progress_log_interval = std::chrono::seconds(20); for (int i = 0; i < max_pool_size; i++) { + // Print init log every 20s until the current slot is ready. + while (futures[i].wait_for(progress_log_interval) != std::future_status::ready) { + const auto now = std::chrono::steady_clock::now(); + const auto total_elapsed_ms = + std::chrono::duration_cast(now - init_start_time) + .count(); + LOG(INFO) << "Python process pool initialization progress for version " + << version.to_string() << ": waiting_slot=" << (i + 1) << "/" << max_pool_size + << ", success=" << success_count << ", failed=" << failure_count + << ", elapsed_ms=" << total_elapsed_ms; + } + Status s = futures[i].get(); if (s.ok() && temp_processes[i]) { pool.push_back(std::move(temp_processes[i])); @@ -106,9 +123,13 @@ Status PythonServerManager::ensure_pool_initialized(const PythonVersion& version max_pool_size); } + const auto total_elapsed_ms = std::chrono::duration_cast( + std::chrono::steady_clock::now() - init_start_time) + .count(); LOG(INFO) << "Python process pool initialized for version " << version.to_string() << ": created " << success_count << " processes" - << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : ""); + << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "") + << ", elapsed_ms=" << total_elapsed_ms; _initialized_versions.insert(version); _start_health_check_thread(); diff --git a/be/test/udf/python/python_udf_runtime_test.cpp b/be/test/udf/python/python_udf_runtime_test.cpp index c6ee6f0047285d..99728c0500a8c9 100644 --- a/be/test/udf/python/python_udf_runtime_test.cpp +++ b/be/test/udf/python/python_udf_runtime_test.cpp @@ -205,9 +205,10 @@ TEST_F(PythonUDFRuntimeTest, FlightServerPathTemplate) { // ============================================================================ TEST_F(PythonUDFRuntimeTest, ShutdownTerminatesProcess) { - // Use /bin/cat which blocks waiting for stdin - reliable and fast + // Use sleep instead of a stdin-driven command. In CI, stdin may be closed and + // commands like cat can exit before running() is checked. bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); ASSERT_TRUE(child.valid()); ASSERT_TRUE(child.running()); @@ -229,7 +230,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownTerminatesProcess) { TEST_F(PythonUDFRuntimeTest, ShutdownIdempotent) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); PythonUDFProcess process(std::move(child), std::move(output)); @@ -247,7 +248,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownIdempotent) { TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) { // Create a process that ignores SIGTERM - tests the SIGKILL fallback path bp::ipstream output; - bp::child child("/bin/bash", "-c", "trap '' TERM; cat", bp::std_out > output); + bp::child child("/bin/bash", "-c", "trap '' TERM; exec sleep 60", bp::std_out > output); PythonUDFProcess process(std::move(child), std::move(output)); EXPECT_TRUE(process.is_alive()); @@ -265,7 +266,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) { TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketExistingFile) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -284,7 +285,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketExistingFile) { TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketNonExistent) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -300,7 +301,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketNonExistent) { TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketIsDirectory) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); @@ -326,7 +327,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketIsDirectory) { TEST_F(PythonUDFRuntimeTest, ToStringFormat) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -350,7 +351,7 @@ TEST_F(PythonUDFRuntimeTest, ToStringFormat) { TEST_F(PythonUDFRuntimeTest, GetUri) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -367,7 +368,7 @@ TEST_F(PythonUDFRuntimeTest, GetUri) { TEST_F(PythonUDFRuntimeTest, GetSocketFilePath) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); pid_t child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -389,8 +390,8 @@ TEST_F(PythonUDFRuntimeTest, GetSocketFilePath) { TEST_F(PythonUDFRuntimeTest, ProcessEquality) { bp::ipstream output1, output2; - bp::child child1("/bin/cat", bp::std_out > output1); - bp::child child2("/bin/cat", bp::std_out > output2); + bp::child child1("/bin/sleep", "60", bp::std_out > output1); + bp::child child2("/bin/sleep", "60", bp::std_out > output2); PythonUDFProcess process1(std::move(child1), std::move(output1)); PythonUDFProcess process2(std::move(child2), std::move(output2)); @@ -410,7 +411,7 @@ TEST_F(PythonUDFRuntimeTest, DestructorCallsShutdown) { pid_t child_pid; { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); child_pid = child.id(); PythonUDFProcess process(std::move(child), std::move(output)); @@ -431,7 +432,7 @@ TEST_F(PythonUDFRuntimeTest, DestructorCallsShutdown) { TEST_F(PythonUDFRuntimeTest, IsAliveReflectsState) { bp::ipstream output; - bp::child child("/bin/cat", bp::std_out > output); + bp::child child("/bin/sleep", "60", bp::std_out > output); PythonUDFProcess process(std::move(child), std::move(output)); diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index 01753481dc367a..4b7694bc286b34 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -63,7 +63,7 @@ fail_when_segment_rows_not_in_rowset_meta=true enable_python_udf_support=true python_env_mode=conda python_conda_root_path=/opt/miniconda3 -max_python_process_num=64 +max_python_process_num=16 enable_cloud_make_rs_visible_on_be=true cloud_mow_sync_rowsets_when_load_txn_begin=false diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf b/regression-test/pipeline/cloud_p1/conf/be_custom.conf index 0b9d27e98a7f78..ade5308ded62ed 100644 --- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf @@ -45,7 +45,7 @@ enable_segment_rows_check_core=true enable_python_udf_support=true python_env_mode=conda python_conda_root_path=/opt/miniconda3 -max_python_process_num=64 +max_python_process_num=16 enable_cloud_make_rs_visible_on_be=true cloud_mow_sync_rowsets_when_load_txn_begin=false diff --git a/regression-test/pipeline/external/conf/be.conf b/regression-test/pipeline/external/conf/be.conf index 073c5d9990ddcd..f29f342366e273 100644 --- a/regression-test/pipeline/external/conf/be.conf +++ b/regression-test/pipeline/external/conf/be.conf @@ -77,4 +77,4 @@ enable_graceful_exit_check=true enable_python_udf_support=true python_env_mode=venv python_venv_interpreter_paths=/usr/bin/python -max_python_process_num=64 +max_python_process_num=16 diff --git a/regression-test/pipeline/nonConcurrent/conf/be.conf b/regression-test/pipeline/nonConcurrent/conf/be.conf index 2a3031651cfb6f..b308eb8ab62692 100644 --- a/regression-test/pipeline/nonConcurrent/conf/be.conf +++ b/regression-test/pipeline/nonConcurrent/conf/be.conf @@ -98,4 +98,4 @@ enable_segment_rows_check_core=true enable_python_udf_support=true python_env_mode=venv python_venv_interpreter_paths=/usr/bin/python -max_python_process_num=64 +max_python_process_num=16 diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 0fff884d858341..b732395e7e449c 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -101,4 +101,4 @@ fail_when_segment_rows_not_in_rowset_meta=true enable_python_udf_support=true python_env_mode=venv python_venv_interpreter_paths=/usr/bin/python -max_python_process_num=64 +max_python_process_num=16 diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index 29646ae302557c..8f68115270c0fe 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -84,4 +84,4 @@ enable_segment_rows_check_core=true enable_python_udf_support=true python_env_mode=venv python_venv_interpreter_paths=/usr/bin/python -max_python_process_num=64 +max_python_process_num=16 diff --git a/regression-test/pipeline/vault_p0/conf/be_custom.conf b/regression-test/pipeline/vault_p0/conf/be_custom.conf index 85a86ea733e2fb..779eb5a77a5bcb 100644 --- a/regression-test/pipeline/vault_p0/conf/be_custom.conf +++ b/regression-test/pipeline/vault_p0/conf/be_custom.conf @@ -44,4 +44,4 @@ enable_brpc_connection_check=true enable_python_udf_support=true python_env_mode=venv python_venv_interpreter_paths=/usr/bin/python -max_python_process_num=64 +max_python_process_num=16