From 12578e3e87858e522180671f82b975a65ddb0543 Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Tue, 9 Jun 2026 15:25:51 +0000 Subject: [PATCH 1/2] Merge pull request #106020 from ClickHouse/multistage_distributed_queries Multi-stage distributed queries --- programs/server/Server.cpp | 79 + programs/server/config.xml | 19 + src/Analyzer/TableExpressionModifiers.cpp | 16 + src/Analyzer/TableExpressionModifiers.h | 6 + src/CMakeLists.txt | 2 + .../BuzzHouse/Generator/SessionSettings.cpp | 2 +- src/Common/CurrentMetrics.cpp | 3 + src/Common/setThreadName.h | 1 + src/Core/ProtocolDefines.h | 4 +- src/Core/Settings.cpp | 2 +- .../QueryPlan/BroadcastExchangeStep.cpp | 26 + .../QueryPlan/BroadcastExchangeStep.h | 46 + .../QueryPlan/BroadcastReceiveStep.cpp | 65 + .../QueryPlan/BroadcastReceiveStep.h | 35 + .../QueryPlan/BroadcastSendStep.cpp | 79 + src/Processors/QueryPlan/BroadcastSendStep.h | 40 + .../QueryPlan/BuildQueryPipelineSettings.h | 7 + src/Processors/QueryPlan/ExchangeLookup.h | 54 + .../QueryPlan/GatherExchangeStep.cpp | 17 + src/Processors/QueryPlan/GatherExchangeStep.h | 49 + .../QueryPlan/GatherReceiveStep.cpp | 88 + src/Processors/QueryPlan/GatherReceiveStep.h | 37 + src/Processors/QueryPlan/GatherSendStep.cpp | 60 + src/Processors/QueryPlan/GatherSendStep.h | 36 + src/Processors/QueryPlan/IParameterLookup.h | 19 + src/Processors/QueryPlan/JoinStepLogical.h | 1 + .../QueryPlan/LogicalExchangeStep.h | 47 + src/Processors/QueryPlan/ObjectFilterStep.h | 1 + .../QueryPlan/Optimizations/Optimizations.h | 7 + .../QueryPlanOptimizationSettings.cpp | 35 +- .../QueryPlanOptimizationSettings.h | 3 +- .../Optimizations/makeDistributed.cpp | 1053 ++++++++++++ .../Optimizations/optimizeExtended.cpp | 14 - .../QueryPlan/Optimizations/optimizeJoin.cpp | 13 +- .../QueryPlan/Optimizations/optimizeTopK.cpp | 10 + .../QueryPlan/Optimizations/optimizeTree.cpp | 29 +- src/Processors/QueryPlan/QueryPlan.cpp | 192 +++ src/Processors/QueryPlan/QueryPlan.h | 54 + .../QueryPlan/QueryPlanStepRegistry.cpp | 22 + .../QueryPlan/ReadFromMergeTree.cpp | 355 +++- src/Processors/QueryPlan/ReadFromMergeTree.h | 12 + .../QueryPlan/ReadFromObjectStorageStep.h | 1 + .../QueryPlan/ReadFromPreparedSource.cpp | 53 + .../QueryPlan/ReadFromPreparedSource.h | 6 + .../QueryPlan/ReadFromTableFunctionStep.cpp | 14 - .../QueryPlan/ReadFromTableFunctionStep.h | 1 + .../QueryPlan/ReadFromTableStep.cpp | 14 - src/Processors/QueryPlan/ReadFromTableStep.h | 1 + .../QueryPlan/ScatterExchangeStep.cpp | 28 + .../QueryPlan/ScatterExchangeStep.h | 64 + .../QueryPlan/ShuffleExchangeStep.cpp | 18 + .../QueryPlan/ShuffleExchangeStep.h | 62 + .../QueryPlan/ShuffleReceiveStep.cpp | 64 + src/Processors/QueryPlan/ShuffleReceiveStep.h | 34 + src/Processors/QueryPlan/ShuffleSendStep.cpp | 121 ++ src/Processors/QueryPlan/ShuffleSendStep.h | 48 + src/Processors/Sinks/NativeCompressedSink.cpp | 84 + src/Processors/Sinks/NativeCompressedSink.h | 43 + .../Sources/NativeCompressedSource.cpp | 63 + .../Sources/NativeCompressedSource.h | 34 + .../Sources/ReadFromDistributedPlanSource.cpp | 75 + .../Sources/ReadFromDistributedPlanSource.h | 57 + .../ScatterByPartitionTransform.cpp | 35 +- .../Transforms/ScatterByPartitionTransform.h | 7 +- src/QueryPipeline/DistributedPlanExecutor.cpp | 1523 +++++++++++++++++ src/QueryPipeline/DistributedPlanExecutor.h | 138 ++ .../DistributedQuery/ExchangeConnections.cpp | 188 ++ .../DistributedQuery/ExchangeConnections.h | 93 + .../DistributedQuery/ExchangeServer.cpp | 263 +++ src/Server/DistributedQuery/ExchangeServer.h | 45 + .../DistributedQuery/FutureConnection.cpp | 95 + .../DistributedQuery/FutureConnection.h | 60 + .../StreamingExchangeLookup.cpp | 69 + .../StreamingExchangeLookup.h | 20 + .../StreamingExchangeProtocol.cpp | 72 + .../StreamingExchangeProtocol.h | 85 + .../StreamingExchangeSink.cpp | 449 +++++ .../DistributedQuery/StreamingExchangeSink.h | 93 + .../StreamingExchangeSource.cpp | 340 ++++ .../StreamingExchangeSource.h | 83 + .../tests/gtest_distributed_query.cpp | 573 +++++++ .../tests/gtest_exchange_server_handshake.cpp | 203 +++ .../StatelessWorker/StatelessTaskExecutor.cpp | 209 +++ .../StatelessWorker/StatelessTaskExecutor.h | 68 + .../StatelessWorker/StatelessWorkerClient.cpp | 211 +++ .../StatelessWorker/StatelessWorkerClient.h | 22 + .../StatelessWorkerEndpoint.cpp | 335 ++++ .../StatelessWorker/StatelessWorkerEndpoint.h | 28 + .../StatelessWorkerProtocol.cpp | 24 + .../StatelessWorker/StatelessWorkerProtocol.h | 21 + src/Storages/SelectQueryInfo.cpp | 3 +- tests/config/config.d/distributed_query.xml | 39 + tests/config/install.sh | 6 + .../test_distributed_plan_cancel/__init__.py | 0 .../configs/config.d/stateless_worker.xml | 36 + .../test_distributed_plan_cancel/test.py | 87 + .../__init__.py | 0 .../configs/config.d/stateless_worker.xml | 46 + .../test.py | 444 +++++ ...03394_distributed_broadcast_join.reference | 57 + .../03394_distributed_broadcast_join.sql | 57 + .../03394_distributed_shuffle_join.reference | 3 + .../03394_distributed_shuffle_join.sql | 27 + ...ed_shuffle_join_early_close_sink.reference | 1 + ...tributed_shuffle_join_early_close_sink.sql | 17 + ...ed_shuffle_join_with_aggregation.reference | 54 + ...tributed_shuffle_join_with_aggregation.sql | 68 + ...ributed_shuffle_join_with_filter.reference | 36 + ...4_distributed_shuffle_join_with_filter.sql | 40 + ...distributed_shuffle_join_with_in.reference | 24 + ...03394_distributed_shuffle_join_with_in.sql | 51 + ...buted_shuffle_join_with_prewhere.reference | 12 + ...distributed_shuffle_join_with_prewhere.sql | 40 + .../03394_distributed_sort.reference | 19 + .../0_stateless/03394_distributed_sort.sh | 36 + .../04097_distributed_join_kinds.reference | 195 +++ .../04097_distributed_join_kinds.sql | 242 +++ ...4105_distributed_final_replacing.reference | 4 + .../04105_distributed_final_replacing.sql | 24 + ...tributed_shuffle_join_type_mixed.reference | 17 + ...05_distributed_shuffle_join_type_mixed.sql | 43 + ...d_aggregation_correctness_guards.reference | 2 + ...ributed_aggregation_correctness_guards.sql | 25 + ...istributed_read_error_terminates.reference | 0 ...4307_distributed_read_error_terminates.sql | 20 + ...tals_rollup_cube_not_distributed.reference | 3 + ...ted_totals_rollup_cube_not_distributed.sql | 25 + ...d_aggregation_persisted_exchange.reference | 5 + ...ributed_aggregation_persisted_exchange.sql | 18 + ...ted_unserializable_step_rejected.reference | 0 ...stributed_unserializable_step_rejected.sql | 17 + ...plan_set_operation_const_columns.reference | 1 + ...buted_plan_set_operation_const_columns.sql | 44 + ...20_distributed_plan_read_rejects.reference | 2 + .../04320_distributed_plan_read_rejects.sql | 26 + ...d_plan_count_implicit_projection.reference | 3 + ...ributed_plan_count_implicit_projection.sql | 32 + 137 files changed, 10772 insertions(+), 56 deletions(-) create mode 100644 src/Processors/QueryPlan/BroadcastExchangeStep.cpp create mode 100644 src/Processors/QueryPlan/BroadcastExchangeStep.h create mode 100644 src/Processors/QueryPlan/BroadcastReceiveStep.cpp create mode 100644 src/Processors/QueryPlan/BroadcastReceiveStep.h create mode 100644 src/Processors/QueryPlan/BroadcastSendStep.cpp create mode 100644 src/Processors/QueryPlan/BroadcastSendStep.h create mode 100644 src/Processors/QueryPlan/ExchangeLookup.h create mode 100644 src/Processors/QueryPlan/GatherExchangeStep.cpp create mode 100644 src/Processors/QueryPlan/GatherExchangeStep.h create mode 100644 src/Processors/QueryPlan/GatherReceiveStep.cpp create mode 100644 src/Processors/QueryPlan/GatherReceiveStep.h create mode 100644 src/Processors/QueryPlan/GatherSendStep.cpp create mode 100644 src/Processors/QueryPlan/GatherSendStep.h create mode 100644 src/Processors/QueryPlan/IParameterLookup.h create mode 100644 src/Processors/QueryPlan/LogicalExchangeStep.h create mode 100644 src/Processors/QueryPlan/Optimizations/makeDistributed.cpp delete mode 100644 src/Processors/QueryPlan/Optimizations/optimizeExtended.cpp create mode 100644 src/Processors/QueryPlan/ScatterExchangeStep.cpp create mode 100644 src/Processors/QueryPlan/ScatterExchangeStep.h create mode 100644 src/Processors/QueryPlan/ShuffleExchangeStep.cpp create mode 100644 src/Processors/QueryPlan/ShuffleExchangeStep.h create mode 100644 src/Processors/QueryPlan/ShuffleReceiveStep.cpp create mode 100644 src/Processors/QueryPlan/ShuffleReceiveStep.h create mode 100644 src/Processors/QueryPlan/ShuffleSendStep.cpp create mode 100644 src/Processors/QueryPlan/ShuffleSendStep.h create mode 100644 src/Processors/Sinks/NativeCompressedSink.cpp create mode 100644 src/Processors/Sinks/NativeCompressedSink.h create mode 100644 src/Processors/Sources/NativeCompressedSource.cpp create mode 100644 src/Processors/Sources/NativeCompressedSource.h create mode 100644 src/Processors/Sources/ReadFromDistributedPlanSource.cpp create mode 100644 src/Processors/Sources/ReadFromDistributedPlanSource.h create mode 100644 src/QueryPipeline/DistributedPlanExecutor.cpp create mode 100644 src/QueryPipeline/DistributedPlanExecutor.h create mode 100644 src/Server/DistributedQuery/ExchangeConnections.cpp create mode 100644 src/Server/DistributedQuery/ExchangeConnections.h create mode 100644 src/Server/DistributedQuery/ExchangeServer.cpp create mode 100644 src/Server/DistributedQuery/ExchangeServer.h create mode 100644 src/Server/DistributedQuery/FutureConnection.cpp create mode 100644 src/Server/DistributedQuery/FutureConnection.h create mode 100644 src/Server/DistributedQuery/StreamingExchangeLookup.cpp create mode 100644 src/Server/DistributedQuery/StreamingExchangeLookup.h create mode 100644 src/Server/DistributedQuery/StreamingExchangeProtocol.cpp create mode 100644 src/Server/DistributedQuery/StreamingExchangeProtocol.h create mode 100644 src/Server/DistributedQuery/StreamingExchangeSink.cpp create mode 100644 src/Server/DistributedQuery/StreamingExchangeSink.h create mode 100644 src/Server/DistributedQuery/StreamingExchangeSource.cpp create mode 100644 src/Server/DistributedQuery/StreamingExchangeSource.h create mode 100644 src/Server/DistributedQuery/tests/gtest_distributed_query.cpp create mode 100644 src/Server/DistributedQuery/tests/gtest_exchange_server_handshake.cpp create mode 100644 src/Server/StatelessWorker/StatelessTaskExecutor.cpp create mode 100644 src/Server/StatelessWorker/StatelessTaskExecutor.h create mode 100644 src/Server/StatelessWorker/StatelessWorkerClient.cpp create mode 100644 src/Server/StatelessWorker/StatelessWorkerClient.h create mode 100644 src/Server/StatelessWorker/StatelessWorkerEndpoint.cpp create mode 100644 src/Server/StatelessWorker/StatelessWorkerEndpoint.h create mode 100644 src/Server/StatelessWorker/StatelessWorkerProtocol.cpp create mode 100644 src/Server/StatelessWorker/StatelessWorkerProtocol.h create mode 100644 tests/config/config.d/distributed_query.xml create mode 100644 tests/integration/test_distributed_plan_cancel/__init__.py create mode 100644 tests/integration/test_distributed_plan_cancel/configs/config.d/stateless_worker.xml create mode 100644 tests/integration/test_distributed_plan_cancel/test.py create mode 100644 tests/integration/test_distributed_plan_replicated_merge_tree/__init__.py create mode 100644 tests/integration/test_distributed_plan_replicated_merge_tree/configs/config.d/stateless_worker.xml create mode 100644 tests/integration/test_distributed_plan_replicated_merge_tree/test.py create mode 100644 tests/queries/0_stateless/03394_distributed_broadcast_join.reference create mode 100644 tests/queries/0_stateless/03394_distributed_broadcast_join.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.sql create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.reference create mode 100644 tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.sql create mode 100644 tests/queries/0_stateless/03394_distributed_sort.reference create mode 100755 tests/queries/0_stateless/03394_distributed_sort.sh create mode 100644 tests/queries/0_stateless/04097_distributed_join_kinds.reference create mode 100644 tests/queries/0_stateless/04097_distributed_join_kinds.sql create mode 100644 tests/queries/0_stateless/04105_distributed_final_replacing.reference create mode 100644 tests/queries/0_stateless/04105_distributed_final_replacing.sql create mode 100644 tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.reference create mode 100644 tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.sql create mode 100644 tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.reference create mode 100644 tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.sql create mode 100644 tests/queries/0_stateless/04307_distributed_read_error_terminates.reference create mode 100644 tests/queries/0_stateless/04307_distributed_read_error_terminates.sql create mode 100644 tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.reference create mode 100644 tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.sql create mode 100644 tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.reference create mode 100644 tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.sql create mode 100644 tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.reference create mode 100644 tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.sql create mode 100644 tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.reference create mode 100644 tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.sql create mode 100644 tests/queries/0_stateless/04320_distributed_plan_read_rejects.reference create mode 100644 tests/queries/0_stateless/04320_distributed_plan_read_rejects.sql create mode 100644 tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.reference create mode 100644 tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.sql diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 774c6d9f479b..35b816e78b4b 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -115,8 +115,11 @@ #include #include #include +#include +#include #include #include +#include #include #include #include @@ -1938,6 +1941,82 @@ try LOG_DEBUG(log, "Initializing interserver credentials."); global_context->updateInterserverCredentials(config()); + std::shared_ptr stateless_worker_endpoint_ptr{nullptr}; + String stateless_worker_endpoint_name; + if (config().getBool("stateless_worker_server.enabled", false)) + { + String stateless_worker_endpoint = config().getString("stateless_worker_server.endpoint", "localhost"); + stateless_worker_endpoint_ptr = std::make_shared(); + stateless_worker_endpoint_name = stateless_worker_endpoint_ptr->getId(stateless_worker_endpoint); + global_context->getInterserverIOHandler().addEndpoint(stateless_worker_endpoint_name, stateless_worker_endpoint_ptr); + LOG_DEBUG(log, "Added stateless worker endpoint '{}'.", stateless_worker_endpoint_name); + } + + SCOPE_EXIT({ + if (stateless_worker_endpoint_ptr) + { + /// Remove the same endpoint that was registered (the configured name may differ from "localhost"). + LOG_DEBUG(log, "Shutting down stateless worker endpoint '{}'.", stateless_worker_endpoint_name); + global_context->getInterserverIOHandler().removeEndpointIfExists(stateless_worker_endpoint_name); + + stateless_worker_endpoint_ptr->blocker.cancelForever(); + stateless_worker_endpoint_ptr->shutdown(); + /// Acquire the lock to wait for all in-flight requests to finish. + std::lock_guard lock(stateless_worker_endpoint_ptr->rwlock); + } + stateless_worker_endpoint_ptr.reset(); + }); + + #ifdef OS_LINUX + ExchangeConnectionsPtr exchange_connections_ptr = ExchangeConnections::instance(); + std::vector> exchange_servers; + if (auto streaming_exchange_port = config().getUInt("distributed_query.streaming_exchange_port", 0)) + { + if (streaming_exchange_port > 65535) + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, + "`distributed_query.streaming_exchange_port` must be in range 1..65535, got {}", streaming_exchange_port); + + /// The exchange handshake is unauthenticated, so the listener is never bound to all interfaces + /// implicitly: the streaming exchange is enabled only when explicit listen host(s) are given. + Strings exchange_listen_hosts = DB::getMultipleValuesFromConfig(config(), "distributed_query", "streaming_exchange_listen_host"); + if (exchange_listen_hosts.empty()) + { + LOG_ERROR(log, "`distributed_query.streaming_exchange_port` is set but no " + "`distributed_query.streaming_exchange_listen_host` is configured; the streaming exchange " + "server is not started. Specify a listen host to enable it."); + } + else + { + for (const auto & listen_host : exchange_listen_hosts) + { + try + { + exchange_servers.emplace_back(std::make_shared(listen_host, streaming_exchange_port, exchange_connections_ptr)); + exchange_servers.back()->start(); + } + catch (Poco::Exception & e) + { + LOG_INFO(log, "Failed to start exchange server on {}:{}: {}", + listen_host, streaming_exchange_port, e.displayText()); + } + } + if (exchange_servers.empty()) + throw Exception(ErrorCodes::NETWORK_ERROR, "Failed to start ExchangeServer on port {}", streaming_exchange_port); + } + } + + SCOPE_EXIT({ + for (auto & exchange_server_ptr : exchange_servers) + { + exchange_server_ptr->stop(); + exchange_server_ptr.reset(); + } + }); + #else + if (config().getUInt("distributed_query.streaming_exchange_port", 0)) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ExchangeServer is not supported on non-linux platform"); + #endif + /// Set up caches. const size_t max_cache_size = static_cast(static_cast(physical_server_memory) * server_settings[ServerSetting::cache_size_to_ram_max_ratio]); diff --git a/programs/server/config.xml b/programs/server/config.xml index 8c16f317b320..319abd1bf44f 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -280,6 +280,25 @@ --> + + + + + 1 + localhost + + + + 1 + + test_cluster_one_shard_two_replicas + + + + 9223 + 0.0.0.0 + :: + + + local + ./local_object_storage/ + tmp_sub_path/ + + + + + + + diff --git a/tests/config/install.sh b/tests/config/install.sh index f5b5e383ed0c..f08faa656e57 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -158,6 +158,12 @@ cp $SRC_PATH/config.d/storage_conf_backups.xml $DEST_SERVER_PATH/config.d/ cp $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/ cp $SRC_PATH/config.d/filesystem_caches_path.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/validate_tcp_client_information.xml $DEST_SERVER_PATH/config.d/ +# distributed_query.xml sets distributed_query.streaming_exchange_port, which the server rejects on +# non-Linux builds; only install it where the streaming exchange is supported. +if [ "$(uname -s)" = "Linux" ]; then + ln -sf $SRC_PATH/config.d/distributed_query.xml $DEST_SERVER_PATH/config.d/ +fi + ln -sf $SRC_PATH/config.d/zero_copy_destructive_operations.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/handlers.yaml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/threadpool_writer_pool_size.yaml $DEST_SERVER_PATH/config.d/ diff --git a/tests/integration/test_distributed_plan_cancel/__init__.py b/tests/integration/test_distributed_plan_cancel/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_distributed_plan_cancel/configs/config.d/stateless_worker.xml b/tests/integration/test_distributed_plan_cancel/configs/config.d/stateless_worker.xml new file mode 100644 index 000000000000..922c3fd2fbec --- /dev/null +++ b/tests/integration/test_distributed_plan_cancel/configs/config.d/stateless_worker.xml @@ -0,0 +1,36 @@ + + + 1 + localhost + + + + 1 + default + + + + 9223 + 0.0.0.0 + + local + ./local_object_storage/ + tmp_sub_path/ + + + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_distributed_plan_cancel/test.py b/tests/integration/test_distributed_plan_cancel/test.py new file mode 100644 index 000000000000..64d442baacf2 --- /dev/null +++ b/tests/integration/test_distributed_plan_cancel/test.py @@ -0,0 +1,87 @@ +""" +Test that worker task failures in distributed plan queries are propagated +to the initiator with the correct error message instead of QUERY_WAS_CANCELLED. + +Reproduces https://github.com/ClickHouse/clickhouse-private/issues/40546 +(error-propagation part). +""" + +import logging +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.client import QueryRuntimeException + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + main_configs=["configs/config.d/stateless_worker.xml"], + stay_alive=True, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/config.d/stateless_worker.xml"], + stay_alive=True, +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +DISTRIBUTED_SETTINGS = ( + "make_distributed_plan = 1, " + "enable_parallel_replicas = 0, " + "distributed_plan_default_shuffle_join_bucket_count = 2, " + "distributed_plan_default_reader_bucket_count = 2, " + "distributed_plan_max_rows_to_broadcast = 0" +) + + +def test_worker_error_not_masked(started_cluster): + """A worker task error must surface to the initiator with the real error, not QUERY_WAS_CANCELLED. + + The initiator sets max_rows_to_read = 5, which is propagated to the workers; each worker reads + more than 5 rows and fails with TOO_MANY_ROWS, and the initiator must report that. This also + covers settings propagation: without it the workers would not enforce the limit and the query + would silently succeed. + """ + for node in [node1, node2]: + node.query( + "CREATE TABLE IF NOT EXISTS test_err_mask (id UInt64) " + "ENGINE = MergeTree() ORDER BY id" + ) + node.query("INSERT INTO test_err_mask SELECT number FROM numbers(10000)") + + with pytest.raises(QueryRuntimeException) as exc_info: + node1.query( + f""" + SELECT sum(id) + FROM test_err_mask + SETTINGS {DISTRIBUTED_SETTINGS}, + max_rows_to_read = 5 + """, + timeout=30, + ) + + error_msg = str(exc_info.value) + logging.info(f"Query failed with: {error_msg}") + + # The error should be the worker's limit failure, not QUERY_WAS_CANCELLED. + assert "QUERY_WAS_CANCELLED" not in error_msg, ( + f"Initiator masked the worker error with QUERY_WAS_CANCELLED: {error_msg}" + ) + assert ( + "TOO_MANY_ROWS" in error_msg + or "max_rows_to_read" in error_msg + or "Limit for rows" in error_msg + ), f"Expected the propagated max_rows_to_read limit to be enforced on the worker, got: {error_msg}" + + for node in [node1, node2]: + node.query("DROP TABLE IF EXISTS test_err_mask") diff --git a/tests/integration/test_distributed_plan_replicated_merge_tree/__init__.py b/tests/integration/test_distributed_plan_replicated_merge_tree/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_distributed_plan_replicated_merge_tree/configs/config.d/stateless_worker.xml b/tests/integration/test_distributed_plan_replicated_merge_tree/configs/config.d/stateless_worker.xml new file mode 100644 index 000000000000..5fb07caa730f --- /dev/null +++ b/tests/integration/test_distributed_plan_replicated_merge_tree/configs/config.d/stateless_worker.xml @@ -0,0 +1,46 @@ + + + 1 + localhost + + + + 1 + default + + + + 9223 + 0.0.0.0 + + + object_storage + s3 + http://minio1:9001/root/distributed_query_tmp/ + minio + ClickHouse_Minio_P@ssw0rd + tmp_sub_path/ + + + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_distributed_plan_replicated_merge_tree/test.py b/tests/integration/test_distributed_plan_replicated_merge_tree/test.py new file mode 100644 index 000000000000..328a361f9044 --- /dev/null +++ b/tests/integration/test_distributed_plan_replicated_merge_tree/test.py @@ -0,0 +1,444 @@ +""" +Distributed query execution against ReplicatedMergeTree tables. + +Spins up a 3-node cluster, creates several ReplicatedMergeTree tables (filled +with multiple parts on each replica), and exercises: + - parallel read of an RMT table, + - shuffle hash join, + - broadcast join, + - shuffle aggregation, + - distributed sort, + - aggregation feeding a join. + +Each distributed-plan query is compared against the same query without +make_distributed_plan to confirm the distributed path produces matching results. +The EXPLAIN PLAN of the distributed query is also compared against a baked-in +reference; if optimizer changes reshape a plan the test fails with a diff so +the reference can be updated deliberately. +""" + +import logging +import textwrap +from typing import Optional + +import pytest + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster + +pytestmark = pytest.mark.timeout(300) + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + main_configs=["configs/config.d/stateless_worker.xml"], + with_zookeeper=True, + with_minio=True, + stay_alive=True, + macros={"shard": 1, "replica": 1}, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/config.d/stateless_worker.xml"], + with_zookeeper=True, + with_minio=True, + stay_alive=True, + macros={"shard": 1, "replica": 2}, +) +node3 = cluster.add_instance( + "node3", + main_configs=["configs/config.d/stateless_worker.xml"], + with_zookeeper=True, + with_minio=True, + stay_alive=True, + macros={"shard": 1, "replica": 3}, +) + +NODES = [node1, node2, node3] +INITIATOR = node1 + +# Settings common to all distributed-plan queries in this test. The 3 buckets +# match the 3-node cluster size. distributed_plan_max_rows_to_broadcast = 0 +# prevents the optimizer from broadcasting the join's right side by default; +# individual tests override it when broadcast is the path under test. +# +# Several settings are pinned to keep the EXPLAIN snapshot stable against +# unrelated default-value changes elsewhere in the codebase: anything that +# can swap join sides, change join-side conversion, or fuse/split exchange +# steps would otherwise produce a different plan shape from one ClickHouse +# version to the next without indicating a real regression in this feature. +DISTRIBUTED_SETTINGS = ", ".join([ + "make_distributed_plan = 1", + "enable_parallel_replicas = 0", + "distributed_plan_default_shuffle_join_bucket_count = 3", + "distributed_plan_default_reader_bucket_count = 3", + "distributed_plan_max_rows_to_broadcast = 0", + "distributed_plan_optimize_exchanges = 1", + "query_plan_join_swap_table = 'false'", + "query_plan_optimize_join_order_limit = 0", + "query_plan_use_new_logical_join_step = 1", + "query_plan_convert_join_to_in = 0", + "query_plan_convert_outer_join_to_inner_join = 0", + "query_plan_convert_any_join_to_semi_or_anti_join = 0", + # Runtime filters are not yet implemented for distributed queries. + "enable_join_runtime_filters = 0", +]) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + _create_tables_and_load_data() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables_and_load_data(): + """Create RMT tables on every node and insert data in multiple batches so + each replica ends up with several data parts.""" + for node in NODES: + node.query( + """ + CREATE TABLE big (id UInt64, group_key UInt32, payload String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/big', '{replica}') + ORDER BY id + """ + ) + node.query( + """ + CREATE TABLE small (id UInt64, label String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/small', '{replica}') + ORDER BY id + """ + ) + + # Stop background merges so the inserted parts are not collapsed before the + # sanity check below — that check expects each replica to see > 1 part. + for node in NODES: + node.query("SYSTEM STOP MERGES big") + node.query("SYSTEM STOP MERGES small") + + # Load data on a single replica; replication propagates it to the others. + # Multiple inserts produce multiple data parts so the parallel read path + # has more than one part to split per worker bucket. + for batch in range(4): + offset = batch * 25_000 + INITIATOR.query( + f""" + INSERT INTO big + SELECT + number + {offset} AS id, + (number + {offset}) % 100 AS group_key, + concat('p_', toString(number + {offset})) AS payload + FROM numbers(25000) + """ + ) + + INITIATOR.query( + """ + INSERT INTO small + SELECT number AS id, concat('lbl_', toString(number)) AS label + FROM numbers(500) + """ + ) + + # Wait for replication so all replicas see the full data set. + for node in NODES: + node.query("SYSTEM SYNC REPLICA big") + node.query("SYSTEM SYNC REPLICA small") + + # Sanity-check: every replica sees all rows and more than one part on big. + for node in NODES: + assert int(node.query("SELECT count() FROM big").strip()) == 100_000 + assert int(node.query("SELECT count() FROM small").strip()) == 500 + parts_count = int( + node.query( + "SELECT count() FROM system.parts " + "WHERE table = 'big' AND active = 1" + ).strip() + ) + assert parts_count >= 2, ( + f"expected >= 2 active parts on {node.name}, got {parts_count}" + ) + + +def _explain_and_check(query: str, settings: str, expected_plan: str): + """Run EXPLAIN PLAN on the query and compare its output to the expected + plan, line-by-line after stripping the common indentation. On mismatch + the failure message includes both plans so the test author can update + the reference. + """ + actual = INITIATOR.query(f"EXPLAIN PLAN {query} SETTINGS {settings}").strip("\n") + expected = textwrap.dedent(expected_plan).strip("\n") + if actual != expected: + pytest.fail( + "distributed plan does not match the expected reference\n" + "--- ACTUAL PLAN ---\n" + actual + + "\n--- EXPECTED PLAN ---\n" + expected + ) + return actual + + +def _run_both_ways( + query: str, + settings_override: str = "", + expected_plan: Optional[str] = None, +): + """Run the query in distributed and non-distributed modes and return both + results for the caller to compare. If expected_plan is provided, also + EXPLAIN PLAN the distributed query and assert the dumped plan matches. + """ + settings = DISTRIBUTED_SETTINGS + if settings_override: + settings = settings + ", " + settings_override + if expected_plan is not None: + _explain_and_check(query, settings, expected_plan) + distributed = INITIATOR.query(f"{query} SETTINGS {settings}") + baseline = INITIATOR.query(f"{query} SETTINGS make_distributed_plan = 0") + logging.info("distributed result:\n%s", distributed) + logging.info("baseline result:\n%s", baseline) + return distributed, baseline + + +EXCHANGE_KINDS = pytest.mark.parametrize("exchange_kind", ["Streaming", "Persisted"]) + + +def _override(exchange_kind: str, *extra: str) -> str: + parts = [f"distributed_plan_force_exchange_kind = '{exchange_kind}'"] + parts.extend(extra) + return ", ".join(parts) + + +@EXCHANGE_KINDS +def test_parallel_read(started_cluster, exchange_kind): + """A scan of the RMT table goes through the distributed-read path with + one task per reader bucket and a GatherExchange feeding the initiator.""" + distributed, baseline = _run_both_ways( + "SELECT count(), sum(id), sum(group_key) FROM big", + settings_override=_override(exchange_kind), + expected_plan="""\ + Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression ((Before GROUP BY + Change column names to column identifiers)) + ReadFromMergeTree (default.big) + """, + ) + assert distributed == baseline + + +def test_parallel_read_missing_part_on_worker_errors(started_cluster): + """A distributed read buckets over the parts the coordinator selected. If a + worker replica is missing one of those parts (replication lag), the read must + fail cleanly instead of silently returning a divergent slice of the data. + + Stop fetches on node2/node3 and add a new part only on the coordinator + (node1); the read assigns a bucket to a lagging replica, which cannot find + the coordinator-selected part and raises NO_SUCH_DATA_PART.""" + table = "big_lagging" + for node in NODES: + node.query(f"DROP TABLE IF EXISTS {table} SYNC") + node.query( + f""" + CREATE TABLE {table} (id UInt64, group_key UInt32, payload String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{table}', '{{replica}}') + ORDER BY id + """ + ) + node.query(f"SYSTEM STOP MERGES {table}") + + # Initial data, replicated to every node (multiple parts). + for batch in range(2): + offset = batch * 25_000 + INITIATOR.query( + f"INSERT INTO {table} SELECT number + {offset}, (number + {offset}) % 100, " + f"concat('p_', toString(number + {offset})) FROM numbers(25000)" + ) + for node in NODES: + node.query(f"SYSTEM SYNC REPLICA {table}") + + node2.query(f"SYSTEM STOP FETCHES {table}") + node3.query(f"SYSTEM STOP FETCHES {table}") + try: + # New part lands only on the coordinator; node2/node3 stay behind. + INITIATOR.query( + f"INSERT INTO {table} SELECT number + 50000, (number + 50000) % 100, " + f"concat('p_', toString(number + 50000)) FROM numbers(25000)" + ) + assert int(INITIATOR.query(f"SELECT count() FROM {table}").strip()) == 75_000 + assert int(node2.query(f"SELECT count() FROM {table}").strip()) == 50_000 + assert int(node3.query(f"SELECT count() FROM {table}").strip()) == 50_000 + + settings = ( + DISTRIBUTED_SETTINGS + + ", distributed_plan_force_exchange_kind = 'Persisted'" + + ", distributed_plan_prefer_replicas_over_workers = 1" + ) + with pytest.raises(QueryRuntimeException) as exc: + INITIATOR.query(f"SELECT count(), sum(id) FROM {table} SETTINGS {settings}") + assert "is not available on this replica" in str(exc.value) + finally: + node2.query(f"SYSTEM START FETCHES {table}") + node3.query(f"SYSTEM START FETCHES {table}") + for node in NODES: + node.query(f"SYSTEM SYNC REPLICA {table}") + node.query(f"DROP TABLE IF EXISTS {table} SYNC") + + +@EXCHANGE_KINDS +def test_shuffle_hash_join(started_cluster, exchange_kind): + """Self-join on a non-key expression so the optimizer cannot reuse the + primary-key sort order and must shuffle both inputs by the join key.""" + distributed, baseline = _run_both_ways( + """ + SELECT count() + FROM big AS a + INNER JOIN big AS b ON a.id = b.id + 1 + WHERE a.group_key < 5 + """, + settings_override=_override(exchange_kind), + expected_plan="""\ + Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression ((Before GROUP BY + )) + JoinLogical + ShuffleExchange (by hash([__table1.id])) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.big) + ShuffleExchange (by hash([plus(__table2.id, 1_UInt8)])) + Expression (Calculate right join keys) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.big) + """, + ) + assert distributed == baseline + + +@EXCHANGE_KINDS +def test_broadcast_join(started_cluster, exchange_kind): + """Join a large left side with a small right side; the small side is + broadcast to every worker when the estimator returns its row count.""" + distributed, baseline = _run_both_ways( + """ + SELECT count(), sum(b.id) + FROM big AS b + INNER JOIN small AS s ON b.id = s.id + """, + # Raise the broadcast threshold high enough to include `small`. + settings_override=_override( + exchange_kind, + "distributed_plan_max_rows_to_broadcast = 10000", + ), + expected_plan="""\ + Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.big) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) + """, + ) + assert distributed == baseline + + +@EXCHANGE_KINDS +def test_shuffle_aggregation(started_cluster, exchange_kind): + """GROUP BY with a moderate number of groups so the optimizer picks the + one-stage shuffle path (scatter by hash → aggregate → gather).""" + distributed, baseline = _run_both_ways( + """ + SELECT group_key, count(), sum(id) + FROM big + GROUP BY group_key + ORDER BY group_key + """, + settings_override=_override( + exchange_kind, + "distributed_plan_force_shuffle_aggregation = 1", + ), + expected_plan="""\ + Expression (Project names) + GatherExchange (sorted by (__table1.group_key ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + Aggregating + ShuffleExchange (by hash([__table1.group_key])) + Expression ((Before GROUP BY + Change column names to column identifiers)) + ReadFromMergeTree (default.big) + """, + ) + assert distributed == baseline + + +@EXCHANGE_KINDS +def test_distributed_sort(started_cluster, exchange_kind): + """ORDER BY ... LIMIT exercises the distributed sort path: each worker + performs a partial sort, results are gathered and merged on the initiator.""" + distributed, baseline = _run_both_ways( + """ + SELECT id, group_key + FROM big + WHERE group_key > 10 + ORDER BY id DESC, group_key ASC + LIMIT 50 + """, + settings_override=_override(exchange_kind), + expected_plan="""\ + Expression (Project names) + Limit (preliminary LIMIT) + GatherExchange (sorted by (__table1.id DESC, __table1.group_key ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.big) + """, + ) + assert distributed == baseline + + +@EXCHANGE_KINDS +def test_join_with_aggregation(started_cluster, exchange_kind): + """Combined shuffle: aggregate one side, broadcast the small side, then + join. Verifies that exchanges around aggregation and join compose.""" + distributed, baseline = _run_both_ways( + """ + SELECT g.group_key, g.cnt, s.label + FROM ( + SELECT group_key, count() AS cnt FROM big GROUP BY group_key + ) AS g + INNER JOIN small AS s ON g.group_key = s.id + ORDER BY g.group_key + """, + settings_override=_override( + exchange_kind, + "distributed_plan_max_rows_to_broadcast = 10000", + "distributed_plan_force_shuffle_aggregation = 1", + ), + expected_plan="""\ + Expression (Project names) + GatherExchange (sorted by (__table1.group_key ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + JoinLogical + Expression ((Change column names to column identifiers + (Project names + Projection))) + Aggregating + ShuffleExchange (by hash([__table2.group_key])) + Expression ((Before GROUP BY + Change column names to column identifiers)) + ReadFromMergeTree (default.big) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) + """, + ) + assert distributed == baseline diff --git a/tests/queries/0_stateless/03394_distributed_broadcast_join.reference b/tests/queries/0_stateless/03394_distributed_broadcast_join.reference new file mode 100644 index 000000000000..c6c027e85739 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_broadcast_join.reference @@ -0,0 +1,57 @@ +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.big) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) +20000 +------------ +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + ScatterExchange (any) + Expression (Before GROUP BY) + GatherExchange + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + GatherExchange + ReadFromMergeTree (default.big) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) +------------ +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (by hash([__table1.sid])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) + ShuffleExchange (by hash([modulo(plus(__table2.bid, 1_UInt8), 5000_UInt16)])) + Expression (Calculate right join keys) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.big) +20000 +------------ +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ShuffleExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.big) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.small) +20000 diff --git a/tests/queries/0_stateless/03394_distributed_broadcast_join.sql b/tests/queries/0_stateless/03394_distributed_broadcast_join.sql new file mode 100644 index 000000000000..80e4cc56f05c --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_broadcast_join.sql @@ -0,0 +1,57 @@ +-- Tags: no-fasttest, no-old-analyzer +-- no-fasttest: requires object storage + +CREATE TABLE small(sid UInt64, s Array(Int64)) ENGINE = MergeTree ORDER BY sid; +CREATE TABLE big(bid UInt64, b Array(Int64)) ENGINE = MergeTree ORDER BY bid; + +insert into small select number, [number] from numbers(0, 1000); +insert into big select number, [number] from numbers(0, 100000); + +SET query_plan_join_swap_table = 0; +-- Distributed aggregation cannot enforce a global max_rows_to_group_by, so pin it to 0. +SET max_rows_to_group_by = 0; + +SET + make_distributed_plan=1, + enable_parallel_replicas=0, + enable_join_runtime_filters=0, + use_statistics=1, + distributed_plan_optimize_exchanges=1; + +EXPLAIN SELECT count() +FROM big, small +WHERE (small.sid = (big.bid + 1) % 5000); + +SELECT count() +FROM big, small +WHERE (small.sid = (big.bid + 1) % 5000); + +SELECT '------------'; + +EXPLAIN SELECT count() +FROM big, small +WHERE (small.sid = (big.bid + 1) % 5000) +SETTINGS distributed_plan_optimize_exchanges=0; + +SELECT '------------'; + +EXPLAIN SELECT count() +FROM small, big +WHERE (small.sid = (big.bid + 1) % 5000); + +SELECT count() +FROM small, big +WHERE (small.sid = (big.bid + 1) % 5000); + +SELECT '------------'; + +-- Check with big table read bucket count not matching join bucket count +EXPLAIN SELECT count() +FROM big, small +WHERE (small.sid = (big.bid + 1) % 5000) +SETTINGS distributed_plan_default_shuffle_join_bucket_count=5, distributed_plan_default_reader_bucket_count=2; + +SELECT count() +FROM big, small +WHERE (small.sid = (big.bid + 1) % 5000) +SETTINGS distributed_plan_default_shuffle_join_bucket_count=5, distributed_plan_default_reader_bucket_count=2; diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join.reference new file mode 100644 index 000000000000..d189ba3e0870 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join.reference @@ -0,0 +1,3 @@ +809128 +809128 +809128 diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join.sql new file mode 100644 index 000000000000..66f295c4b4cd --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join.sql @@ -0,0 +1,27 @@ +-- Tags: long, no-fasttest, no-old-analyzer +-- no-fasttest: requires object storage + +DROP TABLE IF EXISTS test_3; +CREATE TABLE test_3(id UInt64, a Array(Int64)) ENGINE = MergeTree ORDER BY id; + +insert into test_3 select number, [number] from numbers(0, 100000); + +-- Distributed aggregation cannot enforce a global max_rows_to_group_by, so pin it to 0. +SET max_rows_to_group_by = 0; + +SELECT count() +FROM test_3 AS a, test_3 AS b, test_3 AS c, test_3 AS d +WHERE (a.id = (b.id + 1)) AND (b.id = (c.id + 100)) AND ((c.id % 11111) = ((d.id % 12345) + 17)); + + +SELECT count() +FROM test_3 AS a, test_3 AS b, test_3 AS c, test_3 AS d +WHERE (a.id = (b.id + 1)) AND (b.id = (c.id + 100)) AND ((c.id % 11111) = ((d.id % 12345) + 17)) +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_default_shuffle_join_bucket_count = 5, + query_plan_use_new_logical_join_step=1, distributed_plan_force_exchange_kind='Persisted'; + +SELECT count() +FROM test_3 AS a, test_3 AS b, test_3 AS c, test_3 AS d +WHERE (a.id = (b.id + 1)) AND (b.id = (c.id + 100)) AND ((c.id % 11111) = ((d.id % 12345) + 17)) +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_default_shuffle_join_bucket_count = 3, + query_plan_use_new_logical_join_step=1, distributed_plan_force_exchange_kind='Streaming'; diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.reference new file mode 100644 index 000000000000..f40d4e97d177 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.reference @@ -0,0 +1 @@ +50000000 diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.sql new file mode 100644 index 000000000000..11e262385a74 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_early_close_sink.sql @@ -0,0 +1,17 @@ +-- Tags: long, no-fasttest, no-old-analyzer + +SET query_plan_join_swap_table = 0; +-- Distributed aggregation cannot enforce a global max_rows_to_group_by, so pin it to 0. +SET max_rows_to_group_by = 0; + +CREATE TABLE test(id UInt64, data String) ENGINE=MergeTree() ORDER BY id SETTINGS index_granularity=10000; + +INSERT INTO test SELECT number, '' FROM numbers(10000000); + +-- Joining on a.id%1 so that only one bucket is not empty +-- After building hash table for an empty bucket the pipeline will stop reading from probing input and will close Exchange stream before reading all the data from it +SELECT count() +FROM test AS b JOIN test AS a ON a.id%1 = b.id%2 +WHERE a.id < 10 AND b.id < 10000000 AND NOT sleepEachRow(0.000001) +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_default_shuffle_join_bucket_count=4, distributed_plan_default_reader_bucket_count=5, distributed_plan_max_rows_to_broadcast=0; + diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.reference new file mode 100644 index 000000000000..5ccb82979bb8 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.reference @@ -0,0 +1,54 @@ +---------- +Expression (Project names) + GatherExchange (sorted by (__table1.path ASC, __table1.hits ASC, __table3.path ASC, __table3.hits ASC)) + Sorting (Sorting for ORDER BY) + ScatterExchange (any scatter) + Expression ((Before ORDER BY + Projection)) + GatherExchange + JoinLogical + ScatterExchange (by hash([__table1.path])) + Expression + GatherExchange + Aggregating + ScatterExchange (by hash([__table2.path])) + Expression + GatherExchange + ReadFromMergeTree (default.test) + ScatterExchange (by hash([__table3.path])) + BuildRuntimeFilter (Build runtime join filter on __table3.path) + Expression ((Change column names to column identifiers + (Project names + Projection))) + GatherExchange + Aggregating + ScatterExchange (by hash([__table4.path])) + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + GatherExchange + ReadFromMergeTree (default.test) +---------- +Expression (Project names) + GatherExchange (sorted by (__table1.path ASC, __table1.hits ASC, __table3.path ASC, __table3.hits ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + JoinLogical + ShuffleExchange (by hash([__table1.path])) + Expression + Aggregating + ShuffleExchange (by hash([__table2.path])) + Expression + ReadFromMergeTree (default.test) + ShuffleExchange (by hash([__table3.path])) + BuildRuntimeFilter (Build runtime join filter on __table3.path) + Expression ((Change column names to column identifiers + (Project names + Projection))) + Aggregating + ShuffleExchange (by hash([__table4.path])) + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test) +---------- +path_0 0 path_0 12 +path_1 2 path_1 8 +path_2 4 path_2 6 +---------- +path_0 16 0 24 +path_1 12 12 16 +path_2 12 24 12 diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.sql new file mode 100644 index 000000000000..cdf868a409fc --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_aggregation.sql @@ -0,0 +1,68 @@ +-- Tags: no-old-analyzer + +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; +SET distributed_plan_optimize_exchanges = 1; + +DROP TABLE IF EXISTS test; +CREATE TABLE test(path String, lang String, hits UInt64) ENGINE MergeTree() ORDER BY tuple(); + +INSERT INTO test SELECT 'path_' || number::String, 'en', number FROM numbers(5); +INSERT INTO test SELECT 'path_' || (number%3)::String, 'de', number%4 FROM numbers(10); + +INSERT INTO test SELECT 'path_' || number::String, 'en', number FROM numbers(5); +INSERT INTO test SELECT 'path_' || (number%3)::String, 'de', number%4 FROM numbers(10); + +SET query_plan_join_swap_table = 0; + +SET + optimize_move_to_prewhere = 1, + query_plan_optimize_prewhere = 1, + make_distributed_plan = 1, + enable_parallel_replicas = 0, + enable_join_runtime_filters=1, + distributed_plan_default_shuffle_join_bucket_count=3, + distributed_plan_default_reader_bucket_count=3, + distributed_plan_force_exchange_kind='Streaming', + distributed_plan_max_rows_to_broadcast=0; + +SELECT '----------'; + +EXPLAIN SELECT * +FROM + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'en' GROUP BY path) AS en, + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'de' GROUP BY path) AS de +WHERE (en.path = de.path) +ORDER BY ALL +SETTINGS distributed_plan_optimize_exchanges=0; + + +SELECT '----------'; + +EXPLAIN SELECT * +FROM + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'en' GROUP BY path) AS en, + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'de' GROUP BY path) AS de +WHERE (en.path = de.path) +ORDER BY ALL; + + +SELECT '----------'; + +SELECT * +FROM + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'en' GROUP BY path) AS en, + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'de' GROUP BY path) AS de +WHERE (en.path = de.path) +ORDER BY ALL; + + +SELECT '----------'; + +SELECT en.path, count(), sum(en.hits), sum(de.hits) +FROM + (SELECT * FROM test WHERE lang = 'en') AS en, + (SELECT * FROM test WHERE lang = 'de') AS de +WHERE (en.path = de.path) +GROUP BY en.path +ORDER BY ALL; diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.reference new file mode 100644 index 000000000000..4bf486e52820 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.reference @@ -0,0 +1,36 @@ +------------------------- +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + ScatterExchange (any) + Expression ((Before GROUP BY + )) + GatherExchange + JoinLogical + ScatterExchange (by hash([__table1.src_ip])) + Expression + GatherExchange + ReadFromMergeTree (default.test) + ScatterExchange (by hash([__table2.dst_ip])) + BuildRuntimeFilter (Build runtime join filter on __table2.dst_ip) + Filter ((WHERE + Change column names to column identifiers)) + GatherExchange + ReadFromMergeTree (default.test) +------------------------- +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression ((Before GROUP BY + )) + JoinLogical + ShuffleExchange (by hash([__table1.src_ip])) + Expression + ReadFromMergeTree (default.test) + ShuffleExchange (by hash([__table2.dst_ip])) + BuildRuntimeFilter (Build runtime join filter on __table2.dst_ip) + Filter ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test) +------------------------- +1019 +1019 +1019 diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.sql new file mode 100644 index 000000000000..41738ea60884 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_filter.sql @@ -0,0 +1,40 @@ +-- Tags: no-old-analyzer + +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; +SET distributed_plan_optimize_exchanges = 1; +SET optimize_move_to_prewhere = 1; +SET query_plan_optimize_prewhere = 1; +SET query_plan_remove_unused_columns = 1; +CREATE TABLE test(src_ip UInt32, dst_ip UInt32, bytes UInt64) ENGINE MergeTree() ORDER BY src_ip settings auto_statistics_types=''; + +INSERT INTO test SELECT number%30, (number+10)%30, number%50 FROM numbers(100); +INSERT INTO test SELECT number%30, (number+10)%30, number%50 FROM numbers(100, 100); + +-- t1.src_ip!=0 condition is not moved to prewhere because src_ip is in primary key + +SET query_plan_join_swap_table = 0; +SET enable_join_runtime_filters = 1; + +SELECT '-------------------------'; + +EXPLAIN +SELECT count() FROM test AS t1 JOIN test AS t2 ON t1.src_ip = t2.dst_ip WHERE t1.src_ip != 0 AND t1.bytes > 10 +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_optimize_exchanges=0, distributed_plan_max_rows_to_broadcast=0; + +SELECT '-------------------------'; + +EXPLAIN +SELECT count() FROM test AS t1 JOIN test AS t2 ON t1.src_ip = t2.dst_ip WHERE t1.src_ip != 0 AND t1.bytes > 10 +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_max_rows_to_broadcast=0; + +SELECT '-------------------------'; + +SELECT count() FROM test AS t1 JOIN test AS t2 ON t1.src_ip = t2.dst_ip WHERE t1.src_ip != 0 AND t1.bytes > 10 +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_optimize_exchanges=0, distributed_plan_default_shuffle_join_bucket_count = 3, distributed_plan_default_reader_bucket_count = 3; + +SELECT count() FROM test AS t1 JOIN test AS t2 ON t1.src_ip = t2.dst_ip WHERE t1.src_ip != 0 AND t1.bytes > 10 +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_default_shuffle_join_bucket_count = 3, distributed_plan_default_reader_bucket_count = 3; + +SELECT count() FROM test AS t1 JOIN test AS t2 ON t1.src_ip = t2.dst_ip WHERE t1.src_ip != 0 AND t1.bytes > 10 +SETTINGS make_distributed_plan=0; diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.reference new file mode 100644 index 000000000000..e2c93545e130 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.reference @@ -0,0 +1,24 @@ +---------- +Expression (Project names) + GatherExchange (sorted by (__table1.path ASC, __table1.hits ASC, __table3.path ASC, __table3.hits ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + JoinLogical + ShuffleExchange (by hash([__table1.path])) + Expression + Aggregating + ShuffleExchange (by hash([__table2.path])) + Expression + ReadFromMergeTree (default.test) + ShuffleExchange (by hash([__table3.path])) + BuildRuntimeFilter (Build runtime join filter on __table3.path) + Expression ((Change column names to column identifiers + (Project names + Projection))) + Aggregating + ShuffleExchange (by hash([__table4.path])) + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test) +---------- +path_0 12 path_0 12 +path_1 10 path_1 8 +path_2 10 path_2 6 diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.sql new file mode 100644 index 000000000000..6e7680024dc9 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_in.sql @@ -0,0 +1,51 @@ +-- Tags: no-old-analyzer + +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; + +DROP TABLE IF EXISTS test; + +CREATE TABLE test(path String, lang String, hits UInt64) ENGINE MergeTree() ORDER BY tuple(); + +INSERT INTO test SELECT 'path_' || number::String, 'en', number FROM numbers(5); +INSERT INTO test SELECT 'path_' || (number%3)::String, 'de', number%4 FROM numbers(10); + +INSERT INTO test SELECT 'path_' || number::String, 'en', number FROM numbers(5); +INSERT INTO test SELECT 'path_' || (number%3)::String, 'de', number%4 FROM numbers(10); + +SET query_plan_join_swap_table = 0; + + +SET + optimize_move_to_prewhere = 1, + query_plan_optimize_prewhere = 1, + make_distributed_plan = 1, + enable_parallel_replicas = 0, + enable_join_runtime_filters = 1, + distributed_plan_default_shuffle_join_bucket_count=3, + distributed_plan_default_reader_bucket_count=3, + distributed_plan_force_exchange_kind='Streaming', + distributed_plan_optimize_exchanges = 1, + distributed_plan_max_rows_to_broadcast=0; + +SELECT '----------'; + +-- Query with col IN (val1, val2, ...) +-- It passes the set corresponding to IN conditions as ColumnSet +EXPLAIN SELECT * +FROM + (SELECT path, sum(hits) as hits FROM test WHERE lang IN ('en', 'de') GROUP BY path) AS en, + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'de' GROUP BY path) AS de +WHERE (en.path = de.path) +ORDER BY ALL; + +SELECT '----------'; + +SELECT * +FROM + (SELECT path, sum(hits) as hits FROM test WHERE lang IN ('en', 'de') GROUP BY path) AS en, + (SELECT path, sum(hits) as hits FROM test WHERE lang = 'de' GROUP BY path) AS de +WHERE (en.path = de.path) +ORDER BY ALL; + +DROP TABLE test; diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.reference b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.reference new file mode 100644 index 000000000000..31ba7f564164 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.reference @@ -0,0 +1,12 @@ +5 + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + JoinLogical + Join: en[N] ⋈ de[N] + ShuffleExchange (by hash([__table1.path])) + ReadFromMergeTree (default.test) + Prewhere filter column: and(equals(__table1.lang, \'en\'_String), __applyFilter(_runtime_filter_UNIQ_ID_0, __table1.path)) (removed) + ShuffleExchange (by hash([__table2.path])) + ReadFromMergeTree (default.test) + Prewhere filter column: equals(__table2.lang, \'de\'_String) (removed) diff --git a/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.sql b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.sql new file mode 100644 index 000000000000..f5ad87493bb1 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_shuffle_join_with_prewhere.sql @@ -0,0 +1,40 @@ +-- Tags: no-old-analyzer + +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; +SET distributed_plan_optimize_exchanges = 1; + +CREATE TABLE test(path String, lang String, hits UInt64) ENGINE MergeTree() +ORDER BY tuple() +SETTINGS auto_statistics_types = 'tdigest,uniq,minmax'; + +SET materialize_statistics_on_insert = 1; + +INSERT INTO test SELECT 'path' || number::String, 'en', number FROM numbers(5); +INSERT INTO test SELECT 'path' || number::String, 'de', number FROM numbers(10); +INSERT INTO test SELECT 'path' || number::String, 'ua', number FROM numbers(15); +INSERT INTO test SELECT 'path' || number::String, 'jp', number FROM numbers(20); + +SET query_plan_join_swap_table = 0; + +SET + make_distributed_plan = 1, + enable_parallel_replicas = 0, + distributed_plan_default_shuffle_join_bucket_count=3, + distributed_plan_default_reader_bucket_count=3, + distributed_plan_force_exchange_kind='Streaming', + distributed_plan_max_rows_to_broadcast=0; + +SET enable_join_runtime_filters=1; +SET query_plan_optimize_prewhere = 1; +SET optimize_move_to_prewhere = 1; +SET query_plan_optimize_join_order_limit = 10; +SET use_statistics = 1, use_statistics_cache = 1; + +SELECT count() FROM test AS en, test AS de WHERE (en.path = de.path) AND (en.lang = 'en') AND (de.lang = 'de'); + +SELECT REGEXP_REPLACE(REGEXP_REPLACE(explain, '_runtime_filter_\\d+', '_runtime_filter_UNIQ_ID'), '\\[\\d+\\]', '[N]') AS explain FROM ( + EXPLAIN actions = 1 SELECT count() FROM test AS en, test AS de WHERE (en.path = de.path) AND (en.lang = 'en') AND (de.lang = 'de') +) WHERE + explain LIKE '%Join%' OR explain LIKE '%ReadFrom%' OR explain LIKE '%Aggregating%' OR explain LIKE '%Merging%' OR explain LIKE '%filter column%' + OR explain LIKE '%Shuffle%' OR explain LIKE '%Broadcast%' OR explain LIKE '%Scatter%' OR explain LIKE '%Gather%'; diff --git a/tests/queries/0_stateless/03394_distributed_sort.reference b/tests/queries/0_stateless/03394_distributed_sort.reference new file mode 100644 index 000000000000..aea91a2d6337 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_sort.reference @@ -0,0 +1,19 @@ +0 3 18 +1 3 13 +1 4 19 +2 4 14 +Expression (Project names) + GatherExchange (sorted by (__table1.dst_ip ASC, __table1.src_ip ASC, __table1.bytes ASC)) + Sorting (Sorting for ORDER BY) + ScatterExchange (any scatter) + Expression ((Before ORDER BY + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test) +------------------ +Expression (Project names) + GatherExchange (sorted by (__table1.dst_ip ASC, __table1.src_ip ASC, __table1.bytes ASC)) + Sorting (Sorting for ORDER BY) + ScatterExchange (any scatter) + Expression ((Before ORDER BY + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test) diff --git a/tests/queries/0_stateless/03394_distributed_sort.sh b/tests/queries/0_stateless/03394_distributed_sort.sh new file mode 100755 index 000000000000..71c55a1122f5 --- /dev/null +++ b/tests/queries/0_stateless/03394_distributed_sort.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# Tags: no-old-analyzer + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "CREATE TABLE test(src_ip UInt32, dst_ip UInt32, bytes UInt64) ENGINE MergeTree() ORDER BY src_ip" + +$CLICKHOUSE_CLIENT -q "INSERT INTO test SELECT number%3, number%4, number FROM numbers(10)" +$CLICKHOUSE_CLIENT -q "INSERT INTO test SELECT number%5, number%3, number FROM numbers(10, 10)" + +$CLICKHOUSE_CLIENT -q " +SELECT dst_ip, src_ip, bytes +FROM test +WHERE bytes > 5 AND src_ip > 2 +ORDER BY dst_ip, src_ip, bytes +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0" + +# The WHERE step may appear as either Expression or Filter depending on optimizer settings. +# Normalize to Expression so the test is deterministic with randomized settings. +$CLICKHOUSE_CLIENT -q " +EXPLAIN SELECT dst_ip, src_ip, bytes +FROM test +WHERE bytes > 5 AND src_ip > 2 +ORDER BY dst_ip, src_ip, bytes +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_optimize_exchanges=0" | sed 's/Filter ((WHERE/Expression ((WHERE/' + +echo '------------------' + +$CLICKHOUSE_CLIENT -q " +EXPLAIN SELECT dst_ip, src_ip, bytes +FROM test +WHERE bytes > 5 AND src_ip > 2 +ORDER BY dst_ip, src_ip, bytes +SETTINGS make_distributed_plan=1, enable_parallel_replicas=0, distributed_plan_optimize_exchanges=1" | sed 's/Filter ((WHERE/Expression ((WHERE/' diff --git a/tests/queries/0_stateless/04097_distributed_join_kinds.reference b/tests/queries/0_stateless/04097_distributed_join_kinds.reference new file mode 100644 index 000000000000..6d490df27963 --- /dev/null +++ b/tests/queries/0_stateless/04097_distributed_join_kinds.reference @@ -0,0 +1,195 @@ +-- INNER JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +350 91612.5 +350 91612.5 +-- LEFT JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +400 +400 +-- RIGHT JOIN (shuffle) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (by hash([__table1.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + ScatterExchange (by hash([__table2.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +400 +400 +-- FULL JOIN (shuffle) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (by hash([__table1.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + ScatterExchange (by hash([__table2.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +450 +450 +-- LEFT SEMI JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +350 +350 +-- LEFT ANTI JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +50 +50 +-- RIGHT SEMI JOIN (shuffle) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (by hash([__table1.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + ScatterExchange (by hash([__table2.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +350 +350 +-- RIGHT ANTI JOIN (shuffle) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (by hash([__table1.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + ScatterExchange (by hash([__table2.order_id])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +50 +50 +-- ANY INNER JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +350 +350 +-- ANY LEFT JOIN (broadcast) +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_orders) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_items) +400 +400 +-- ASOF JOIN (broadcast) +Expression (Project names) + GatherExchange (sorted by (__table1.symbol ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + Aggregating + ShuffleExchange (by hash([__table1.symbol])) + Expression (Before GROUP BY) + JoinLogical + ScatterExchange (any scatter) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_trades) + BroadcastExchange + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_quotes) +S0 40 4390 4370 +S1 40 4394 4382 +S2 40 4398 4384 +S3 40 4402 4386 +S4 40 4406 4388 +-- ASOF JOIN (shuffle) +Expression (Project names) + GatherExchange (sorted by (__table1.symbol ASC)) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + Aggregating + ShuffleExchange (by hash([__table1.symbol])) + Expression (Before GROUP BY) + JoinLogical + ShuffleExchange (by hash([__table1.symbol])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_trades) + ShuffleExchange (by hash([__table2.symbol])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.dist_quotes) +S0 40 4390 4370 +S1 40 4394 4382 +S2 40 4398 4384 +S3 40 4402 4386 +S4 40 4406 4388 +S0 40 4390 4370 +S1 40 4394 4382 +S2 40 4398 4384 +S3 40 4402 4386 +S4 40 4406 4388 diff --git a/tests/queries/0_stateless/04097_distributed_join_kinds.sql b/tests/queries/0_stateless/04097_distributed_join_kinds.sql new file mode 100644 index 000000000000..8d0b53168666 --- /dev/null +++ b/tests/queries/0_stateless/04097_distributed_join_kinds.sql @@ -0,0 +1,242 @@ +-- Test that the old heuristic-based distributed join (tryMakeDistributedJoin) +-- supports all join kinds and picks the correct distribution strategy. +-- +-- dist_items (right side) has 400 rows, well below the broadcast threshold +-- (20000), so broadcast is chosen when safe. For RIGHT and FULL joins, +-- broadcast is blocked because the right side can produce unmatched output +-- rows that would be duplicated across workers — shuffle is used instead. +-- +-- Each join kind is tested with: +-- 1. EXPLAIN to verify the chosen strategy (Broadcast vs Shuffle) +-- 2. Distributed execution result +-- 3. Single-node baseline for correctness comparison + +SET enable_analyzer = 1; +SET make_distributed_plan = 1; +SET enable_parallel_replicas = 0; +SET query_plan_use_new_logical_join_step = 1; +SET distributed_plan_default_shuffle_join_bucket_count = 4; +SET distributed_plan_force_exchange_kind = 'Persisted'; +-- Pin settings that affect plan shape to make EXPLAIN output stable. +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; +SET enable_join_runtime_filters = 0; +SET optimize_move_to_prewhere = 0; +SET query_plan_convert_outer_join_to_inner_join = 0; + +DROP TABLE IF EXISTS dist_orders; +DROP TABLE IF EXISTS dist_items; + +CREATE TABLE dist_orders ( + order_id UInt64, + customer String +) ENGINE = MergeTree ORDER BY order_id + SETTINGS index_granularity = 8192, auto_statistics_types = ''; + +CREATE TABLE dist_items ( + item_id UInt64, + order_id UInt64, + amount Decimal(10, 2) +) ENGINE = MergeTree ORDER BY item_id + SETTINGS index_granularity = 8192, auto_statistics_types = ''; + +-- 4 parts for dist_orders: orders 0..399 +SYSTEM STOP MERGES dist_orders; +INSERT INTO dist_orders SELECT number, 'C' || toString(number % 10) FROM numbers(100); +INSERT INTO dist_orders SELECT number + 100, 'C' || toString((number + 100) % 10) FROM numbers(100); +INSERT INTO dist_orders SELECT number + 200, 'C' || toString((number + 200) % 10) FROM numbers(100); +INSERT INTO dist_orders SELECT number + 300, 'C' || toString((number + 300) % 10) FROM numbers(100); + +-- 4 parts for dist_items: items for orders 50..449 (overlap: 50..399 match, 400..449 don't) +SYSTEM STOP MERGES dist_items; +INSERT INTO dist_items SELECT number, number + 50, toDecimal64(number * 1.5, 2) FROM numbers(100); +INSERT INTO dist_items SELECT number + 100, number + 150, toDecimal64((number + 100) * 1.5, 2) FROM numbers(100); +INSERT INTO dist_items SELECT number + 200, number + 250, toDecimal64((number + 200) * 1.5, 2) FROM numbers(100); +INSERT INTO dist_items SELECT number + 300, number + 350, toDecimal64((number + 300) * 1.5, 2) FROM numbers(100); + + +-- INNER JOIN: broadcast (safe, right side small) +SELECT '-- INNER JOIN (broadcast)'; +EXPLAIN PLAN SELECT count(), sum(amount) +FROM dist_orders INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count(), sum(amount) +FROM dist_orders INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count(), sum(amount) +FROM dist_orders INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- LEFT JOIN: broadcast (safe, right side small) +SELECT '-- LEFT JOIN (broadcast)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- RIGHT JOIN: shuffle (broadcast blocked — right side produces unmatched rows) +SELECT '-- RIGHT JOIN (shuffle)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders RIGHT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- FULL JOIN: shuffle (broadcast blocked — both sides produce unmatched rows) +SELECT '-- FULL JOIN (shuffle)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders FULL JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders FULL JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders FULL JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- LEFT SEMI JOIN: broadcast (safe, right side small) +SELECT '-- LEFT SEMI JOIN (broadcast)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders LEFT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- LEFT ANTI JOIN: broadcast (safe, right side small) +SELECT '-- LEFT ANTI JOIN (broadcast)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders LEFT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders LEFT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- RIGHT SEMI JOIN: shuffle (broadcast blocked — kind is RIGHT) +SELECT '-- RIGHT SEMI JOIN (shuffle)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders RIGHT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT SEMI JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- RIGHT ANTI JOIN: shuffle (broadcast blocked — kind is RIGHT) +SELECT '-- RIGHT ANTI JOIN (shuffle)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders RIGHT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders RIGHT ANTI JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- ANY INNER JOIN: broadcast (safe, right side small) +SELECT '-- ANY INNER JOIN (broadcast)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders ANY INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders ANY INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders ANY INNER JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- ANY LEFT JOIN: broadcast (safe, right side small) +SELECT '-- ANY LEFT JOIN (broadcast)'; +EXPLAIN PLAN SELECT count() +FROM dist_orders ANY LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders ANY LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id; + +SELECT count() +FROM dist_orders ANY LEFT JOIN dist_items ON dist_orders.order_id = dist_items.order_id +SETTINGS make_distributed_plan = 0; + + +-- ASOF JOIN: broadcast (equality predicate is sufficient for shuffle partitioning, +-- the HashJoin ASOF implementation sorts the right side internally per equality-key +-- bucket, so input order after shuffle does not matter) +DROP TABLE IF EXISTS dist_trades; +DROP TABLE IF EXISTS dist_quotes; + +CREATE TABLE dist_trades (symbol String, ts DateTime, price Decimal(10, 2)) +ENGINE = MergeTree ORDER BY (symbol, ts) +SETTINGS index_granularity = 8192, auto_statistics_types = ''; + +CREATE TABLE dist_quotes (symbol String, ts DateTime, bid Decimal(10, 2)) +ENGINE = MergeTree ORDER BY (symbol, ts) +SETTINGS index_granularity = 8192, auto_statistics_types = ''; + +-- Multiple symbols across multiple parts to exercise shuffle partitioning. +SYSTEM STOP MERGES dist_trades; +INSERT INTO dist_trades SELECT 'S' || toString(number % 5), toDateTime('2024-01-01 10:00:00') + number * 60, toDecimal64(100 + number * 0.1, 2) FROM numbers(100); +INSERT INTO dist_trades SELECT 'S' || toString(number % 5), toDateTime('2024-01-01 10:00:00') + (number + 100) * 60, toDecimal64(100 + (number + 100) * 0.1, 2) FROM numbers(100); + +SYSTEM STOP MERGES dist_quotes; +INSERT INTO dist_quotes SELECT 'S' || toString(number % 5), toDateTime('2024-01-01 09:58:00') + number * 30, toDecimal64(99.5 + number * 0.05, 2) FROM numbers(200); +INSERT INTO dist_quotes SELECT 'S' || toString(number % 5), toDateTime('2024-01-01 09:58:00') + (number + 200) * 30, toDecimal64(99.5 + (number + 200) * 0.05, 2) FROM numbers(200); + +SELECT '-- ASOF JOIN (broadcast)'; +EXPLAIN PLAN SELECT symbol, count(), sum(price), sum(bid) +FROM dist_trades ASOF LEFT JOIN dist_quotes ON dist_trades.symbol = dist_quotes.symbol AND dist_trades.ts >= dist_quotes.ts +GROUP BY symbol ORDER BY symbol; + +SELECT symbol, count(), sum(price), sum(bid) +FROM dist_trades ASOF LEFT JOIN dist_quotes ON dist_trades.symbol = dist_quotes.symbol AND dist_trades.ts >= dist_quotes.ts +GROUP BY symbol ORDER BY symbol; + +-- Force shuffle by setting broadcast threshold to 0. +SELECT '-- ASOF JOIN (shuffle)'; +EXPLAIN PLAN SELECT symbol, count(), sum(price), sum(bid) +FROM dist_trades ASOF LEFT JOIN dist_quotes ON dist_trades.symbol = dist_quotes.symbol AND dist_trades.ts >= dist_quotes.ts +GROUP BY symbol ORDER BY symbol +SETTINGS distributed_plan_max_rows_to_broadcast = 0; + +SELECT symbol, count(), sum(price), sum(bid) +FROM dist_trades ASOF LEFT JOIN dist_quotes ON dist_trades.symbol = dist_quotes.symbol AND dist_trades.ts >= dist_quotes.ts +GROUP BY symbol ORDER BY symbol +SETTINGS distributed_plan_max_rows_to_broadcast = 0; + +-- Single-node baseline. +SELECT symbol, count(), sum(price), sum(bid) +FROM dist_trades ASOF LEFT JOIN dist_quotes ON dist_trades.symbol = dist_quotes.symbol AND dist_trades.ts >= dist_quotes.ts +GROUP BY symbol ORDER BY symbol +SETTINGS make_distributed_plan = 0; + +DROP TABLE dist_trades; +DROP TABLE dist_quotes; + +DROP TABLE dist_orders; +DROP TABLE dist_items; diff --git a/tests/queries/0_stateless/04105_distributed_final_replacing.reference b/tests/queries/0_stateless/04105_distributed_final_replacing.reference new file mode 100644 index 000000000000..51f104a032a8 --- /dev/null +++ b/tests/queries/0_stateless/04105_distributed_final_replacing.reference @@ -0,0 +1,4 @@ +-- Local +100000 200000 100000 +-- Distributed +100000 200000 100000 diff --git a/tests/queries/0_stateless/04105_distributed_final_replacing.sql b/tests/queries/0_stateless/04105_distributed_final_replacing.sql new file mode 100644 index 000000000000..8e45f42a33da --- /dev/null +++ b/tests/queries/0_stateless/04105_distributed_final_replacing.sql @@ -0,0 +1,24 @@ +-- Tags: no-old-analyzer +-- Regression test: distributed query plan on SELECT FINAL from engines with specialized merging +-- (Replacing, Collapsing, ...) must not reroute same-sort-key rows to different buckets, or +-- deduplication is broken. + +DROP TABLE IF EXISTS t_replacing_final_correctness; + +CREATE TABLE t_replacing_final_correctness(pk UInt64, version UInt64, val String) ENGINE = ReplacingMergeTree(version) ORDER BY pk; + +SYSTEM STOP MERGES t_replacing_final_correctness; + +-- Two parts with full PK overlap and different versions. +INSERT INTO t_replacing_final_correctness SELECT number, 1, 'old' FROM numbers(100000); +INSERT INTO t_replacing_final_correctness SELECT number, 2, 'new' FROM numbers(100000); + +SELECT '-- Local'; +SELECT count(), sum(version), uniqExact(pk) FROM t_replacing_final_correctness FINAL; + +SELECT '-- Distributed'; +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SELECT count(), sum(version), uniqExact(pk) FROM t_replacing_final_correctness FINAL +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, max_rows_to_group_by = 0; + +DROP TABLE t_replacing_final_correctness; diff --git a/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.reference b/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.reference new file mode 100644 index 000000000000..f2b3ff1fe936 --- /dev/null +++ b/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.reference @@ -0,0 +1,17 @@ +-- Local +1000000 +-- Distributed plan +Expression ((Project names + Projection)) + MergingAggregated (merge) + GatherExchange + Aggregating (partial) + Expression (Before GROUP BY) + JoinLogical + ShuffleExchange (by hash([__table1.k])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.t_shuffle_join_left) + ShuffleExchange (by hash([__table2.k])) + Expression (Change column names to column identifiers) + ReadFromMergeTree (default.t_shuffle_join_right) +-- Distributed +1000000 diff --git a/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.sql b/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.sql new file mode 100644 index 000000000000..cf4ed6dc0364 --- /dev/null +++ b/tests/queries/0_stateless/04305_distributed_shuffle_join_type_mixed.sql @@ -0,0 +1,43 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- Regression test: shuffle-join key types differ on left and right side. +-- Without casting to a common supertype, the scatter step on each side uses different +-- hashing because of different physical types, so matching rows are routed to different +-- buckets and the join silently drops them. + +-- Reset the global max_rows_to_group_by; distributed aggregation rejects a nonzero limit. +SET max_rows_to_group_by = 0; + +DROP TABLE IF EXISTS t_shuffle_join_left; +DROP TABLE IF EXISTS t_shuffle_join_right; + +CREATE TABLE t_shuffle_join_left (k Int8, v UInt32) ENGINE = MergeTree ORDER BY k; +CREATE TABLE t_shuffle_join_right (k Int64, v UInt32) ENGINE = MergeTree ORDER BY k; + +INSERT INTO t_shuffle_join_left SELECT (number % 100) - 50, number FROM numbers(10000); +INSERT INTO t_shuffle_join_right SELECT (number % 100) - 50, number * 2 FROM numbers(10000); + +SELECT '-- Local'; +SELECT count() FROM t_shuffle_join_left AS l JOIN t_shuffle_join_right AS r ON l.k = r.k; + +SELECT '-- Distributed plan'; +EXPLAIN SELECT count() FROM t_shuffle_join_left AS l JOIN t_shuffle_join_right AS r ON l.k = r.k +SETTINGS + make_distributed_plan = 1, + enable_parallel_replicas = 0, + distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, + enable_join_runtime_filters = 0; + +SELECT '-- Distributed'; +SELECT count() FROM t_shuffle_join_left AS l JOIN t_shuffle_join_right AS r ON l.k = r.k +SETTINGS + make_distributed_plan = 1, + enable_parallel_replicas = 0, + distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, + enable_join_runtime_filters = 0; + +DROP TABLE t_shuffle_join_left; +DROP TABLE t_shuffle_join_right; diff --git a/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.reference b/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.reference new file mode 100644 index 000000000000..01dd3c633a29 --- /dev/null +++ b/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.reference @@ -0,0 +1,2 @@ +-- GROUPING SETS rejected +-- max_rows_to_group_by rejected diff --git a/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.sql b/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.sql new file mode 100644 index 000000000000..e536ffa5b122 --- /dev/null +++ b/tests/queries/0_stateless/04306_distributed_aggregation_correctness_guards.sql @@ -0,0 +1,25 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- Regression test: make_distributed_plan rejects aggregations it cannot distribute correctly, +-- rather than silently running them single-node. +-- * GROUPING SETS: shuffle scatters by the full key set, so subtotals (over key subsets) would be +-- produced independently in several buckets and duplicated. +-- * A global GROUP BY limit (max_rows_to_group_by) cannot be enforced once split per bucket. + +DROP TABLE IF EXISTS t_agg_guard; + +CREATE TABLE t_agg_guard (a UInt32, b UInt32, v UInt32) ENGINE = MergeTree ORDER BY (a, b); +INSERT INTO t_agg_guard SELECT number % 10, number % 7, number FROM numbers(100000); + +SET make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 1000000000, enable_join_runtime_filters = 0; + +SELECT '-- GROUPING SETS rejected'; +SELECT a, b, sum(v) AS s FROM t_agg_guard GROUP BY GROUPING SETS ((a), (b), ()); -- { serverError SUPPORT_IS_DISABLED } + +SELECT '-- max_rows_to_group_by rejected'; +SELECT a, sum(v) FROM t_agg_guard GROUP BY a +SETTINGS max_rows_to_group_by = 5; -- { serverError SUPPORT_IS_DISABLED } + +DROP TABLE t_agg_guard; diff --git a/tests/queries/0_stateless/04307_distributed_read_error_terminates.reference b/tests/queries/0_stateless/04307_distributed_read_error_terminates.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/04307_distributed_read_error_terminates.sql b/tests/queries/0_stateless/04307_distributed_read_error_terminates.sql new file mode 100644 index 000000000000..458134772560 --- /dev/null +++ b/tests/queries/0_stateless/04307_distributed_read_error_terminates.sql @@ -0,0 +1,20 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- Regression test: when a distributed read errors mid-flight, the query must terminate with that +-- error rather than hang. The local in-memory exchanges are cancelled on teardown so tasks waiting +-- for input do not block forever. max_rows_to_read trips during the distributed read while it is +-- still feeding the downstream aggregation. + +DROP TABLE IF EXISTS t_distr_read_error; + +CREATE TABLE t_distr_read_error (a UInt32, v UInt32) ENGINE = MergeTree ORDER BY a; +INSERT INTO t_distr_read_error SELECT number % 10, number FROM numbers(100000); + +SELECT a, sum(v) FROM t_distr_read_error GROUP BY a +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, distributed_plan_default_reader_bucket_count = 4, + enable_join_runtime_filters = 0, max_rows_to_group_by = 0, + max_rows_to_read = 10, read_overflow_mode = 'throw'; -- { serverError TOO_MANY_ROWS } + +DROP TABLE t_distr_read_error; diff --git a/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.reference b/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.reference new file mode 100644 index 000000000000..bf7b5aaba123 --- /dev/null +++ b/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.reference @@ -0,0 +1,3 @@ +-- WITH TOTALS +-- ROLLUP +-- CUBE diff --git a/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.sql b/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.sql new file mode 100644 index 000000000000..7fff5bd8f102 --- /dev/null +++ b/tests/queries/0_stateless/04308_distributed_totals_rollup_cube_not_distributed.sql @@ -0,0 +1,25 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- Regression test: WITH TOTALS / ROLLUP / CUBE produce extra streams (a totals stream, or subtotal +-- rows from a Rollup/Cube step) that the distributed exchange protocol does not carry. make_distributed_plan +-- rejects such plans rather than silently running them single-node. + +DROP TABLE IF EXISTS t_totals_guard; + +CREATE TABLE t_totals_guard (a UInt32, v UInt32) ENGINE = MergeTree ORDER BY a; +INSERT INTO t_totals_guard SELECT number % 10, number FROM numbers(100000); + +SET make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, enable_join_runtime_filters = 0, max_rows_to_group_by = 0; + +SELECT '-- WITH TOTALS'; +SELECT a, sum(v) FROM t_totals_guard GROUP BY a WITH TOTALS ORDER BY a; -- { serverError SUPPORT_IS_DISABLED } + +SELECT '-- ROLLUP'; +SELECT a, sum(v) FROM t_totals_guard GROUP BY a WITH ROLLUP ORDER BY a; -- { serverError SUPPORT_IS_DISABLED } + +SELECT '-- CUBE'; +SELECT a, sum(v) FROM t_totals_guard GROUP BY a WITH CUBE ORDER BY a; -- { serverError SUPPORT_IS_DISABLED } + +DROP TABLE t_totals_guard; diff --git a/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.reference b/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.reference new file mode 100644 index 000000000000..cf0981de79a3 --- /dev/null +++ b/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.reference @@ -0,0 +1,5 @@ +0 100 +1 100 +2 100 +3 100 +4 100 diff --git a/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.sql b/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.sql new file mode 100644 index 000000000000..f83f65843e78 --- /dev/null +++ b/tests/queries/0_stateless/04309_distributed_aggregation_persisted_exchange.sql @@ -0,0 +1,18 @@ +-- Tags: no-old-analyzer + +-- Regression test: distributed partial aggregation delivered through a persisted exchange must not +-- deadlock. The result reader drains final_result after the driver (the executor) has finished, so +-- the query's in-memory exchanges must outlive the executor rather than be removed on its completion. +-- A high distributed_plan_max_rows_to_broadcast keeps the aggregation on the partial+merge path. + +DROP TABLE IF EXISTS t_agg_persisted; + +CREATE TABLE t_agg_persisted (k UInt32, v UInt32) ENGINE = MergeTree ORDER BY k; +INSERT INTO t_agg_persisted SELECT number % 500, number FROM numbers(50000); + +SELECT k, count() AS c FROM t_agg_persisted GROUP BY k ORDER BY k LIMIT 5 +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 1000000000, distributed_plan_force_exchange_kind = 'Persisted', + enable_join_runtime_filters = 0, max_rows_to_group_by = 0; + +DROP TABLE t_agg_persisted; diff --git a/tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.reference b/tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.sql b/tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.sql new file mode 100644 index 000000000000..b15249382c68 --- /dev/null +++ b/tests/queries/0_stateless/04310_distributed_unserializable_step_rejected.sql @@ -0,0 +1,17 @@ +-- Tags: no-old-analyzer + +-- Regression test: make_distributed_plan fails early and explicitly when a stage fragment contains a +-- step that cannot be serialized for remote execution (here a window function), instead of failing +-- late mid-execution with a generic "Method serialize is not implemented" error. + +DROP TABLE IF EXISTS t_unserializable; + +CREATE TABLE t_unserializable (a UInt32, v UInt32) ENGINE = MergeTree ORDER BY a; +INSERT INTO t_unserializable SELECT number % 10, number FROM numbers(100000); + +SELECT a, sum(v) OVER (PARTITION BY a) AS w FROM t_unserializable +SETTINGS make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, distributed_plan_default_reader_bucket_count = 4, + enable_join_runtime_filters = 0, max_rows_to_group_by = 0; -- { serverError SUPPORT_IS_DISABLED } + +DROP TABLE t_unserializable; diff --git a/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.reference b/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.reference new file mode 100644 index 000000000000..3a48788e7543 --- /dev/null +++ b/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.reference @@ -0,0 +1 @@ +\N \N diff --git a/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.sql b/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.sql new file mode 100644 index 000000000000..da3c074ec357 --- /dev/null +++ b/tests/queries/0_stateless/04319_distributed_plan_set_operation_const_columns.sql @@ -0,0 +1,44 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- A distributed set operation (UNION / INTERSECT / EXCEPT) can produce a column as a constant in one +-- branch and as a full column (aliased from an exchange) in another. Plan serialization re-derives +-- constness per step, so the branches used to mismatch at the strict set-operation header check. +-- Constants are now materialized on every branch so they agree. + +DROP TABLE IF EXISTS t_union_const; +CREATE TABLE t_union_const (x UInt64) ENGINE = MergeTree ORDER BY tuple(); +INSERT INTO t_union_const SELECT number FROM numbers(1000000); + +-- Distributed aggregation cannot enforce a global max_rows_to_group_by, so pin it to 0. +SET max_rows_to_group_by = 0; + +SET make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1; + +-- UNION: a constant in one branch vs a full column aliased from an exchange in the other. +SELECT DISTINCT toFixedString(NULL, 'null'), minus(NULL, (SELECT NULL)) +FROM t_union_const WHERE x < 1 GROUP BY 1, NULL +UNION DISTINCT +SELECT DISTINCT toFixedString(NULL, 'null'), minus(NULL, (SELECT NULL)) +FROM t_union_const WHERE x < 9 GROUP BY 1, NULL; + +DROP TABLE t_union_const; + +-- INTERSECT / EXCEPT with a UNION of constants inside each input. The set operation must be the root +-- step to reproduce (a parent step would be distributed and reject the unserializable set operation), +-- so the result cannot be ordered; use FORMAT Null. +SET distributed_plan_max_rows_to_broadcast = 0; + +WITH cte AS (SELECT DISTINCT -9223372036854775808 UNION ALL SELECT DISTINCT NULL LIMIT 1025) +SELECT DISTINCT toNullable(NULL), * FROM cte LIMIT 65536 +INTERSECT DISTINCT +WITH cte AS (SELECT DISTINCT -9223372036854775808 UNION ALL SELECT DISTINCT NULL LIMIT 1025) +SELECT DISTINCT NULL, * FROM cte LIMIT 65536 +FORMAT Null; + +WITH cte AS (SELECT DISTINCT -9223372036854775808 UNION ALL SELECT DISTINCT NULL LIMIT 1025) +SELECT DISTINCT toNullable(NULL), * FROM cte LIMIT 65536 +EXCEPT DISTINCT +WITH cte AS (SELECT DISTINCT 0 UNION ALL SELECT DISTINCT NULL LIMIT 1025) +SELECT DISTINCT NULL, * FROM cte LIMIT 65536 +FORMAT Null; diff --git a/tests/queries/0_stateless/04320_distributed_plan_read_rejects.reference b/tests/queries/0_stateless/04320_distributed_plan_read_rejects.reference new file mode 100644 index 000000000000..e613ab682c66 --- /dev/null +++ b/tests/queries/0_stateless/04320_distributed_plan_read_rejects.reference @@ -0,0 +1,2 @@ +19999900000 +19999900000 diff --git a/tests/queries/0_stateless/04320_distributed_plan_read_rejects.sql b/tests/queries/0_stateless/04320_distributed_plan_read_rejects.sql new file mode 100644 index 000000000000..85c740037e9c --- /dev/null +++ b/tests/queries/0_stateless/04320_distributed_plan_read_rejects.sql @@ -0,0 +1,26 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +DROP TABLE IF EXISTS t_read_rejects; +CREATE TABLE t_read_rejects (x UInt64) ENGINE = MergeTree ORDER BY tuple(); +INSERT INTO t_read_rejects SELECT number FROM numbers(200000); + +-- Distributed aggregation cannot enforce a global max_rows_to_group_by, so pin it to 0 (randomized +-- settings set it nonzero, which would make make_distributed_plan reject the count/sum below). +SET max_rows_to_group_by = 0; +SET make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0; + +-- A distributed read cannot reproduce the coordinator's part ordering, so the part-order virtual +-- columns are rejected at planning time (rather than silently returning worker-local values). +SELECT _part_index FROM t_read_rejects; -- { serverError SUPPORT_IS_DISABLED } +SELECT _part_starting_offset FROM t_read_rejects; -- { serverError SUPPORT_IS_DISABLED } + +-- _part_offset alone is per-part and order-independent, so it stays supported. +SELECT sum(_part_offset) FROM t_read_rejects; + +-- A per-block function must keep its global numbering: the GatherExchange is not pushed below it, +-- so the row numbers stay a single 0..N-1 sequence (sum is order-independent). +SELECT sum(rn) FROM (SELECT rowNumberInAllBlocks() AS rn FROM t_read_rejects); + +DROP TABLE t_read_rejects; diff --git a/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.reference b/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.reference new file mode 100644 index 000000000000..fc7bed0c3a6c --- /dev/null +++ b/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.reference @@ -0,0 +1,3 @@ +200000 +99999 +99999 diff --git a/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.sql b/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.sql new file mode 100644 index 000000000000..c603a5f4e2e6 --- /dev/null +++ b/tests/queries/0_stateless/04321_distributed_plan_count_implicit_projection.sql @@ -0,0 +1,32 @@ +-- Tags: no-old-analyzer +-- no-old-analyzer: make_distributed_plan requires the analyzer. + +-- The implicit count/minmax projection counts rows from part metadata. A distributed read buckets the +-- part across workers; if the projection is left enabled it is replicated to every bucket and counts +-- the whole part each time, so the result is multiplied by the bucket count. These distributed counts +-- must match a single-node read. + +DROP TABLE IF EXISTS t_one; +DROP TABLE IF EXISTS t_cnt; +CREATE TABLE t_one (x UInt64) ENGINE = MergeTree ORDER BY tuple(); +CREATE TABLE t_cnt (k UInt64, v UInt64) ENGINE = MergeTree ORDER BY k; +INSERT INTO t_one SELECT number FROM numbers(200000); +INSERT INTO t_cnt SELECT number, number FROM numbers(200000); + +-- Pin the implicit projection on so the bug path is exercised; the fix disables it for distributed +-- plans regardless. Without pinning, randomized settings can turn it off and mask the regression. +-- Pin max_rows_to_group_by to 0 too: distributed aggregation rejects a nonzero limit (randomized). +SET max_rows_to_group_by = 0; +SET make_distributed_plan = 1, enable_parallel_replicas = 0, distributed_plan_execute_locally = 1, + distributed_plan_max_rows_to_broadcast = 0, distributed_plan_default_reader_bucket_count = 8, + optimize_use_implicit_projections = 1; + +-- Trivial count over a distributed read (counted from part metadata). +SELECT count() FROM (SELECT x FROM t_one); +-- Primary-key range: full granules counted from the index, only the boundary granule scanned. +SELECT count() FROM t_cnt WHERE k > 100000; +-- Non-index filter: a real per-row scan (always correct; guards against regression the other way). +SELECT count() FROM t_cnt WHERE v > 100000; + +DROP TABLE t_one; +DROP TABLE t_cnt; From 4087e7a7f27801e5f68b549dfafb8f2e2b9b0b46 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 2 Jul 2026 10:28:58 +0200 Subject: [PATCH 2/2] Fix after cherry-pick --- .../QueryPlan/BroadcastReceiveStep.cpp | 2 +- .../Optimizations/makeDistributed.cpp | 6 -- src/Processors/QueryPlan/QueryPlan.h | 4 ++ .../QueryPlan/ReadFromMergeTree.cpp | 71 +------------------ src/Processors/QueryPlan/ReadFromMergeTree.h | 7 ++ .../QueryPlan/ShuffleReceiveStep.cpp | 2 +- src/QueryPipeline/DistributedPlanExecutor.cpp | 2 - .../StatelessWorker/StatelessWorkerClient.cpp | 26 +++---- 8 files changed, 27 insertions(+), 93 deletions(-) diff --git a/src/Processors/QueryPlan/BroadcastReceiveStep.cpp b/src/Processors/QueryPlan/BroadcastReceiveStep.cpp index 633ad85be66a..e5a37f1c15c3 100644 --- a/src/Processors/QueryPlan/BroadcastReceiveStep.cpp +++ b/src/Processors/QueryPlan/BroadcastReceiveStep.cpp @@ -18,7 +18,7 @@ void BroadcastReceiveStep::initializePipeline(QueryPipelineBuilder & pipeline, c { const String bucket_id = settings.parameter_lookup->getParameter("bucket_id").safeGet(); - VectorWithMemoryTracking> pipelines; + std::vector> pipelines; /// Read all shards for (const String & shard_id : source_shards) diff --git a/src/Processors/QueryPlan/Optimizations/makeDistributed.cpp b/src/Processors/QueryPlan/Optimizations/makeDistributed.cpp index 4c436448749c..0108d4224e27 100644 --- a/src/Processors/QueryPlan/Optimizations/makeDistributed.cpp +++ b/src/Processors/QueryPlan/Optimizations/makeDistributed.cpp @@ -619,12 +619,6 @@ void optimizeExchanges(QueryPlan::Node & root) bool can_move_gather_up = true; - /// Per-block functions (`rowNumberInAllBlocks`, `blockNumber`, `nowInBlock`, ...) - /// depend on the whole block stream; below a gather they would run per shard and - /// produce different values. Keep such a step above the gather. - if (dag && dagContainsNonDeterministicFunction(*dag)) - can_move_gather_up = false; - /// Moving the sorted GatherExchange above the step is only valid if every sort column /// survives the step unchanged - otherwise GatherReceive would merge by a sort /// description that no longer matches the data. Expression/Filter may recompute or diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 7aece7baa08b..1618d05e4e6f 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -112,6 +113,9 @@ class QueryPlan void resolveStorages(const ContextPtr & context); void optimize(const QueryPlanOptimizationSettings & optimization_settings); + /// Converts the original plan to distributed plan and replaces the original plan with a plan that + /// contains a step that executes the distributed plan and a step that receives the result. + void convertToDistributed(const QueryPlanOptimizationSettings & optimization_settings); QueryPipelineBuilderPtr buildQueryPipeline( const QueryPlanOptimizationSettings & optimization_settings, diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index ecbc502532ed..0e20406785d7 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -3283,66 +3283,6 @@ bool ReadFromMergeTree::supportsSkipIndexesOnDataRead() const static const char * indexTypeToString(ReadFromMergeTree::IndexType type); -void ReadFromMergeTree::logPredicateStatistics(const AnalysisResult & result) const -{ - UInt64 sample_rate = context->getSettingsRef()[Setting::predicate_statistics_sample_rate]; - if (sample_rate == 0) - return; - - if (sample_rate > 1) - { - auto qid = CurrentThread::getQueryId(); - if (CityHash_v1_0_2::CityHash64(qid.data(), qid.size()) % sample_rate != 0) - return; - } - - auto predicate_stats_log = context->getPredicateStatisticsLog(); - if (!predicate_stats_log) - return; - - if (result.index_stats.empty()) - return; - - auto storage_id = data.getStorageID(); - if (storage_id.database_name.empty()) - return; - - PredicateStatisticsLogElement elem; - auto now = time(nullptr); - elem.event_date = static_cast(DateLUT::instance().toDayNum(now)); - elem.event_time = now; - elem.database = storage_id.database_name; - elem.table = storage_id.table_name; - elem.query_id = String(CurrentThread::getQueryId()); - - UInt64 prev_granules = 0; - for (const auto & stat : result.index_stats) - { - if (stat.type == IndexType::None) - { - prev_granules = stat.num_granules_after; - continue; - } - - if (!stat.part_name.empty()) - continue; - - UInt64 total = prev_granules > 0 ? prev_granules : stat.num_granules_after; - UInt64 after = stat.num_granules_after; - - elem.index_names.push_back(stat.name.empty() ? indexTypeToString(stat.type) : stat.name); - elem.index_types.push_back(indexTypeToString(stat.type)); - elem.total_granules.push_back(total); - elem.granules_after.push_back(after); - elem.index_selectivities.push_back(total > 0 ? static_cast(after) / static_cast(total) : 1.0); - - prev_granules = after; - } - - if (!elem.index_names.empty()) - predicate_stats_log->add(std::move(elem)); -} - MarkRanges filterMarkRangesForBucket(const MarkRanges & ranges, size_t & effective_bucket_index, size_t total_buckets) { MarkRanges result; @@ -3363,8 +3303,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, [[ma { auto & result = getAnalysisResult(); - logPredicateStatistics(result); - /// Filter ranges by 'bucket_id' parameter so that each distributed worker reads only its slice of the parts. if (distributed_read_bucket_count > 0 && settings.parameter_lookup) { @@ -4458,13 +4396,6 @@ Strings ReadFromMergeTree::getShardsForDistributedRead() const void ReadFromMergeTree::serialize(Serialization & ctx) const { - /// Serializing the STREAM modifier is not implemented yet, so reject it instead of silently - /// reading a plain snapshot. (Pinned block boundaries and part-order virtual columns are rejected - /// earlier in checkDistributedReadSupported.) - if (query_info.isStream()) - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, - "make_distributed_plan does not support a distributed read with the STREAM modifier"); - /// Needed only for a bucketed read: it is pinned to the coordinator's part list and cannot /// re-derive read-in-order, deferred FINAL filters, a projection, or text index tasks. A /// non-bucket read is rebuilt and re-optimized on the worker, which re-derives them. @@ -4614,7 +4545,7 @@ std::unique_ptr ReadFromMergeTree::deserialize(Deserialization & MergeTreeData & table = dynamic_cast(*storage_ptr); MergeTreeDataSelectExecutor executor(table); - StorageSnapshotPtr storage_snapshot = table.getStorageSnapshot(table.getInMemoryMetadataPtr(ctx.context, false), ctx.context); + StorageSnapshotPtr storage_snapshot = table.getStorageSnapshot(table.getInMemoryMetadataPtr(), ctx.context); const auto & snapshot_data = assert_cast(*storage_snapshot->data); auto step = executor.readFromParts( diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index bdcb8b6a3a55..061018a7bb7b 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -385,8 +385,12 @@ class ReadFromMergeTree final : public SourceStepWithFilter bool isSkipIndexAvailableForTopK(const String & sort_column) const; const ProjectionIndexReadDescription & getProjectionIndexReadDescription() const { return projection_index_read_desc; } ProjectionIndexReadDescription & getProjectionIndexReadDescription() { return projection_index_read_desc; } + /// In distributed query plan, this step will be executed in a distributed manner - shards will be read in parallel. + void setDistributedRead(size_t bucket_count); /// Parts (by name) every worker buckets over, so the partition is identical across replicas. void setDistributedReadParts(Names part_names); + /// Makes a list of shards to read in parallel in distributed query plan + Strings getShardsForDistributedRead() const; bool canRemoveUnusedColumns() const override; RemovedUnusedColumns removeUnusedColumns(NameMultiSet required_outputs, bool remove_inputs) override; @@ -543,6 +547,9 @@ class ReadFromMergeTree final : public SourceStepWithFilter std::optional top_k_filter_info; ProjectionIndexReadDescription projection_index_read_desc; + /// This is set when this step is part of a distributed query plan and it will be executed in a distributed manner. + /// "bucket_id" task parameter will be used to determine what part of the data to read. + size_t distributed_read_bucket_count = 0; /// Coordinator-selected parts a distributed-read worker buckets over. Empty otherwise. Names distributed_read_part_names; }; diff --git a/src/Processors/QueryPlan/ShuffleReceiveStep.cpp b/src/Processors/QueryPlan/ShuffleReceiveStep.cpp index a48ec117c080..252e78f585d7 100644 --- a/src/Processors/QueryPlan/ShuffleReceiveStep.cpp +++ b/src/Processors/QueryPlan/ShuffleReceiveStep.cpp @@ -17,7 +17,7 @@ void ShuffleReceiveStep::initializePipeline(QueryPipelineBuilder & pipeline, con { const String bucket_id = settings.parameter_lookup->getParameter("bucket_id").safeGet(); - VectorWithMemoryTracking> pipelines; + std::vector> pipelines; /// Read all shards for (const String & shard_id : source_shards) diff --git a/src/QueryPipeline/DistributedPlanExecutor.cpp b/src/QueryPipeline/DistributedPlanExecutor.cpp index 8e8d793a6904..ac8ca476e0fd 100644 --- a/src/QueryPipeline/DistributedPlanExecutor.cpp +++ b/src/QueryPipeline/DistributedPlanExecutor.cpp @@ -627,12 +627,10 @@ void doExecuteTask(const DistributedQueryTaskDescription & task_description, Obj /// swaps the join sides while the others don't), breaking exchange partitioning. optimization_settings.join_swap_table = std::make_optional(false); optimization_settings.query_plan_optimize_join_order_limit = 0; - optimization_settings.query_plan_optimize_join_order_randomize = 0; optimization_settings.convert_join_to_in = false; optimization_settings.convert_outer_join_to_inner_join = false; optimization_settings.convert_any_join_to_semi_or_anti_join = false; optimization_settings.merge_filter_into_join_condition = false; - optimization_settings.top_k_through_join = false; /// The fragment's read is bucketed; re-introducing the implicit count projection would count the /// whole part per bucket. Keep it off so counts read the bucket's mark ranges. diff --git a/src/Server/StatelessWorker/StatelessWorkerClient.cpp b/src/Server/StatelessWorker/StatelessWorkerClient.cpp index 7e17a706faf3..6dd4696c263b 100644 --- a/src/Server/StatelessWorker/StatelessWorkerClient.cpp +++ b/src/Server/StatelessWorker/StatelessWorkerClient.cpp @@ -31,9 +31,9 @@ String doSendTask(const String & endpoint_uri, const String & task_id, std::func timeouts.receive_timeout = Poco::Timespan(100 * 1000 * 1000); ReadSettings read_settings; /// Not safe to retry: worker would schedule a duplicate task. - read_settings.http_settings.max_tries = 1; - read_settings.http_settings.retry_initial_backoff_ms = 500; - read_settings.http_settings.retry_max_backoff_ms = 1000; + read_settings.http_max_tries = 1; + read_settings.http_retry_initial_backoff_ms = 500; + read_settings.http_retry_max_backoff_ms = 1000; Poco::URI uri(endpoint_uri); uri.addQueryParameter("operation", "start"); @@ -99,16 +99,16 @@ DistributedQueryTaskStatus getTaskStatus(const String & endpoint_uri, const Stri /// server-side wait plus a small network margin, and a failed poll must not retry. timeouts.send_timeout = Poco::Timespan(2 * 1000 * 1000); timeouts.receive_timeout = Poco::Timespan((wait_for_ms + 2000) * 1000); - read_settings.http_settings.max_tries = 1; + read_settings.http_max_tries = 1; } else { timeouts.send_timeout = Poco::Timespan(100 * 1000 * 1000); timeouts.receive_timeout = Poco::Timespan(100 * 1000 * 1000); /// Safe to retry: read-only. - read_settings.http_settings.max_tries = 3; - read_settings.http_settings.retry_initial_backoff_ms = 200; - read_settings.http_settings.retry_max_backoff_ms = 1000; + read_settings.http_max_tries = 3; + read_settings.http_retry_initial_backoff_ms = 200; + read_settings.http_retry_max_backoff_ms = 1000; } Poco::URI uri(endpoint_uri); @@ -150,9 +150,9 @@ void cancelTask(const String & endpoint_uri, const String & task_id, const Conte timeouts.receive_timeout = Poco::Timespan(5 * 1000 * 1000); ReadSettings read_settings; /// Safe to retry: idempotent. - read_settings.http_settings.max_tries = 3; - read_settings.http_settings.retry_initial_backoff_ms = 200; - read_settings.http_settings.retry_max_backoff_ms = 1000; + read_settings.http_max_tries = 3; + read_settings.http_retry_initial_backoff_ms = 200; + read_settings.http_retry_max_backoff_ms = 1000; Poco::URI uri(endpoint_uri); uri.addQueryParameter("operation", "cancel"); @@ -187,9 +187,9 @@ void forgetTask(const String & endpoint_uri, const String & task_id, const Conte timeouts.receive_timeout = Poco::Timespan(100 * 1000 * 1000); ReadSettings read_settings; /// Safe to retry: idempotent. - read_settings.http_settings.max_tries = 3; - read_settings.http_settings.retry_initial_backoff_ms = 200; - read_settings.http_settings.retry_max_backoff_ms = 1000; + read_settings.http_max_tries = 3; + read_settings.http_retry_initial_backoff_ms = 200; + read_settings.http_retry_max_backoff_ms = 1000; Poco::URI uri(endpoint_uri); uri.addQueryParameter("operation", "forget");