Skip to content

Commit 86c363c

Browse files
authored
Remove unnecessary wait for MPP tunnel destruction before shutdown (#10313)
ref #10266 Remove unnecessary wait for MPP tunnel destruction during TiFlash shutdown Signed-off-by: gengliqi <gengliqiii@gmail.com>
1 parent dc33637 commit 86c363c

File tree

3 files changed

+20
-24
lines changed

3 files changed

+20
-24
lines changed

dbms/src/Flash/Mpp/MPPTaskManager.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,33 +84,41 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
8484
return ptr;
8585
}
8686

87-
void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
87+
void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & context)
8888
{
8989
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
9090
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
9191
// The default value of flash.graceful_wait_before_shutdown
9292
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
93-
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
94-
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
95-
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
93+
auto graceful_wait_before_shutdown
94+
= context->getUsersConfig()->getUInt64(GRACEFUL_WIAT_BEFORE_SHUTDOWN, DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
9695
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
96+
UInt64 graceful_wait_before_shutdown_ms = graceful_wait_before_shutdown * 1000;
9797
Stopwatch watch;
9898
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
9999
std::this_thread::sleep_for(std::chrono::seconds(1));
100+
bool all_tasks_finished = false;
100101
while (true)
101102
{
102103
auto elapsed_ms = watch.elapsedMilliseconds();
104+
if (!all_tasks_finished)
103105
{
104106
std::unique_lock lock(mu);
105107
if (monitored_tasks.empty())
108+
all_tasks_finished = true;
109+
}
110+
if (all_tasks_finished)
111+
{
112+
// Also needs to check if all MPP gRPC connections are finished
113+
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
106114
{
107-
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
115+
LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms);
108116
break;
109117
}
110118
}
111-
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
119+
if (elapsed_ms >= graceful_wait_before_shutdown_ms)
112120
{
113-
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
121+
LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms);
114122
break;
115123
}
116124
std::this_thread::sleep_for(std::chrono::milliseconds(200));

dbms/src/Flash/Mpp/MPPTaskManager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ struct MPPTaskMonitor
194194
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
195195
}
196196

197-
void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);
197+
void waitAllMPPTasksFinish(const std::unique_ptr<Context> & context);
198198

199199
std::mutex mu;
200200
std::condition_variable cv;

dbms/src/Server/FlashGrpcServerHolder.cpp

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void handleRpcs(grpc::ServerCompletionQueue * curcq, const LoggerPtr & log)
5252
// tells us whether there is any kind of event or cq is shutting down.
5353
if (!curcq->Next(&tag, &ok))
5454
{
55-
LOG_INFO(log, "CQ is fully drained and shut down");
55+
LOG_DEBUG(log, "CQ is fully drained and shut down");
5656
break;
5757
}
5858
GET_METRIC(tiflash_thread_count, type_active_rpc_async_worker).Increment();
@@ -220,21 +220,9 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
220220
{
221221
/// Shut down grpc server.
222222
LOG_INFO(log, "Begin to shut down flash grpc server");
223-
flash_grpc_server->Shutdown();
223+
Stopwatch watch;
224224
*is_shutdown = true;
225-
// Wait all existed MPPTunnels done to prevent crash.
226-
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
227-
const int max_wait_cnt = 300;
228-
int wait_cnt = 0;
229-
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
230-
std::this_thread::sleep_for(std::chrono::seconds(1));
231-
if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1)
232-
LOG_WARNING(
233-
log,
234-
"Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak",
235-
wait_cnt);
236-
else
237-
LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt);
225+
flash_grpc_server->Shutdown();
238226

239227
for (auto & cq : cqs)
240228
cq->Shutdown();
@@ -252,7 +240,7 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
252240
GRPCCompletionQueuePool::global_instance->markShutdown();
253241

254242
GRPCCompletionQueuePool::global_instance = nullptr;
255-
LOG_INFO(log, "Shut down flash grpc server");
243+
LOG_INFO(log, "Shut down flash grpc server after {}ms", watch.elapsedMilliseconds());
256244

257245
/// Close flash service.
258246
LOG_INFO(log, "Begin to shut down flash service");

0 commit comments

Comments
 (0)