Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,9 @@ DEFINE_mInt32(group_commit_queue_mem_limit, "67108864");
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
DEFINE_Bool(group_commit_wait_replay_wal_finish, "false");
// Max time(ms) to wait for creating group commit plan fragment.
// 0 means no timeout, default 2min.
DEFINE_mInt32(group_commit_create_plan_timeout_ms, "120000");

DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,8 @@ DECLARE_mInt32(group_commit_queue_mem_limit);
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
DECLARE_Bool(group_commit_wait_replay_wal_finish);
// Max time(ms) to wait for creating group commit plan fragment. 0 means no timeout.
DECLARE_mInt32(group_commit_create_plan_timeout_ms);

// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
Expand Down
37 changes: 19 additions & 18 deletions be/src/exec/operator/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._schema->indexes().size(),
p._load_id, _load_block_queue, _state->be_exec_version(),
_state->query_mem_tracker(), _create_plan_dependency, _put_block_dependency));
p._load_id, _load_block_queue, _state->be_exec_version(), _create_plan_dependency,
_put_block_dependency));
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
return Status::OK();
Expand Down Expand Up @@ -141,17 +141,16 @@ Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
block->get_by_position(i).column = make_nullable(block->get_by_position(i).column);
block->get_by_position(i).type = make_nullable(block->get_by_position(i).type);
}
// add block to queue
auto cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
{
IColumn::Selector selector;
for (auto i = 0; i < block->rows(); i++) {
selector.emplace_back(i);
}
RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(), selector));
}
// Add block to queue. The block has already been converted to all-nullable columns above
// and contains exactly the rows to enqueue (filtering is done by the caller before
// _add_block), so move its columns into a standalone output_block instead of deep-copying
// them. The previous code appended every row through an identity selector, which duplicated
// all column data (notably ColumnString chars) and dominated group-commit load memory.
// Columns are COW shared_ptrs, so swapping into a fresh Block is O(1) and memory-safe: the
// queued block is a distinct object (consumers swap/mutate only that object) while the
// underlying data stays alive via reference counting and is only read downstream.
std::shared_ptr<Block> output_block = Block::create_shared();
output_block->swap(cur_mutable_block->to_block());
output_block->swap(*block);
if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() +
state->num_rows_load_filtered() <=
config::group_commit_memory_rows_for_max_filter_ratio) {
Expand Down Expand Up @@ -338,10 +337,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo
};

auto rows = input_block->rows();
auto bytes = input_block->bytes();
if (UNLIKELY(rows == 0)) {
return wind_up();
}
auto bytes = input_block->bytes();

// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
Expand All @@ -363,11 +362,9 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo
local_state._partitions.assign(rows, nullptr);
local_state._filter_bitmap.Reset(rows);

for (int index = 0; index < rows; index++) {
local_state._vpartition->find_partition(block.get(), index,
local_state._partitions[index]);
}
for (int row_index = 0; row_index < rows; row_index++) {
local_state._vpartition->find_partition(block.get(), row_index,
local_state._partitions[row_index]);
if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
local_state._filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
Expand All @@ -394,15 +391,19 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block* input_blo
local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block = MutableBlock::build_mutable_block(std::move(cloneBlock));
std::vector<uint32_t> rows_to_keep;
rows_to_keep.reserve(rows);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
}
if (local_state._filter_bitmap.Get(i)) {
continue;
}
res_block.add_row(block.get(), i);
rows_to_keep.emplace_back(i);
}
RETURN_IF_ERROR(res_block.add_rows(block.get(), rows_to_keep.data(),
rows_to_keep.data() + rows_to_keep.size()));
block->swap(res_block.to_block());
}
// add block into block queue
Expand Down
Loading
Loading