Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@
#include <Compression/CompressionCodecEncrypted.h>
#include <Parsers/ASTAlterQuery.h>
#include <Server/CloudPlacementInfo.h>
#include <Server/DistributedQuery/ExchangeConnections.h>
#include <Server/DistributedQuery/ExchangeServer.h>
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTP/HTTPServerConnectionFactory.h>
#include <Server/StatelessWorker/StatelessWorkerEndpoint.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
Expand Down Expand Up @@ -1938,6 +1941,82 @@ try
LOG_DEBUG(log, "Initializing interserver credentials.");
global_context->updateInterserverCredentials(config());

std::shared_ptr<StatelessWorkerEndpoint> stateless_worker_endpoint_ptr{nullptr};
String stateless_worker_endpoint_name;
if (config().getBool("stateless_worker_server.enabled", false))
{
String stateless_worker_endpoint = config().getString("stateless_worker_server.endpoint", "localhost");
stateless_worker_endpoint_ptr = std::make_shared<StatelessWorkerEndpoint>();
stateless_worker_endpoint_name = stateless_worker_endpoint_ptr->getId(stateless_worker_endpoint);
global_context->getInterserverIOHandler().addEndpoint(stateless_worker_endpoint_name, stateless_worker_endpoint_ptr);
LOG_DEBUG(log, "Added stateless worker endpoint '{}'.", stateless_worker_endpoint_name);
}

SCOPE_EXIT({
if (stateless_worker_endpoint_ptr)
{
/// Remove the same endpoint that was registered (the configured name may differ from "localhost").
LOG_DEBUG(log, "Shutting down stateless worker endpoint '{}'.", stateless_worker_endpoint_name);
global_context->getInterserverIOHandler().removeEndpointIfExists(stateless_worker_endpoint_name);

stateless_worker_endpoint_ptr->blocker.cancelForever();
stateless_worker_endpoint_ptr->shutdown();
/// Acquire the lock to wait for all in-flight requests to finish.
std::lock_guard lock(stateless_worker_endpoint_ptr->rwlock);
}
stateless_worker_endpoint_ptr.reset();
});

#ifdef OS_LINUX
ExchangeConnectionsPtr exchange_connections_ptr = ExchangeConnections::instance();
std::vector<std::shared_ptr<ExchangeServer>> exchange_servers;
if (auto streaming_exchange_port = config().getUInt("distributed_query.streaming_exchange_port", 0))
{
if (streaming_exchange_port > 65535)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"`distributed_query.streaming_exchange_port` must be in range 1..65535, got {}", streaming_exchange_port);

/// The exchange handshake is unauthenticated, so the listener is never bound to all interfaces
/// implicitly: the streaming exchange is enabled only when explicit listen host(s) are given.
Strings exchange_listen_hosts = DB::getMultipleValuesFromConfig(config(), "distributed_query", "streaming_exchange_listen_host");
if (exchange_listen_hosts.empty())
{
LOG_ERROR(log, "`distributed_query.streaming_exchange_port` is set but no "
"`distributed_query.streaming_exchange_listen_host` is configured; the streaming exchange "
"server is not started. Specify a listen host to enable it.");
}
else
{
for (const auto & listen_host : exchange_listen_hosts)
{
try
{
exchange_servers.emplace_back(std::make_shared<ExchangeServer>(listen_host, streaming_exchange_port, exchange_connections_ptr));
exchange_servers.back()->start();
}
catch (Poco::Exception & e)
{
LOG_INFO(log, "Failed to start exchange server on {}:{}: {}",
listen_host, streaming_exchange_port, e.displayText());
}
}
if (exchange_servers.empty())
throw Exception(ErrorCodes::NETWORK_ERROR, "Failed to start ExchangeServer on port {}", streaming_exchange_port);
}
}

SCOPE_EXIT({
for (auto & exchange_server_ptr : exchange_servers)
{
exchange_server_ptr->stop();
exchange_server_ptr.reset();
}
});
#else
if (config().getUInt("distributed_query.streaming_exchange_port", 0))
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ExchangeServer is not supported on non-linux platform");
#endif

/// Set up caches.

const size_t max_cache_size = static_cast<size_t>(static_cast<double>(physical_server_memory) * server_settings[ServerSetting::cache_size_to_ram_max_ratio]);
Expand Down
19 changes: 19 additions & 0 deletions programs/server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,25 @@
<password></password>
</interserver_http_credentials>-->

<!-- Multi-stage distributed query execution (experimental, enabled per query with the
`make_distributed_plan` setting). When a query uses streaming exchanges, workers connect
directly to each other on this port to pass intermediate results.

Security: the streaming exchange handshake is NOT authenticated. The port must be treated
like the interserver port: reachable only from trusted ClickHouse nodes in the same cluster,
never from untrusted networks. For that reason the listener is started only when an explicit
`streaming_exchange_listen_host` is configured; setting only the port logs an error and does
not start the listener. Bind it to a private interface and protect it with a firewall.
Exchanges are keyed by a random per-query identifier, which makes blind injection into a
running query impractical, but this is defense in depth, not an access-control mechanism.
-->
<!--
<distributed_query>
<streaming_exchange_port>9223</streaming_exchange_port>
<streaming_exchange_listen_host>127.0.0.1</streaming_exchange_listen_host>
</distributed_query>
-->

<!-- Listen specified address.
Use :: (wildcard IPv6 address), if you want to accept connections both with IPv4 and IPv6 from everywhere.
Notes:
Expand Down
16 changes: 16 additions & 0 deletions src/Analyzer/TableExpressionModifiers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <Common/SipHash.h>

#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
Expand Down Expand Up @@ -40,6 +42,20 @@ void TableExpressionModifiers::updateTreeHash(SipHash & hash_state) const
}
}

void serializeRational(TableExpressionModifiers::Rational val, WriteBuffer & out)
{
writeIntBinary(val.numerator, out);
writeIntBinary(val.denominator, out);
}

TableExpressionModifiers::Rational deserializeRational(ReadBuffer & in)
{
TableExpressionModifiers::Rational val;
readIntBinary(val.numerator, in);
readIntBinary(val.denominator, in);
return val;
}

String TableExpressionModifiers::formatForErrorMessage() const
{
WriteBufferFromOwnString buffer;
Expand Down
6 changes: 6 additions & 0 deletions src/Analyzer/TableExpressionModifiers.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
namespace DB
{

class ReadBuffer;
class WriteBuffer;

/** Modifiers that can be used for table, table function and subquery in JOIN TREE.
*
* Example: SELECT * FROM test_table SAMPLE 0.1 OFFSET 0.1 FINAL
Expand Down Expand Up @@ -74,6 +77,9 @@ class TableExpressionModifiers
std::optional<Rational> sample_offset_ratio;
};

void serializeRational(TableExpressionModifiers::Rational val, WriteBuffer & out);
TableExpressionModifiers::Rational deserializeRational(ReadBuffer & in);

inline bool operator==(const TableExpressionModifiers & lhs, const TableExpressionModifiers & rhs)
{
return lhs.hasFinal() == rhs.hasFinal() && lhs.getSampleSizeRatio() == rhs.getSampleSizeRatio() && lhs.getSampleOffsetRatio() == rhs.getSampleOffsetRatio();
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ if (TARGET ch_contrib::ssh)
add_object_library(clickhouse_server_ssh Server/SSH)
endif()
add_object_library(clickhouse_server_embedded_client Server/ClientEmbedded)
add_object_library(clickhouse_server_statelessworker Server/StatelessWorker)
add_object_library(clickhouse_server_distributedquery Server/DistributedQuery)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
add_object_library(clickhouse_processors_executors Processors/Executors)
Expand Down
2 changes: 1 addition & 1 deletion src/Client/BuzzHouse/Generator/SessionSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ std::unordered_map<String, CHSetting> serverSettings = {
{"distributed_insert_skip_read_only_replicas", trueOrFalseSettingNoOracle},
{"distributed_plan_default_reader_bucket_count",
CHSetting(
[](RandomGenerator & rg, FuzzConfig &) { return std::to_string(rg.thresholdGenerator<uint64_t>(0.2, 0.2, 0, 128)); }, {}, false)},
[](RandomGenerator & rg, FuzzConfig &) { return std::to_string(rg.thresholdGenerator<uint64_t>(0.2, 0.2, 1, 128)); }, {}, false)},
{"distributed_plan_default_shuffle_join_bucket_count",
CHSetting(
[](RandomGenerator & rg, FuzzConfig &) { return std::to_string(rg.thresholdGenerator<uint64_t>(0.2, 0.2, 0, 128)); }, {}, false)},
Expand Down
3 changes: 3 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \
M(StatelessWorkerThreadsScheduled, "Number of queued or active jobs in the stateless worker thread pool.") \
M(ExchangeServerThreads, "Number of threads in the distributed exchange server handshake thread pool.") \
M(ExchangeServerThreadsActive, "Number of threads in the distributed exchange server handshake thread pool running a task.") \
M(ExchangeServerThreadsScheduled, "Number of queued or active jobs in the distributed exchange server handshake thread pool.") \
M(ReadonlyDisks, "Number of disks that were marked as readonly during disk check.") \
M(BrokenDisks, "Number of disks disks that were marked as broken during disk check.") \
M(TaskTrackerThreads, "Number of threads used by the distributed query remote task tracker.") \
Expand Down
1 change: 1 addition & 0 deletions src/Common/setThreadName.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace DB
M(DISTRIBUTED_SCHEDULE_POOL, "BgDistSchPool") \
M(DISTRIBUTED_SINK, "DistrOutStrProc") \
M(DISTRIBUTED_INDEX_ANALYSIS, "DistIdxAnalysis") \
M(DISTRIBUTED_QUERY_TASK, "DistQueryTask") \
M(DROP_TABLES, "DropTables") \
M(DWARF_DECODER, "DWARFDecoder") \
M(ERROR_LOG, "ErrorLog") \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ static constexpr auto DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS = 54475;

static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;

static constexpr auto DBMS_QUERY_PLAN_SERIALIZATION_VERSION = 0;
static constexpr auto DBMS_QUERY_PLAN_SERIALIZATION_VERSION = 1;
/// Version 1 added the initiator's settings changes to the task.
static constexpr auto DBMS_DISTRIBUTED_TASK_SERIALIZATION_VERSION = 1;

static constexpr auto DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET = 54441;

Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7803,7 +7803,7 @@ Run all tasks of a distributed query plan locally. Useful for testing and debugg
DECLARE(NonZeroUInt64, distributed_plan_default_shuffle_join_bucket_count, 8, R"(
Default number of buckets for distributed shuffle-hash-join.
)", EXPERIMENTAL) \
DECLARE(UInt64, distributed_plan_default_reader_bucket_count, 8, R"(
DECLARE(NonZeroUInt64, distributed_plan_default_reader_bucket_count, 8, R"(
Default number of tasks for parallel reading in distributed query. Tasks are spread across between replicas.
)", EXPERIMENTAL) \
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Expand Down
26 changes: 26 additions & 0 deletions src/Processors/QueryPlan/BroadcastExchangeStep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <Processors/QueryPlan/BroadcastExchangeStep.h>
#include <Processors/QueryPlan/BroadcastSendStep.h>
#include <Processors/QueryPlan/BroadcastReceiveStep.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

std::pair<QueryPlanStepPtr, QueryPlanStepPtr> BroadcastExchangeStep::createSinkAndSourcePair(const String & exchange_id, const Strings & source_shards) const
{
if (source_shards.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "BroadcastExchangeStep should have one source shard, got {}", source_shards.size());

size_t num_buckets = getResultBucketCount();
auto sink = std::make_unique<BroadcastSendStep>(input_headers.front(), exchange_id, num_buckets);

auto source = std::make_unique<BroadcastReceiveStep>(output_header, exchange_id, source_shards);

return {std::move(sink), std::move(source)};
}

}
46 changes: 46 additions & 0 deletions src/Processors/QueryPlan/BroadcastExchangeStep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

#include <Processors/QueryPlan/LogicalExchangeStep.h>

namespace DB
{

/// Copies the data from 1 logical streams into N logical streams.
class BroadcastExchangeStep final : public LogicalExchangeStep
{
public:
BroadcastExchangeStep(SharedHeader input_header_, size_t result_bucket_count_)
: LogicalExchangeStep(input_header_)
, result_bucket_count(result_bucket_count_)
{
}

String getName() const override { return "BroadcastExchange"; }

void transformPipeline(QueryPipelineBuilder & /*pipeline*/, const BuildQueryPipelineSettings &) override
{
/// Doesn't change the pipeline if executed directly
}

size_t getSourceBucketCount() const override
{
return 1;
}

size_t getResultBucketCount() const override
{
return result_bucket_count;
}

std::pair<QueryPlanStepPtr, QueryPlanStepPtr> createSinkAndSourcePair(const String & exchange_id, const Strings & source_shards) const override;

private:
void updateOutputHeader() override
{
output_header = input_headers.front();
}

const size_t result_bucket_count;
};

}
65 changes: 65 additions & 0 deletions src/Processors/QueryPlan/BroadcastReceiveStep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <Processors/QueryPlan/BroadcastReceiveStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/QueryPlan/IParameterLookup.h>
#include <Processors/QueryPlan/ExchangeLookup.h>
#include <Processors/QueryPlan/LogicalExchangeStep.h>
#include <Processors/ISource.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>


namespace DB
{

void BroadcastReceiveStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
const String bucket_id = settings.parameter_lookup->getParameter("bucket_id").safeGet<String>();

std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;

/// Read all shards
for (const String & shard_id : source_shards)
{
std::unique_ptr<QueryPipelineBuilder> pipeline_ptr = std::make_unique<QueryPipelineBuilder>();
pipeline_ptr->init(Pipe(settings.exchange_lookup->createSource(output_header, ExchangeStreamId(exchange_id, shard_id, bucket_id))));
pipelines.emplace_back(std::move(pipeline_ptr));
}

pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), 0, &processors);
}

void BroadcastReceiveStep::serialize(Serialization & ctx) const
{
writeStringBinary(exchange_id, ctx.out);
writeVarUInt(source_shards.size(), ctx.out);
for (const String & shard_id : source_shards)
writeStringBinary(shard_id, ctx.out);
}

std::unique_ptr<IQueryPlanStep> BroadcastReceiveStep::deserialize(Deserialization & ctx)
{
String exchange_id;
readStringBinary(exchange_id, ctx.in);
size_t shard_id_count = 0;
readVarUInt(shard_id_count, ctx.in);
Strings list_of_shard_ids;
list_of_shard_ids.reserve(shard_id_count);
for (size_t i = 0; i < shard_id_count; ++i)
{
String shard_id;
readStringBinary(shard_id, ctx.in);
list_of_shard_ids.push_back(std::move(shard_id));
}
return std::make_unique<BroadcastReceiveStep>(ctx.output_header, exchange_id, list_of_shard_ids);
}

void registerBroadcastReceiveStep(QueryPlanStepRegistry & registry);
void registerBroadcastReceiveStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("BroadcastReceive", BroadcastReceiveStep::deserialize);
}

}
Loading
Loading