Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7889,6 +7889,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest'
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_s3_tables, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4)
)", EXPERIMENTAL) \
DECLARE(UInt64, webassembly_udf_max_fuel, 100'000, R"(
Fuel limit per WebAssembly UDF instance execution. Each WebAssembly instruction consumes some amount of fuel.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"optimize_read_in_window_order", true, false, "Disable this logic by default."},
{"correlated_subqueries_use_in_memory_buffer", false, true, "Use in-memory buffer for input of correlated subqueries by default."},
{"allow_experimental_database_paimon_rest_catalog", false, false, "New setting"},
{"allow_experimental_database_s3_tables", false, false, "New setting"},
{"allow_experimental_object_storage_queue_hive_partitioning", false, false, "New setting."},
{"type_json_use_partial_match_to_skip_paths_by_regexp", false, true, "Add new setting that allows to use partial match in regexp paths skip in JSON type parsing"},
{"max_insert_block_size_bytes", 0, 0, "New setting that allows to control the size of blocks in bytes during parsing of data in Row Input Format."},
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ IMPLEMENT_SETTING_ENUM(
{"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE},
{"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE},
{"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE},
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}})
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST},
{"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}})

IMPLEMENT_SETTING_ENUM(
FileCachePolicy,
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t
ICEBERG_ONELAKE,
ICEBERG_BIGLAKE,
PAIMON_REST,
S3_TABLES,
};

DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType)
Expand Down
110 changes: 110 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Databases/DataLake/AWSV4Signer.h>

#include <Common/Exception.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/String.h>

#include <aws/core/auth/signer/AWSAuthV4Signer.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/memory/AWSMemory.h>

#include <sstream>
#include <utility>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
}

namespace DataLake
{
namespace
{

Aws::Http::HttpMethod mapPocoMethodToAws(const String & method)
{
using Aws::Http::HttpMethod;
using Poco::Net::HTTPRequest;

static const std::pair<String, HttpMethod> supported_methods[] = {
{HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET},
{HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST},
{HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT},
{HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE},
{HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD},
{HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH},
};

for (const auto & [poco_method, aws_method] : supported_methods)
if (method == poco_method)
return aws_method;

throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method);
}

}

void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers)
{
const Aws::Http::URI aws_uri(uri.toString().c_str());
Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method));

for (const auto & h : extra_headers)
{
if (Poco::icompare(h.name, "authorization") == 0)
continue;
request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size()));
}

if (!payload.empty())
{
auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
body_stream->seekg(0);
request.AddContentBody(body_stream);
}

static constexpr bool sign_body = true;
if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))
throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed");

bool has_authorization = false;
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "authorization") == 0 && !value.empty())
has_authorization = true;
}
if (!has_authorization)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"AWS credentials are missing or incomplete; cannot sign S3 Tables REST request");

out_headers.clear();
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "host") == 0)
continue;
out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size()));
}
}

}

#endif
34 changes: 34 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Core/Types.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/URI.h>

namespace Aws::Client
{
class AWSAuthV4Signer;
}

namespace DataLake
{

/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer.
/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts
/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI).
void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers);

}

#endif
54 changes: 51 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Databases/DataLake/PaimonRestCatalog.h>
#if USE_AWS_S3 && USE_SSL
#include <Databases/DataLake/S3TablesCatalog.h>
#endif
#include <DataTypes/DataTypeString.h>

#include <Storages/ObjectStorage/S3/Configuration.h>
Expand Down Expand Up @@ -91,6 +94,7 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool allow_experimental_database_paimon_rest_catalog;
extern const SettingsBool allow_experimental_database_s3_tables;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
Expand Down Expand Up @@ -143,8 +147,20 @@ void DatabaseDataLake::validateSettings()
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. "
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES)
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");

if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS warehouse=<table_bucket_arn>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
{
Expand Down Expand Up @@ -299,6 +315,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
break;
}
case DB::DatabaseDataLakeCatalogType::S3_TABLES:
{
#if USE_AWS_S3 && USE_SSL
catalog_impl = std::make_shared<DataLake::S3TablesCatalog>(
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::region].value,
catalog_parameters,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL");
#endif
break;
}
}
return catalog_impl;
}
Expand Down Expand Up @@ -331,6 +364,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
case DatabaseDataLakeCatalogType::ICEBERG_HIVE:
case DatabaseDataLakeCatalogType::ICEBERG_REST:
case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE:
case DatabaseDataLakeCatalogType::S3_TABLES:
{
switch (type)
{
Expand Down Expand Up @@ -951,9 +985,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST
&& catalog_type != DatabaseDataLakeCatalogType::S3_TABLES)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only");
}

for (auto & engine_arg : engine_args)
Expand Down Expand Up @@ -1039,6 +1074,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
engine_func->name = "Paimon";
break;
}
case DatabaseDataLakeCatalogType::S3_TABLES:
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseDataLake with S3 Tables catalog is experimental. "
"To allow its usage, enable setting allow_experimental_database_s3_tables");
}

engine_func->name = "Iceberg";
break;
}
case DatabaseDataLakeCatalogType::NONE:
break;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ bool TableMetadata::hasStorageCredentials() const
return storage_credentials != nullptr;
}

bool TableMetadata::hasDataLakeSpecificProperties() const
{
return data_lake_specific_metadata.has_value();
}

std::string TableMetadata::getMetadataLocation(const std::string & iceberg_metadata_file_location) const
{
std::string metadata_location = iceberg_metadata_file_location;
Expand Down
35 changes: 25 additions & 10 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Pt
result.default_base_location = object->get("default-base-location").extract<String>();
}

DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Option 1: user specified auth header manually.
/// Header has format: 'Authorization: <scheme> <token>'.
Expand Down Expand Up @@ -387,7 +392,12 @@ BigLakeCatalog::BigLakeCatalog(
config = loadConfig();
}

DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Google Cloud OAuth2 for BigLake.
/// Uses GCP metadata service or Application Default Credentials to get access token.
Expand Down Expand Up @@ -542,7 +552,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(

auto create_buffer = [&](bool update_token)
{
auto result_headers = getAuthHeaders(update_token);
auto result_headers = getAuthHeaders(update_token, Poco::Net::HTTPRequest::HTTP_GET, url, headers, {});
std::move(headers.begin(), headers.end(), std::back_inserter(result_headers));

return DB::BuilderRWBufferFromHTTP(url)
Expand Down Expand Up @@ -978,9 +988,6 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
request_body->stringify(oss);
const std::string body_str = DB::removeEscapedSlashes(oss.str());

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true);
headers.emplace_back("Content-Type", "application/json");

const auto & context = getContext();

DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback;
Expand All @@ -994,6 +1001,12 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

/// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar'
Poco::URI url(endpoint, /* enable_url_encoding */ false);

DB::HTTPHeaderEntries extra_headers;
extra_headers.emplace_back("Content-Type", "application/json");

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true, method, url, extra_headers, body_str);
headers.emplace_back("Content-Type", "application/json");
auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(method)
Expand All @@ -1014,7 +1027,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, const String & location) const
{
const std::string endpoint = fmt::format("{}/namespaces", base_url);
const std::string endpoint = base_url / config.prefix / "namespaces";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1046,7 +1059,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

createNamespaceIfNotExists(namespace_name, metadata_content->getValue<String>("location"));

const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
request_body->set("name", table_name);
Expand Down Expand Up @@ -1083,7 +1096,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1153,7 +1166,9 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name);
const std::string endpoint
= (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string()
+ "?purgeRequested=False";

Poco::JSON::Object::Ptr request_body = nullptr;
try
Expand Down
Loading
Loading