From eaf714d162b7cefde74448fc1a3403f8b4ca82d1 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 1 Jul 2026 14:37:19 +0800 Subject: [PATCH] [fix](group commit) fix can not get a block queue (#63722) Under high-concurrency async stream load, group commit may fail to get a block queue when creating the group commit plan fragment fails or leaves pending load requests waiting for a queue that is no longer usable. This change tracks pending create-plan requests per table, adds a background worker to resubmit group commit plan creation, and introduces group_commit_create_plan_timeout_ms so waiting requests can be released after a bounded time. --- be/src/common/config.cpp | 3 + be/src/common/config.h | 2 + .../group_commit_block_sink_operator.cpp | 37 +- be/src/load/group_commit/group_commit_mgr.cpp | 329 ++++++++++++++---- be/src/load/group_commit/group_commit_mgr.h | 41 ++- be/src/load/group_commit/wal/wal_table.cpp | 11 +- .../test_group_commit_error.groovy | 46 ++- ..._stream_load_high_concurrency_async.groovy | 138 ++++++++ 8 files changed, 511 insertions(+), 96 deletions(-) create mode 100644 regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 02f1d145f25471..2b4e9056460d41 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1414,6 +1414,9 @@ DEFINE_mInt32(group_commit_queue_mem_limit, "67108864"); // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DEFINE_String(group_commit_wal_max_disk_limit, "10%"); DEFINE_Bool(group_commit_wait_replay_wal_finish, "false"); +// Max time(ms) to wait for creating group commit plan fragment. +// 0 means no timeout, default 2min. +DEFINE_mInt32(group_commit_create_plan_timeout_ms, "120000"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index d1c509ff35398e..fd236881fc0866 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1501,6 +1501,8 @@ DECLARE_mInt32(group_commit_queue_mem_limit); // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. DECLARE_mString(group_commit_wal_max_disk_limit); DECLARE_Bool(group_commit_wait_replay_wal_finish); +// Max time(ms) to wait for creating group commit plan fragment. 0 means no timeout. +DECLARE_mInt32(group_commit_create_plan_timeout_ms); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp b/be/src/exec/operator/group_commit_block_sink_operator.cpp index fd44a9e74a627d..7a71866148bc3e 100644 --- a/be/src/exec/operator/group_commit_block_sink_operator.cpp +++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp @@ -87,8 +87,8 @@ Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { if (_state->exec_env()->wal_mgr()->is_running()) { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( p._db_id, p._table_id, p._base_schema_version, p._schema->indexes().size(), - p._load_id, _load_block_queue, _state->be_exec_version(), - _state->query_mem_tracker(), _create_plan_dependency, _put_block_dependency)); + p._load_id, _load_block_queue, _state->be_exec_version(), _create_plan_dependency, + _put_block_dependency)); _state->set_import_label(_load_block_queue->label); _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id return Status::OK(); @@ -141,17 +141,16 @@ Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state, block->get_by_position(i).column = make_nullable(block->get_by_position(i).column); block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); } - // add block to queue - auto cur_mutable_block = MutableBlock::create_unique(block->clone_empty()); - { - IColumn::Selector selector; - for (auto i = 0; i < block->rows(); i++) { - selector.emplace_back(i); - } - RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(), selector)); - } + // Add block to queue. The block has already been converted to all-nullable columns above + // and contains exactly the rows to enqueue (filtering is done by the caller before + // _add_block), so move its columns into a standalone output_block instead of deep-copying + // them. The previous code appended every row through an identity selector, which duplicated + // all column data (notably ColumnString chars) and dominated group-commit load memory. + // Columns are COW shared_ptrs, so swapping into a fresh Block is O(1) and memory-safe: the + // queued block is a distinct object (consumers swap/mutate only that object) while the + // underlying data stays alive via reference counting and is only read downstream. std::shared_ptr output_block = Block::create_shared(); - output_block->swap(cur_mutable_block->to_block()); + output_block->swap(*block); if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() + state->num_rows_load_filtered() <= config::group_commit_memory_rows_for_max_filter_ratio) { @@ -338,10 +337,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo }; auto rows = input_block->rows(); - auto bytes = input_block->bytes(); if (UNLIKELY(rows == 0)) { return wind_up(); } + auto bytes = input_block->bytes(); // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. @@ -363,11 +362,9 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo local_state._partitions.assign(rows, nullptr); local_state._filter_bitmap.Reset(rows); - for (int index = 0; index < rows; index++) { - local_state._vpartition->find_partition(block.get(), index, - local_state._partitions[index]); - } for (int row_index = 0; row_index < rows; row_index++) { + local_state._vpartition->find_partition(block.get(), row_index, + local_state._partitions[row_index]); if (local_state._partitions[row_index] == nullptr) [[unlikely]] { local_state._filter_bitmap.Set(row_index, true); LOG(WARNING) << "no partition for this tuple. tuple=" @@ -394,6 +391,8 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo local_state._has_filtered_rows) { auto cloneBlock = block->clone_without_columns(); auto res_block = MutableBlock::build_mutable_block(std::move(cloneBlock)); + std::vector rows_to_keep; + rows_to_keep.reserve(rows); for (int i = 0; i < rows; ++i) { if (local_state._block_convertor->filter_map()[i]) { continue; @@ -401,8 +400,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo if (local_state._filter_bitmap.Get(i)) { continue; } - res_block.add_row(block.get(), i); + rows_to_keep.emplace_back(i); } + RETURN_IF_ERROR(res_block.add_rows(block.get(), rows_to_keep.data(), + rows_to_keep.data() + rows_to_keep.size())); block->swap(res_block.to_block()); } // add block into block queue diff --git a/be/src/load/group_commit/group_commit_mgr.cpp b/be/src/load/group_commit/group_commit_mgr.cpp index e75d09141a24e9..3a152a25b16730 100644 --- a/be/src/load/group_commit/group_commit_mgr.cpp +++ b/be/src/load/group_commit/group_commit_mgr.cpp @@ -29,9 +29,12 @@ #include "exec/pipeline/dependency.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/thread_context.h" #include "util/client_cache.h" #include "util/debug_points.h" #include "util/thrift_rpc_helper.h" +#include "util/time.h" namespace doris { #include "common/compile_check_begin.h" @@ -100,18 +103,21 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptrload(std::memory_order_relaxed) >= config::group_commit_queue_mem_limit) { group_commit_block_by_memory_counter << 1; - DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()); - _load_ids_to_write_dep[load_id]->block(); - VLOG_DEBUG << "block add_block for load_id=" << load_id - << ", memory=" << _all_block_queues_bytes->load(std::memory_order_relaxed) - << ". inner load_id=" << load_instance_id << ", label=" << label; + auto dep_it = _load_ids_to_write_dep.find(load_id); + DCHECK(dep_it != _load_ids_to_write_dep.end()); + if (dep_it != _load_ids_to_write_dep.end() && dep_it->second) { + dep_it->second->block(); + VLOG_DEBUG << "block add_block for load_id=" << load_id << ", memory=" + << _all_block_queues_bytes->load(std::memory_order_relaxed) + << ". inner load_id=" << load_instance_id << ", label=" << label; + } } } - if (!_need_commit) { + if (!_need_commit.load()) { if (_data_bytes >= _group_commit_data_bytes) { VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; - _need_commit = true; + _need_commit.store(true); data_size_condition = true; } if (std::chrono::duration_cast(std::chrono::steady_clock::now() - @@ -119,7 +125,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr= _group_commit_interval_ms) { VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; - _need_commit = true; + _need_commit.store(true); } } for (auto read_dep : _read_deps) { @@ -142,11 +148,11 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - _start_time) .count(); - if (!_need_commit && duration >= _group_commit_interval_ms) { - _need_commit = true; + if (!_need_commit.load() && duration >= _group_commit_interval_ms) { + _need_commit.store(true); } if (_block_queue.empty()) { - if (_need_commit && duration >= 10 * _group_commit_interval_ms) { + if (_need_commit.load() && duration >= 10 * _group_commit_interval_ms) { auto last_print_duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - _last_print_time) .count(); @@ -158,7 +164,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool } } VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty"; - if (!_need_commit) { + if (!_need_commit.load()) { get_block_dep->block(); VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id; } @@ -176,7 +182,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool << ". txn_id=" << txn_id << ", label=" << label << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); } - if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) { + if (_block_queue.empty() && _need_commit.load() && _load_ids_to_write_dep.empty()) { *eos = true; } else { *eos = false; @@ -215,9 +221,12 @@ bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) { Status LoadBlockQueue::add_load_id(const UniqueId& load_id, const std::shared_ptr put_block_dep) { std::unique_lock l(mutex); - if (_need_commit) { - return Status::InternalError("block queue is set need commit, id=" + - load_instance_id.to_string()); + if (_need_commit.load() || !status.ok() || process_finish.load()) { + return Status::InternalError( + "block queue cannot add load id, id=" + load_instance_id.to_string() + + ", need_commit=" + (_need_commit.load() ? "true" : "false") + + ", process_finish=" + (process_finish.load() ? "true" : "false") + + ", queue_status=" + status.to_string()); } _load_ids_to_write_dep[load_id] = put_block_dep; group_commit_load_count.fetch_add(1); @@ -237,7 +246,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string()); size_t before_block_queues_bytes = _all_block_queues_bytes->load(); while (!_block_queue.empty()) { - const BlockData& block_data = _block_queue.front().block; + const BlockData& block_data = _block_queue.front(); _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); _block_queue.pop_front(); } @@ -259,8 +268,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id, std::shared_ptr& load_block_queue, int be_exe_version, - std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, - std::shared_ptr put_block_dep) { + std::shared_ptr create_plan_dep, std::shared_ptr put_block_dep) { DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { @@ -293,31 +301,131 @@ Status GroupCommitTable::get_first_block_load_queue( return Status::OK(); } create_plan_dep->block(); + _create_plan_be_exe_version = be_exe_version; + if (_create_plan_deps.empty()) { + _create_plan_start_time_ms = MonotonicMillis(); + } _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep, base_schema_version, index_size)); - if (!_is_creating_plan_fragment) { - _is_creating_plan_fragment = true; - RETURN_IF_ERROR( - _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { - Defer defer {[&, dep = dep]() { - std::unique_lock l(_lock); - for (auto it : _create_plan_deps) { - std::get<0>(it.second)->set_ready(); + [[maybe_unused]] auto submit_st = _submit_create_group_commit_load(); + return try_to_get_matched_queue(); +} + +Status GroupCommitTable::submit_create_group_commit_load() { + std::unique_lock l(_lock); + if (_create_plan_deps.empty()) { + return Status::OK(); + } + return _submit_create_group_commit_load(); +} + +Status GroupCommitTable::_submit_create_group_commit_load() { + if (_is_creating_plan_fragment) { + return Status::OK(); + } + + int64_t timeout_ms = config::group_commit_create_plan_timeout_ms; + if (timeout_ms > 0 && !_create_plan_deps.empty()) { + int64_t now_ms = MonotonicMillis(); + if (_create_plan_start_time_ms > 0 && now_ms - _create_plan_start_time_ms > timeout_ms) { + std::string last_create_plan_failed_reason = _create_plan_failed_reason; + _create_plan_failed_reason = + ". group commit create plan timeout after " + std::to_string(timeout_ms) + "ms"; + if (!last_create_plan_failed_reason.empty()) { + _create_plan_failed_reason += + ", last create plan error: " + last_create_plan_failed_reason; + } + for (const auto& [id, load_info] : _create_plan_deps) { + std::get<0>(load_info)->set_ready(); + } + _create_plan_deps.clear(); + _create_plan_start_time_ms = 0; + return Status::OK(); + } + } + + auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker(); + int be_exe_version = _create_plan_be_exe_version; + _is_creating_plan_fragment = true; + auto submit_st = _thread_pool->submit_func([&, be_exe_version, mem_tracker] { + std::shared_ptr created_load_block_queue; + Status create_group_commit_st = Status::OK(); + std::string create_plan_failed_reason; + Defer defer {[&]() { + bool need_resubmit = !create_group_commit_st.ok(); + std::unique_lock l(_lock); + _is_creating_plan_fragment = false; + _create_plan_failed_reason = create_plan_failed_reason; + if (created_load_block_queue && create_group_commit_st.ok() && + !created_load_block_queue->need_commit()) { + std::vector success_load_ids; + for (const auto& [id, load_info] : _create_plan_deps) { + auto create_dep = std::get<0>(load_info); + auto put_dep = std::get<1>(load_info); + if (created_load_block_queue->schema_version == std::get<2>(load_info) && + created_load_block_queue->index_size == std::get<3>(load_info)) { + auto st = created_load_block_queue->add_load_id(id, put_dep); + if (!st.ok()) { + LOG(WARNING) << "failed to add pending load_id into created " + "group commit queue, load_id=" + << id << ", label=" << created_load_block_queue->label + << ", status=" << st.to_string(); + need_resubmit = true; + } else { + create_dep->set_ready(); + success_load_ids.emplace_back(id); } - _create_plan_deps.clear(); - _is_creating_plan_fragment = false; - }}; - auto st = _create_group_commit_load(be_exe_version, mem_tracker); - if (!st.ok()) { - LOG(WARNING) << "create group commit load error: " << st.to_string(); - _create_plan_failed_reason = ". create group commit load error: " + - st.to_string().substr(0, 300); - } else { - _create_plan_failed_reason = ""; + } else if (created_load_block_queue->schema_version > std::get<2>(load_info) || + (created_load_block_queue->schema_version == + std::get<2>(load_info) && + created_load_block_queue->index_size != std::get<3>(load_info))) { + // schema version mismatch: + // 1. the schema version of created load block queue is newer than the load request + // 2. the index size is not equal + // set ready for the load request to let it fail + create_dep->set_ready(); + success_load_ids.emplace_back(id); } - })); - } - return try_to_get_matched_queue(); + } + for (const auto& id : success_load_ids) { + _create_plan_deps.erase(id); + } + if (_create_plan_deps.empty()) { + _create_plan_start_time_ms = 0; + } + } + if (!_create_plan_deps.empty()) { + need_resubmit = true; + } + if (need_resubmit && _group_commit_mgr) { + LOG(INFO) << "resubmit create group commit load task for table: " << _table_id; + _group_commit_mgr->add_need_create_plan_table(_table_id); + } + }}; + create_group_commit_st = + _create_group_commit_load(be_exe_version, mem_tracker, created_load_block_queue); + if (!create_group_commit_st.ok()) { + LOG(WARNING) << "create group commit load error: " + << create_group_commit_st.to_string(); + create_plan_failed_reason = ". create group commit load error: " + + create_group_commit_st.to_string().substr(0, 300); + } else { + create_plan_failed_reason = ""; + } + }); + if (!submit_st.ok()) { + _is_creating_plan_fragment = false; + _create_plan_failed_reason = + ". create group commit load error: submit create group commit load task failed: " + + submit_st.to_string().substr(0, 300); + for (const auto& [id, load_info] : _create_plan_deps) { + std::get<0>(load_info)->set_ready(); + } + _create_plan_deps.clear(); + LOG(WARNING) << "submit create group commit load task for table: " << _table_id + << ", error: " << submit_st.to_string(); + } + return submit_st; } void GroupCommitTable::remove_load_id(const UniqueId& load_id) { @@ -333,8 +441,9 @@ void GroupCommitTable::remove_load_id(const UniqueId& load_id) { } } -Status GroupCommitTable::_create_group_commit_load(int be_exe_version, - std::shared_ptr mem_tracker) { +Status GroupCommitTable::_create_group_commit_load( + int be_exe_version, const std::shared_ptr& mem_tracker, + std::shared_ptr& created_load_block_queue) { Status st = Status::OK(); TStreamLoadPutResult result; std::string label; @@ -406,6 +515,7 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, be_exe_version)); + created_load_block_queue = load_block_queue; std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); @@ -419,11 +529,23 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, create_dep->set_ready(); success_load_ids.emplace_back(id); } + } else if (load_block_queue->schema_version > std::get<2>(load_info) || + (load_block_queue->schema_version == std::get<2>(load_info) && + load_block_queue->index_size != std::get<3>(load_info))) { + // schema version mismatch: + // 1. the schema version of created load block queue is newer than the load request + // 2. the index size is not equal + // set ready for the load request to let it fail + create_dep->set_ready(); + success_load_ids.emplace_back(id); } } for (const auto& id : success_load_ids) { _create_plan_deps.erase(id); } + if (_create_plan_deps.empty()) { + _create_plan_start_time_ms = 0; + } } } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params); @@ -483,16 +605,25 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ client->loadTxnCommit(result, request); }, config::txn_commit_rpc_timeout_ms); - result_status = Status::create(result.status); - // DELETE_BITMAP_LOCK_ERROR will be retried - if (result_status.ok() || !result_status.is()) { - break; + if (st.ok()) { + result_status = Status::create(result.status); + // DELETE_BITMAP_LOCK_ERROR will be retried + if (result_status.ok() || + !result_status.is()) { + break; + } + LOG_WARNING("Failed to commit txn on group commit") + .tag("label", label) + .tag("txn_id", txn_id) + .tag("retry_times", retry_times) + .error(result_status); + } else { + LOG_WARNING("Failed to commit txn on group commit") + .tag("label", label) + .tag("txn_id", txn_id) + .tag("retry_times", retry_times) + .error(st); } - LOG_WARNING("Failed to commit txn on group commit") - .tag("label", label) - .tag("txn_id", txn_id) - .tag("retry_times", retry_times) - .error(result_status); retry_times++; } DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error", @@ -513,7 +644,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ [&request, &result](FrontendServiceConnection& client) { client->loadTxnRollback(result, request); }); - result_status = Status::create(result.status); + if (st.ok()) { + result_status = Status::create(result.status); + } DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { std ::string msg = "abort txn"; LOG(INFO) << "debug promise set: " << msg; @@ -543,6 +676,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ } _load_block_queues.erase(instance_id); } + if (!load_block_queue) { + LOG(WARNING) << "finish group commit can not find load block queue, label=" << label + << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); + } // status: exec_plan_fragment result // st: commit txn rpc status // result_status: commit txn result @@ -567,11 +704,15 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) << ", exec_plan_fragment status=" << status.to_string() << ", commit/abort txn rpc status=" << st.to_string() - << ", commit/abort txn status=" << result_status.to_string() - << ", this group commit includes " << load_block_queue->group_commit_load_count << " loads" - << ", flush because meet " - << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition" - << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); + << ", commit/abort txn status=" << result_status.to_string(); + if (load_block_queue) { + ss << ", this group commit includes " << load_block_queue->group_commit_load_count + << " loads, flush because meet " + << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition"; + } else { + ss << ", load block queue is missing when finishing group commit"; + } + ss << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); if (state) { if (!state->get_error_log_file_path().empty()) { ss << ", error_url=" << state->get_error_log_file_path(); @@ -631,35 +772,95 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { .set_max_threads(config::group_commit_insert_threads) .build(&_thread_pool)); _all_block_queues_bytes = std::make_shared(0); + _group_commit_mem_tracker = + MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "GroupCommit"); + _create_plan_thread = std::thread(&GroupCommitMgr::_create_plan_worker, this); } GroupCommitMgr::~GroupCommitMgr() { + stop(); LOG(INFO) << "GroupCommitMgr is destoried"; } void GroupCommitMgr::stop() { + { + std::lock_guard l(_need_create_plan_lock); + if (_stopped) { + return; + } + _stopped = true; + } + _need_create_plan_cv.notify_all(); + if (_create_plan_thread.joinable()) { + _create_plan_thread.join(); + } _thread_pool->shutdown(); LOG(INFO) << "GroupCommitMgr is stopped"; } -Status GroupCommitMgr::get_first_block_load_queue( - int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t index_size, - const UniqueId& load_id, std::shared_ptr& load_block_queue, - int be_exe_version, std::shared_ptr mem_tracker, - std::shared_ptr create_plan_dep, std::shared_ptr put_block_dep) { +void GroupCommitMgr::add_need_create_plan_table(int64_t table_id) { + { + std::lock_guard l(_need_create_plan_lock); + if (_stopped) { + return; + } + _need_create_plan_tables.insert(table_id); + } + _need_create_plan_cv.notify_one(); +} + +void GroupCommitMgr::_create_plan_worker() { + SCOPED_INIT_THREAD_CONTEXT(); + while (true) { + std::set need_create_plan_tables; + { + std::unique_lock l(_need_create_plan_lock); + _need_create_plan_cv.wait( + l, [this] { return _stopped || !_need_create_plan_tables.empty(); }); + if (_stopped && _need_create_plan_tables.empty()) { + return; + } + need_create_plan_tables.swap(_need_create_plan_tables); + } + for (const auto table_id : need_create_plan_tables) { + std::shared_ptr group_commit_table; + { + std::lock_guard l(_lock); + auto it = _table_map.find(table_id); + if (it == _table_map.end()) { + continue; + } + group_commit_table = it->second; + } + auto st = group_commit_table->submit_create_group_commit_load(); + if (!st.ok()) { + LOG(WARNING) << "submit create group commit load task from worker for table: " + << table_id << ", error: " << st.to_string(); + } + } + } +} + +Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id, + int64_t base_schema_version, int64_t index_size, + const UniqueId& load_id, + std::shared_ptr& load_block_queue, + int be_exe_version, + std::shared_ptr create_plan_dep, + std::shared_ptr put_block_dep) { std::shared_ptr group_commit_table; { std::lock_guard wlock(_lock); if (_table_map.find(table_id) == _table_map.end()) { _table_map.emplace(table_id, std::make_shared( _exec_env, _thread_pool.get(), db_id, table_id, - _all_block_queues_bytes)); + _all_block_queues_bytes, this)); } group_commit_table = _table_map[table_id]; } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version, - mem_tracker, create_plan_dep, put_block_dep)); + create_plan_dep, put_block_dep)); return Status::OK(); } diff --git a/be/src/load/group_commit/group_commit_mgr.h b/be/src/load/group_commit/group_commit_mgr.h index bf892a4302c364..5ab3831e4f5cc4 100644 --- a/be/src/load/group_commit/group_commit_mgr.h +++ b/be/src/load/group_commit/group_commit_mgr.h @@ -25,7 +25,9 @@ #include #include #include +#include #include +#include #include #include @@ -38,6 +40,7 @@ namespace doris { class ExecEnv; +class GroupCommitMgr; class TUniqueId; class RuntimeState; @@ -76,7 +79,7 @@ class LoadBlockQueue { Status add_load_id(const UniqueId& load_id, const std::shared_ptr put_block_dep); Status remove_load_id(const UniqueId& load_id); void cancel(const Status& st); - bool need_commit() { return _need_commit; } + bool need_commit() const { return _need_commit.load(); } Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, @@ -95,8 +98,8 @@ class LoadBlockQueue { "wait_internal_group_commit_finish={}, data_size_condition={}, " "group_commit_load_count={}, process_finish={}, _need_commit={}, schema_version={}", load_instance_id.to_string(), label, txn_id, wait_internal_group_commit_finish, - data_size_condition, group_commit_load_count, process_finish.load(), _need_commit, - schema_version); + data_size_condition, group_commit_load_count, process_finish.load(), + _need_commit.load(), schema_version); return fmt::to_string(debug_string_buffer); } @@ -135,7 +138,7 @@ class LoadBlockQueue { std::shared_ptr _v_wal_writer; // commit - bool _need_commit = false; + std::atomic _need_commit = false; // commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' int64_t _group_commit_interval_ms; std::chrono::steady_clock::time_point _start_time; @@ -152,27 +155,31 @@ class LoadBlockQueue { class GroupCommitTable { public: GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id, - int64_t table_id, std::shared_ptr all_block_queue_bytes) + int64_t table_id, std::shared_ptr all_block_queue_bytes, + GroupCommitMgr* group_commit_mgr) : _exec_env(exec_env), _thread_pool(thread_pool), _all_block_queues_bytes(all_block_queue_bytes), _db_id(db_id), - _table_id(table_id) {}; + _table_id(table_id), + _group_commit_mgr(group_commit_mgr) {}; Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id, std::shared_ptr& load_block_queue, int be_exe_version, - std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, std::shared_ptr put_block_dep); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue, std::shared_ptr get_block_dep); void remove_load_id(const UniqueId& load_id); + Status submit_create_group_commit_load(); private: + Status _submit_create_group_commit_load(); Status _create_group_commit_load(int be_exe_version, - std::shared_ptr mem_tracker); + const std::shared_ptr& mem_tracker, + std::shared_ptr& created_load_block_queue); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, const TPipelineFragmentParams& pipeline_params); Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label, @@ -186,6 +193,7 @@ class GroupCommitTable { int64_t _db_id; int64_t _table_id; + GroupCommitMgr* _group_commit_mgr = nullptr; std::mutex _lock; // fragment_instance_id to load_block_queue @@ -196,6 +204,8 @@ class GroupCommitTable { std::shared_ptr, int64_t, int64_t>> _create_plan_deps; std::string _create_plan_failed_reason; + int _create_plan_be_exe_version = 0; + int64_t _create_plan_start_time_ms = 0; }; class GroupCommitMgr { @@ -213,14 +223,19 @@ class GroupCommitMgr { int64_t index_size, const UniqueId& load_id, std::shared_ptr& load_block_queue, int be_exe_version, - std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, std::shared_ptr put_block_dep); void remove_load_id(int64_t table_id, const UniqueId& load_id); std::promise debug_promise; std::future debug_future = debug_promise.get_future(); + void add_need_create_plan_table(int64_t table_id); + std::shared_ptr group_commit_mem_tracker() { + return _group_commit_mem_tracker; + } private: + void _create_plan_worker(); + ExecEnv* _exec_env = nullptr; std::unique_ptr _thread_pool; // memory consumption of all tables' load block queues, used for memory back pressure. @@ -229,6 +244,12 @@ class GroupCommitMgr { std::mutex _lock; // TODO remove table when unused std::unordered_map> _table_map; + std::mutex _need_create_plan_lock; + std::condition_variable _need_create_plan_cv; + std::set _need_create_plan_tables; + bool _stopped = false; + std::thread _create_plan_thread; + std::shared_ptr _group_commit_mem_tracker; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/load/group_commit/wal/wal_table.cpp b/be/src/load/group_commit/wal/wal_table.cpp index c3b82e16377751..8edb1d938c5238 100644 --- a/be/src/load/group_commit/wal/wal_table.cpp +++ b/be/src/load/group_commit/wal/wal_table.cpp @@ -183,9 +183,14 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { [&request, &result](FrontendServiceConnection& client) { client->loadTxnRollback(result, request); }); - auto result_status = Status::create(result.status); - LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status; - return result_status; + if (st.ok()) { + auto result_status = Status::create(result.status); + LOG(INFO) << "abort label " << label << ", result_status:" << result_status; + return result_status; + } else { + LOG(WARNING) << "abort label " << label << ", rpc error:" << st; + return st; + } } Status WalTable::_replay_wal_internal(const std::string& wal) { diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index cef9bbdbf27df4..f2797ff7a72edd 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -17,6 +17,50 @@ suite("test_group_commit_error", "nonConcurrent") { def tableName = "test_group_commit_error" + def beConfigName = "group_commit_create_plan_timeout_ms" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def get_be_config = { String backend_id, String key -> + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == key) { + return ((List) ele)[2] + } + } + assertTrue(false, "Failed to find BE config: " + key) + } + + def set_be_config = { String key, String value -> + for (String backend_id : backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + } + + def originBeConfig = [:] + for (String backend_id : backendId_to_backendIP.keySet()) { + originBeConfig[backend_id] = get_be_config(backend_id, beConfigName) + } + + onFinish { + for (String backend_id : originBeConfig.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), + backendId_to_backendHttpPort.get(backend_id), beConfigName, originBeConfig[backend_id]) + logger.info("restore config: code=" + code + ", out=" + out + ", err=" + err) + } + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + + set_be_config(beConfigName, "20000") sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -104,4 +148,4 @@ suite("test_group_commit_error", "nonConcurrent") { } finally { GetDebugPoint().clearDebugPointsForAllBEs() } -} \ No newline at end of file +} diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy new file mode 100644 index 00000000000000..237d7fc649c81a --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger + +suite("test_group_commit_stream_load_high_concurrency_async", "p0") { + def tableName = "test_group_commit_stream_load_high_concurrency_async" + int concurrentClients = 20 + int loadsPerClient = 20 + int expectedRows = concurrentClients * loadsPerClient + def errors = Collections.synchronizedList(new ArrayList()) + def stopRequested = new AtomicBoolean(false) + def successLoads = new AtomicInteger(0) + def getProperty = { property, userName -> + def result = sql_return_maparray """SHOW PROPERTY FOR '${userName}'""" + result.find { + it.Key == property as String + } + } + def originMaxUserConnections = getProperty("max_user_connections", "root").Value as long + + def waitRowCount = { expected -> + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql "select count(*) from ${tableName}" + logger.info("table: ${tableName}, rowCount: ${result}, expected: ${expected}") + return result[0][0] == expected + }) + } + + def checkStreamLoadResult = { loadId, result, exception -> + if (exception != null) { + stopRequested.set(true) + errors.add("load ${loadId} exception: ${exception.getMessage()}") + return + } + def json = parseJson(result) + if (!"success".equalsIgnoreCase(json.Status?.toString())) { + stopRequested.set(true) + errors.add("load ${loadId} status=${json.Status}, msg=${json.Message}") + return + } + if (json.GroupCommit != true) { + stopRequested.set(true) + errors.add("load ${loadId} is not group commit: ${result}") + return + } + if (json.NumberTotalRows != 1 || json.NumberLoadedRows != 1 || + json.NumberFilteredRows != 0 || json.NumberUnselectedRows != 0) { + stopRequested.set(true) + errors.add("load ${loadId} unexpected counters: ${result}") + return + } + successLoads.incrementAndGet() + } + + try { + sql """SET PROPERTY FOR 'root' 'max_user_connections' = '1024'""" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + id BIGINT NOT NULL, + name STRING NULL + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "5" + ); + """ + + def threads = [] + for (int client = 0; client < concurrentClients; client++) { + int clientId = client + threads.add(Thread.startDaemon("group-commit-stream-load-${clientId}") { + for (int round = 0; round < loadsPerClient; round++) { + if (stopRequested.get()) { + break + } + long loadId = clientId * loadsPerClient + round + try { + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'group_commit', 'async_mode' + unset 'label' + inputText "${loadId},name_${loadId}\n" + time 60000 + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(loadId, result, exception) + } + } + } catch (Exception e) { + stopRequested.set(true) + errors.add("load ${loadId} streamLoad throws: ${e.getMessage()}") + break + } + } + }) + } + + threads.each { it.join() } + + assertTrue(errors.isEmpty(), + "group commit stream load failures: " + + errors.subList(0, Math.min(errors.size(), 10))) + assertEquals(expectedRows, successLoads.get()) + + sql "sync" + waitRowCount(expectedRows) + + def result = sql "select count(*) from ${tableName}" + assertEquals(expectedRows, result[0][0]) + } finally { + sql """SET PROPERTY FOR 'root' 'max_user_connections' = '${originMaxUserConnections}'""" + } +}