diff --git a/src/Storages/MergeTree/Compaction/MergePredicates/DistributedMergePredicate.h b/src/Storages/MergeTree/Compaction/MergePredicates/DistributedMergePredicate.h index 6bf691957a5a..b017ab03e481 100644 --- a/src/Storages/MergeTree/Compaction/MergePredicates/DistributedMergePredicate.h +++ b/src/Storages/MergeTree/Compaction/MergePredicates/DistributedMergePredicate.h @@ -76,6 +76,9 @@ class DistributedMergePredicate : public IMergePredicate if (left.info.partition_id != right.info.partition_id) return std::unexpected(PreformattedMessage::create("Parts {} and {} belong to different partitions", left.name, right.name)); + if (left.is_in_volume_where_merges_avoid || right.is_in_volume_where_merges_avoid) + return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) lies on volume where merges should be avoided", left.name, right.name)); + int64_t left_max_block = left.info.max_block; int64_t right_min_block = right.info.min_block; chassert(left_max_block < right_min_block); diff --git a/src/Storages/MergeTree/Compaction/MergePredicates/MergeTreeMergePredicate.cpp b/src/Storages/MergeTree/Compaction/MergePredicates/MergeTreeMergePredicate.cpp index 7741e09c3903..58fbdce80039 100644 --- a/src/Storages/MergeTree/Compaction/MergePredicates/MergeTreeMergePredicate.cpp +++ b/src/Storages/MergeTree/Compaction/MergePredicates/MergeTreeMergePredicate.cpp @@ -16,6 +16,9 @@ std::expected MergeTreeMergePredicate::canMergeParts( if (left.info.partition_id != right.info.partition_id) return std::unexpected(PreformattedMessage::create("Parts {} and {} belong to different partitions", left.name, right.name)); + if (left.is_in_volume_where_merges_avoid || right.is_in_volume_where_merges_avoid) + return std::unexpected(PreformattedMessage::create("One of parts ({}, {}) lies on volume where merges should be avoided", left.name, right.name)); + if (left.projection_names != right.projection_names) return std::unexpected(PreformattedMessage::create( "Parts have different projection sets: {{}} in {} and {{}} in {}", diff --git a/src/Storages/MergeTree/Compaction/MergeSelectors/TTLMergeSelector.cpp b/src/Storages/MergeTree/Compaction/MergeSelectors/TTLMergeSelector.cpp index 743042e8e455..af8e91f66b0a 100644 --- a/src/Storages/MergeTree/Compaction/MergeSelectors/TTLMergeSelector.cpp +++ b/src/Storages/MergeTree/Compaction/MergeSelectors/TTLMergeSelector.cpp @@ -147,6 +147,9 @@ time_t TTLRowDeleteMergeSelector::getTTLForPart(const PartProperties & part) con bool TTLRowDeleteMergeSelector::canConsiderPart(const PartProperties & part) const { + if (part.is_in_volume_where_merges_avoid) + return false; + if (!part.general_ttl_info.has_value()) return false; @@ -165,6 +168,9 @@ time_t TTLRecompressMergeSelector::getTTLForPart(const PartProperties & part) co bool TTLRecompressMergeSelector::canConsiderPart(const PartProperties & part) const { + if (part.is_in_volume_where_merges_avoid) + return false; + if (!part.recompression_ttl_info.has_value()) return false; diff --git a/src/Storages/MergeTree/Compaction/PartProperties.cpp b/src/Storages/MergeTree/Compaction/PartProperties.cpp index 671dc6956a8d..a3c06b102231 100644 --- a/src/Storages/MergeTree/Compaction/PartProperties.cpp +++ b/src/Storages/MergeTree/Compaction/PartProperties.cpp @@ -66,6 +66,7 @@ std::set getCalculatedProjectionNames(const MergeTreeDataPartPtr & PartProperties buildPartProperties( const MergeTreeDataPartPtr & part, const StorageMetadataPtr & metadata_snapshot, + const StoragePolicyPtr & storage_policy, time_t current_time) { return PartProperties{ @@ -73,6 +74,7 @@ PartProperties buildPartProperties( .info = part->info, .projection_names = getCalculatedProjectionNames(part), .all_ttl_calculated_if_any = part->checkAllTTLCalculated(metadata_snapshot), + .is_in_volume_where_merges_avoid = !part->shallParticipateInMerges(storage_policy), .size = part->getExistingBytesOnDisk(), .age = current_time - part->modification_time, .general_ttl_info = buildGeneralTTLInfo(metadata_snapshot, part), diff --git a/src/Storages/MergeTree/Compaction/PartProperties.h b/src/Storages/MergeTree/Compaction/PartProperties.h index 9314debf8837..70530bcc6b0d 100644 --- a/src/Storages/MergeTree/Compaction/PartProperties.h +++ b/src/Storages/MergeTree/Compaction/PartProperties.h @@ -2,6 +2,8 @@ #include +#include + #include #include @@ -18,6 +20,7 @@ struct PartProperties const std::set projection_names = {}; const bool all_ttl_calculated_if_any = false; + const bool is_in_volume_where_merges_avoid = false; /// Size of data part in bytes. const size_t size = 0; @@ -50,6 +53,7 @@ using PartsRangeView = std::span; PartProperties buildPartProperties( const MergeTreeDataPartPtr & part, const StorageMetadataPtr & metadata_snapshot, + const StoragePolicyPtr & storage_policy, time_t current_time); } diff --git a/src/Storages/MergeTree/Compaction/PartsCollectors/Common.cpp b/src/Storages/MergeTree/Compaction/PartsCollectors/Common.cpp index 07cf85555713..aff33e6b982c 100644 --- a/src/Storages/MergeTree/Compaction/PartsCollectors/Common.cpp +++ b/src/Storages/MergeTree/Compaction/PartsCollectors/Common.cpp @@ -6,7 +6,10 @@ namespace DB { PartsRanges constructPartsRanges( - std::vector && ranges, const StorageMetadataPtr & metadata_snapshot, const time_t & current_time) + std::vector && ranges, + const StorageMetadataPtr & metadata_snapshot, + const StoragePolicyPtr & storage_policy, + const time_t & current_time) { PartsRanges properties_ranges; properties_ranges.reserve(ranges.size()); @@ -17,7 +20,7 @@ PartsRanges constructPartsRanges( properties_ranges.reserve(range.size()); for (const auto & part : range) - properties_range.push_back(buildPartProperties(part, metadata_snapshot, current_time)); + properties_range.push_back(buildPartProperties(part, metadata_snapshot, storage_policy, current_time)); properties_ranges.push_back(std::move(properties_range)); } diff --git a/src/Storages/MergeTree/Compaction/PartsCollectors/Common.h b/src/Storages/MergeTree/Compaction/PartsCollectors/Common.h index f13cef98a62a..a147394d0f83 100644 --- a/src/Storages/MergeTree/Compaction/PartsCollectors/Common.h +++ b/src/Storages/MergeTree/Compaction/PartsCollectors/Common.h @@ -55,7 +55,10 @@ std::expected checkAllPartsSatisfyPredicate(const std } PartsRanges constructPartsRanges( - std::vector && ranges, const StorageMetadataPtr & metadata_snapshot, const time_t & current_time); + std::vector && ranges, + const StorageMetadataPtr & metadata_snapshot, + const StoragePolicyPtr & storage_policy, + const time_t & current_time); MergeTreeDataPartsVector filterByPartitions( MergeTreeDataPartsVector && parts, const std::optional & partitions_to_keep); diff --git a/src/Storages/MergeTree/Compaction/PartsCollectors/MergeTreePartsCollector.cpp b/src/Storages/MergeTree/Compaction/PartsCollectors/MergeTreePartsCollector.cpp index d31d85b764cb..14f6ca7da90a 100644 --- a/src/Storages/MergeTree/Compaction/PartsCollectors/MergeTreePartsCollector.cpp +++ b/src/Storages/MergeTree/Compaction/PartsCollectors/MergeTreePartsCollector.cpp @@ -75,9 +75,7 @@ MergeTreeDataPartsVector collectInitial(const MergeTreeData & data, const MergeT auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, const MergeTreeTransactionPtr & tx, const MergeTreeMergePredicatePtr & merge_pred) { - bool has_volumes_with_disabled_merges = storage_policy->hasAnyVolumeWithDisabledMerges(); - - auto predicate = [storage_policy, tx, merge_pred, has_volumes_with_disabled_merges](const MergeTreeDataPartPtr & part) -> std::expected + auto predicate = [storage_policy, tx, merge_pred](const MergeTreeDataPartPtr & part) -> std::expected { if (tx) { @@ -91,9 +89,6 @@ auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, co return std::unexpected(PreformattedMessage::create("Part {} is locked for removal", part->name)); } - if (has_volumes_with_disabled_merges && !part->shallParticipateInMerges(storage_policy)) - return std::unexpected(PreformattedMessage::create("Merges for part's {} volume are disabled", part->name)); - chassert(merge_pred); return merge_pred->canUsePartInMerges(part); }; @@ -133,7 +128,7 @@ PartsRanges MergeTreePartsCollector::grabAllPossibleRanges( { auto parts = filterByPartitions(collectInitial(storage, tx), partitions_hint); auto ranges = splitPartsByPreconditions(std::move(parts), storage_policy, tx, merge_pred, series_log); - return constructPartsRanges(std::move(ranges), metadata_snapshot, current_time); + return constructPartsRanges(std::move(ranges), metadata_snapshot, storage_policy, current_time); } std::expected MergeTreePartsCollector::grabAllPartsInsidePartition( @@ -146,7 +141,7 @@ std::expected MergeTreePartsCollector::grabAllP if (auto result = checkAllParts(parts, storage_policy, tx, merge_pred); !result) return std::unexpected(std::move(result.error())); - auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, current_time); + auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, storage_policy, current_time); chassert(ranges.size() == 1); return std::move(ranges.front()); diff --git a/src/Storages/MergeTree/Compaction/PartsCollectors/ReplicatedMergeTreePartsCollector.cpp b/src/Storages/MergeTree/Compaction/PartsCollectors/ReplicatedMergeTreePartsCollector.cpp index 30f5f4dbacbf..727f2569cb7b 100644 --- a/src/Storages/MergeTree/Compaction/PartsCollectors/ReplicatedMergeTreePartsCollector.cpp +++ b/src/Storages/MergeTree/Compaction/PartsCollectors/ReplicatedMergeTreePartsCollector.cpp @@ -15,14 +15,8 @@ MergeTreeDataPartsVector collectInitial(const MergeTreeData & data) auto constructPreconditionsPredicate(const StoragePolicyPtr & storage_policy, const ReplicatedMergeTreeMergePredicatePtr & merge_pred) { - bool has_volumes_with_disabled_merges = storage_policy->hasAnyVolumeWithDisabledMerges(); - - auto predicate = [storage_policy, merge_pred, has_volumes_with_disabled_merges](const MergeTreeDataPartPtr & part) -> std::expected + auto predicate = [storage_policy, merge_pred](const MergeTreeDataPartPtr & part) -> std::expected { - if (has_volumes_with_disabled_merges && !part->shallParticipateInMerges(storage_policy)) - return std::unexpected(PreformattedMessage::create("Merges for part's {} volume are disabled", part->name)); - - chassert(merge_pred); return merge_pred->canUsePartInMerges(part); }; @@ -60,7 +54,7 @@ PartsRanges ReplicatedMergeTreePartsCollector::grabAllPossibleRanges( { auto parts = filterByPartitions(collectInitial(storage), partitions_hint); auto ranges = splitPartsByPreconditions(std::move(parts), storage_policy, merge_pred, series_log); - return constructPartsRanges(std::move(ranges), metadata_snapshot, current_time); + return constructPartsRanges(std::move(ranges), metadata_snapshot, storage_policy, current_time); } std::expected ReplicatedMergeTreePartsCollector::grabAllPartsInsidePartition( @@ -73,7 +67,7 @@ std::expected ReplicatedMergeTreePartsCollector if (auto result = checkAllParts(parts, storage_policy, merge_pred); !result) return std::unexpected(std::move(result.error())); - auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, current_time); + auto ranges = constructPartsRanges({std::move(parts)}, metadata_snapshot, storage_policy, current_time); chassert(ranges.size() == 1); return std::move(ranges.front()); diff --git a/tests/integration/test_filesystem_cache/config.d/filesystem_caches_path.xml b/tests/integration/test_filesystem_cache/config.d/filesystem_caches_path.xml new file mode 100644 index 000000000000..f5b70dd78d5d --- /dev/null +++ b/tests/integration/test_filesystem_cache/config.d/filesystem_caches_path.xml @@ -0,0 +1,3 @@ + + /var/log/clickhouse/fs-cache/ + diff --git a/tests/integration/test_no_merges_volume_ttl/__init__.py b/tests/integration/test_no_merges_volume_ttl/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_no_merges_volume_ttl/configs/storage_config.xml b/tests/integration/test_no_merges_volume_ttl/configs/storage_config.xml new file mode 100644 index 000000000000..f6add9b5da63 --- /dev/null +++ b/tests/integration/test_no_merges_volume_ttl/configs/storage_config.xml @@ -0,0 +1,25 @@ + + + + + /with_merges/ + + + /no_merges/ + + + + + + + with_merges + + + no_merges + true + + + + + + diff --git a/tests/integration/test_no_merges_volume_ttl/test.py b/tests/integration/test_no_merges_volume_ttl/test.py new file mode 100644 index 000000000000..84463352e372 --- /dev/null +++ b/tests/integration/test_no_merges_volume_ttl/test.py @@ -0,0 +1,80 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance( + "node", + main_configs=["configs/storage_config.xml"], + tmpfs=["/with_merges:size=200M", "/no_merges:size=200M"], +) + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def wait_parts_count(table, expected_number_of_parts): + for i in range(100): + print(f"Waiting {expected_number_of_parts} for table {table}. Iteration: {i}") + parts_count = int(node.query(f"select count() from system.parts where table = '{table}' and active")) + + if parts_count == expected_number_of_parts: + break + + time.sleep(1) + + assert int(node.query(f"select count() from system.parts where table = '{table}' and active")) == expected_number_of_parts + + +def test_no_merges_volume_ttl_merge(start_cluster): + node.query("create table t (time DateTime) engine = MergeTree order by tuple() ttl time settings storage_policy='hot_cold_separation_policy', merge_with_ttl_timeout=0") + table_uuid = node.query("select uuid from system.tables where table = 't'").strip() + + node.query("system stop merges t") + node.query("insert into t values (now() - interval 1 day)") + assert node.query("select path from system.parts where table = 't' and active").strip() == f"/with_merges/store/{table_uuid[:3]}/{table_uuid}/all_1_1_0/" + assert int(node.query("select count() from t").strip()) == 1 + + node.query("alter table t move partition () to volume 'no_merges'") + assert node.query("select path from system.parts where table = 't' and active").strip() == f"/no_merges/store/{table_uuid[:3]}/{table_uuid}/all_1_1_0/" + assert int(node.query("select count() from t").strip()) == 1 + + node.query("system start merges t") + wait_parts_count("t", 0) + + assert int(node.query("select count() from t").strip()) == 0 + node.query("drop table t sync") + + +def test_no_merges_volume_no_regular_merges(start_cluster): + node.query("create table t (a UInt64) engine = MergeTree order by tuple() settings storage_policy='hot_cold_separation_policy'") + + node.query("system stop merges t") + node.query("insert into t select number from numbers(50) settings max_block_size=1, min_insert_block_size_bytes=1") + assert int(node.query("select count() from t").strip()) == 50 + assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50 + + node.query("alter table t move partition () to volume 'no_merges'") + assert int(node.query("select count() from t").strip()) == 50 + assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50 + + node.query("system start merges t") + node.query("optimize table t") + node.query("optimize table t") + node.query("optimize table t") + node.query("optimize table t") + node.query("optimize table t final") + node.query("optimize table t final") + node.query("optimize table t final") + node.query("optimize table t final") + + assert int(node.query("select count() from t").strip()) == 50 + assert int(node.query("select count() from system.parts where table = 't' and active").strip()) == 50 + node.query("drop table t sync")