From ded45a91c817b07e6d8e78449aeda248ac804d40 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 14:49:17 +0800 Subject: [PATCH 1/9] graceful shutdown Signed-off-by: gengliqi --- .../src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 2 -- dbms/src/Flash/FlashService.cpp | 2 +- dbms/src/Flash/Mpp/MPPTaskManager.h | 7 +++++++ dbms/src/Server/FlashGrpcServerHolder.cpp | 13 ++++++++----- dbms/src/Server/Server.cpp | 4 ++++ 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index a290ffe351b..fa6fa0abc49 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -879,8 +879,6 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (context.getDAGContext()->is_disaggregated_task) throw; - if (tmt.checkShuttingDown()) - throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index e3b2c67e34b..aae09a1fdd8 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -563,7 +563,7 @@ grpc::Status FlashService::IsAlive( return check_result; auto & tmt_context = context->getTMTContext(); - response->set_available(tmt_context.checkRunning()); + response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable()); response->set_mpp_version(DB::GetMppVersion()); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index a46d33b2df2..aba270e5598 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -193,6 +193,8 @@ struct MPPTaskMonitor return monitored_tasks.find(task_unique_id) != monitored_tasks.end(); } + void waitAllMPPTasksFinish(const std::unique_ptr & global_context); + std::mutex mu; std::condition_variable cv; bool is_shutdown = false; @@ -220,6 +222,8 @@ class MPPTaskManager : private boost::noncopyable std::shared_ptr monitor; + std::atomic is_available{true}; + public: explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler); @@ -272,6 +276,9 @@ class MPPTaskManager : private boost::noncopyable bool isTaskExists(const MPPTaskId & id); + void setUnavailable() { is_available = false; } + bool isAvailable() { return is_available; } + private: MPPQueryPtr addMPPQuery( const MPPQueryId & query_id, diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index 95fd21ac9aa..d37fb60c890 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -224,17 +224,20 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() *is_shutdown = true; // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. - const int max_wait_cnt = 300; + constexpr int wait_step = 200; + // Maximum wait for 3 minute + constexpr int max_wait_cnt = 180 * 1000 / wait_step; int wait_cnt = 0; while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(wait_step)); if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1) LOG_WARNING( log, - "Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak", - wait_cnt); + "Wait {} milliseconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource " + "leak", + wait_cnt * wait_step); else - LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt); + LOG_INFO(log, "Wait {} milliseconds for mpp tunnels shutdown, all finished", wait_cnt * wait_step); for (auto & cq : cqs) cq->Shutdown(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index bcbb9932992..6ba04d3d6fa 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1262,6 +1262,10 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + LOG_INFO(log, "Set unavailble for MPPTask"); + tmt_context.getMPPTaskManager()->setUnavailable(); + tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); + { // Set limiters stopping and wakeup threads in waitting queue. global_context->getIORateLimiter().setStop(); From b1edac996fef67402580e56aa55feab2afe3a961 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 14:59:16 +0800 Subject: [PATCH 2/9] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index da70b76e7f8..4c95c311838 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -82,6 +82,31 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) return ptr; } +void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) +{ + auto start = std::chrono::steady_clock::now(); + // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down + static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60); + auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); + while (true) + { + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + { + std::unique_lock lock(mu); + if (monitored_tasks.empty()) + break; + } + auto current_time = std::chrono::steady_clock::now(); + if (current_time >= max_wait_time) + { + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown); + break; + } + } +} + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE) From 967d5b9fe7056ea4fd1acef28c5301e490b5ce7d Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 17:08:41 +0800 Subject: [PATCH 3/9] update config Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 6 +++++- dbms/src/Server/FlashGrpcServerHolder.cpp | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 4c95c311838..1e5f5218e38 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -87,7 +87,11 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob auto start = std::chrono::steady_clock::now(); // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; - auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, 60); + // The default value of flash.graceful_wait_before_shutdown + static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600; + auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( + GRACEFUL_WIAT_BEFORE_SHUTDOWN, + DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); while (true) { diff --git a/dbms/src/Server/FlashGrpcServerHolder.cpp b/dbms/src/Server/FlashGrpcServerHolder.cpp index d37fb60c890..ef5eb9bf180 100644 --- a/dbms/src/Server/FlashGrpcServerHolder.cpp +++ b/dbms/src/Server/FlashGrpcServerHolder.cpp @@ -225,8 +225,8 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder() // Wait all existed MPPTunnels done to prevent crash. // If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done. constexpr int wait_step = 200; - // Maximum wait for 3 minute - constexpr int max_wait_cnt = 180 * 1000 / wait_step; + // Maximum wait for 1 minute + constexpr int max_wait_cnt = 60 * 1000 / wait_step; int wait_cnt = 0; while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt)) std::this_thread::sleep_for(std::chrono::milliseconds(wait_step)); From 9b2f87326a21161094b51d5e81c92c46cea67aae Mon Sep 17 00:00:00 2001 From: gengliqi Date: Thu, 10 Jul 2025 20:42:14 +0800 Subject: [PATCH 4/9] u Signed-off-by: gengliqi --- dbms/src/Server/Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 6ba04d3d6fa..ad2f318040c 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1262,7 +1262,7 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); - LOG_INFO(log, "Set unavailble for MPPTask"); + LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); From 710753a8c1bee17cdc7393809d212398053b0912 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Fri, 11 Jul 2025 23:42:46 +0800 Subject: [PATCH 5/9] address comments Signed-off-by: gengliqi --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 3 +++ dbms/src/Flash/Mpp/MPPTaskManager.cpp | 1 + 2 files changed, 4 insertions(+) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index fa6fa0abc49..0ba25ac14d3 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -879,6 +879,9 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (context.getDAGContext()->is_disaggregated_task) throw; + if (tmt.checkShuttingDown()) + throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); + // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1e5f5218e38..1e3d3046ccf 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -92,6 +92,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64( GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); + LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); while (true) { From 7434f460e667594c3628a72ee451f895bdebcdbc Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 14 Jul 2025 17:02:43 +0800 Subject: [PATCH 6/9] u Signed-off-by: gengliqi --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 0ba25ac14d3..a290ffe351b 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -881,7 +881,6 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() if (tmt.checkShuttingDown()) throw TiFlashException("TiFlash server is terminating", Errors::Coprocessor::Internal); - // By now, RegionException will contain all region id of MvccQueryInfo, which is needed by CHSpark. // When meeting RegionException, we can let MakeRegionQueryInfos to check in next loop. force_retry.insert(e.unavailable_region.begin(), e.unavailable_region.end()); From 321e0eb644fb035e38eeabee18379be774e028c5 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Mon, 14 Jul 2025 17:32:35 +0800 Subject: [PATCH 7/9] address comments Signed-off-by: gengliqi --- dbms/src/Server/Server.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index ad2f318040c..62061b13f59 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1262,6 +1262,8 @@ try LOG_INFO(log, "Start to wait for terminal signal"); waitForTerminationRequest(); + // Note: `waitAllMPPTasksFinish` must be called before stopping the proxy. + // Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully. LOG_INFO(log, "Set unavailable for MPPTask"); tmt_context.getMPPTaskManager()->setUnavailable(); tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context); From d3d6c8f58d1ad39c7a48e3de89016f07d86bf83a Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 15 Jul 2025 11:05:44 +0800 Subject: [PATCH 8/9] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 1e3d3046ccf..c854b1c8162 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -94,6 +95,7 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); + Stopwatch watch; while (true) { // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched @@ -101,12 +103,15 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob { std::unique_lock lock(mu); if (monitored_tasks.empty()) + { + LOG_INFO(log, "All MPPTasks have finished after {}ms", watch.elapsedMilliseconds()); break; + } } auto current_time = std::chrono::steady_clock::now(); if (current_time >= max_wait_time) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}s", graceful_wait_before_shutdown); + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", watch.elapsedMilliseconds()); break; } } From 0920739dfd0f9bcc072c5a93a1831996ed9d25a6 Mon Sep 17 00:00:00 2001 From: gengliqi Date: Tue, 15 Jul 2025 11:39:57 +0800 Subject: [PATCH 9/9] u Signed-off-by: gengliqi --- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index c854b1c8162..bb3b08c4266 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -85,7 +85,6 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id) void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & global_context) { - auto start = std::chrono::steady_clock::now(); // The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown"; // The default value of flash.graceful_wait_before_shutdown @@ -94,26 +93,26 @@ void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr & glob GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN); LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown); - auto max_wait_time = start + std::chrono::seconds(graceful_wait_before_shutdown); Stopwatch watch; + // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched + std::this_thread::sleep_for(std::chrono::seconds(1)); while (true) { - // The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + auto elapsed_ms = watch.elapsedMilliseconds(); { std::unique_lock lock(mu); if (monitored_tasks.empty()) { - LOG_INFO(log, "All MPPTasks have finished after {}ms", watch.elapsedMilliseconds()); + LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms); break; } } - auto current_time = std::chrono::steady_clock::now(); - if (current_time >= max_wait_time) + if (elapsed_ms >= graceful_wait_before_shutdown * 1000) { - LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", watch.elapsedMilliseconds()); + LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms); break; } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } }