diff --git a/src/iceberg/catalog/rest/catalog_properties.cc b/src/iceberg/catalog/rest/catalog_properties.cc index 9f12c492c..0e417e6c3 100644 --- a/src/iceberg/catalog/rest/catalog_properties.cc +++ b/src/iceberg/catalog/rest/catalog_properties.cc @@ -19,6 +19,8 @@ #include "iceberg/catalog/rest/catalog_properties.h" +#include +#include #include namespace iceberg::rest { @@ -47,4 +49,16 @@ Result RestCatalogProperties::Uri() const { return it->second; } +Result RestCatalogProperties::SnapshotLoadingMode() const { + std::string upper = StringUtils::ToUpper(Get(kSnapshotLoadingMode)); + if (upper == "ALL") { + return SnapshotMode::kAll; + } else if (upper == "REFS") { + return SnapshotMode::kRefs; + } else { + return InvalidArgument("Invalid snapshot loading mode: '{}'.", + Get(kSnapshotLoadingMode)); + } +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h index be054dfa1..a00aa87d6 100644 --- a/src/iceberg/catalog/rest/catalog_properties.h +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -31,6 +31,9 @@ namespace iceberg::rest { +/// \brief Snapshot loading mode for REST catalog. +enum class SnapshotMode : uint8_t { kAll, kRefs }; + /// \brief Configuration class for a REST Catalog. class ICEBERG_REST_EXPORT RestCatalogProperties : public ConfigBase { @@ -46,6 +49,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties inline static Entry kWarehouse{"warehouse", ""}; /// \brief The optional prefix for REST API paths. inline static Entry kPrefix{"prefix", ""}; + /// \brief The snapshot loading mode (ALL or REFS). + inline static Entry kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"}; /// \brief The prefix for HTTP headers. inline static constexpr std::string_view kHeaderPrefix = "header."; @@ -62,6 +67,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties /// \brief Get the URI of the REST catalog server. /// \return The URI if configured, or an error if not set or empty. Result Uri() const; + + /// \brief Get the snapshot loading mode. + /// \return SnapshotMode::kAll if configured as "ALL", SnapshotMode::kRefs if + /// "REFS", or an error if the value is invalid. Parsing is + /// case-insensitive to match Java behavior. + Result SnapshotLoadingMode() const; }; } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index caef50411..40e112db7 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -161,13 +161,17 @@ Result> RestCatalog::Make( paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)), final_config.Get(RestCatalogProperties::kPrefix))); + // Get snapshot loading mode + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode()); + auto client = std::make_unique(final_config.ExtractHeaders()); ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); - return std::shared_ptr(new RestCatalog( - std::move(final_config), std::move(file_io), std::move(client), std::move(paths), - std::move(endpoints), std::move(auth_manager), std::move(catalog_session))); + return std::shared_ptr( + new RestCatalog(std::move(final_config), std::move(file_io), std::move(client), + std::move(paths), std::move(endpoints), std::move(auth_manager), + std::move(catalog_session), snapshot_mode)); } RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, @@ -175,7 +179,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, - std::shared_ptr catalog_session) + std::shared_ptr catalog_session, + SnapshotMode snapshot_mode) : config_(std::move(config)), file_io_(std::move(file_io)), client_(std::move(client)), @@ -183,7 +188,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f name_(config_.Get(RestCatalogProperties::kName)), supported_endpoints_(std::move(endpoints)), auth_manager_(std::move(auth_manager)), - catalog_session_(std::move(catalog_session)) { + catalog_session_(std::move(catalog_session)), + snapshot_mode_(snapshot_mode) { ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null"); } @@ -442,9 +448,17 @@ Result RestCatalog::LoadTableInternal( const TableIdentifier& identifier) const { ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable()); ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier)); + + std::unordered_map params; + if (snapshot_mode_ == SnapshotMode::kRefs) { + params["snapshots"] = "refs"; + } else { + params["snapshots"] = "all"; + } + ICEBERG_ASSIGN_OR_RAISE( const auto response, - client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance(), + client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance(), *catalog_session_)); return response.body(); } @@ -453,7 +467,6 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); - return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, shared_from_this()); diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 5cc61eae2..38230a5e2 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -109,7 +109,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::unique_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, - std::shared_ptr catalog_session); + std::shared_ptr catalog_session, + SnapshotMode snapshot_mode); Result LoadTableInternal(const TableIdentifier& identifier) const; @@ -127,6 +128,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::unordered_set supported_endpoints_; std::unique_ptr auth_manager_; std::shared_ptr catalog_session_; + SnapshotMode snapshot_mode_; }; } // namespace iceberg::rest diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index c4ec29c48..768e0507e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -225,7 +225,7 @@ if(ICEBERG_BUILD_REST) if(ICEBERG_BUILD_REST_INTEGRATION_TESTS) add_rest_iceberg_test(rest_catalog_integration_test SOURCES - rest_catalog_test.cc + rest_catalog_integration_test.cc util/cmd_util.cc util/docker_compose_util.cc) endif() diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 71ab6942e..df2d5db8e 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -120,7 +120,7 @@ if get_option('rest').enabled() iceberg_tests += { 'rest_integration_test': { 'sources': files( - 'rest_catalog_test.cc', + 'rest_catalog_integration_test.cc', 'util/cmd_util.cc', 'util/docker_compose_util.cc', ), diff --git a/src/iceberg/test/rest_catalog_integration_test.cc b/src/iceberg/test/rest_catalog_integration_test.cc new file mode 100644 index 000000000..b364ffd36 --- /dev/null +++ b/src/iceberg/test/rest_catalog_integration_test.cc @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/catalog/rest/json_serde_internal.h" +#include "iceberg/catalog/rest/rest_catalog.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/sort_order.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/std_io.h" +#include "iceberg/test/test_resource.h" +#include "iceberg/test/util/docker_compose_util.h" +#include "iceberg/transaction.h" + +namespace iceberg::rest { + +namespace { + +constexpr uint16_t kRestCatalogPort = 8181; +constexpr int kMaxRetries = 60; +constexpr int kRetryDelayMs = 1000; + +constexpr std::string_view kDockerProjectName = "iceberg-rest-catalog-service"; +constexpr std::string_view kCatalogName = "test_catalog"; +constexpr std::string_view kWarehouseName = "default"; +constexpr std::string_view kLocalhostUri = "http://localhost"; + +/// \brief Check if a localhost port is ready to accept connections. +bool CheckServiceReady(uint16_t port) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) return false; + + struct timeval timeout{ + .tv_sec = 1, + .tv_usec = 0, + }; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + + sockaddr_in addr{ + .sin_family = AF_INET, + .sin_port = htons(port), + .sin_addr = {.s_addr = htonl(INADDR_LOOPBACK)}, + }; + bool result = + (connect(sock, reinterpret_cast(&addr), sizeof(addr)) == 0); + close(sock); + return result; +} + +std::string CatalogUri() { return std::format("{}:{}", kLocalhostUri, kRestCatalogPort); } + +} // namespace + +/// \brief Integration test fixture for REST catalog with Docker Compose. +class RestCatalogIntegrationTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + docker_compose_ = std::make_unique( + std::string{kDockerProjectName}, GetResourcePath("iceberg-rest-fixture")); + docker_compose_->Up(); + + std::println("[INFO] Waiting for REST catalog at localhost:{}...", kRestCatalogPort); + for (int i = 0; i < kMaxRetries; ++i) { + if (CheckServiceReady(kRestCatalogPort)) { + std::println("[INFO] REST catalog is ready!"); + return; + } + std::println("[INFO] Retrying... (attempt {}/{})", i + 1, kMaxRetries); + std::this_thread::sleep_for(std::chrono::milliseconds(kRetryDelayMs)); + } + throw std::runtime_error("REST catalog failed to start within timeout"); + } + + static void TearDownTestSuite() { docker_compose_.reset(); } + + /// Create a catalog with default configuration. + Result> CreateCatalog() { + return CreateCatalogWithProperties({}); + } + + /// Create a catalog with additional properties merged on top of defaults. + Result> CreateCatalogWithProperties( + const std::unordered_map& extra) { + auto config = RestCatalogProperties::default_properties(); + config.Set(RestCatalogProperties::kUri, CatalogUri()) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName)); + for (const auto& [k, v] : extra) { + config.mutable_configs()[k] = v; + } + return RestCatalog::Make(config, std::make_shared()); + } + + /// Create a catalog configured with a specific snapshot loading mode. + Result> CreateCatalogWithSnapshotMode( + const std::string& mode) { + return CreateCatalogWithProperties( + {{RestCatalogProperties::kSnapshotLoadingMode.key(), mode}}); + } + + /// Convenience: create a namespace and return the catalog. + Result> CreateCatalogAndNamespace(const Namespace& ns) { + ICEBERG_ASSIGN_OR_RAISE(auto catalog, CreateCatalog()); + auto status = catalog->CreateNamespace(ns, {}); + if (!status.has_value()) { + return std::unexpected(status.error()); + } + return catalog; + } + + /// Default two-column schema used across tests. + static std::shared_ptr DefaultSchema() { + return std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "data", string())}, + /*schema_id=*/1); + } + + /// Create a table with default schema and no partitioning. + Result> CreateDefaultTable( + const std::shared_ptr& catalog, const TableIdentifier& table_id, + const std::unordered_map& props = {}) { + return catalog->CreateTable(table_id, DefaultSchema(), PartitionSpec::Unpartitioned(), + SortOrder::Unsorted(), "", props); + } + + static inline std::unique_ptr docker_compose_; +}; + +TEST_F(RestCatalogIntegrationTest, MakeCatalogSuccess) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + EXPECT_EQ(catalog->name(), kCatalogName); +} + +TEST_F(RestCatalogIntegrationTest, FetchServerConfigDirect) { + HttpClient client({}); + auto noop_session = auth::AuthSession::MakeDefault({}); + std::string config_url = std::format("{}/v1/config", CatalogUri()); + + ICEBERG_UNWRAP_OR_FAIL(const auto response, + client.Get(config_url, {}, /*headers=*/{}, + *DefaultErrorHandler::Instance(), *noop_session)); + ICEBERG_UNWRAP_OR_FAIL(auto json, FromJsonString(response.body())); + + EXPECT_TRUE(json.contains("defaults")); + EXPECT_TRUE(json.contains("overrides")); + + if (json.contains("endpoints")) { + EXPECT_TRUE(json["endpoints"].is_array()); + ICEBERG_UNWRAP_OR_FAIL(auto config, CatalogConfigFromJson(json)); + std::println("[INFO] Server provided {} endpoints", config.endpoints.size()); + EXPECT_GT(config.endpoints.size(), 0); + } +} + +// -- Namespace operations -- + +TEST_F(RestCatalogIntegrationTest, ListNamespaces) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + ICEBERG_UNWRAP_OR_FAIL(auto result, catalog->ListNamespaces(Namespace{.levels = {}})); + EXPECT_TRUE(result.empty()); +} + +TEST_F(RestCatalogIntegrationTest, CreateNamespace) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"test_ns"}}; + ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto namespaces, + catalog->ListNamespaces(Namespace{.levels = {}})); + EXPECT_EQ(namespaces.size(), 1); + EXPECT_EQ(namespaces[0].levels, std::vector{"test_ns"}); +} + +TEST_F(RestCatalogIntegrationTest, CreateNamespaceWithProperties) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"test_ns_props"}}; + std::unordered_map properties{ + {"owner", "test_user"}, {"description", "Test namespace with properties"}}; + ASSERT_THAT(catalog->CreateNamespace(ns, properties), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns)); + EXPECT_EQ(props.at("owner"), "test_user"); + EXPECT_EQ(props.at("description"), "Test namespace with properties"); +} + +TEST_F(RestCatalogIntegrationTest, CreateNestedNamespace) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace parent{.levels = {"parent"}}; + Namespace child{.levels = {"parent", "child"}}; + ASSERT_THAT(catalog->CreateNamespace(parent, {}), IsOk()); + ASSERT_THAT(catalog->CreateNamespace(child, {}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto children, catalog->ListNamespaces(parent)); + EXPECT_EQ(children.size(), 1); + EXPECT_EQ(children[0].levels, (std::vector{"parent", "child"})); +} + +TEST_F(RestCatalogIntegrationTest, GetNamespaceProperties) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"test_get_props"}}; + ASSERT_THAT(catalog->CreateNamespace(ns, {{"key1", "value1"}, {"key2", "value2"}}), + IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns)); + EXPECT_EQ(props.at("key1"), "value1"); + EXPECT_EQ(props.at("key2"), "value2"); +} + +TEST_F(RestCatalogIntegrationTest, NamespaceExists) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"non_existent"}}; + ICEBERG_UNWRAP_OR_FAIL(auto before, catalog->NamespaceExists(ns)); + EXPECT_FALSE(before); + + ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto after, catalog->NamespaceExists(ns)); + EXPECT_TRUE(after); +} + +TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"test_update"}}; + ASSERT_THAT(catalog->CreateNamespace(ns, {{"key1", "value1"}, {"key2", "value2"}}), + IsOk()); + + ASSERT_THAT(catalog->UpdateNamespaceProperties( + ns, {{"key1", "updated_value1"}, {"key3", "value3"}}, {"key2"}), + IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto props, catalog->GetNamespaceProperties(ns)); + EXPECT_EQ(props.at("key1"), "updated_value1"); + EXPECT_EQ(props.at("key3"), "value3"); + EXPECT_EQ(props.count("key2"), 0); +} + +TEST_F(RestCatalogIntegrationTest, DropNamespace) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + Namespace ns{.levels = {"test_drop"}}; + ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk()); + ASSERT_THAT(catalog->DropNamespace(ns), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto exists, catalog->NamespaceExists(ns)); + EXPECT_FALSE(exists); +} + +TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + EXPECT_THAT(catalog->DropNamespace(Namespace{.levels = {"nonexistent"}}), + IsError(ErrorKind::kNoSuchNamespace)); +} + +// -- Table operations -- + +TEST_F(RestCatalogIntegrationTest, CreateTable) { + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalog()); + + // Build nested namespace hierarchy + Namespace ns{.levels = {"test_create_table", "apple", "ios"}}; + ASSERT_THAT(catalog->CreateNamespace(Namespace{.levels = {"test_create_table"}}, {}), + IsOk()); + ASSERT_THAT( + catalog->CreateNamespace(Namespace{.levels = {"test_create_table", "apple"}}, {}), + IsOk()); + ASSERT_THAT(catalog->CreateNamespace(ns, {{"owner", "ray"}, {"community", "apache"}}), + IsOk()); + + TableIdentifier table_id{.ns = ns, .name = "t1"}; + ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id)); + + EXPECT_EQ(table->name().ns.levels, + (std::vector{"test_create_table", "apple", "ios"})); + EXPECT_EQ(table->name().name, "t1"); + + // Duplicate creation should fail + EXPECT_THAT(CreateDefaultTable(catalog, table_id), IsError(ErrorKind::kAlreadyExists)); +} + +TEST_F(RestCatalogIntegrationTest, ListTables) { + Namespace ns{.levels = {"test_list_tables"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + ICEBERG_UNWRAP_OR_FAIL(auto empty_list, catalog->ListTables(ns)); + EXPECT_TRUE(empty_list.empty()); + + TableIdentifier t1{.ns = ns, .name = "table1"}; + TableIdentifier t2{.ns = ns, .name = "table2"}; + ASSERT_THAT(CreateDefaultTable(catalog, t1), IsOk()); + ASSERT_THAT(CreateDefaultTable(catalog, t2), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto tables, catalog->ListTables(ns)); + EXPECT_THAT(tables, testing::UnorderedElementsAre( + testing::Field(&TableIdentifier::name, "table1"), + testing::Field(&TableIdentifier::name, "table2"))); +} + +TEST_F(RestCatalogIntegrationTest, LoadTable) { + Namespace ns{.levels = {"test_load_table"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "test_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id, {{"key1", "value1"}}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id)); + EXPECT_EQ(loaded->name().ns.levels, std::vector{"test_load_table"}); + EXPECT_EQ(loaded->name().name, "test_table"); + EXPECT_NE(loaded->metadata(), nullptr); + + ICEBERG_UNWRAP_OR_FAIL(auto schema, loaded->schema()); + ASSERT_EQ(schema->fields().size(), 2); + EXPECT_EQ(schema->fields()[0].name(), "id"); + EXPECT_EQ(schema->fields()[1].name(), "data"); +} + +TEST_F(RestCatalogIntegrationTest, DropTable) { + Namespace ns{.levels = {"test_drop_table"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "table_to_drop"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + + ASSERT_THAT(catalog->DropTable(table_id, /*purge=*/false), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto exists, catalog->TableExists(table_id)); + EXPECT_FALSE(exists); +} + +TEST_F(RestCatalogIntegrationTest, RenameTable) { + Namespace ns{.levels = {"test_rename_table"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier old_id{.ns = ns, .name = "old_table"}; + TableIdentifier new_id{.ns = ns, .name = "new_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, old_id), IsOk()); + ASSERT_THAT(catalog->RenameTable(old_id, new_id), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto old_exists, catalog->TableExists(old_id)); + EXPECT_FALSE(old_exists); + ICEBERG_UNWRAP_OR_FAIL(auto new_exists, catalog->TableExists(new_id)); + EXPECT_TRUE(new_exists); +} + +TEST_F(RestCatalogIntegrationTest, UpdateTable) { + Namespace ns{.levels = {"test_update_table"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "t1"}; + ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id)); + + std::vector> requirements; + requirements.push_back(std::make_unique(table->uuid())); + + std::vector> updates; + updates.push_back(std::make_unique( + std::unordered_map{{"key1", "value1"}})); + + ICEBERG_UNWRAP_OR_FAIL(auto updated, + catalog->UpdateTable(table_id, requirements, updates)); + EXPECT_EQ(updated->metadata()->properties.configs().at("key1"), "value1"); +} + +TEST_F(RestCatalogIntegrationTest, RegisterTable) { + Namespace ns{.levels = {"test_register_table"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "t1"}; + ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id)); + std::string metadata_location(table->metadata_file_location()); + + ASSERT_THAT(catalog->DropTable(table_id, /*purge=*/false), IsOk()); + + TableIdentifier new_id{.ns = ns, .name = "t2"}; + ICEBERG_UNWRAP_OR_FAIL(auto registered, + catalog->RegisterTable(new_id, metadata_location)); + EXPECT_EQ(table->metadata_file_location(), registered->metadata_file_location()); + EXPECT_NE(table->name(), registered->name()); +} + +TEST_F(RestCatalogIntegrationTest, StageCreateTable) { + Namespace ns{.levels = {"test_stage_create"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "staged_table"}; + ICEBERG_UNWRAP_OR_FAIL( + auto txn, + catalog->StageCreateTable(table_id, DefaultSchema(), PartitionSpec::Unpartitioned(), + SortOrder::Unsorted(), "", {{"key1", "value1"}})); + + EXPECT_EQ(txn->table()->name(), table_id); + + // Not yet visible in catalog + ICEBERG_UNWRAP_OR_FAIL(auto before, catalog->TableExists(table_id)); + EXPECT_FALSE(before); + + ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit()); + + ICEBERG_UNWRAP_OR_FAIL(auto after, catalog->TableExists(table_id)); + EXPECT_TRUE(after); + EXPECT_EQ(committed->metadata()->properties.configs().at("key1"), "value1"); +} + +// -- Snapshot loading mode -- + +TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeAll) { + Namespace ns{.levels = {"test_snapshot_all"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogWithSnapshotMode("ALL")); + ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk()); + + TableIdentifier table_id{.ns = ns, .name = "all_mode_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id)); + EXPECT_NE(loaded->metadata(), nullptr); + EXPECT_FALSE(loaded->metadata()->schemas.empty()); +} + +TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeRefs) { + Namespace ns{.levels = {"test_snapshot_refs"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogWithSnapshotMode("REFS")); + ASSERT_THAT(catalog->CreateNamespace(ns, {}), IsOk()); + + TableIdentifier table_id{.ns = ns, .name = "refs_mode_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id)); + EXPECT_NE(loaded->metadata(), nullptr); + EXPECT_FALSE(loaded->metadata()->schemas.empty()); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc deleted file mode 100644 index d4a9477bb..000000000 --- a/src/iceberg/test/rest_catalog_test.cc +++ /dev/null @@ -1,693 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/catalog/rest/rest_catalog.h" - -#include - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "iceberg/catalog/rest/auth/auth_session.h" -#include "iceberg/catalog/rest/catalog_properties.h" -#include "iceberg/catalog/rest/error_handlers.h" -#include "iceberg/catalog/rest/http_client.h" -#include "iceberg/catalog/rest/json_serde_internal.h" -#include "iceberg/partition_spec.h" -#include "iceberg/result.h" -#include "iceberg/schema.h" -#include "iceberg/sort_order.h" -#include "iceberg/table.h" -#include "iceberg/table_identifier.h" -#include "iceberg/table_requirement.h" -#include "iceberg/table_update.h" -#include "iceberg/test/matchers.h" -#include "iceberg/test/std_io.h" -#include "iceberg/test/test_resource.h" -#include "iceberg/test/util/docker_compose_util.h" -#include "iceberg/transaction.h" - -namespace iceberg::rest { - -namespace { - -constexpr uint16_t kRestCatalogPort = 8181; -constexpr int kMaxRetries = 60; // Wait up to 60 seconds -constexpr int kRetryDelayMs = 1000; - -constexpr std::string_view kDockerProjectName = "iceberg-rest-catalog-service"; -constexpr std::string_view kCatalogName = "test_catalog"; -constexpr std::string_view kWarehouseName = "default"; -constexpr std::string_view kLocalhostUri = "http://localhost"; - -/// \brief Check if a localhost port is ready to accept connections -/// \param port Port number to check -/// \return true if the port is accessible on localhost, false otherwise -bool CheckServiceReady(uint16_t port) { - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - return false; - } - - struct timeval timeout{ - .tv_sec = 1, - .tv_usec = 0, - }; - setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); - - sockaddr_in addr{ - .sin_family = AF_INET, - .sin_port = htons(port), - .sin_addr = {.s_addr = htonl(INADDR_LOOPBACK)} // 127.0.0.1 - }; - bool result = - (connect(sock, reinterpret_cast(&addr), sizeof(addr)) == 0); - close(sock); - return result; -} - -} // namespace - -/// \brief Integration test fixture for REST catalog with automatic Docker Compose setup。 -class RestCatalogIntegrationTest : public ::testing::Test { - protected: - static void SetUpTestSuite() { - std::string project_name{kDockerProjectName}; - std::filesystem::path resources_dir = GetResourcePath("iceberg-rest-fixture"); - - // Create and start DockerCompose - docker_compose_ = std::make_unique(project_name, resources_dir); - docker_compose_->Up(); - - // Wait for REST catalog to be ready on localhost - std::println("[INFO] Waiting for REST catalog to be ready at localhost:{}...", - kRestCatalogPort); - for (int i = 0; i < kMaxRetries; ++i) { - if (CheckServiceReady(kRestCatalogPort)) { - std::println("[INFO] REST catalog is ready!"); - return; - } - std::println( - "[INFO] Waiting for 1s for REST catalog to be ready... (attempt {}/{})", i + 1, - kMaxRetries); - std::this_thread::sleep_for(std::chrono::milliseconds(kRetryDelayMs)); - } - throw std::runtime_error("REST catalog failed to start within {} seconds"); - } - - static void TearDownTestSuite() { docker_compose_.reset(); } - - void SetUp() override {} - - void TearDown() override {} - - // Helper function to create a REST catalog instance - Result> CreateCatalog() { - auto config = RestCatalogProperties::default_properties(); - config - .Set(RestCatalogProperties::kUri, - std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) - .Set(RestCatalogProperties::kName, std::string(kCatalogName)) - .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName)); - auto file_io = std::make_shared(); - return RestCatalog::Make(config, std::make_shared()); - } - - // Helper function to create a default schema for testing - std::shared_ptr CreateDefaultSchema() { - return std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeOptional(2, "data", string())}, - /*schema_id=*/1); - } - - static inline std::unique_ptr docker_compose_; -}; - -TEST_F(RestCatalogIntegrationTest, MakeCatalogSuccess) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - - auto& catalog = catalog_result.value(); - EXPECT_EQ(catalog->name(), kCatalogName); -} - -/// Verifies that the server's /v1/config endpoint returns a valid response -/// and that the endpoints field (if present) can be parsed correctly. -TEST_F(RestCatalogIntegrationTest, FetchServerConfigDirect) { - // Create HTTP client and fetch config directly - HttpClient client({}); - auto noop_session = auth::AuthSession::MakeDefault({}); - std::string config_url = - std::format("{}:{}/v1/config", kLocalhostUri, kRestCatalogPort); - - auto response_result = client.Get(config_url, {}, /*headers=*/{}, - *DefaultErrorHandler::Instance(), *noop_session); - ASSERT_THAT(response_result, IsOk()); - auto json_result = FromJsonString(response_result->body()); - ASSERT_THAT(json_result, IsOk()); - auto& json = json_result.value(); - - EXPECT_TRUE(json.contains("defaults")); - EXPECT_TRUE(json.contains("overrides")); - - if (json.contains("endpoints")) { - EXPECT_TRUE(json["endpoints"].is_array()); - - // Parse the config to ensure all endpoints are valid - auto config_result = CatalogConfigFromJson(json); - ASSERT_THAT(config_result, IsOk()); - auto& config = config_result.value(); - std::println("[INFO] Server provided {} endpoints", config.endpoints.size()); - EXPECT_GT(config.endpoints.size(), 0) - << "Server should provide at least one endpoint"; - } else { - std::println( - "[INFO] Server did not provide endpoints field, client will use default " - "endpoints"); - } -} - -TEST_F(RestCatalogIntegrationTest, ListNamespaces) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - Namespace root{.levels = {}}; - auto result = catalog->ListNamespaces(root); - EXPECT_THAT(result, IsOk()); - EXPECT_TRUE(result->empty()); -} - -TEST_F(RestCatalogIntegrationTest, CreateNamespace) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create a simple namespace - Namespace ns{.levels = {"test_ns"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Verify it was created by listing - Namespace root{.levels = {}}; - auto list_result = catalog->ListNamespaces(root); - ASSERT_THAT(list_result, IsOk()); - EXPECT_EQ(list_result->size(), 1); - EXPECT_EQ(list_result->at(0).levels, std::vector{"test_ns"}); -} - -TEST_F(RestCatalogIntegrationTest, CreateNamespaceWithProperties) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace with properties - Namespace ns{.levels = {"test_ns_props"}}; - std::unordered_map properties{ - {"owner", "test_user"}, {"description", "Test namespace with properties"}}; - auto status = catalog->CreateNamespace(ns, properties); - EXPECT_THAT(status, IsOk()); - - // Verify properties were set - auto props_result = catalog->GetNamespaceProperties(ns); - ASSERT_THAT(props_result, IsOk()); - EXPECT_EQ(props_result->at("owner"), "test_user"); - EXPECT_EQ(props_result->at("description"), "Test namespace with properties"); -} - -TEST_F(RestCatalogIntegrationTest, CreateNestedNamespace) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create parent namespace - Namespace parent{.levels = {"parent"}}; - auto status = catalog->CreateNamespace(parent, {}); - EXPECT_THAT(status, IsOk()); - - // Create nested namespace - Namespace child{.levels = {"parent", "child"}}; - status = catalog->CreateNamespace(child, {}); - EXPECT_THAT(status, IsOk()); - - // Verify nested namespace exists - auto list_result = catalog->ListNamespaces(parent); - ASSERT_THAT(list_result, IsOk()); - EXPECT_EQ(list_result->size(), 1); - EXPECT_EQ(list_result->at(0).levels, (std::vector{"parent", "child"})); -} - -TEST_F(RestCatalogIntegrationTest, GetNamespaceProperties) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace with properties - Namespace ns{.levels = {"test_get_props"}}; - std::unordered_map properties{{"key1", "value1"}, - {"key2", "value2"}}; - auto status = catalog->CreateNamespace(ns, properties); - EXPECT_THAT(status, IsOk()); - - // Get properties - auto props_result = catalog->GetNamespaceProperties(ns); - ASSERT_THAT(props_result, IsOk()); - EXPECT_EQ(props_result->at("key1"), "value1"); - EXPECT_EQ(props_result->at("key2"), "value2"); -} - -TEST_F(RestCatalogIntegrationTest, NamespaceExists) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Check non-existent namespace - Namespace ns{.levels = {"non_existent"}}; - auto exists_result = catalog->NamespaceExists(ns); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(*exists_result); - - // Create namespace - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Check it now exists - exists_result = catalog->NamespaceExists(ns); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(exists_result.value()); -} - -TEST_F(RestCatalogIntegrationTest, UpdateNamespaceProperties) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace with initial properties - Namespace ns{.levels = {"test_update"}}; - std::unordered_map initial_props{{"key1", "value1"}, - {"key2", "value2"}}; - auto status = catalog->CreateNamespace(ns, initial_props); - EXPECT_THAT(status, IsOk()); - - // Update properties: modify key1, add key3, remove key2 - std::unordered_map updates{{"key1", "updated_value1"}, - {"key3", "value3"}}; - std::unordered_set removals{"key2"}; - status = catalog->UpdateNamespaceProperties(ns, updates, removals); - EXPECT_THAT(status, IsOk()); - - // Verify updated properties - auto props_result = catalog->GetNamespaceProperties(ns); - ASSERT_THAT(props_result, IsOk()); - EXPECT_EQ(props_result->at("key1"), "updated_value1"); - EXPECT_EQ(props_result->at("key3"), "value3"); - EXPECT_EQ(props_result->count("key2"), 0); // Should be removed -} - -TEST_F(RestCatalogIntegrationTest, DropNamespace) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_drop"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Verify it exists - auto exists_result = catalog->NamespaceExists(ns); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(*exists_result); - - // Drop namespace - status = catalog->DropNamespace(ns); - EXPECT_THAT(status, IsOk()); - - // Verify it no longer exists - exists_result = catalog->NamespaceExists(ns); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(exists_result.value()); -} - -TEST_F(RestCatalogIntegrationTest, DropNonExistentNamespace) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - Namespace ns{.levels = {"nonexistent_namespace"}}; - auto status = catalog->DropNamespace(ns); - - // Should return NoSuchNamespace error - EXPECT_THAT(status, IsError(ErrorKind::kNoSuchNamespace)); -} - -TEST_F(RestCatalogIntegrationTest, CreateTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create nested namespace with properties - Namespace ns{.levels = {"test_create_table", "apple", "ios"}}; - std::unordered_map ns_properties{{"owner", "ray"}, - {"community", "apache"}}; - - // Create parent namespaces first - auto status = catalog->CreateNamespace(Namespace{.levels = {"test_create_table"}}, {}); - EXPECT_THAT(status, IsOk()); - status = - catalog->CreateNamespace(Namespace{.levels = {"test_create_table", "apple"}}, {}); - EXPECT_THAT(status, IsOk()); - status = catalog->CreateNamespace(ns, ns_properties); - EXPECT_THAT(status, IsOk()); - - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - // Create table - TableIdentifier table_id{.ns = ns, .name = "t1"}; - std::unordered_map table_properties; - auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(table_result, IsOk()); - auto& table = table_result.value(); - - // Verify table - EXPECT_EQ(table->name().ns.levels, - (std::vector{"test_create_table", "apple", "ios"})); - EXPECT_EQ(table->name().name, "t1"); - - // Verify that creating the same table again fails - auto duplicate_result = catalog->CreateTable(table_id, schema, partition_spec, - sort_order, "", table_properties); - EXPECT_THAT(duplicate_result, IsError(ErrorKind::kAlreadyExists)); - EXPECT_THAT(duplicate_result, - HasErrorMessage("Table already exists: test_create_table.apple.ios.t1")); -} - -TEST_F(RestCatalogIntegrationTest, ListTables) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_list_tables"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Initially no tables - auto list_result = catalog->ListTables(ns); - ASSERT_THAT(list_result, IsOk()); - EXPECT_TRUE(list_result.value().empty()); - - // Create tables - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - std::unordered_map table_properties; - - TableIdentifier table1{.ns = ns, .name = "table1"}; - auto create_result = catalog->CreateTable(table1, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(create_result, IsOk()); - - TableIdentifier table2{.ns = ns, .name = "table2"}; - create_result = catalog->CreateTable(table2, schema, partition_spec, sort_order, "", - table_properties); - ASSERT_THAT(create_result, IsOk()); - - // List and varify tables - list_result = catalog->ListTables(ns); - ASSERT_THAT(list_result, IsOk()); - EXPECT_THAT(list_result.value(), testing::UnorderedElementsAre( - testing::Field(&TableIdentifier::name, "table1"), - testing::Field(&TableIdentifier::name, "table2"))); -} - -TEST_F(RestCatalogIntegrationTest, LoadTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace and table first - Namespace ns{.levels = {"test_load_table"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Create schema, partition spec, and sort order using helper functions - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - // Create table - TableIdentifier table_id{.ns = ns, .name = "test_table"}; - std::unordered_map table_properties{{"key1", "value1"}}; - auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(create_result, IsOk()); - - // Load the table - auto load_result = catalog->LoadTable(table_id); - ASSERT_THAT(load_result, IsOk()); - auto& loaded_table = load_result.value(); - - // Verify loaded table properties - EXPECT_EQ(loaded_table->name().ns.levels, std::vector{"test_load_table"}); - EXPECT_EQ(loaded_table->name().name, "test_table"); - EXPECT_NE(loaded_table->metadata(), nullptr); - - // Verify schema - auto loaded_schema_result = loaded_table->schema(); - ASSERT_THAT(loaded_schema_result, IsOk()); - auto loaded_schema = loaded_schema_result.value(); - EXPECT_EQ(loaded_schema->fields().size(), 2); - EXPECT_EQ(loaded_schema->fields()[0].name(), "id"); - EXPECT_EQ(loaded_schema->fields()[1].name(), "data"); -} - -TEST_F(RestCatalogIntegrationTest, DropTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace and table first - Namespace ns{.levels = {"test_drop_table"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Create table - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - TableIdentifier table_id{.ns = ns, .name = "table_to_drop"}; - std::unordered_map table_properties; - auto create_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(create_result, IsOk()); - - // Verify table exists - auto load_result = catalog->TableExists(table_id); - ASSERT_THAT(load_result, IsOk()); - EXPECT_TRUE(load_result.value()); - - // Drop the table - status = catalog->DropTable(table_id, /*purge=*/false); - ASSERT_THAT(status, IsOk()); - - // Verify table no longer exists - load_result = catalog->TableExists(table_id); - ASSERT_THAT(load_result, IsOk()); - EXPECT_FALSE(load_result.value()); -} - -TEST_F(RestCatalogIntegrationTest, RenameTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_rename_table"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Create table - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - TableIdentifier old_table_id{.ns = ns, .name = "old_table"}; - std::unordered_map table_properties; - auto table_result = catalog->CreateTable(old_table_id, schema, partition_spec, - sort_order, "", table_properties); - ASSERT_THAT(table_result, IsOk()); - - // Rename table - TableIdentifier new_table_id{.ns = ns, .name = "new_table"}; - status = catalog->RenameTable(old_table_id, new_table_id); - ASSERT_THAT(status, IsOk()); - - // Verify old table no longer exists - auto exists_result = catalog->TableExists(old_table_id); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(exists_result.value()); - - // Verify new table exists - exists_result = catalog->TableExists(new_table_id); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(exists_result.value()); -} - -TEST_F(RestCatalogIntegrationTest, UpdateTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_update_table"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Create table - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - TableIdentifier table_id{.ns = ns, .name = "t1"}; - std::unordered_map table_properties; - auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(table_result, IsOk()); - auto& table = table_result.value(); - - // Update table properties - std::vector> requirements; - requirements.push_back(std::make_unique(table->uuid())); - - std::vector> updates; - updates.push_back(std::make_unique( - std::unordered_map{{"key1", "value1"}})); - - auto update_result = catalog->UpdateTable(table_id, requirements, updates); - ASSERT_THAT(update_result, IsOk()); - auto& updated_table = update_result.value(); - - // Verify the property was set - auto& props = updated_table->metadata()->properties.configs(); - EXPECT_EQ(props.at("key1"), "value1"); -} - -TEST_F(RestCatalogIntegrationTest, RegisterTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_register_table"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Create table - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - TableIdentifier table_id{.ns = ns, .name = "t1"}; - std::unordered_map table_properties; - auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order, - "", table_properties); - ASSERT_THAT(table_result, IsOk()); - auto& table = table_result.value(); - std::string metadata_location(table->metadata_file_location()); - - // Drop table (without purge, to keep metadata file) - status = catalog->DropTable(table_id, /*purge=*/false); - ASSERT_THAT(status, IsOk()); - - // Register table with new name - TableIdentifier new_table_id{.ns = ns, .name = "t2"}; - auto register_result = catalog->RegisterTable(new_table_id, metadata_location); - ASSERT_THAT(register_result, IsOk()); - auto& registered_table = register_result.value(); - - EXPECT_EQ(table->metadata_file_location(), registered_table->metadata_file_location()); - EXPECT_NE(table->name(), registered_table->name()); -} - -TEST_F(RestCatalogIntegrationTest, StageCreateTable) { - auto catalog_result = CreateCatalog(); - ASSERT_THAT(catalog_result, IsOk()); - auto& catalog = catalog_result.value(); - - // Create namespace - Namespace ns{.levels = {"test_stage_create"}}; - auto status = catalog->CreateNamespace(ns, {}); - EXPECT_THAT(status, IsOk()); - - // Stage create table - auto schema = CreateDefaultSchema(); - auto partition_spec = PartitionSpec::Unpartitioned(); - auto sort_order = SortOrder::Unsorted(); - - TableIdentifier table_id{.ns = ns, .name = "staged_table"}; - std::unordered_map table_properties{{"key1", "value1"}}; - auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec, - sort_order, "", table_properties); - ASSERT_THAT(txn_result, IsOk()); - auto& txn = txn_result.value(); - - // Verify the staged table in transaction - EXPECT_NE(txn->table(), nullptr); - EXPECT_EQ(txn->table()->name(), table_id); - - // Table should NOT exist in catalog yet (staged but not committed) - auto exists_result = catalog->TableExists(table_id); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_FALSE(exists_result.value()); - - // Commit the transaction - auto commit_result = txn->Commit(); - ASSERT_THAT(commit_result, IsOk()); - auto& committed_table = commit_result.value(); - - // Verify table now exists - exists_result = catalog->TableExists(table_id); - ASSERT_THAT(exists_result, IsOk()); - EXPECT_TRUE(exists_result.value()); - - // Verify table properties - EXPECT_EQ(committed_table->name(), table_id); - auto& props = committed_table->metadata()->properties.configs(); - EXPECT_EQ(props.at("key1"), "value1"); -} - -} // namespace iceberg::rest diff --git a/src/iceberg/test/rest_util_test.cc b/src/iceberg/test/rest_util_test.cc index e11f00154..c4b6aa855 100644 --- a/src/iceberg/test/rest_util_test.cc +++ b/src/iceberg/test/rest_util_test.cc @@ -19,8 +19,11 @@ #include "iceberg/catalog/rest/rest_util.h" +#include + #include +#include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/table_identifier.h" #include "iceberg/test/matchers.h" @@ -154,4 +157,50 @@ TEST(RestUtilTest, MergeConfigs) { EXPECT_EQ(merged_empty["key"], "value"); } +struct SnapshotModeValidCase { + std::string input; + SnapshotMode expected; +}; + +class SnapshotLoadingModeValidTest + : public ::testing::TestWithParam {}; + +TEST_P(SnapshotLoadingModeValidTest, ParsesCorrectly) { + auto config = RestCatalogProperties::default_properties(); + config.Set(RestCatalogProperties::kSnapshotLoadingMode, GetParam().input); + auto result = config.SnapshotLoadingMode(); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), GetParam().expected); +} + +INSTANTIATE_TEST_SUITE_P(RestCatalogProperties, SnapshotLoadingModeValidTest, + ::testing::Values( + // Exact uppercase + SnapshotModeValidCase{"ALL", SnapshotMode::kAll}, + SnapshotModeValidCase{"REFS", SnapshotMode::kRefs}, + // Lowercase (Java parity: toUpperCase before parsing) + SnapshotModeValidCase{"all", SnapshotMode::kAll}, + SnapshotModeValidCase{"refs", SnapshotMode::kRefs}, + // Mixed case + SnapshotModeValidCase{"All", SnapshotMode::kAll}, + SnapshotModeValidCase{"Refs", SnapshotMode::kRefs})); + +class SnapshotLoadingModeInvalidTest : public ::testing::TestWithParam {}; + +TEST_P(SnapshotLoadingModeInvalidTest, ReturnsError) { + auto config = RestCatalogProperties::default_properties(); + config.Set(RestCatalogProperties::kSnapshotLoadingMode, GetParam()); + EXPECT_THAT(config.SnapshotLoadingMode(), IsError(ErrorKind::kInvalidArgument)); +} + +INSTANTIATE_TEST_SUITE_P(RestCatalogProperties, SnapshotLoadingModeInvalidTest, + ::testing::Values("INVALID", "none", "")); + +TEST(RestCatalogPropertiesTest, SnapshotLoadingModeDefaultIsAll) { + auto config = RestCatalogProperties::default_properties(); + auto result = config.SnapshotLoadingMode(); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), SnapshotMode::kAll); +} + } // namespace iceberg::rest