From 20c034151794307ec4cc459d2118c55e6db7e214 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 6 Mar 2026 08:37:51 +0000 Subject: [PATCH 1/5] Merge pull request #91574 from amosbird/merge-tree-clean-up-thread --- src/Common/FailPoint.cpp | 10 +- .../MergeTree/IMergeTreeCleanupThread.cpp | 162 +++++++++++++ .../MergeTree/IMergeTreeCleanupThread.h | 56 +++++ .../MergeTree/MergeTreeCleanupThread.cpp | 63 +++++ .../MergeTree/MergeTreeCleanupThread.h | 31 +++ .../ReplicatedMergeTreeCleanupThread.cpp | 228 +++++------------- .../ReplicatedMergeTreeCleanupThread.h | 86 +++---- src/Storages/StorageMergeTree.cpp | 74 +++--- src/Storages/StorageMergeTree.h | 8 +- .../test_merge_tree_load_parts/test.py | 7 +- tests/integration/test_merge_tree_s3/test.py | 3 + .../test_s3_plain_rewritable/test.py | 2 +- .../0_stateless/00652_mergetree_mutations.sh | 10 +- .../01560_ttl_remove_empty_parts.sh | 2 +- .../02421_new_type_json_empty_parts.sh | 6 +- .../03008_s3_plain_rewritable_fault.sh | 2 +- .../03198_unload_primary_key_outdated.sh | 2 +- .../0_stateless/03305_mutations_counters.sh | 2 +- .../03305_rename_mutations_counter.sh | 2 +- .../03357_replacing_min_age_cleanup.sh | 5 +- ...2_clean_up_thread_for_merge_tree.reference | 1 + .../03742_clean_up_thread_for_merge_tree.sh | 59 +++++ 22 files changed, 547 insertions(+), 274 deletions(-) create mode 100644 src/Storages/MergeTree/IMergeTreeCleanupThread.cpp create mode 100644 src/Storages/MergeTree/IMergeTreeCleanupThread.h create mode 100644 src/Storages/MergeTree/MergeTreeCleanupThread.cpp create mode 100644 src/Storages/MergeTree/MergeTreeCleanupThread.h create mode 100644 tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference create mode 100755 tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 476738471079..6e9d53efe167 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -29,8 +29,8 @@ static struct InitFiu /// We should define different types of failpoints here. There are four types of them: /// - ONCE: the failpoint will only be triggered once. /// - REGULAR: the failpoint will always be triggered until disableFailPoint is called. -/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called. -/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called. +/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, until disableFailPoint is called. +/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, until disableFailPoint is called. #define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \ ONCE(replicated_merge_tree_commit_zk_fail_after_op) \ @@ -131,9 +131,13 @@ static struct InitFiu REGULAR(rmt_delay_commit_part) \ ONCE(smt_commit_exception_before_op) \ ONCE(backup_add_empty_memory_table) \ + ONCE(local_object_storage_network_error_during_remove) \ + ONCE(parallel_replicas_check_read_mode_always) \ + REGULAR(lightweight_show_tables) \ + PAUSEABLE_ONCE(drop_database_before_exclusive_ddl_lock) \ + REGULAR(storage_merge_tree_background_schedule_merge_fail) \ REGULAR(refresh_task_stop_racing_for_running_refresh) - namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; diff --git a/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp new file mode 100644 index 000000000000..701707ddafd0 --- /dev/null +++ b/src/Storages/MergeTree/IMergeTreeCleanupThread.cpp @@ -0,0 +1,162 @@ +#include + +#include +#include +#include +#include + +namespace DB +{ + +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsUInt64 cleanup_delay_period; + extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add; + extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration; + extern const MergeTreeSettingsUInt64 max_cleanup_delay_period; +} + +IMergeTreeCleanupThread::IMergeTreeCleanupThread(MergeTreeData & data_) + : data(data_) + , log_name(data.getStorageID().getFullTableName() + " (CleanupThread)") + , log(getLogger(log_name)) + , sleep_ms((*data.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000) +{ + task = data.getContext()->getSchedulePool().createTask(log_name, [this] { run(); }); +} + +IMergeTreeCleanupThread::~IMergeTreeCleanupThread() = default; + +void IMergeTreeCleanupThread::start() +{ + task->activateAndSchedule(); +} + +void IMergeTreeCleanupThread::wakeup() +{ + task->schedule(); +} + +void IMergeTreeCleanupThread::stop() +{ + task->deactivate(); +} + +void IMergeTreeCleanupThread::wakeupEarlierIfNeeded() +{ + /// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data. + /// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon, + /// but the number of objects to clean up is growing. We need to wakeup the task earlier. + auto storage_settings = data.getSettings(); + if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) + return; + + /// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts. + /// Do not wake up unless we have too many. + size_t number_of_outdated_objects = data.getOutdatedPartsCount(); + if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) + return; + + /// A race condition is possible here, but it's okay + if (is_running.load(std::memory_order_relaxed)) + return; + + /// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime()) + if (!wakeup_check_timer.compareAndRestart(static_cast((*storage_settings)[MergeTreeSetting::cleanup_delay_period]) / 4.0)) + return; + + UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); + UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000; + if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms) + return; + + /// Don't run it more often than cleanup_delay_period + UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000; + if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period]) + return; + + /// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many. + number_of_outdated_objects = data.getNumberOfOutdatedPartsWithExpiredRemovalTime(); + if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) + return; + + LOG_TRACE( + log, + "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago", + number_of_outdated_objects, + seconds_passed); + + wakeup(); +} + +void IMergeTreeCleanupThread::run() +{ + if (cleanup_blocker.isCancelled()) + { + LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting"); + return; + } + + SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); }); + is_running.store(true, std::memory_order_relaxed); + + auto storage_settings = data.getSettings(); + + Float32 cleanup_points = 0; + try + { + cleanup_points = iterate(); + } + catch (const Coordination::Exception & e) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + + if (e.code == Coordination::Error::ZSESSIONEXPIRED) + return; + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + + UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); + UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000; + + /// Do not adjust sleep_ms on the first run after starting the server + if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) + { + /// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup. + /// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s) + /// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part. + /// So we need some interpolation based on preferred batch size. + auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]; + + /// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration? + Float32 ratio = cleanup_points / static_cast(expected_cleanup_points); + if (ratio == 0) + sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000; + else + sleep_ms = static_cast(static_cast(sleep_ms) / ratio); + + sleep_ms = std::clamp( + sleep_ms, + (*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000, + (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000); + + UInt64 interval_ms = now_ms - prev_timestamp; + LOG_TRACE( + log, + "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})", + sleep_ms, + cleanup_points, + interval_ms, + ratio, + cleanup_points / static_cast(interval_ms * 60'000)); + } + prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed); + + sleep_ms += std::uniform_int_distribution(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng); + task->scheduleAfter(sleep_ms); +} + +} diff --git a/src/Storages/MergeTree/IMergeTreeCleanupThread.h b/src/Storages/MergeTree/IMergeTreeCleanupThread.h new file mode 100644 index 000000000000..3621d9e177e7 --- /dev/null +++ b/src/Storages/MergeTree/IMergeTreeCleanupThread.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include + +#include + +namespace DB +{ + +class MergeTreeData; + +/// Removes obsolete data from a table of type [Replicated]MergeTree. +class IMergeTreeCleanupThread +{ +public: + explicit IMergeTreeCleanupThread(MergeTreeData & data_); + + virtual ~IMergeTreeCleanupThread(); + + void start(); + + void wakeup(); + + void stop(); + + void wakeupEarlierIfNeeded(); + + ActionLock getCleanupLock() { return cleanup_blocker.cancel(); } + +protected: + MergeTreeData & data; + + String log_name; + LoggerPtr log; + BackgroundSchedulePoolTaskHolder task; + pcg64 rng{randomSeed()}; + + UInt64 sleep_ms; + + std::atomic prev_cleanup_timestamp_ms = 0; + std::atomic is_running = false; + + AtomicStopwatch wakeup_check_timer; + + ActionBlocker cleanup_blocker; + + void run(); + + /// Returns a number this is directly proportional to the number of cleaned up blocks + virtual Float32 iterate() = 0; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeCleanupThread.cpp b/src/Storages/MergeTree/MergeTreeCleanupThread.cpp new file mode 100644 index 000000000000..a23e16a3db14 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeCleanupThread.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include + +namespace DB +{ + +namespace MergeTreeSetting +{ + extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations; + extern const MergeTreeSettingsUInt64 merge_tree_clear_old_parts_interval_seconds; + extern const MergeTreeSettingsUInt64 merge_tree_clear_old_temporary_directories_interval_seconds; + extern const MergeTreeSettingsSeconds temporary_directories_lifetime; +} + +MergeTreeCleanupThread::MergeTreeCleanupThread(StorageMergeTree & storage_) + : IMergeTreeCleanupThread(storage_) + , storage(storage_) +{ +} + +void MergeTreeCleanupThread::start() +{ + time_after_previous_cleanup_parts.restart(); + time_after_previous_cleanup_temporary_directories.restart(); + IMergeTreeCleanupThread::start(); +} + +Float32 MergeTreeCleanupThread::iterate() +{ + size_t cleaned_other = 0; + size_t cleaned_part_like = 0; + size_t cleaned_parts = 0; + + auto storage_settings = storage.getSettings(); + + auto shared_lock + = storage.lockForShare(RWLockImpl::NO_QUERY, (*storage_settings)[MergeTreeSetting::lock_acquire_timeout_for_background_operations]); + if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred( + static_cast((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds]))) + { + /// Both use relative_data_path which changes during rename, so we do it under share lock + cleaned_part_like += storage.clearOldTemporaryDirectories( + (*storage.getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds()); + } + + if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( + static_cast((*storage_settings)[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds]))) + { + cleaned_parts += storage.clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */ true); + cleaned_other += storage.clearOldMutations(); + cleaned_part_like += storage.clearEmptyParts(); + cleaned_part_like += storage.clearUnusedPatchParts(); + cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts(); + } + + constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time + Float32 cleaned_inserted_parts = static_cast(cleaned_parts) / parts_number_amplification; + return cleaned_inserted_parts + static_cast(cleaned_part_like) + static_cast(cleaned_other); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeCleanupThread.h b/src/Storages/MergeTree/MergeTreeCleanupThread.h new file mode 100644 index 000000000000..aa3ce0f2a684 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeCleanupThread.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class StorageMergeTree; + +class MergeTreeCleanupThread : public IMergeTreeCleanupThread +{ +public: + explicit MergeTreeCleanupThread(StorageMergeTree & storage_); + + /// Shadows IMergeTreeCleanupThread::start() to restart cleanup timers + /// before activating the background task. This ensures the thread waits + /// a full interval after the manual cleanup done in startup(). + void start(); + +private: + StorageMergeTree & storage; + + AtomicStopwatch time_after_previous_cleanup_parts; + AtomicStopwatch time_after_previous_cleanup_temporary_directories; + + /// Returns a number that is directly proportional to the number of cleaned up objects + Float32 iterate() override; +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index ae11b95ec511..50b123fb1a76 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -1,28 +1,21 @@ -#include -#include #include + +#include #include -#include -#include +#include -#include #include #include -#include - +#include namespace DB { namespace MergeTreeSetting { - extern const MergeTreeSettingsUInt64 cleanup_delay_period; - extern const MergeTreeSettingsUInt64 cleanup_delay_period_random_add; - extern const MergeTreeSettingsUInt64 cleanup_thread_preferred_points_per_iteration; extern const MergeTreeSettingsUInt64 finished_mutations_to_keep; extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations; - extern const MergeTreeSettingsUInt64 max_cleanup_delay_period; extern const MergeTreeSettingsUInt64 max_replicated_logs_to_keep; extern const MergeTreeSettingsUInt64 min_replicated_logs_to_keep; extern const MergeTreeSettingsUInt64 replicated_deduplication_window; @@ -41,132 +34,9 @@ namespace ErrorCodes ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) - : storage(storage_) - , log_name(storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeCleanupThread)") - , log(getLogger(log_name)) - , sleep_ms((*storage.getSettings())[MergeTreeSetting::cleanup_delay_period] * 1000) + : IMergeTreeCleanupThread(storage_) + , storage(storage_) { - task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ run(); }); -} - -void ReplicatedMergeTreeCleanupThread::run() -{ - if (cleanup_blocker.isCancelled()) - { - LOG_TRACE(LogFrequencyLimiter(log, 30), "Cleanup is cancelled, exiting"); - return; - } - - SCOPE_EXIT({ is_running.store(false, std::memory_order_relaxed); }); - is_running.store(true, std::memory_order_relaxed); - - auto storage_settings = storage.getSettings(); - - Float32 cleanup_points = 0; - try - { - cleanup_points = iterate(); - } - catch (const Coordination::Exception & e) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - - if (e.code == Coordination::Error::ZSESSIONEXPIRED) - return; - } - catch (...) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } - - UInt64 prev_timestamp = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); - UInt64 now_ms = clock_gettime_ns_adjusted(prev_timestamp * 1'000'000) / 1'000'000; - - /// Do not adjust sleep_ms on the first run after starting the server - if (prev_timestamp && (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) - { - /// We don't want to run the task too often when the table was barely changed and there's almost nothing to cleanup. - /// But we cannot simply sleep max_cleanup_delay_period (300s) when nothing was cleaned up and cleanup_delay_period (30s) - /// when we removed something, because inserting one part per 30s will lead to running cleanup each 30s just to remove one part. - /// So we need some interpolation based on preferred batch size. - auto expected_cleanup_points = (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]; - - /// How long should we sleep to remove cleanup_thread_preferred_points_per_iteration on the next iteration? - Float32 ratio = cleanup_points / expected_cleanup_points; - if (ratio == 0) - sleep_ms = (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000; - else - sleep_ms = static_cast(sleep_ms / ratio); - - sleep_ms = std::clamp(sleep_ms, (*storage_settings)[MergeTreeSetting::cleanup_delay_period] * 1000, (*storage_settings)[MergeTreeSetting::max_cleanup_delay_period] * 1000); - - UInt64 interval_ms = now_ms - prev_timestamp; - LOG_TRACE(log, "Scheduling next cleanup after {}ms (points: {}, interval: {}ms, ratio: {}, points per minute: {})", - sleep_ms, cleanup_points, interval_ms, ratio, cleanup_points / interval_ms * 60'000); - } - prev_cleanup_timestamp_ms.store(now_ms, std::memory_order_relaxed); - - sleep_ms += std::uniform_int_distribution(0, (*storage_settings)[MergeTreeSetting::cleanup_delay_period_random_add] * 1000)(rng); - task->scheduleAfter(sleep_ms); -} - -void ReplicatedMergeTreeCleanupThread::wakeupEarlierIfNeeded() -{ - /// It may happen that the tables was idle for a long time, but then a user started to aggressively insert (or mutate) data. - /// In this case, sleep_ms was set to the highest possible value, the task is not going to wake up soon, - /// but the number of objects to clean up is growing. We need to wakeup the task earlier. - auto storage_settings = storage.getSettings(); - if (!(*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration]) - return; - - /// The number of other objects (logs, blocks, etc) is usually correlated with the number of Outdated parts. - /// Do not wake up unless we have too many. - size_t number_of_outdated_objects = storage.getOutdatedPartsCount(); - if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) - return; - - /// A race condition is possible here, but it's okay - if (is_running.load(std::memory_order_relaxed)) - return; - - /// Do not re-check all parts too often (avoid constantly calling getNumberOfOutdatedPartsWithExpiredRemovalTime()) - if (!wakeup_check_timer.compareAndRestart((*storage_settings)[MergeTreeSetting::cleanup_delay_period] / 4.0)) - return; - - UInt64 prev_run_timestamp_ms = prev_cleanup_timestamp_ms.load(std::memory_order_relaxed); - UInt64 now_ms = clock_gettime_ns_adjusted(prev_run_timestamp_ms * 1'000'000) / 1'000'000; - if (!prev_run_timestamp_ms || now_ms <= prev_run_timestamp_ms) - return; - - /// Don't run it more often than cleanup_delay_period - UInt64 seconds_passed = (now_ms - prev_run_timestamp_ms) / 1000; - if (seconds_passed < (*storage_settings)[MergeTreeSetting::cleanup_delay_period]) - return; - - /// Do not count parts that cannot be removed anyway. Do not wake up unless we have too many. - number_of_outdated_objects = storage.getNumberOfOutdatedPartsWithExpiredRemovalTime(); - if (number_of_outdated_objects < (*storage_settings)[MergeTreeSetting::cleanup_thread_preferred_points_per_iteration] * 2) - return; - - LOG_TRACE(log, "Waking up cleanup thread because there are {} outdated objects and previous cleanup finished {}s ago", - number_of_outdated_objects, seconds_passed); - - wakeup(); -} - -void ReplicatedMergeTreeCleanupThread::start() -{ - task->activateAndSchedule(); -} - -void ReplicatedMergeTreeCleanupThread::wakeup() -{ - task->schedule(); -} - -void ReplicatedMergeTreeCleanupThread::stop() -{ - task->deactivate(); } Float32 ReplicatedMergeTreeCleanupThread::iterate() @@ -191,18 +61,25 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate() if (storage.is_leader) { cleaned_logs = clearOldLogs(); - size_t normal_blocks = clearOldBlocks("blocks", (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds], - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window], cached_block_stats_for_sync_inserts); - size_t async_blocks = clearOldBlocks("async_blocks", - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds_for_async_inserts], - (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts], - cached_block_stats_for_async_inserts); + auto zookeeper = storage.getZooKeeper(); + + size_t normal_blocks = clearOldBlocks(storage.zookeeper_path, "blocks", *zookeeper, + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds], + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window], + cached_block_stats_for_sync_inserts, + log); + + size_t async_blocks = clearOldBlocks(storage.zookeeper_path, "async_blocks", *zookeeper, + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_seconds_for_async_inserts], + (*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts], + cached_block_stats_for_async_inserts, + log); /// Many async blocks are transformed into one ordinary block Float32 async_blocks_per_block = static_cast((*storage_settings)[MergeTreeSetting::replicated_deduplication_window]) / - ((*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts] + 1); - cleaned_blocks = (normal_blocks + async_blocks * async_blocks_per_block) / 2; + static_cast((*storage_settings)[MergeTreeSetting::replicated_deduplication_window_for_async_inserts] + 1); + cleaned_blocks = (static_cast(normal_blocks) + static_cast(async_blocks) * async_blocks_per_block) / 2; cleaned_other += clearOldMutations(); cleaned_part_like += storage.clearEmptyParts(); @@ -222,8 +99,8 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate() /// many Outdated parts, and WALs usually contain many parts too). We count then as one part for simplicity. constexpr Float32 parts_number_amplification = 1.3f; /// Assuming we merge 4-5 parts each time - Float32 cleaned_inserted_parts = (cleaned_blocks + (cleaned_logs + cleaned_parts) / parts_number_amplification) / 3; - return cleaned_inserted_parts + cleaned_part_like + cleaned_other; + Float32 cleaned_inserted_parts = (cleaned_blocks + static_cast(cleaned_logs + cleaned_parts) / parts_number_amplification) / 3; + return cleaned_inserted_parts + static_cast(cleaned_part_like + cleaned_other); } @@ -460,22 +337,31 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat { String node; Int64 ctime = 0; + Int64 czxid = 0; Int32 version = 0; - NodeWithStat(String node_, Int64 ctime_, Int32 version_) : node(std::move(node_)), ctime(ctime_), version(version_) {} + NodeWithStat(String node_, Int64 ctime_, Int64 czxid_, Int32 version_) : node(std::move(node_)), ctime(ctime_), czxid(czxid_), version(version_) {} + /// Sort by (ctime, czxid) rather than (ctime, node_name) to ensure consistent ordering + /// across different deduplication directories that may contain entries created + /// by the same multi-op. static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs) { - return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node); + return std::forward_as_tuple(lhs.ctime, lhs.czxid) > std::forward_as_tuple(rhs.ctime, rhs.czxid); } }; -size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats) +size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + UInt64 window_seconds, + UInt64 window_size, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_) { - auto zookeeper = storage.getZooKeeper(); - std::vector timed_blocks; - getBlocksSortedByTime(blocks_dir_name, *zookeeper, timed_blocks, cached_block_stats); + getBlocksSortedByTime(zookeeper_path, blocks_dir_name, zookeeper, timed_blocks, cached_block_stats, log_); if (timed_blocks.empty()) return 0; @@ -487,7 +373,7 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di current_time - static_cast(1000 * window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted - NodeWithStat block_threshold{{}, time_threshold, 0}; + NodeWithStat block_threshold{{}, time_threshold, 0, 0}; size_t current_deduplication_window = std::min(timed_blocks.size(), window_size); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; @@ -500,15 +386,15 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di return 0; auto last_outdated_block = timed_blocks.end() - 1; - LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete, + LOG_TRACE(log_, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete, first_outdated_block->node, first_outdated_block->ctime, last_outdated_block->node, last_outdated_block->ctime); zkutil::AsyncResponses try_remove_futures; for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { - String path = storage.zookeeper_path + "/" + blocks_dir_name + "/" + it->node; - try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path, it->version)); + String path = zookeeper_path + "/" + blocks_dir_name + "/" + it->node; + try_remove_futures.emplace_back(path, zookeeper.asyncTryRemove(path, it->version)); } for (auto & pair : try_remove_futures) @@ -518,7 +404,7 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di if (rc == Coordination::Error::ZNOTEMPTY) { /// Can happen if there are leftover block nodes with children created by previous server versions. - zookeeper->removeRecursive(path); + zookeeper.removeRecursive(path); cached_block_stats.erase(first_outdated_block->node); } else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE || rc == Coordination::Error::ZBADVERSION) @@ -529,24 +415,30 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di } else { - LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, rc); + LOG_WARNING(log_, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, rc); } first_outdated_block++; } - LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); + LOG_TRACE(log_, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete); return num_nodes_to_delete; } -void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats) +void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + std::vector & timed_blocks, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_) { timed_blocks.clear(); Strings blocks; Coordination::Stat stat; - if (Coordination::Error::ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/" + blocks_dir_name, blocks, &stat)) - throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", storage.zookeeper_path, blocks_dir_name); + if (Coordination::Error::ZOK != zookeeper.tryGetChildren(zookeeper_path + "/" + blocks_dir_name, blocks, &stat)) + throw Exception(ErrorCodes::NOT_FOUND_NODE, "{}/{} doesn't exist", zookeeper_path, blocks_dir_name); /// Seems like this code is obsolete, because we delete blocks from cache /// when they are deleted from zookeeper. But we don't know about all (maybe future) places in code @@ -565,7 +457,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc auto not_cached_blocks = stat.numChildren - cached_block_stats.size(); if (not_cached_blocks) { - LOG_TRACE(log, "Checking {} {} ({} are not cached){}, path is {}", stat.numChildren, blocks_dir_name, not_cached_blocks, " to clear old ones from ZooKeeper.", storage.zookeeper_path + "/" + blocks_dir_name); + LOG_TRACE(log_, "Checking {} {} ({} are not cached) to clear old ones from ZooKeeper., path is {}/{}", stat.numChildren, blocks_dir_name, not_cached_blocks, zookeeper_path, blocks_dir_name); } std::vector exists_paths; @@ -575,13 +467,13 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc if (it == cached_block_stats.end()) { /// New block. Fetch its stat asynchronously. - exists_paths.emplace_back(storage.zookeeper_path + "/" + blocks_dir_name + "/" + block); + exists_paths.emplace_back(zookeeper_path + "/" + blocks_dir_name + "/" + block); } else { /// Cached block - const auto & ctime_and_version = it->second; - timed_blocks.emplace_back(block, ctime_and_version.first, ctime_and_version.second); + const auto & entry = it->second; + timed_blocks.emplace_back(block, entry.ctime, entry.czxid, entry.version); } } @@ -595,8 +487,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(const String & bloc if (status.error != Coordination::Error::ZNONODE) { auto node_name = fs::path(exists_paths[i]).filename(); - cached_block_stats.emplace(node_name, std::make_pair(status.stat.ctime, status.stat.version)); - timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.version); + cached_block_stats.emplace(node_name, NodeCacheEntry{status.stat.ctime, status.stat.czxid, status.stat.version}); + timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.czxid, status.stat.version); } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 268c9a08202c..92790dacbca9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -1,61 +1,48 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include +#include +#include -#include -#include -#include +namespace zkutil +{ + +class ZooKeeper; +using ZooKeeperPtr = std::shared_ptr; +} namespace DB { class StorageReplicatedMergeTree; - -/** Removes obsolete data from a table of type ReplicatedMergeTree. - */ -class ReplicatedMergeTreeCleanupThread +class ReplicatedMergeTreeCleanupThread : public IMergeTreeCleanupThread { public: explicit ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_); - void start(); - - void wakeup(); - - void stop(); - - void wakeupEarlierIfNeeded(); - - ActionLock getCleanupLock() { return cleanup_blocker.cancel(); } + struct NodeCacheEntry + { + Int64 ctime = 0; + Int64 czxid = 0; + Int32 version = 0; + }; + using NodeCTimeAndVersionCache = std::map; + /// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks + static size_t clearOldBlocks( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + UInt64 window_seconds, + UInt64 window_size, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_); private: StorageReplicatedMergeTree & storage; - String log_name; - LoggerPtr log; - BackgroundSchedulePoolTaskHolder task; - pcg64 rng{randomSeed()}; - - UInt64 sleep_ms; - - std::atomic prev_cleanup_timestamp_ms = 0; - std::atomic is_running = false; - - AtomicStopwatch wakeup_check_timer; - - ActionBlocker cleanup_blocker; - - void run(); /// Returns a number this is directly proportional to the number of cleaned up blocks - Float32 iterate(); + Float32 iterate() override; /// Remove old records from ZooKeeper. Returns the number of removed logs size_t clearOldLogs(); @@ -63,13 +50,11 @@ class ReplicatedMergeTreeCleanupThread /// The replica is marked as "lost" if it is inactive and its log pointer /// is far behind and we are not going to keep logs for it. /// Lost replicas will use different strategy for repair. - void markLostReplicas(const std::unordered_map & host_versions_lost_replicas, - const std::unordered_map & log_pointers_candidate_lost_replicas, - size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper); - - using NodeCTimeAndVersionCache = std::map>; - /// Remove old block hashes from ZooKeeper. This is done by the leader replica. Returns the number of removed blocks - size_t clearOldBlocks(const String & blocks_dir_name, UInt64 window_seconds, UInt64 window_size, NodeCTimeAndVersionCache & cached_block_stats); + void markLostReplicas( + const std::unordered_map & host_versions_lost_replicas, + const std::unordered_map & log_pointers_candidate_lost_replicas, + size_t replicas_count, + const zkutil::ZooKeeperPtr & zookeeper); /// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. Returns the number of removed mutations size_t clearOldMutations(); @@ -79,10 +64,15 @@ class ReplicatedMergeTreeCleanupThread struct NodeWithStat; /// Returns list of blocks (with their stat) sorted by ctime in descending order. - void getBlocksSortedByTime(const String & blocks_dir_name, zkutil::ZooKeeper & zookeeper, std::vector & timed_blocks, NodeCTimeAndVersionCache & cached_block_stats); + static void getBlocksSortedByTime( + const String & zookeeper_path, + const String & blocks_dir_name, + zkutil::ZooKeeper & zookeeper, + std::vector & timed_blocks, + NodeCTimeAndVersionCache & cached_block_stats, + LoggerPtr log_); /// TODO Removing old quorum/failed_parts }; - } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index acfe2905f91d..7e37c6f2845b 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -64,6 +64,11 @@ namespace DB namespace FailPoints { extern const char storage_merge_tree_background_clear_old_parts_pause[]; + extern const char smt_merge_selecting_task_pause_when_scheduled[]; + extern const char mt_select_parts_to_mutate_no_free_threads[]; + extern const char mt_select_parts_to_mutate_max_part_size[]; + extern const char storage_shared_merge_tree_mutate_pause_before_wait[]; + extern const char storage_merge_tree_background_schedule_merge_fail[]; } namespace Setting @@ -127,6 +132,7 @@ namespace ActionLocks extern const StorageActionBlockType PartsMerge; extern const StorageActionBlockType PartsTTLMerge; extern const StorageActionBlockType PartsMove; + extern const StorageActionBlockType Cleanup; } static MergeTreeTransactionPtr tryGetTransactionForMutation(const MergeTreeMutationEntry & mutation, LoggerPtr log = nullptr) @@ -166,6 +172,7 @@ StorageMergeTree::StorageMergeTree( , reader(*this) , writer(*this) , merger_mutator(*this) + , cleanup_thread(*this) { initializeDirectoriesAndFormatVersion(relative_data_path_, LoadingStrictnessLevel::ATTACH <= mode, date_column_name); @@ -203,11 +210,10 @@ void StorageMergeTree::startup() clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); /// NOTE background task will also do the above cleanups periodically. - time_after_previous_cleanup_parts.restart(); - time_after_previous_cleanup_temporary_directories.restart(); try { + cleanup_thread.start(); background_operations_assignee.start(); startBackgroundMovesIfNeeded(); startOutdatedAndUnexpectedDataPartsLoadingTask(); @@ -231,6 +237,23 @@ void StorageMergeTree::startup() } } +void StorageMergeTree::flushAndPrepareForShutdown() +{ + LOG_TRACE(log, "Start preparing for shutdown"); + + if (flush_called.exchange(true)) + return; + + merger_mutator.merges_blocker.cancelForever(); + parts_mover.moves_blocker.cancelForever(); + + background_operations_assignee.finish(); + background_moves_assignee.finish(); + + cleanup_thread.stop(); + + LOG_TRACE(log, "Finished preparing for shutdown"); +} void StorageMergeTree::shutdown(bool) { if (shutdown_called.exchange(true)) @@ -1505,6 +1528,9 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign assert(!isStaticStorage()); + FailPointInjection::pauseFailPoint(FailPoints::smt_merge_selecting_task_pause_when_scheduled); + + cleanup_thread.wakeupEarlierIfNeeded(); auto metadata_snapshot = getInMemoryMetadataPtr(); MergeMutateSelectedEntryPtr merge_entry; MergeMutateSelectedEntryPtr mutate_entry; @@ -1571,6 +1597,12 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign /// in MergePlainMergeTreeTask. So, this slot will never be freed. if (!scheduled && isTTLMergeType(merge_entry->future_part->merge_type)) getContext()->getMergeList().cancelMergeWithTTL(); + + fiu_do_on(FailPoints::storage_merge_tree_background_schedule_merge_fail, + { + scheduled = false; + }); + return scheduled; } if (mutate_entry) @@ -1592,39 +1624,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign mutation_wait_event.notify_all(); } - bool scheduled = false; - if (auto lock = time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred( - (*getSettings())[MergeTreeSetting::merge_tree_clear_old_temporary_directories_interval_seconds])) - { - assignee.scheduleCommonTask(std::make_shared( - [this, shared_lock] () - { - return clearOldTemporaryDirectories((*getSettings())[MergeTreeSetting::temporary_directories_lifetime].totalSeconds()); - }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); - scheduled = true; - } - - if (auto lock = time_after_previous_cleanup_parts.compareAndRestartDeferred( - (*getSettings())[MergeTreeSetting::merge_tree_clear_old_parts_interval_seconds])) - { - assignee.scheduleCommonTask(std::make_shared( - [this, shared_lock] () - { - /// All use relative_data_path which changes during rename - /// so execute under share lock. - size_t cleared_count = 0; - cleared_count += clearOldPartsFromFilesystem(/* force */ false, /* with_pause_point */true); - cleared_count += clearOldMutations(); - cleared_count += clearEmptyParts(); - cleared_count += clearUnusedPatchParts(); - cleared_count += unloadPrimaryKeysAndClearCachesOfOutdatedParts(); - return cleared_count; - /// TODO maybe take into account number of cleared objects when calculating backoff - }, common_assignee_trigger, getStorageID()), /* need_trigger */ false); - scheduled = true; - } - - return scheduled; + return false; } UInt64 StorageMergeTree::getCurrentMutationVersion(UInt64 data_version, std::unique_lock & /*currently_processing_in_background_mutex_lock*/) const @@ -2598,6 +2598,8 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) return merger_mutator.ttl_merges_blocker.cancel(); if (action_type == ActionLocks::PartsMove) return parts_mover.moves_blocker.cancel(); + if (action_type == ActionLocks::Cleanup) + return cleanup_thread.getCleanupLock(); return {}; } @@ -2608,6 +2610,8 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type) background_operations_assignee.trigger(); else if (action_type == ActionLocks::PartsMove) background_moves_assignee.trigger(); + else if (action_type == ActionLocks::Cleanup) + cleanup_thread.wakeup(); } IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList( diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 00cbc7acdad8..b293bc823a09 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -49,6 +50,7 @@ class StorageMergeTree final : public MergeTreeData std::unique_ptr settings_); void startup() override; + void flushAndPrepareForShutdown() override; void shutdown(bool is_drop) override; ~StorageMergeTree() override; @@ -131,16 +133,13 @@ class StorageMergeTree final : public MergeTreeData MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; + MergeTreeCleanupThread cleanup_thread; std::unique_ptr deduplication_log; /// For block numbers. SimpleIncrement increment; - /// For clearOldParts - AtomicStopwatch time_after_previous_cleanup_parts; - /// For clearOldTemporaryDirectories. - AtomicStopwatch time_after_previous_cleanup_temporary_directories; /// For clearOldBrokenDetachedParts AtomicStopwatch time_after_previous_cleanup_broken_detached_parts; @@ -304,6 +303,7 @@ class StorageMergeTree final : public MergeTreeData friend class MergeTreeData; friend class MergePlainMergeTreeTask; friend class MutatePlainMergeTreeTask; + friend class MergeTreeCleanupThread; struct DataValidationTasks : public IStorage::DataValidationTasksBase { diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index ebd68b70bf1b..9c1da7ef4a19 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -76,7 +76,12 @@ def test_merge_tree_load_parts(started_cluster): == "1\n" ) - node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1") + node1.query( + "ALTER TABLE mt_load_parts MODIFY SETTING " + "old_parts_lifetime = 1, cleanup_delay_period = 1, " + "cleanup_delay_period_random_add = 0, " + "cleanup_thread_preferred_points_per_iteration = 0" + ) node1.query("DETACH TABLE mt_load_parts") node1.query("ATTACH TABLE mt_load_parts") diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index bf28ae9b3ca2..ec0dd56f784a 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -95,6 +95,9 @@ def create_table(node, table_name, **additional_settings): "index_granularity": 512, "temporary_directories_lifetime": 1, "write_marks_for_substreams_in_compact_parts": 1, + "cleanup_delay_period": 1, + "cleanup_delay_period_random_add": 0, + "cleanup_thread_preferred_points_per_iteration": 0, } settings.update(additional_settings) diff --git a/tests/integration/test_s3_plain_rewritable/test.py b/tests/integration/test_s3_plain_rewritable/test.py index f02f20d954ff..8b1730a23d69 100644 --- a/tests/integration/test_s3_plain_rewritable/test.py +++ b/tests/integration/test_s3_plain_rewritable/test.py @@ -63,7 +63,7 @@ def create_insert(node, table_name, insert_values): ) ENGINE=MergeTree() PARTITION BY id % 10 ORDER BY id - SETTINGS storage_policy='{}' + SETTINGS storage_policy='{}', cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0 """.format( table_name, storage_policy ) diff --git a/tests/queries/0_stateless/00652_mergetree_mutations.sh b/tests/queries/0_stateless/00652_mergetree_mutations.sh index edb306d38831..dec4adb720cf 100755 --- a/tests/queries/0_stateless/00652_mergetree_mutations.sh +++ b/tests/queries/0_stateless/00652_mergetree_mutations.sh @@ -53,7 +53,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'" ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS mutations_cleaner" # Create a table with finished_mutations_to_keep = 2 -${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner(x UInt32) ENGINE MergeTree ORDER BY x SETTINGS finished_mutations_to_keep = 2" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE mutations_cleaner(x UInt32) ENGINE MergeTree ORDER BY x SETTINGS finished_mutations_to_keep = 2, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0" # Insert some data ${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (1), (2), (3), (4)" @@ -65,9 +65,9 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE mutations_cleaner DELETE WHERE x = 3" wait_for_mutation "mutations_cleaner" "mutation_4.txt" -# Sleep and then do an INSERT to wakeup the background task that will clean up the old mutations +# Sleep and then wakeup the background cleanup thread that will clean up the old mutations sleep 1 -${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)" +${CLICKHOUSE_CLIENT} --query="SYSTEM START CLEANUP mutations_cleaner" sleep 0.1 for i in {1..10} @@ -77,12 +77,12 @@ do break fi - if [[ $i -eq 100 ]]; then + if [[ $i -eq 10 ]]; then echo "Timed out while waiting for outdated mutation record to be deleted!" fi sleep 1 - ${CLICKHOUSE_CLIENT} --query="INSERT INTO mutations_cleaner(x) VALUES (4)" + ${CLICKHOUSE_CLIENT} --query="SYSTEM START CLEANUP mutations_cleaner" done # Check that the first mutation is cleaned diff --git a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh index b65e6019a2a6..2d21f60d55ef 100755 --- a/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh +++ b/tests/queries/0_stateless/01560_ttl_remove_empty_parts.sh @@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} -q 'DROP TABLE IF EXISTS ttl_empty_parts' ${CLICKHOUSE_CLIENT} -q ' - CREATE TABLE ttl_empty_parts (id UInt32, d Date) ENGINE = MergeTree ORDER BY tuple() PARTITION BY id SETTINGS old_parts_lifetime=5 + CREATE TABLE ttl_empty_parts (id UInt32, d Date) ENGINE = MergeTree ORDER BY tuple() PARTITION BY id SETTINGS old_parts_lifetime = 5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0 ' ${CLICKHOUSE_CLIENT} -q "INSERT INTO ttl_empty_parts SELECT 0, toDate('2005-01-01') + number from numbers(500);" diff --git a/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh b/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh index 1d94afa42949..aaa1d0cdafe7 100755 --- a/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh +++ b/tests/queries/0_stateless/02421_new_type_json_empty_parts.sh @@ -11,7 +11,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'Collapsing';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, s Int8, data JSON) ENGINE = CollapsingMergeTree(s) ORDER BY id SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, 1, '{\"k1\": \"aaa\"}') (1, -1, '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;" @@ -19,7 +19,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT DISTINCT arrayJoin(JSONAllPathsWithTypes(data)) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'DELETE all';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, data JSON) ENGINE = MergeTree ORDER BY id SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, '{\"k1\": \"aaa\"}') (1, '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM system.parts WHERE table = 't_json_empty_parts' AND database = currentDatabase() AND active;" @@ -32,7 +32,7 @@ ${CLICKHOUSE_CLIENT} -q "SELECT DISTINCT arrayJoin(JSONAllPathsWithTypes(data)) ${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_json_empty_parts;" ${CLICKHOUSE_CLIENT} -q "SELECT 'TTL';" -${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1 SETTINGS old_parts_lifetime=5;" --enable_json_type 1 +${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_json_empty_parts (id UInt64, d Date, data JSON) ENGINE = MergeTree ORDER BY id TTL d WHERE id % 2 = 1 SETTINGS old_parts_lifetime=5, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0;" --enable_json_type 1 ${CLICKHOUSE_CLIENT} -q "INSERT INTO t_json_empty_parts VALUES (1, '2000-01-01', '{\"k1\": \"aaa\"}') (2, '2000-01-01', '{\"k2\": \"bbb\"}');" ${CLICKHOUSE_CLIENT} -q "OPTIMIZE TABLE t_json_empty_parts FINAL;" ${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_json_empty_parts;" diff --git a/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh b/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh index 514092c40f54..e9d17d0bed3d 100755 --- a/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh +++ b/tests/queries/0_stateless/03008_s3_plain_rewritable_fault.sh @@ -64,7 +64,7 @@ SETTINGS endpoint = 'http://localhost:11111/test/03008_test_s3_mt_fault/', access_key_id = clickhouse, secret_access_key = clickhouse), - old_parts_lifetime = 1; + old_parts_lifetime = 1, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; " ${CLICKHOUSE_CLIENT} --query " diff --git a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh index f43fd6bc310c..edcdaf873ada 100755 --- a/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh +++ b/tests/queries/0_stateless/03198_unload_primary_key_outdated.sh @@ -9,7 +9,7 @@ $CLICKHOUSE_CLIENT " CREATE TABLE t_unload_primary_key (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a - SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0; + SETTINGS old_parts_lifetime = 10000, use_primary_key_cache = 0, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_unload_primary_key VALUES (1, 1); diff --git a/tests/queries/0_stateless/03305_mutations_counters.sh b/tests/queries/0_stateless/03305_mutations_counters.sh index 4b3bdd5f5e98..32b5d6ba49db 100755 --- a/tests/queries/0_stateless/03305_mutations_counters.sh +++ b/tests/queries/0_stateless/03305_mutations_counters.sh @@ -26,7 +26,7 @@ function wait_for_mutation_cleanup() $CLICKHOUSE_CLIENT --query " DROP TABLE IF EXISTS t_mutations_counters; - CREATE TABLE t_mutations_counters (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a; + CREATE TABLE t_mutations_counters (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_mutations_counters VALUES (1, 2) (2, 3); diff --git a/tests/queries/0_stateless/03305_rename_mutations_counter.sh b/tests/queries/0_stateless/03305_rename_mutations_counter.sh index 8f204453da46..cebc7fe55c5e 100755 --- a/tests/queries/0_stateless/03305_rename_mutations_counter.sh +++ b/tests/queries/0_stateless/03305_rename_mutations_counter.sh @@ -26,7 +26,7 @@ function wait_for_mutation_cleanup() $CLICKHOUSE_CLIENT --query " DROP TABLE IF EXISTS t_mutations_counters_rename; - CREATE TABLE t_mutations_counters_rename (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a; + CREATE TABLE t_mutations_counters_rename (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a SETTINGS cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO t_mutations_counters_rename VALUES (1, 2) (2, 3); diff --git a/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh b/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh index 5c1557d8e580..831122d43a4a 100755 --- a/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh +++ b/tests/queries/0_stateless/03357_replacing_min_age_cleanup.sh @@ -26,7 +26,7 @@ SET alter_sync = 2; DROP TABLE IF EXISTS replacing; -CREATE TABLE replacing (key int, value int, version int, deleted UInt8) ENGINE = ReplacingMergeTree(version, deleted) ORDER BY key SETTINGS merge_tree_clear_old_parts_interval_seconds = 1; +CREATE TABLE replacing (key int, value int, version int, deleted UInt8) ENGINE = ReplacingMergeTree(version, deleted) ORDER BY key SETTINGS merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; INSERT INTO replacing VALUES (1, 1, 1, 0), (1, 1, 2, 1); @@ -59,6 +59,9 @@ CREATE TABLE replacing2 (key int, value int, version int, deleted UInt8) ENGINE SETTINGS allow_experimental_replacing_merge_with_cleanup = true, enable_replacing_merge_with_cleanup_for_min_age_to_force_merge = true, merge_tree_clear_old_parts_interval_seconds = 1, + cleanup_delay_period = 1, + cleanup_delay_period_random_add = 0, + cleanup_thread_preferred_points_per_iteration = 0, number_of_free_entries_in_pool_to_execute_optimize_entire_partition = 1, min_age_to_force_merge_on_partition_only = true, min_age_to_force_merge_seconds = 1, diff --git a/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference new file mode 100644 index 000000000000..f188ceda5ef4 --- /dev/null +++ b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.reference @@ -0,0 +1 @@ +OK: parts count reached 1 diff --git a/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh new file mode 100755 index 000000000000..a6509ef22390 --- /dev/null +++ b/tests/queries/0_stateless/03742_clean_up_thread_for_merge_tree.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# Tag no-parallel: Fails due to failpoint intersection + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +on_exit() { + $CLICKHOUSE_CLIENT --query "SYSTEM DISABLE FAILPOINT storage_merge_tree_background_schedule_merge_fail;" +} + +trap on_exit EXIT + +# Prepare +$CLICKHOUSE_CLIENT --query " + DROP TABLE IF EXISTS m; + + CREATE TABLE m ( + i Int32 + ) ENGINE = MergeTree + ORDER BY i + SETTINGS old_parts_lifetime = 1, merge_tree_clear_old_parts_interval_seconds = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, cleanup_thread_preferred_points_per_iteration = 0; + + SYSTEM ENABLE FAILPOINT storage_merge_tree_background_schedule_merge_fail; + + INSERT INTO m VALUES (1); + INSERT INTO m VALUES (2); + + OPTIMIZE TABLE m FINAL; +" + +function parts_count() { + $CLICKHOUSE_CLIENT --query "SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'm';" +} + +# Wait up to 60 seconds until count = 1 +ok=0 +for _ in $(seq 1 60); do + CNT=$(parts_count) + if [ "$CNT" -eq 1 ]; then + ok=1 + break + fi + + sleep 1 +done + +if [ "$ok" -eq 1 ]; then + echo "OK: parts count reached 1" +else + echo "FAIL: parts count never reached 1 within 60 seconds" +fi + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS m;" + +$CLICKHOUSE_CLIENT --query " + DROP TABLE IF EXISTS m; +" From 0020551fc77423a88738c7a1ebd226fa7ae5279e Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 17 Apr 2026 14:44:01 +0000 Subject: [PATCH 2/5] 91574: make integration test reenterable --- tests/integration/test_merge_tree_load_parts/test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index 9c1da7ef4a19..494f4236e28e 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -42,6 +42,7 @@ def started_cluster(): def test_merge_tree_load_parts(started_cluster): node1.query( """ + DROP TABLE IF EXISTS mt_load_parts SYNC; CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String) ENGINE = MergeTree ORDER BY id PARTITION BY pk""" ) @@ -118,6 +119,7 @@ def test_merge_tree_load_parts_corrupted(started_cluster): for i, node in enumerate([node1, node2]): node.query( f""" + DROP TABLE IF EXISTS mt_load_parts_2 SYNC; CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk""" ) @@ -237,6 +239,7 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster): node3.query( """ + DROP TABLE IF EXISTS mt_load_parts SYNC; CREATE TABLE mt_load_parts (id UInt32) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity_bytes = 0""" From 44efe8ab1486b265d3a055ba4d8a053b36455d27 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 13 May 2026 21:03:54 +0000 Subject: [PATCH 3/5] Call clearCaches --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.h | 2 +- src/Storages/MergeTree/MergeTreeData.cpp | 12 ++++++++++++ src/Storages/StorageMergeTree.cpp | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index bdffdf4f2d26..6244bf457ae3 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -661,7 +661,7 @@ bool IMergeTreeDataPart::isMovingPart() const return part_directory_path.parent_path().filename() == "moving"; } -void IMergeTreeDataPart::clearCaches() +void IMergeTreeDataPart::clearCaches() const { if (cleared_data_in_caches.exchange(true) || is_duplicate) return; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index dfb086929172..53d3a4ca208e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -177,7 +177,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_thisclearCaches(); + for (const auto & [_, proj_part] : (*it_to_delete)->getProjectionParts()) + proj_part->clearCaches(); } } @@ -3723,6 +3729,9 @@ void MergeTreeData::dropAllData() continue; } modifyPartState(it, DataPartState::Deleting); + (*it)->clearCaches(); + for (const auto & [_, proj_part] : (*it)->getProjectionParts()) + proj_part->clearCaches(); all_parts.push_back(*it); } if (skipped_parts > 0) @@ -5632,6 +5641,9 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPar } modifyPartState(original_active_part, DataPartState::DeleteOnDestroy); + original_active_part->clearCaches(); + for (const auto & [_, proj_part] : original_active_part->getProjectionParts()) + proj_part->clearCaches(); LOG_TEST(log, "swapActivePart: removing {} from data_parts_indexes", (*active_part_it)->getNameWithState()); data_parts_indexes.erase(active_part_it); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 7e37c6f2845b..dd2cc263059d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -275,6 +275,7 @@ void StorageMergeTree::shutdown(bool) background_operations_assignee.finish(); background_moves_assignee.finish(); + cleanup_thread.stop(); if (deduplication_log) deduplication_log->shutdown(); From 14a50d4dd2810d657a95a0d18f606edb71fc4b63 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 27 Sep 2025 13:17:52 +0200 Subject: [PATCH 4/5] tests: make test test_merge_tree_load_parts retriable --- .../helpers/corrupt_part_data_on_disk.py | 2 + .../test_merge_tree_load_parts/test.py | 144 +++++++----------- 2 files changed, 60 insertions(+), 86 deletions(-) diff --git a/tests/integration/helpers/corrupt_part_data_on_disk.py b/tests/integration/helpers/corrupt_part_data_on_disk.py index a84a6e825e67..595c29b03dbb 100644 --- a/tests/integration/helpers/corrupt_part_data_on_disk.py +++ b/tests/integration/helpers/corrupt_part_data_on_disk.py @@ -1,4 +1,5 @@ def corrupt_part_data_on_disk(node, table, part_name, file_ext=".bin", database=None): + assert part_name part_path = node.query( "SELECT path FROM system.parts WHERE table = '{}' and name = '{}' {}".format( table, @@ -6,6 +7,7 @@ def corrupt_part_data_on_disk(node, table, part_name, file_ext=".bin", database= f"AND database = '{database}'" if database is not None else "", ) ).strip() + assert part_path corrupt_part_data_by_path(node, part_path, file_ext) diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index 494f4236e28e..24ad9fed2023 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -1,4 +1,6 @@ import time +import random +import string import pytest @@ -39,58 +41,48 @@ def started_cluster(): cluster.shutdown() +def random_string(length): + return "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) + + def test_merge_tree_load_parts(started_cluster): + table = f"mt_load_parts_{random_string(6)}" node1.query( - """ - DROP TABLE IF EXISTS mt_load_parts SYNC; - CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String) + f""" + CREATE TABLE {table} (pk UInt32, id UInt32, s String) ENGINE = MergeTree ORDER BY id PARTITION BY pk""" ) - node1.query("SYSTEM STOP MERGES mt_load_parts") + node1.query(f"SYSTEM STOP MERGES {table}") for i in range(20): - node1.query( - f"INSERT INTO mt_load_parts VALUES (44, {i}, randomPrintableASCII(10))" - ) + node1.query(f"INSERT INTO {table} VALUES (44, {i}, randomPrintableASCII(10))") node1.restart_clickhouse(kill=True) for i in range(1, 21): assert node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0") - node1.query("OPTIMIZE TABLE mt_load_parts FINAL") + node1.query(f"OPTIMIZE TABLE {table} FINAL") node1.restart_clickhouse(kill=True) - node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts") + node1.query(f"SYSTEM WAIT LOADING PARTS {table}") assert node1.contains_in_log("Loading Active part 44_1_20") for i in range(1, 21): assert not node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0") assert node1.contains_in_log(f"Loading Outdated part 44_{i}_{i}_0") - assert node1.query("SELECT count() FROM mt_load_parts") == "20\n" + assert node1.query(f"SELECT count() FROM {table}") == "20\n" - assert ( - node1.query( - "SELECT count() FROM system.parts WHERE table = 'mt_load_parts' AND active" - ) - == "1\n" - ) + assert node1.query(f"SELECT count() FROM system.parts WHERE table = '{table}' AND active") == "1\n" - node1.query( - "ALTER TABLE mt_load_parts MODIFY SETTING " - "old_parts_lifetime = 1, cleanup_delay_period = 1, " - "cleanup_delay_period_random_add = 0, " - "cleanup_thread_preferred_points_per_iteration = 0" - ) - node1.query("DETACH TABLE mt_load_parts") - node1.query("ATTACH TABLE mt_load_parts") + node1.query(f"ALTER TABLE {table} MODIFY SETTING old_parts_lifetime = 1, cleanup_delay_period=1, cleanup_delay_period_random_add=0, cleanup_thread_preferred_points_per_iteration=0;") + node1.query(f"DETACH TABLE {table}") + node1.query(f"ATTACH TABLE {table}") - node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts") + node1.query(f"SYSTEM WAIT LOADING PARTS {table}") - table_path = node1.query( - "SELECT data_paths[1] FROM system.tables WHERE table = 'mt_load_parts'" - ).strip() + table_path = node1.query(f"SELECT data_paths[1] FROM system.tables WHERE table = '{table}'").strip() part_dirs = node1.exec_in_container(["bash", "-c", f"ls {table_path}"], user="root") @@ -116,52 +108,43 @@ def test_merge_tree_load_parts(started_cluster): def test_merge_tree_load_parts_corrupted(started_cluster): + table = f"mt_load_parts_2_{random_string(6)}" + for i, node in enumerate([node1, node2]): node.query( f""" - DROP TABLE IF EXISTS mt_load_parts_2 SYNC; - CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String) - ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk""" + CREATE TABLE {table} (pk UInt32, id UInt32, s String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/{table}', '{i}') ORDER BY id PARTITION BY pk""" ) """min-max blocks in created parts: 1_1_0, 2_2_0, 1_2_1, 3_3_0, 1_3_2""" for partition in [111, 222, 333]: - node1.query( - f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))" - ) - - node1.query( - f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))" - ) - - node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL") - - node1.query( - f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 2, randomPrintableASCII(10))" - ) - - node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL") + node1.query(f"INSERT INTO {table} VALUES ({partition}, 0, randomPrintableASCII(10))") + node1.query(f"INSERT INTO {table} VALUES ({partition}, 1, randomPrintableASCII(10))") + node1.query(f"OPTIMIZE TABLE {table} PARTITION {partition} FINAL") + node1.query(f"INSERT INTO {table} VALUES ({partition}, 2, randomPrintableASCII(10))") + node1.query(f"OPTIMIZE TABLE {table} PARTITION {partition} FINAL") - node2.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) + node2.query(f"SYSTEM SYNC REPLICA {table}", timeout=30) def get_part_name(node, partition, min_block, max_block): return node.query( f""" SELECT name FROM system.parts - WHERE table = 'mt_load_parts_2' + WHERE table = '{table}' AND partition = '{partition}' AND min_block_number = {min_block} AND max_block_number = {max_block}""" ).strip() - corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 111, 0, 2)) - corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 2)) - corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 1)) - corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 0, 1)) - corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 2, 2)) + corrupt_part_data_on_disk(node1, table, get_part_name(node1, 111, 0, 2)) + corrupt_part_data_on_disk(node1, table, get_part_name(node1, 222, 0, 2)) + corrupt_part_data_on_disk(node1, table, get_part_name(node1, 222, 0, 1)) + corrupt_part_data_on_disk(node1, table, get_part_name(node1, 333, 0, 1)) + corrupt_part_data_on_disk(node1, table, get_part_name(node1, 333, 2, 2)) node1.restart_clickhouse(kill=True) - node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts_2") + node1.query(f"SYSTEM WAIT LOADING PARTS {table}") def check_parts_loading(node, partition, loaded, failed, skipped): # The whole test produces around 6-700 lines, so 2k is plenty enough. @@ -169,13 +152,8 @@ def check_parts_loading(node, partition, loaded, failed, skipped): look_behind_lines = 2000 for min_block, max_block in loaded: part_name = f"{partition}_{min_block}_{max_block}" - assert node.wait_for_log_line( - f"Loading Active part {part_name}", look_behind_lines=look_behind_lines - ) - assert node.wait_for_log_line( - f"Finished loading Active part {part_name}", - look_behind_lines=look_behind_lines, - ) + assert node.wait_for_log_line(f"{table}.*Loading Active part {part_name}") + assert node.wait_for_log_line(f"{table}.*Finished loading Active part {part_name}") failed_part_names = [] # Let's wait until there is some information about all expected parts, and only @@ -183,19 +161,15 @@ def check_parts_loading(node, partition, loaded, failed, skipped): for min_block, max_block in failed: part_name = f"{partition}_{min_block}_{max_block}" failed_part_names.append(part_name) - assert node.wait_for_log_line( - f"Loading Active part {part_name}", look_behind_lines=look_behind_lines - ) + assert node.wait_for_log_line(f"{table}.*Loading Active part {part_name}") for failed_part_name in failed_part_names: - assert not node.contains_in_log( - f"Finished loading Active part {failed_part_name}" - ) + assert not node.contains_in_log(f"{table}.*Finished loading Active part {failed_part_name}") for min_block, max_block in skipped: part_name = f"{partition}_{min_block}_{max_block}" - assert not node.contains_in_log(f"Loading Active part {part_name}") - assert not node.contains_in_log(f"Finished loading Active part {part_name}") + assert not node.contains_in_log(f"{table}.*Loading Active part {part_name}") + assert not node.contains_in_log(f"{table}.*Finished loading Active part {part_name}") check_parts_loading( node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)] @@ -207,23 +181,23 @@ def check_parts_loading(node, partition, loaded, failed, skipped): node1, 333, loaded=[(0, 2)], failed=[], skipped=[(0, 0), (1, 1), (2, 2), (0, 1)] ) - node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) - node1.query("OPTIMIZE TABLE mt_load_parts_2 FINAL") - node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) + node1.query(f"SYSTEM SYNC REPLICA {table}", timeout=30) + node1.query(f"OPTIMIZE TABLE {table} FINAL") + node1.query(f"SYSTEM SYNC REPLICA {table}", timeout=30) assert ( node1.query( - """ - SELECT pk, count() FROM mt_load_parts_2 + f""" + SELECT pk, count() FROM {table} GROUP BY pk ORDER BY pk""" ) == "111\t3\n222\t3\n333\t3\n" ) assert ( node1.query( - """ + f""" SELECT partition, count() - FROM system.parts WHERE table = 'mt_load_parts_2' AND active + FROM system.parts WHERE table = '{table}' AND active GROUP BY partition ORDER BY partition""" ) == "111\t1\n222\t1\n333\t1\n" @@ -231,6 +205,7 @@ def check_parts_loading(node, partition, loaded, failed, skipped): def test_merge_tree_load_parts_filesystem_error(started_cluster): + table = f"mt_load_parts_fs_error_{random_string(6)}" if node3.is_built_with_sanitizer() or node3.is_debug_build(): pytest.skip( "Skip with debug build and sanitizers. \ @@ -238,17 +213,16 @@ def test_merge_tree_load_parts_filesystem_error(started_cluster): ) node3.query( - """ - DROP TABLE IF EXISTS mt_load_parts SYNC; - CREATE TABLE mt_load_parts (id UInt32) + f""" + CREATE TABLE {table} (id UInt32) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity_bytes = 0""" ) - node3.query("SYSTEM STOP MERGES mt_load_parts") + node3.query(f"SYSTEM STOP MERGES {table}") for i in range(2): - node3.query(f"INSERT INTO mt_load_parts VALUES ({i})") + node3.query(f"INSERT INTO {table} VALUES ({i})") # We want to somehow check that exception thrown on part creation is handled during part loading. # It can be a filesystem exception triggered at initialization of part storage but it hard @@ -271,13 +245,11 @@ def corrupt_part(table, part_name): privileged=True, ) - corrupt_part("mt_load_parts", "all_1_1_0") + corrupt_part(table, "all_1_1_0") node3.restart_clickhouse(kill=True) - assert node3.query("SELECT * FROM mt_load_parts") == "1\n" + assert node3.query(f"SELECT * FROM {table}") == "1\n" assert ( - node3.query( - "SELECT name FROM system.detached_parts WHERE table = 'mt_load_parts'" - ) + node3.query(f"SELECT name FROM system.detached_parts WHERE table = '{table}'") == "broken-on-start_all_1_1_0\n" ) From 62f54b9630852f86bbc45b978e6b4de480293ffb Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 15 May 2026 14:58:56 +0000 Subject: [PATCH 5/5] look_behind_lines in test_merge_tree_load_parts --- .../test_merge_tree_load_parts/test.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index 24ad9fed2023..a7d74b209a1d 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -152,8 +152,14 @@ def check_parts_loading(node, partition, loaded, failed, skipped): look_behind_lines = 2000 for min_block, max_block in loaded: part_name = f"{partition}_{min_block}_{max_block}" - assert node.wait_for_log_line(f"{table}.*Loading Active part {part_name}") - assert node.wait_for_log_line(f"{table}.*Finished loading Active part {part_name}") + assert node.wait_for_log_line( + f"{table}.*Loading Active part {part_name}", + look_behind_lines=look_behind_lines, + ) + assert node.wait_for_log_line( + f"{table}.*Finished loading Active part {part_name}", + look_behind_lines=look_behind_lines, + ) failed_part_names = [] # Let's wait until there is some information about all expected parts, and only @@ -161,7 +167,10 @@ def check_parts_loading(node, partition, loaded, failed, skipped): for min_block, max_block in failed: part_name = f"{partition}_{min_block}_{max_block}" failed_part_names.append(part_name) - assert node.wait_for_log_line(f"{table}.*Loading Active part {part_name}") + assert node.wait_for_log_line( + f"{table}.*Loading Active part {part_name}", + look_behind_lines=look_behind_lines, + ) for failed_part_name in failed_part_names: assert not node.contains_in_log(f"{table}.*Finished loading Active part {failed_part_name}")