diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 74bdeac114d..a920c1add19 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -141,6 +141,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d if unlikely (acquire_num == 0.0 && remaining_ru == 0.0) acquire_num = DEFAULT_BUFFER_TOKENS; + // The purpose of subtracting remaining_ru is try to ensure that the number of local tokens + // always stays same with the amount consumed. acquire_num -= remaining_ru; acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0); return acquire_num; @@ -364,11 +366,9 @@ void LocalAdmissionController::mainLoop() static_assert( tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION && tick_interval <= DEFAULT_TARGET_PERIOD); - auto cur_tick_beg = current_tick; - auto cur_tick_end = cur_tick_beg + tick_interval; + auto cur_tick_end = current_tick + tick_interval; while (!stopped.load()) { - if (current_tick < cur_tick_end) { std::unique_lock lock(mu); if (keyspace_low_token_resource_groups.empty()) @@ -388,19 +388,16 @@ void LocalAdmissionController::mainLoop() try { while (current_tick >= cur_tick_end) - { - updateRUConsumptionSpeed(); - cur_tick_beg = cur_tick_end; cur_tick_end += tick_interval; - } + updateRUConsumptionSpeed(); if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value()) { std::lock_guard lock(gac_requests_mu); gac_requests.push_back(gac_req_opt.value()); gac_requests_cv.notify_all(); } - clearCPUTimeWithoutLock(current_tick); + clearCPUTime(current_tick); checkDegradeMode(); } catch (...) @@ -437,13 +434,17 @@ std::optional LocalAdmissionController::b else { std::unordered_set, LACPairHash> local_keyspace_low_token_resource_groups; + std::unordered_map, ResourceGroupPtr, LACPairHash> + local_keyspace_resource_groups; { std::lock_guard lock(mu); local_keyspace_low_token_resource_groups = keyspace_low_token_resource_groups; keyspace_low_token_resource_groups.clear(); + + local_keyspace_resource_groups = keyspace_resource_groups; } - for (const auto & ele : keyspace_resource_groups) + for (const auto & ele : local_keyspace_resource_groups) { const bool need_fetch_token = local_keyspace_low_token_resource_groups.contains(ele.first); const bool need_report = ele.second->shouldReportRUConsumption(current_tick); @@ -644,7 +645,7 @@ std::vector> LocalAdmissionController::handle const String err_msg = fmt::format("handle acquire token resp failed: rg: {}(keyspace={})", name, keyspace_id); // It's possible for one_resp.granted_r_u_tokens() to be empty - // when the acquire_token_req is only for report RU consumption. + // when the acquire_token_req is only for report RU consumption or GAC got error(like nan token). if (one_resp.granted_r_u_tokens().empty()) { resource_group->endRequest(); @@ -653,6 +654,7 @@ std::vector> LocalAdmissionController::handle if unlikely (one_resp.granted_r_u_tokens().size() != 1) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}", @@ -665,6 +667,7 @@ std::vector> LocalAdmissionController::handle const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0]; if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU) { + resource_group->endRequest(); LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString()); continue; } @@ -672,6 +675,7 @@ std::vector> LocalAdmissionController::handle const auto trickle_ms = granted_token_bucket.trickle_time_ms(); if unlikely (trickle_ms < 0) { + resource_group->endRequest(); LOG_ERROR( log, "{} unexpected trickle_ms: {} one_resp: {}", @@ -686,6 +690,7 @@ std::vector> LocalAdmissionController::handle double added_tokens = granted_token_bucket.granted_tokens().tokens(); if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0) { + resource_group->endRequest(); LOG_ERROR( log, "{} invalid added_tokens: {} one_resp: {}", @@ -858,7 +863,7 @@ bool LocalAdmissionController::handleDeleteEvent( std::lock_guard lock(mu); erase_num = deleteResourceGroupWithoutLock(keyspace_id, name); } - LOG_DEBUG(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num); + LOG_INFO(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num); return true; } @@ -896,7 +901,7 @@ bool LocalAdmissionController::handlePutEvent( updateMaxRUPerSecAfterDeleteWithoutLock(rg->user_ru_per_sec); } } - LOG_DEBUG(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString()); + LOG_INFO(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString()); return true; } diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 5319a37dcf7..889de84a3b2 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -614,9 +614,10 @@ class LocalAdmissionController final : private boost::noncopyable std::string & err_msg); void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec); - void clearCPUTimeWithoutLock(const SteadyClock::time_point & now) + void clearCPUTime(const SteadyClock::time_point & now) { static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL); + std::lock_guard lock(mu); if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION) { for (auto & ele : keyspace_resource_groups)