Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d
if unlikely (acquire_num == 0.0 && remaining_ru == 0.0)
acquire_num = DEFAULT_BUFFER_TOKENS;

// The purpose of subtracting remaining_ru is try to ensure that the number of local tokens
// always stays same with the amount consumed.
acquire_num -= remaining_ru;
acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0);
return acquire_num;
Expand Down Expand Up @@ -364,11 +366,9 @@ void LocalAdmissionController::mainLoop()
static_assert(
tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION
&& tick_interval <= DEFAULT_TARGET_PERIOD);
auto cur_tick_beg = current_tick;
auto cur_tick_end = cur_tick_beg + tick_interval;
auto cur_tick_end = current_tick + tick_interval;
while (!stopped.load())
{
if (current_tick < cur_tick_end)
{
std::unique_lock<std::mutex> lock(mu);
if (keyspace_low_token_resource_groups.empty())
Expand All @@ -388,19 +388,16 @@ void LocalAdmissionController::mainLoop()
try
{
while (current_tick >= cur_tick_end)
{
updateRUConsumptionSpeed();
cur_tick_beg = cur_tick_end;
cur_tick_end += tick_interval;
}

updateRUConsumptionSpeed();
if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value())
{
std::lock_guard lock(gac_requests_mu);
gac_requests.push_back(gac_req_opt.value());
gac_requests_cv.notify_all();
}
clearCPUTimeWithoutLock(current_tick);
clearCPUTime(current_tick);
checkDegradeMode();
}
catch (...)
Expand Down Expand Up @@ -437,13 +434,17 @@ std::optional<resource_manager::TokenBucketsRequest> LocalAdmissionController::b
else
{
std::unordered_set<std::pair<KeyspaceID, std::string>, LACPairHash> local_keyspace_low_token_resource_groups;
std::unordered_map<std::pair<KeyspaceID, std::string>, ResourceGroupPtr, LACPairHash>
local_keyspace_resource_groups;
{
std::lock_guard lock(mu);
local_keyspace_low_token_resource_groups = keyspace_low_token_resource_groups;
keyspace_low_token_resource_groups.clear();

local_keyspace_resource_groups = keyspace_resource_groups;
}

for (const auto & ele : keyspace_resource_groups)
for (const auto & ele : local_keyspace_resource_groups)
{
const bool need_fetch_token = local_keyspace_low_token_resource_groups.contains(ele.first);
const bool need_report = ele.second->shouldReportRUConsumption(current_tick);
Expand Down Expand Up @@ -644,7 +645,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle

const String err_msg = fmt::format("handle acquire token resp failed: rg: {}(keyspace={})", name, keyspace_id);
// It's possible for one_resp.granted_r_u_tokens() to be empty
// when the acquire_token_req is only for report RU consumption.
// when the acquire_token_req is only for report RU consumption or GAC got error(like nan token).
if (one_resp.granted_r_u_tokens().empty())
{
resource_group->endRequest();
Expand All @@ -653,6 +654,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle

if unlikely (one_resp.granted_r_u_tokens().size() != 1)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}",
Expand All @@ -665,13 +667,15 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0];
if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU)
{
resource_group->endRequest();
LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString());
continue;
}

const auto trickle_ms = granted_token_bucket.trickle_time_ms();
if unlikely (trickle_ms < 0)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} unexpected trickle_ms: {} one_resp: {}",
Expand All @@ -686,6 +690,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
double added_tokens = granted_token_bucket.granted_tokens().tokens();
if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0)
{
resource_group->endRequest();
LOG_ERROR(
log,
"{} invalid added_tokens: {} one_resp: {}",
Expand Down Expand Up @@ -858,7 +863,7 @@ bool LocalAdmissionController::handleDeleteEvent(
std::lock_guard lock(mu);
erase_num = deleteResourceGroupWithoutLock(keyspace_id, name);
}
LOG_DEBUG(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num);
LOG_INFO(log, "delete resource group {}(keyspace={}), erase_num: {}", name, keyspace_id, erase_num);
return true;
}

Expand Down Expand Up @@ -896,7 +901,7 @@ bool LocalAdmissionController::handlePutEvent(
updateMaxRUPerSecAfterDeleteWithoutLock(rg->user_ru_per_sec);
}
}
LOG_DEBUG(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString());
LOG_INFO(log, "modify resource group {}(keyspace={}) to: {}", name, keyspace_id, group_pb.ShortDebugString());
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,10 @@ class LocalAdmissionController final : private boost::noncopyable
std::string & err_msg);
void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec);

void clearCPUTimeWithoutLock(const SteadyClock::time_point & now)
void clearCPUTime(const SteadyClock::time_point & now)
{
static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL);
std::lock_guard lock(mu);
if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION)
{
for (auto & ele : keyspace_resource_groups)
Expand Down