Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ grpc::Status FlashService::IsAlive(
return check_result;

auto & tmt_context = context->getTMTContext();
response->set_available(tmt_context.checkRunning());
response->set_available(tmt_context.checkRunning() && tmt_context.getMPPTaskManager()->isAvailable());
response->set_mpp_version(DB::GetMppVersion());
return grpc::Status::OK;
}
Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
Expand Down Expand Up @@ -82,6 +83,39 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context)
{
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
static constexpr const char * GRACEFUL_WIAT_BEFORE_SHUTDOWN = "flash.graceful_wait_before_shutdown";
// The default value of flash.graceful_wait_before_shutdown
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN = 600;
auto graceful_wait_before_shutdown = global_context->getUsersConfig()->getUInt64(
GRACEFUL_WIAT_BEFORE_SHUTDOWN,
DEFAULT_GRACEFUL_WAIT_BEFORE_SHUTDOWN);
LOG_INFO(log, "Start to wait all MPPTasks to finish, timeout={}s", graceful_wait_before_shutdown);
Stopwatch watch;
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
std::this_thread::sleep_for(std::chrono::seconds(1));
while (true)
{
auto elapsed_ms = watch.elapsedMilliseconds();
{
std::unique_lock lock(mu);
if (monitored_tasks.empty())
{
LOG_INFO(log, "All MPPTasks have finished after {}ms", elapsed_ms);
break;
}
}
if (elapsed_ms >= graceful_wait_before_shutdown * 1000)
{
LOG_WARNING(log, "Timed out waiting for MPP tasks to finish after {}ms", elapsed_ms);
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
: scheduler(std::move(scheduler_))
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ struct MPPTaskMonitor
return monitored_tasks.find(task_unique_id) != monitored_tasks.end();
}

void waitAllMPPTasksFinish(const std::unique_ptr<Context> & global_context);

std::mutex mu;
std::condition_variable cv;
bool is_shutdown = false;
Expand Down Expand Up @@ -220,6 +222,8 @@ class MPPTaskManager : private boost::noncopyable

std::shared_ptr<MPPTaskMonitor> monitor;

std::atomic<bool> is_available{true};

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);

Expand Down Expand Up @@ -272,6 +276,9 @@ class MPPTaskManager : private boost::noncopyable

bool isTaskExists(const MPPTaskId & id);

void setUnavailable() { is_available = false; }
bool isAvailable() { return is_available; }

private:
MPPQueryPtr addMPPQuery(
const MPPQueryId & query_id,
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Server/FlashGrpcServerHolder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,20 @@ FlashGrpcServerHolder::~FlashGrpcServerHolder()
*is_shutdown = true;
// Wait all existed MPPTunnels done to prevent crash.
// If all existed MPPTunnels are done, almost in all cases it means all existed MPPTasks and ExchangeReceivers are also done.
const int max_wait_cnt = 300;
constexpr int wait_step = 200;
// Maximum wait for 1 minute
constexpr int max_wait_cnt = 60 * 1000 / wait_step;
int wait_cnt = 0;
while (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1 && (wait_cnt++ < max_wait_cnt))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(wait_step));
if (GET_METRIC(tiflash_object_count, type_count_of_mpptunnel).Value() >= 1)
LOG_WARNING(
log,
"Wait {} seconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource leak",
wait_cnt);
"Wait {} milliseconds for mpp tunnels shutdown, still some mpp tunnels are alive, potential resource "
"leak",
wait_cnt * wait_step);
else
LOG_INFO(log, "Wait {} seconds for mpp tunnels shutdown, all finished", wait_cnt);
LOG_INFO(log, "Wait {} milliseconds for mpp tunnels shutdown, all finished", wait_cnt * wait_step);

for (auto & cq : cqs)
cq->Shutdown();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,12 @@ try
LOG_INFO(log, "Start to wait for terminal signal");
waitForTerminationRequest();

// Note: `waitAllMPPTasksFinish` must be called before stopping the proxy.
// Otherwise, read index requests may fail, which can prevent TiFlash from shutting down gracefully.
LOG_INFO(log, "Set unavailable for MPPTask");
tmt_context.getMPPTaskManager()->setUnavailable();
tmt_context.getMPPTaskManager()->getMPPTaskMonitor()->waitAllMPPTasksFinish(global_context);

{
// Set limiters stopping and wakeup threads in waitting queue.
global_context->getIORateLimiter().setStop();
Expand Down