From e9f45f78134c4578502442793f042ece62874cdd Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 30 Apr 2026 16:13:05 +0800 Subject: [PATCH 1/8] Add options system table framework --- include/paimon/catalog/identifier.h | 15 +++ include/paimon/table/source/table_read.h | 4 + src/paimon/CMakeLists.txt | 5 + .../core/catalog/file_system_catalog.cpp | 36 +++++- .../core/catalog/file_system_catalog_test.cpp | 46 +++++++ src/paimon/core/catalog/identifier.cpp | 72 +++++++++++ src/paimon/core/catalog/identifier_test.cpp | 40 ++++++ src/paimon/core/table/source/split.cpp | 25 +++- src/paimon/core/table/source/table_read.cpp | 15 +++ .../core/table/source/table_read_test.cpp | 62 ++++++++++ src/paimon/core/table/source/table_scan.cpp | 9 ++ .../table/system/options_system_table.cpp | 117 ++++++++++++++++++ .../core/table/system/options_system_table.h | 47 +++++++ src/paimon/core/table/system/system_table.cpp | 83 +++++++++++++ src/paimon/core/table/system/system_table.h | 66 ++++++++++ .../core/table/system/system_table_read.cpp | 42 +++++++ .../core/table/system/system_table_read.h | 40 ++++++ .../core/table/system/system_table_scan.cpp | 34 +++++ .../core/table/system/system_table_scan.h | 56 +++++++++ .../core/table/system/system_table_schema.cpp | 91 ++++++++++++++ .../core/table/system/system_table_schema.h | 55 ++++++++ 21 files changed, 957 insertions(+), 3 deletions(-) create mode 100644 src/paimon/core/table/system/options_system_table.cpp create mode 100644 src/paimon/core/table/system/options_system_table.h create mode 100644 src/paimon/core/table/system/system_table.cpp create mode 100644 src/paimon/core/table/system/system_table.h create mode 100644 src/paimon/core/table/system/system_table_read.cpp create mode 100644 src/paimon/core/table/system/system_table_read.h create mode 100644 src/paimon/core/table/system/system_table_scan.cpp create mode 100644 src/paimon/core/table/system/system_table_scan.h create mode 100644 src/paimon/core/table/system/system_table_schema.cpp create mode 100644 src/paimon/core/table/system/system_table_schema.h diff --git a/include/paimon/catalog/identifier.h b/include/paimon/catalog/identifier.h index 061c60a05..c15dc82a8 100644 --- a/include/paimon/catalog/identifier.h +++ b/include/paimon/catalog/identifier.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include "paimon/type_fwd.h" @@ -27,6 +28,9 @@ namespace paimon { class PAIMON_EXPORT Identifier { public: static const char kUnknownDatabase[]; + static const char kSystemTableSplitter[]; + static const char kSystemBranchPrefix[]; + static const char kDefaultMainBranch[]; explicit Identifier(const std::string& table); Identifier(const std::string& database, const std::string& table); @@ -34,11 +38,22 @@ class PAIMON_EXPORT Identifier { bool operator==(const Identifier& other); const std::string& GetDatabaseName() const; const std::string& GetTableName() const; + const std::string& GetDataTableName() const; + const std::optional& GetBranchName() const; + std::string GetBranchNameOrDefault() const; + const std::optional& GetSystemTableName() const; + bool IsSystemTable() const; std::string ToString() const; private: + void SplitTableName() const; + const std::string database_; const std::string table_; + mutable bool parsed_ = false; + mutable std::string data_table_; + mutable std::optional branch_; + mutable std::optional system_table_; }; } // namespace paimon diff --git a/include/paimon/table/source/table_read.h b/include/paimon/table/source/table_read.h index f8327da1f..51c1b6efb 100644 --- a/include/paimon/table/source/table_read.h +++ b/include/paimon/table/source/table_read.h @@ -66,6 +66,10 @@ class PAIMON_EXPORT TableRead { protected: explicit TableRead(const std::shared_ptr& memory_pool); + std::shared_ptr GetMemoryPool() const { + return pool_; + } + private: std::shared_ptr pool_; }; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 1a5737160..ba3ad407b 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -311,6 +311,11 @@ set(PAIMON_CORE_SRCS core/table/source/table_read.cpp core/table/source/table_scan.cpp core/table/source/data_evolution_batch_scan.cpp + core/table/system/options_system_table.cpp + core/table/system/system_table.cpp + core/table/system/system_table_read.cpp + core/table/system/system_table_scan.cpp + core/table/system/system_table_schema.cpp core/tag/tag.cpp core/utils/field_mapping.cpp core/utils/file_store_path_factory.cpp diff --git a/src/paimon/core/catalog/file_system_catalog.cpp b/src/paimon/core/catalog/file_system_catalog.cpp index 3d07ade2c..a8193a120 100644 --- a/src/paimon/core/catalog/file_system_catalog.cpp +++ b/src/paimon/core/catalog/file_system_catalog.cpp @@ -30,6 +30,8 @@ #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/snapshot.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/table/system/system_table_schema.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/snapshot_manager.h" #include "paimon/defs.h" @@ -92,6 +94,16 @@ Result FileSystemCatalog::DatabaseExists(const std::string& db_name) const } Result FileSystemCatalog::TableExists(const Identifier& identifier) const { + if (identifier.IsSystemTable()) { + if (!identifier.GetSystemTableName() || + !IsSupportedSystemTable(identifier.GetSystemTableName().value())) { + return false; + } + Identifier data_identifier(identifier.GetDatabaseName(), identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + return latest_schema != std::nullopt; + } PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(identifier)); return latest_schema != std::nullopt; @@ -169,7 +181,7 @@ bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) { } bool FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { - return (identifier.GetTableName().find(SYSTEM_TABLE_SPLITTER) != std::string::npos); + return identifier.IsSystemTable(); } bool FileSystemCatalog::IsSystemTable(const Identifier& identifier) { @@ -237,6 +249,23 @@ Result FileSystemCatalog::TableExistsInFileSystem(const std::string& table Result> FileSystemCatalog::LoadTableSchema( const Identifier& identifier) const { + if (identifier.IsSystemTable()) { + if (!identifier.GetSystemTableName() || + !IsSupportedSystemTable(identifier.GetSystemTableName().value())) { + return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); + } + Identifier data_identifier(identifier.GetDatabaseName(), identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + if (!latest_schema) { + return Status::NotExist(fmt::format("{} not exist", data_identifier.ToString())); + } + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + CreateSystemTable(identifier.GetSystemTableName().value(), fs_, + GetTableLocation(identifier), latest_schema.value())); + return std::make_shared(system_table->ArrowSchema()); + } PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(identifier)); if (!latest_schema) { @@ -246,6 +275,11 @@ Result> FileSystemCatalog::LoadTableSchema( } Result> FileSystemCatalog::GetTable(const Identifier& identifier) const { + if (identifier.IsSystemTable()) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, LoadTableSchema(identifier)); + return std::make_shared(schema, identifier.GetDatabaseName(), + identifier.GetTableName()); + } return Table::Create(fs_, GetTableLocation(identifier), identifier); } diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index f97801359..23ca50267 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -149,6 +149,52 @@ TEST(FileSystemCatalogTest, TestCreateTable) { ArrowSchemaRelease(&schema); } +TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + options["custom.option"] = "custom-value"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + Identifier options_identifier("db1", "tbl1$options"); + ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(options_identifier)); + ASSERT_TRUE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown"))); + ASSERT_FALSE(exists); + ASSERT_EQ(catalog.GetTableLocation(options_identifier), + PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options")); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr system_schema, + catalog.LoadTableSchema(options_identifier)); + ASSERT_OK_AND_ASSIGN(auto c_schema, system_schema->GetArrowSchema()); + auto loaded_schema_result = arrow::ImportSchema(c_schema.get()); + ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString(); + auto loaded_schema = loaded_schema_result.ValueUnsafe(); + ASSERT_EQ(loaded_schema->field_names(), (std::vector{"key", "value"})); + + ::ArrowSchema system_create_schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(options_identifier, &system_create_schema, {}, {}, options, false), + "Cannot create table for system table"); + ArrowSchemaRelease(&system_create_schema); + ASSERT_NOK_WITH_MSG(catalog.DropTable(options_identifier, false), "Cannot drop system table"); + ASSERT_NOK_WITH_MSG(catalog.RenameTable(options_identifier, Identifier("db1", "tbl2"), false), + "Cannot rename system table"); +} + TEST(FileSystemCatalogTest, TestCreateTableWithBlob) { std::map options; options[Options::FILE_SYSTEM] = "local"; diff --git a/src/paimon/core/catalog/identifier.cpp b/src/paimon/core/catalog/identifier.cpp index 7ef7c04df..bcac95124 100644 --- a/src/paimon/core/catalog/identifier.cpp +++ b/src/paimon/core/catalog/identifier.cpp @@ -16,11 +16,20 @@ #include "paimon/catalog/identifier.h" +#include +#include +#include +#include + #include "fmt/format.h" +#include "paimon/common/utils/string_utils.h" namespace paimon { const char Identifier::kUnknownDatabase[] = "unknown"; +const char Identifier::kSystemTableSplitter[] = "$"; +const char Identifier::kSystemBranchPrefix[] = "branch_"; +const char Identifier::kDefaultMainBranch[] = "main"; Identifier::Identifier(const std::string& table) : Identifier(std::string(kUnknownDatabase), table) {} @@ -40,8 +49,71 @@ const std::string& Identifier::GetTableName() const { return table_; } +const std::string& Identifier::GetDataTableName() const { + SplitTableName(); + return data_table_; +} + +const std::optional& Identifier::GetBranchName() const { + SplitTableName(); + return branch_; +} + +std::string Identifier::GetBranchNameOrDefault() const { + const auto& branch = GetBranchName(); + return branch ? branch.value() : std::string(kDefaultMainBranch); +} + +const std::optional& Identifier::GetSystemTableName() const { + SplitTableName(); + return system_table_; +} + +bool Identifier::IsSystemTable() const { + return GetSystemTableName().has_value(); +} + std::string Identifier::ToString() const { return fmt::format("Identifier{{database='{}', table='{}'}}", database_, table_); } +void Identifier::SplitTableName() const { + if (parsed_) { + return; + } + std::string data_table; + std::optional branch; + std::optional system_table; + std::vector splits = + StringUtils::Split(table_, kSystemTableSplitter, /*ignore_empty=*/false); + if (splits.size() == 1) { + data_table = table_; + } else if (splits.size() == 2) { + data_table = splits[0]; + if (StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + } else { + system_table = splits[1]; + } + } else if (splits.size() == 3) { + if (!StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + throw std::invalid_argument(fmt::format( + "System table can only contain one '$' separator, but this is: {}", table_)); + } + data_table = splits[0]; + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + system_table = splits[2]; + } else { + throw std::invalid_argument(fmt::format("Invalid table name: {}", table_)); + } + if (data_table.empty() || (branch && branch->empty()) || + (system_table && system_table->empty())) { + throw std::invalid_argument(fmt::format("Invalid table name: {}", table_)); + } + data_table_ = std::move(data_table); + branch_ = std::move(branch); + system_table_ = std::move(system_table); + parsed_ = true; +} + } // namespace paimon diff --git a/src/paimon/core/catalog/identifier_test.cpp b/src/paimon/core/catalog/identifier_test.cpp index 8c1750158..0d7fb24a7 100644 --- a/src/paimon/core/catalog/identifier_test.cpp +++ b/src/paimon/core/catalog/identifier_test.cpp @@ -16,6 +16,8 @@ #include "paimon/catalog/identifier.h" +#include + #include "gtest/gtest.h" namespace paimon::test { @@ -52,4 +54,42 @@ TEST(IdentifierTest, EmptyDatabaseRemainsEmpty) { EXPECT_EQ(id.GetTableName(), "my_table"); } +TEST(IdentifierTest, ParseSystemTable) { + Identifier id("db", "tbl$options"); + EXPECT_EQ(id.GetTableName(), "tbl$options"); + EXPECT_EQ(id.GetDataTableName(), "tbl"); + EXPECT_FALSE(id.GetBranchName()); + ASSERT_TRUE(id.GetSystemTableName()); + EXPECT_EQ(id.GetSystemTableName().value(), "options"); + EXPECT_TRUE(id.IsSystemTable()); +} + +TEST(IdentifierTest, ParseBranchTable) { + Identifier id("db", "tbl$branch_dev"); + EXPECT_EQ(id.GetDataTableName(), "tbl"); + ASSERT_TRUE(id.GetBranchName()); + EXPECT_EQ(id.GetBranchName().value(), "dev"); + EXPECT_EQ(id.GetBranchNameOrDefault(), "dev"); + EXPECT_FALSE(id.GetSystemTableName()); + EXPECT_FALSE(id.IsSystemTable()); +} + +TEST(IdentifierTest, ParseBranchSystemTable) { + Identifier id("db", "tbl$branch_dev$options"); + EXPECT_EQ(id.GetDataTableName(), "tbl"); + ASSERT_TRUE(id.GetBranchName()); + EXPECT_EQ(id.GetBranchName().value(), "dev"); + ASSERT_TRUE(id.GetSystemTableName()); + EXPECT_EQ(id.GetSystemTableName().value(), "options"); + EXPECT_TRUE(id.IsSystemTable()); +} + +TEST(IdentifierTest, InvalidSystemTableName) { + Identifier invalid_middle("db", "tbl$bad$options"); + EXPECT_THROW(invalid_middle.IsSystemTable(), std::invalid_argument); + + Identifier too_many("db", "tbl$branch_dev$options$extra"); + EXPECT_THROW(too_many.IsSystemTable(), std::invalid_argument); +} + } // namespace paimon::test diff --git a/src/paimon/core/table/source/split.cpp b/src/paimon/core/table/source/split.cpp index 2d188cd53..c8c87edee 100644 --- a/src/paimon/core/table/source/split.cpp +++ b/src/paimon/core/table/source/split.cpp @@ -26,6 +26,7 @@ #include "paimon/core/table/source/data_split_impl.h" #include "paimon/core/table/source/deletion_file.h" #include "paimon/core/table/source/fallback_data_split.h" +#include "paimon/core/table/system/system_table_scan.h" #include "paimon/core/utils/object_serializer.h" #include "paimon/global_index/indexed_split.h" #include "paimon/io/byte_array_input_stream.h" @@ -162,8 +163,13 @@ Result Split::Serialize(const std::shared_ptr& split, } else { out.WriteValue(false); } + } else if (auto system_table_split = std::dynamic_pointer_cast(split)) { + out.WriteValue(SystemTableSplit::MAGIC); + out.WriteValue(SystemTableSplit::VERSION); + out.WriteString(system_table_split->TablePath()); } else { - return Status::Invalid("invalid split, cannot cast to DataSplit or IndexedSplit"); + return Status::Invalid( + "invalid split, cannot cast to DataSplit, IndexedSplit or SystemTableSplit"); } PAIMON_UNIQUE_PTR bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); @@ -217,6 +223,21 @@ Result> Split::Deserialize(const char* buffer, size_t len fmt::format("invalid IndexedSplit, remaining {} bytes after deserializing", stream_length - pos)); } + } else if (magic == SystemTableSplit::MAGIC) { + PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue()); + if (version != SystemTableSplit::VERSION) { + return Status::Invalid( + fmt::format("Unsupported SystemTableSplit version: {}", version)); + } + PAIMON_ASSIGN_OR_RAISE(std::string table_path, in.ReadString()); + PAIMON_ASSIGN_OR_RAISE(int64_t pos, in.GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in.Length()); + if (pos != stream_length) { + return Status::Invalid( + fmt::format("invalid SystemTableSplit, remaining {} bytes after deserializing", + stream_length - pos)); + } + return std::make_shared(table_path); } else if (magic == DataSplitImpl::MAGIC) { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_split, ReadDataSplitWithoutMagicNumber(magic, &in, pool)); @@ -233,6 +254,6 @@ Result> Split::Deserialize(const char* buffer, size_t len stream_length - pos)); } } - return Status::Invalid("invalid split, must be DataSplit or IndexedSplit"); + return Status::Invalid("invalid split, must be DataSplit, IndexedSplit or SystemTableSplit"); } } // namespace paimon diff --git a/src/paimon/core/table/source/table_read.cpp b/src/paimon/core/table/source/table_read.cpp index cc7e646fa..8726ca35f 100644 --- a/src/paimon/core/table/source/table_read.cpp +++ b/src/paimon/core/table/source/table_read.cpp @@ -33,6 +33,8 @@ #include "paimon/core/table/source/append_only_table_read.h" #include "paimon/core/table/source/fallback_table_read.h" #include "paimon/core/table/source/key_value_table_read.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/table/system/system_table_read.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/defs.h" @@ -124,6 +126,19 @@ Result> TableRead::Create(std::unique_ptrGetMemoryPool(); auto executor = context->GetExecutor(); + PAIMON_ASSIGN_OR_RAISE( + CoreOptions tmp_core_options, + CoreOptions::FromMap(context->GetOptions(), context->GetSpecificFileSystem(), + context->GetFileSystemSchemeToIdentifierMap())); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + TryParseSystemTablePath(context->GetPath())); + if (system_table_path) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + LoadSystemTableFromPath(tmp_core_options.GetFileSystem(), context->GetPath())); + return std::make_unique(system_table, memory_pool); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_context, CreateInternalReadContext(context, context->GetBranch())); diff --git a/src/paimon/core/table/source/table_read_test.cpp b/src/paimon/core/table/source/table_read_test.cpp index 2b72dfdbc..33e57039f 100644 --- a/src/paimon/core/table/source/table_read_test.cpp +++ b/src/paimon/core/table/source/table_read_test.cpp @@ -20,7 +20,12 @@ #include #include +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" #include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" #include "paimon/core/core_options.h" #include "paimon/core/operation/abstract_split_read.h" #include "paimon/core/operation/split_read.h" @@ -31,6 +36,9 @@ #include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" #include "paimon/status.h" +#include "paimon/table/source/split.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -153,4 +161,58 @@ TEST(TableReadTest, TestMergeOptions) { {"manifest.format", "orc"}, {"file.format", "orc"}}; ASSERT_EQ(expected_options, core_options.ToMap()); } + +TEST(TableReadTest, TestReadOptionsSystemTable) { + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {"custom.option", "custom-value"}}; + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog->CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + std::string system_table_path = catalog->GetTableLocation(Identifier("db1", "tbl1$options")); + ScanContextBuilder scan_context_builder(system_table_path); + scan_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_EQ(plan->Splits().size(), 1); + + ASSERT_OK_AND_ASSIGN(auto serialized_split, + Split::Serialize(plan->Splits()[0], GetDefaultPool())); + ASSERT_OK_AND_ASSIGN( + auto deserialized_split, + Split::Deserialize(serialized_split.data(), serialized_split.size(), GetDefaultPool())); + + ReadContextBuilder read_context_builder(system_table_path); + read_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader({deserialized_split})); + ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); + ASSERT_TRUE(result); + ASSERT_EQ(result->type()->id(), arrow::Type::STRUCT); + + auto struct_array = std::static_pointer_cast(result->chunk(0)); + auto key_array = std::static_pointer_cast(struct_array->field(0)); + auto value_array = std::static_pointer_cast(struct_array->field(1)); + std::map actual; + for (int64_t i = 0; i < struct_array->length(); ++i) { + actual[key_array->GetString(i)] = value_array->GetString(i); + } + EXPECT_EQ(actual[Options::FILE_SYSTEM], "local"); + EXPECT_EQ(actual[Options::FILE_FORMAT], "orc"); + EXPECT_EQ(actual[Options::MANIFEST_FORMAT], "orc"); + EXPECT_EQ(actual["custom.option"], "custom-value"); +} } // namespace paimon::test diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 43ac11b4f..43f0097a1 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -48,6 +48,7 @@ #include "paimon/core/table/source/merge_tree_split_generator.h" #include "paimon/core/table/source/snapshot/snapshot_reader.h" #include "paimon/core/table/source/split_generator.h" +#include "paimon/core/table/system/system_table.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/core/utils/index_file_path_factories.h" @@ -165,6 +166,14 @@ Result> TableScan::Create(std::unique_ptrGetOptions(), context->GetSpecificFileSystem())); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + TryParseSystemTablePath(context->GetPath())); + if (system_table_path) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + LoadSystemTableFromPath(tmp_options.GetFileSystem(), context->GetPath())); + return system_table->NewScan(); + } SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath()); PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, schema_manager.Latest()); diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp new file mode 100644 index 000000000..25b9d7b85 --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -0,0 +1,117 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/options_system_table.h" + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_scan.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +namespace { + +std::shared_ptr OptionsSchema() { + return arrow::schema({arrow::field("key", arrow::utf8(), /*nullable=*/false), + arrow::field("value", arrow::utf8(), /*nullable=*/false)}); +} + +class OptionsBatchReader : public BatchReader { + public: + explicit OptionsBatchReader(std::map options) + : options_(std::move(options)) {} + + Result NextBatch() override { + if (emitted_) { + return BatchReader::MakeEofBatch(); + } + emitted_ = true; + + arrow::StringBuilder key_builder; + arrow::StringBuilder value_builder; + for (const auto& [key, value] : options_) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder.Append(key)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder.Append(value)); + } + std::shared_ptr key_array; + std::shared_ptr value_array; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(key_array, key_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(value_array, value_builder.Finish()); + auto struct_array = std::make_shared( + arrow::struct_(OptionsSchema()->fields()), key_array->length(), + std::vector>{key_array, value_array}); + + auto c_array = std::make_unique<::ArrowArray>(); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*struct_array, c_array.get(), c_schema.get())); + return std::make_pair(std::move(c_array), std::move(c_schema)); + } + + std::shared_ptr GetReaderMetrics() const override { + return std::make_shared(); + } + + void Close() override { + emitted_ = true; + } + + private: + std::map options_; + bool emitted_ = false; +}; + +} // namespace + +OptionsSystemTable::OptionsSystemTable(std::string table_path, + std::shared_ptr table_schema) + : table_path_(std::move(table_path)), table_schema_(std::move(table_schema)) {} + +std::string OptionsSystemTable::Name() const { + return NAME; +} + +std::shared_ptr OptionsSystemTable::ArrowSchema() const { + return OptionsSchema(); +} + +Result> OptionsSystemTable::NewScan() const { + return std::make_unique(table_path_); +} + +Result> OptionsSystemTable::NewReader( + const std::vector>& splits, + const std::shared_ptr& /*pool*/) const { + for (const auto& split : splits) { + if (!std::dynamic_pointer_cast(split)) { + return Status::Invalid("unsupported split for options system table"); + } + } + return std::make_unique(table_schema_->Options()); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/options_system_table.h b/src/paimon/core/table/system/options_system_table.h new file mode 100644 index 000000000..c520f4155 --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/core/table/system/system_table.h" + +namespace paimon { +class TableSchema; + +class OptionsSystemTable : public SystemTable { + public: + static constexpr const char* NAME = "options"; + + OptionsSystemTable(std::string table_path, std::shared_ptr table_schema); + + std::string Name() const override; + std::shared_ptr ArrowSchema() const override; + Result> NewScan() const override; + Result> NewReader( + const std::vector>& splits, + const std::shared_ptr& pool) const override; + + private: + std::string table_path_; + std::shared_ptr table_schema_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp new file mode 100644 index 000000000..5ea1e4571 --- /dev/null +++ b/src/paimon/core/table/system/system_table.cpp @@ -0,0 +1,83 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table.h" + +#include +#include +#include +#include +#include + +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/options_system_table.h" + +namespace paimon { + +bool IsSupportedSystemTable(const std::string& system_table_name) { + return StringUtils::ToLowerCase(system_table_name) == OptionsSystemTable::NAME; +} + +Result> CreateSystemTable( + const std::string& system_table_name, const std::shared_ptr& /*fs*/, + const std::string& table_path, const std::shared_ptr& table_schema) { + std::string normalized_name = StringUtils::ToLowerCase(system_table_name); + if (normalized_name == OptionsSystemTable::NAME) { + return std::make_shared(table_path, table_schema); + } + return Status::NotExist("unsupported system table: ", system_table_name); +} + +Result> TryParseSystemTablePath(const std::string& path) { + std::string table_name = PathUtil::GetName(path); + Identifier identifier(table_name); + try { + if (!identifier.IsSystemTable()) { + return std::nullopt; + } + std::string parent = PathUtil::GetParentDirPath(path); + SystemTablePath system_table_path; + system_table_path.table_path = PathUtil::JoinPath(parent, identifier.GetDataTableName()); + system_table_path.branch = identifier.GetBranchName(); + system_table_path.system_table_name = identifier.GetSystemTableName().value(); + return system_table_path; + } catch (const std::exception& e) { + return Status::Invalid(e.what()); + } +} + +Result> LoadSystemTableFromPath(const std::shared_ptr& fs, + const std::string& path) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + TryParseSystemTablePath(path)); + if (!system_table_path) { + return Status::Invalid("path is not a system table path: ", path); + } + const auto& parsed = system_table_path.value(); + SchemaManager schema_manager(fs, parsed.table_path, parsed.branch.value_or("")); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + schema_manager.Latest()); + if (!latest_schema) { + return Status::NotExist("base table schema not found for system table path: ", path); + } + return CreateSystemTable(parsed.system_table_name, fs, path, latest_schema.value()); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.h b/src/paimon/core/table/system/system_table.h new file mode 100644 index 000000000..4f49d18a7 --- /dev/null +++ b/src/paimon/core/table/system/system_table.h @@ -0,0 +1,66 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" + +namespace paimon { +class FileSystem; +class MemoryPool; +class Split; +class TableScan; +class TableSchema; + +struct SystemTablePath { + std::string table_path; + std::optional branch; + std::string system_table_name; +}; + +class SystemTable { + public: + virtual ~SystemTable() = default; + + virtual std::string Name() const = 0; + virtual std::shared_ptr ArrowSchema() const = 0; + virtual Result> NewScan() const = 0; + virtual Result> NewReader( + const std::vector>& splits, + const std::shared_ptr& pool) const = 0; +}; + +Result> CreateSystemTable( + const std::string& system_table_name, const std::shared_ptr& fs, + const std::string& table_path, const std::shared_ptr& table_schema); + +bool IsSupportedSystemTable(const std::string& system_table_name); + +Result> TryParseSystemTablePath(const std::string& path); + +Result> LoadSystemTableFromPath(const std::shared_ptr& fs, + const std::string& path); + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_read.cpp b/src/paimon/core/table/system/system_table_read.cpp new file mode 100644 index 000000000..96175b209 --- /dev/null +++ b/src/paimon/core/table/system/system_table_read.cpp @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_read.h" + +#include +#include +#include + +#include "paimon/core/table/system/system_table.h" + +namespace paimon { + +SystemTableRead::SystemTableRead(std::shared_ptr system_table, + const std::shared_ptr& memory_pool) + : TableRead(memory_pool), system_table_(std::move(system_table)) {} + +Result> SystemTableRead::CreateReader( + const std::vector>& splits) { + return system_table_->NewReader(splits, GetMemoryPool()); +} + +Result> SystemTableRead::CreateReader( + const std::shared_ptr& split) { + std::vector> splits = {split}; + return CreateReader(splits); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_read.h b/src/paimon/core/table/system/system_table_read.h new file mode 100644 index 000000000..4d995cfa5 --- /dev/null +++ b/src/paimon/core/table/system/system_table_read.h @@ -0,0 +1,40 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/table/source/table_read.h" + +namespace paimon { +class SystemTable; + +class SystemTableRead : public TableRead { + public: + SystemTableRead(std::shared_ptr system_table, + const std::shared_ptr& memory_pool); + + Result> CreateReader( + const std::vector>& splits) override; + Result> CreateReader(const std::shared_ptr& split) override; + + private: + std::shared_ptr system_table_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.cpp b/src/paimon/core/table/system/system_table_scan.cpp new file mode 100644 index 000000000..64a619764 --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.cpp @@ -0,0 +1,34 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_scan.h" + +#include +#include +#include + +#include "paimon/core/table/source/plan_impl.h" + +namespace paimon { + +SystemTableScan::SystemTableScan(std::string table_path) : table_path_(std::move(table_path)) {} + +Result> SystemTableScan::CreatePlan() { + std::vector> splits = {std::make_shared(table_path_)}; + return std::make_shared(std::nullopt, splits); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.h b/src/paimon/core/table/system/system_table_scan.h new file mode 100644 index 000000000..1deccb536 --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.h @@ -0,0 +1,56 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/table/source/table_scan.h" + +namespace paimon { +class Plan; +class Split; + +class SystemTableSplit : public Split { + public: + static constexpr int64_t MAGIC = -739241698710434219L; + static constexpr int32_t VERSION = 1; + + explicit SystemTableSplit(std::string table_path) : table_path_(std::move(table_path)) {} + + const std::string& TablePath() const { + return table_path_; + } + + private: + std::string table_path_; +}; + +class SystemTableScan : public TableScan { + public: + explicit SystemTableScan(std::string table_path); + + Result> CreatePlan() override; + + private: + std::string table_path_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.cpp b/src/paimon/core/table/system/system_table_schema.cpp new file mode 100644 index 000000000..d0ee48012 --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_schema.h" + +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/field_type_utils.h" +#include "paimon/status.h" + +namespace paimon { + +SystemTableSchema::SystemTableSchema(std::shared_ptr schema) + : schema_(std::move(schema)) { + field_names_ = schema_->field_names(); +} + +Result> SystemTableSchema::GetArrowSchema() const { + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema_, c_schema.get())); + return c_schema; +} + +Result SystemTableSchema::GetJsonSchema() const { + return Status::NotImplemented("system table JSON schema is not supported"); +} + +std::vector SystemTableSchema::FieldNames() const { + return field_names_; +} + +Result SystemTableSchema::GetFieldType(const std::string& field_name) const { + auto field = schema_->GetFieldByName(field_name); + if (!field) { + return Status::NotExist("field ", field_name, " not exist in system table schema"); + } + return FieldTypeUtils::ConvertToFieldType(field->type()->id()); +} + +int64_t SystemTableSchema::Id() const { + return 0; +} + +const std::vector& SystemTableSchema::PrimaryKeys() const { + return empty_keys_; +} + +const std::vector& SystemTableSchema::PartitionKeys() const { + return empty_keys_; +} + +const std::vector& SystemTableSchema::BucketKeys() const { + return empty_keys_; +} + +int32_t SystemTableSchema::NumBuckets() const { + return -1; +} + +int32_t SystemTableSchema::HighestFieldId() const { + return static_cast(schema_->num_fields() - 1); +} + +const std::map& SystemTableSchema::Options() const { + return empty_options_; +} + +std::optional SystemTableSchema::Comment() const { + return std::nullopt; +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.h b/src/paimon/core/table/system/system_table_schema.h new file mode 100644 index 000000000..54aaaaa2d --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.h @@ -0,0 +1,55 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/result.h" +#include "paimon/schema/schema.h" + +namespace paimon { + +class SystemTableSchema : public Schema { + public: + explicit SystemTableSchema(std::shared_ptr schema); + + Result> GetArrowSchema() const override; + Result GetJsonSchema() const override; + std::vector FieldNames() const override; + Result GetFieldType(const std::string& field_name) const override; + int64_t Id() const override; + const std::vector& PrimaryKeys() const override; + const std::vector& PartitionKeys() const override; + const std::vector& BucketKeys() const override; + int32_t NumBuckets() const override; + int32_t HighestFieldId() const override; + const std::map& Options() const override; + std::optional Comment() const override; + + private: + std::shared_ptr schema_; + std::vector field_names_; + std::vector empty_keys_; + std::map empty_options_; +}; + +} // namespace paimon From 524d1a14fb3959aa974f70bde0bc907205d0183d Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 30 Apr 2026 16:20:47 +0800 Subject: [PATCH 2/8] Fix system table path result construction --- src/paimon/core/table/system/system_table.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp index 5ea1e4571..1dbbbfe2f 100644 --- a/src/paimon/core/table/system/system_table.cpp +++ b/src/paimon/core/table/system/system_table.cpp @@ -50,14 +50,14 @@ Result> TryParseSystemTablePath(const std::string Identifier identifier(table_name); try { if (!identifier.IsSystemTable()) { - return std::nullopt; + return std::optional(); } std::string parent = PathUtil::GetParentDirPath(path); SystemTablePath system_table_path; system_table_path.table_path = PathUtil::JoinPath(parent, identifier.GetDataTableName()); system_table_path.branch = identifier.GetBranchName(); system_table_path.system_table_name = identifier.GetSystemTableName().value(); - return system_table_path; + return std::optional(std::move(system_table_path)); } catch (const std::exception& e) { return Status::Invalid(e.what()); } From 8eb9d96bec448ef073ef2b4e78321ddc2824eb6a Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 30 Apr 2026 16:24:52 +0800 Subject: [PATCH 3/8] Fix options system table test build --- src/paimon/core/table/source/table_read_test.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/paimon/core/table/source/table_read_test.cpp b/src/paimon/core/table/source/table_read_test.cpp index 33e57039f..b50f61ded 100644 --- a/src/paimon/core/table/source/table_read_test.cpp +++ b/src/paimon/core/table/source/table_read_test.cpp @@ -35,6 +35,7 @@ #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" +#include "paimon/scan_context.h" #include "paimon/status.h" #include "paimon/table/source/split.h" #include "paimon/table/source/table_scan.h" @@ -198,7 +199,8 @@ TEST(TableReadTest, TestReadOptionsSystemTable) { read_context_builder.SetOptions(options); ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader({deserialized_split})); + std::vector> splits = {deserialized_split}; + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits)); ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); ASSERT_TRUE(result); ASSERT_EQ(result->type()->id(), arrow::Type::STRUCT); From dd382ce96cf639bcbf9c381480de4dced50d2621 Mon Sep 17 00:00:00 2001 From: Socrates Date: Thu, 30 Apr 2026 16:32:09 +0800 Subject: [PATCH 4/8] Cover system table edge cases --- .../core/catalog/file_system_catalog_test.cpp | 11 +++++++++++ src/paimon/core/catalog/identifier_test.cpp | 18 ++++++++++++++++++ .../core/table/source/table_read_test.cpp | 5 +++++ .../core/table/system/options_system_table.cpp | 3 +++ 4 files changed, 37 insertions(+) diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index 23ca50267..10705d2bd 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -173,6 +173,8 @@ TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { ASSERT_TRUE(exists); ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown"))); ASSERT_FALSE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "missing$options"))); + ASSERT_FALSE(exists); ASSERT_EQ(catalog.GetTableLocation(options_identifier), PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options")); @@ -183,6 +185,15 @@ TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString(); auto loaded_schema = loaded_schema_result.ValueUnsafe(); ASSERT_EQ(loaded_schema->field_names(), (std::vector{"key", "value"})); + ASSERT_EQ(loaded_schema->field(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(loaded_schema->field(1)->type()->id(), arrow::Type::STRING); + ASSERT_FALSE(loaded_schema->field(0)->nullable()); + ASSERT_FALSE(loaded_schema->field(1)->nullable()); + + ASSERT_OK_AND_ASSIGN(auto system_table, catalog.GetTable(options_identifier)); + ASSERT_EQ(system_table->Name(), "tbl1$options"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl1$unknown")), "not exist"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "missing$options")), "not exist"); ::ArrowSchema system_create_schema; ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); diff --git a/src/paimon/core/catalog/identifier_test.cpp b/src/paimon/core/catalog/identifier_test.cpp index 0d7fb24a7..dc36b8836 100644 --- a/src/paimon/core/catalog/identifier_test.cpp +++ b/src/paimon/core/catalog/identifier_test.cpp @@ -54,6 +54,16 @@ TEST(IdentifierTest, EmptyDatabaseRemainsEmpty) { EXPECT_EQ(id.GetTableName(), "my_table"); } +TEST(IdentifierTest, ParseDataTable) { + Identifier id("db", "tbl"); + EXPECT_EQ(id.GetTableName(), "tbl"); + EXPECT_EQ(id.GetDataTableName(), "tbl"); + EXPECT_FALSE(id.GetBranchName()); + EXPECT_EQ(id.GetBranchNameOrDefault(), Identifier::kDefaultMainBranch); + EXPECT_FALSE(id.GetSystemTableName()); + EXPECT_FALSE(id.IsSystemTable()); +} + TEST(IdentifierTest, ParseSystemTable) { Identifier id("db", "tbl$options"); EXPECT_EQ(id.GetTableName(), "tbl$options"); @@ -92,4 +102,12 @@ TEST(IdentifierTest, InvalidSystemTableName) { EXPECT_THROW(too_many.IsSystemTable(), std::invalid_argument); } +TEST(IdentifierTest, InvalidEmptySystemTableNameParts) { + EXPECT_THROW(Identifier("db", "$options").IsSystemTable(), std::invalid_argument); + EXPECT_THROW(Identifier("db", "tbl$").IsSystemTable(), std::invalid_argument); + EXPECT_THROW(Identifier("db", "tbl$branch_").IsSystemTable(), std::invalid_argument); + EXPECT_THROW(Identifier("db", "tbl$branch_dev$").IsSystemTable(), std::invalid_argument); + EXPECT_THROW(Identifier("db", "tbl$$options").IsSystemTable(), std::invalid_argument); +} + } // namespace paimon::test diff --git a/src/paimon/core/table/source/table_read_test.cpp b/src/paimon/core/table/source/table_read_test.cpp index b50f61ded..8d65d5f48 100644 --- a/src/paimon/core/table/source/table_read_test.cpp +++ b/src/paimon/core/table/source/table_read_test.cpp @@ -17,8 +17,10 @@ #include "paimon/table/source/table_read.h" #include +#include #include #include +#include #include "arrow/api.h" #include "arrow/c/abi.h" @@ -201,6 +203,9 @@ TEST(TableReadTest, TestReadOptionsSystemTable) { ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); std::vector> splits = {deserialized_split}; ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits)); + ASSERT_NOK_WITH_MSG(table_read->CreateReader(std::vector>{}), + "single split"); + ASSERT_NOK_WITH_MSG(table_read->CreateReader(std::make_shared()), "unsupported split"); ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); ASSERT_TRUE(result); ASSERT_EQ(result->type()->id(), arrow::Type::STRUCT); diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp index 25b9d7b85..fd28c5163 100644 --- a/src/paimon/core/table/system/options_system_table.cpp +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -106,6 +106,9 @@ Result> OptionsSystemTable::NewScan() const { Result> OptionsSystemTable::NewReader( const std::vector>& splits, const std::shared_ptr& /*pool*/) const { + if (splits.size() != 1) { + return Status::Invalid("options system table expects a single split"); + } for (const auto& split : splits) { if (!std::dynamic_pointer_cast(split)) { return Status::Invalid("unsupported split for options system table"); From efd25e83b01b022159f6fc975efd3e094e1b00e4 Mon Sep 17 00:00:00 2001 From: Socrates Date: Fri, 1 May 2026 11:17:43 +0800 Subject: [PATCH 5/8] Fix system table schema validation test --- src/paimon/core/catalog/file_system_catalog_test.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index 10705d2bd..75a4a5f7d 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -453,8 +453,7 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) { ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(identifier), "Identifier{database=\'db1\', table=\'tbl1\'} not exist"); - ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), - "do not support checking TableSchemaExists for system table."); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), "not exist"); ArrowSchemaRelease(&schema); } From 6c612948acab12684de162fa61c9075871bd5155 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 6 May 2026 15:26:37 +0800 Subject: [PATCH 6/8] Address system table review comments --- include/paimon/catalog/identifier.h | 13 +- .../core/catalog/file_system_catalog.cpp | 51 +++++--- src/paimon/core/catalog/file_system_catalog.h | 4 +- src/paimon/core/catalog/identifier.cpp | 37 +++--- src/paimon/core/catalog/identifier_test.cpp | 104 +++++++++------- src/paimon/core/table/source/split.cpp | 25 +--- src/paimon/core/table/source/table_read.cpp | 4 +- .../core/table/source/table_read_test.cpp | 66 ---------- src/paimon/core/table/source/table_scan.cpp | 4 +- .../table/system/options_system_table.cpp | 32 +++-- .../core/table/system/options_system_table.h | 2 +- src/paimon/core/table/system/system_table.cpp | 50 ++++---- src/paimon/core/table/system/system_table.h | 15 ++- .../core/table/system/system_table_scan.cpp | 3 +- .../core/table/system/system_table_scan.h | 9 +- test/inte/read_inte_test.cpp | 117 +++++++++++++++++- 16 files changed, 305 insertions(+), 231 deletions(-) diff --git a/include/paimon/catalog/identifier.h b/include/paimon/catalog/identifier.h index c15dc82a8..dd86c9175 100644 --- a/include/paimon/catalog/identifier.h +++ b/include/paimon/catalog/identifier.h @@ -19,6 +19,7 @@ #include #include +#include "paimon/result.h" #include "paimon/type_fwd.h" #include "paimon/visibility.h" @@ -38,15 +39,15 @@ class PAIMON_EXPORT Identifier { bool operator==(const Identifier& other); const std::string& GetDatabaseName() const; const std::string& GetTableName() const; - const std::string& GetDataTableName() const; - const std::optional& GetBranchName() const; - std::string GetBranchNameOrDefault() const; - const std::optional& GetSystemTableName() const; - bool IsSystemTable() const; + Result GetDataTableName() const; + Result> GetBranchName() const; + Result GetBranchNameOrDefault() const; + Result> GetSystemTableName() const; + Result IsSystemTable() const; std::string ToString() const; private: - void SplitTableName() const; + Status SplitTableName() const; const std::string database_; const std::string table_; diff --git a/src/paimon/core/catalog/file_system_catalog.cpp b/src/paimon/core/catalog/file_system_catalog.cpp index a8193a120..c5bee3faf 100644 --- a/src/paimon/core/catalog/file_system_catalog.cpp +++ b/src/paimon/core/catalog/file_system_catalog.cpp @@ -94,12 +94,15 @@ Result FileSystemCatalog::DatabaseExists(const std::string& db_name) const } Result FileSystemCatalog::TableExists(const Identifier& identifier) const { - if (identifier.IsSystemTable()) { - if (!identifier.GetSystemTableName() || - !IsSupportedSystemTable(identifier.GetSystemTableName().value())) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { return false; } - Identifier data_identifier(identifier.GetDatabaseName(), identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(data_identifier)); return latest_schema != std::nullopt; @@ -122,7 +125,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema* const std::vector& primary_keys, const std::map& options, bool ignore_if_exists) { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::Invalid( fmt::format("Cannot create table for system table {}, please use data table.", identifier.ToString())); @@ -160,7 +164,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema* Result>> FileSystemCatalog::TableSchemaExists( const Identifier& identifier) const { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::NotImplemented( "do not support checking TableSchemaExists for system table."); } @@ -180,12 +185,15 @@ bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) { return db_name == SYSTEM_DATABASE_NAME; } -bool FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { +Result FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { return identifier.IsSystemTable(); } -bool FileSystemCatalog::IsSystemTable(const Identifier& identifier) { - return IsSystemDatabase(identifier.GetDatabaseName()) || IsSpecifiedSystemTable(identifier); +Result FileSystemCatalog::IsSystemTable(const Identifier& identifier) { + if (IsSystemDatabase(identifier.GetDatabaseName())) { + return true; + } + return IsSpecifiedSystemTable(identifier); } std::string FileSystemCatalog::NewDatabasePath(const std::string& warehouse, @@ -249,12 +257,15 @@ Result FileSystemCatalog::TableExistsInFileSystem(const std::string& table Result> FileSystemCatalog::LoadTableSchema( const Identifier& identifier) const { - if (identifier.IsSystemTable()) { - if (!identifier.GetSystemTableName() || - !IsSupportedSystemTable(identifier.GetSystemTableName().value())) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); } - Identifier data_identifier(identifier.GetDatabaseName(), identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(data_identifier)); if (!latest_schema) { @@ -262,8 +273,8 @@ Result> FileSystemCatalog::LoadTableSchema( } PAIMON_ASSIGN_OR_RAISE( std::shared_ptr system_table, - CreateSystemTable(identifier.GetSystemTableName().value(), fs_, - GetTableLocation(identifier), latest_schema.value())); + SystemTableLoader::Load(system_table_name.value(), fs_, GetTableLocation(identifier), + latest_schema.value())); return std::make_shared(system_table->ArrowSchema()); } PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, @@ -275,7 +286,8 @@ Result> FileSystemCatalog::LoadTableSchema( } Result> FileSystemCatalog::GetTable(const Identifier& identifier) const { - if (identifier.IsSystemTable()) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, LoadTableSchema(identifier)); return std::make_shared
(schema, identifier.GetDatabaseName(), identifier.GetTableName()); @@ -385,7 +397,8 @@ Status FileSystemCatalog::DropTableImpl(const Identifier& identifier, } Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if_not_exists) { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::Invalid(fmt::format("Cannot drop system table {}.", identifier.ToString())); } @@ -448,7 +461,9 @@ Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if Status FileSystemCatalog::RenameTable(const Identifier& from_table, const Identifier& to_table, bool ignore_if_not_exists) { - if (IsSystemTable(from_table) || IsSystemTable(to_table)) { + PAIMON_ASSIGN_OR_RAISE(bool is_from_system_table, IsSystemTable(from_table)); + PAIMON_ASSIGN_OR_RAISE(bool is_to_system_table, IsSystemTable(to_table)); + if (is_from_system_table || is_to_system_table) { return Status::Invalid(fmt::format("Cannot rename system table {} or {}.", from_table.ToString(), to_table.ToString())); } diff --git a/src/paimon/core/catalog/file_system_catalog.h b/src/paimon/core/catalog/file_system_catalog.h index cf80d9401..0563a8973 100644 --- a/src/paimon/core/catalog/file_system_catalog.h +++ b/src/paimon/core/catalog/file_system_catalog.h @@ -69,8 +69,8 @@ class FileSystemCatalog : public Catalog { static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name); static std::string NewDataTablePath(const std::string& warehouse, const Identifier& identifier); static bool IsSystemDatabase(const std::string& db_name); - static bool IsSpecifiedSystemTable(const Identifier& identifier); - static bool IsSystemTable(const Identifier& identifier); + static Result IsSpecifiedSystemTable(const Identifier& identifier); + static Result IsSystemTable(const Identifier& identifier); Result>> TableSchemaExists( const Identifier& identifier) const; diff --git a/src/paimon/core/catalog/identifier.cpp b/src/paimon/core/catalog/identifier.cpp index bcac95124..3ec998c83 100644 --- a/src/paimon/core/catalog/identifier.cpp +++ b/src/paimon/core/catalog/identifier.cpp @@ -18,11 +18,12 @@ #include #include -#include #include #include "fmt/format.h" #include "paimon/common/utils/string_utils.h" +#include "paimon/result.h" +#include "paimon/status.h" namespace paimon { @@ -49,37 +50,38 @@ const std::string& Identifier::GetTableName() const { return table_; } -const std::string& Identifier::GetDataTableName() const { - SplitTableName(); +Result Identifier::GetDataTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); return data_table_; } -const std::optional& Identifier::GetBranchName() const { - SplitTableName(); +Result> Identifier::GetBranchName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); return branch_; } -std::string Identifier::GetBranchNameOrDefault() const { - const auto& branch = GetBranchName(); - return branch ? branch.value() : std::string(kDefaultMainBranch); +Result Identifier::GetBranchNameOrDefault() const { + PAIMON_ASSIGN_OR_RAISE(std::optional branch, GetBranchName()); + return branch.value_or(kDefaultMainBranch); } -const std::optional& Identifier::GetSystemTableName() const { - SplitTableName(); +Result> Identifier::GetSystemTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); return system_table_; } -bool Identifier::IsSystemTable() const { - return GetSystemTableName().has_value(); +Result Identifier::IsSystemTable() const { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table, GetSystemTableName()); + return system_table.has_value(); } std::string Identifier::ToString() const { return fmt::format("Identifier{{database='{}', table='{}'}}", database_, table_); } -void Identifier::SplitTableName() const { +Status Identifier::SplitTableName() const { if (parsed_) { - return; + return Status::OK(); } std::string data_table; std::optional branch; @@ -97,23 +99,24 @@ void Identifier::SplitTableName() const { } } else if (splits.size() == 3) { if (!StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { - throw std::invalid_argument(fmt::format( + return Status::Invalid(fmt::format( "System table can only contain one '$' separator, but this is: {}", table_)); } data_table = splits[0]; branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); system_table = splits[2]; } else { - throw std::invalid_argument(fmt::format("Invalid table name: {}", table_)); + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); } if (data_table.empty() || (branch && branch->empty()) || (system_table && system_table->empty())) { - throw std::invalid_argument(fmt::format("Invalid table name: {}", table_)); + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); } data_table_ = std::move(data_table); branch_ = std::move(branch); system_table_ = std::move(system_table); parsed_ = true; + return Status::OK(); } } // namespace paimon diff --git a/src/paimon/core/catalog/identifier_test.cpp b/src/paimon/core/catalog/identifier_test.cpp index dc36b8836..4e9f5fa98 100644 --- a/src/paimon/core/catalog/identifier_test.cpp +++ b/src/paimon/core/catalog/identifier_test.cpp @@ -16,22 +16,21 @@ #include "paimon/catalog/identifier.h" -#include - #include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" namespace paimon::test { TEST(IdentifierTest, ConstructorAndGetters) { Identifier id("test_db", "test_table"); - EXPECT_EQ(id.GetDatabaseName(), "test_db"); - EXPECT_EQ(id.GetTableName(), "test_table"); + ASSERT_EQ(id.GetDatabaseName(), "test_db"); + ASSERT_EQ(id.GetTableName(), "test_table"); } TEST(IdentifierTest, SingleArgumentConstructorUsesUnknownDatabase) { Identifier id("test_table"); - EXPECT_EQ(id.GetDatabaseName(), Identifier::kUnknownDatabase); - EXPECT_EQ(id.GetTableName(), "test_table"); + ASSERT_EQ(id.GetDatabaseName(), Identifier::kUnknownDatabase); + ASSERT_EQ(id.GetTableName(), "test_table"); } TEST(IdentifierTest, EqualityOperator) { @@ -39,75 +38,96 @@ TEST(IdentifierTest, EqualityOperator) { Identifier id2("db1", "table1"); Identifier id3("db2", "table2"); - EXPECT_TRUE(id1 == id2); - EXPECT_FALSE(id1 == id3); + ASSERT_TRUE(id1 == id2); + ASSERT_FALSE(id1 == id3); } TEST(IdentifierTest, ToString) { Identifier id("my_db", "my_table"); - EXPECT_EQ(id.ToString(), "Identifier{database='my_db', table='my_table'}"); + ASSERT_EQ(id.ToString(), "Identifier{database='my_db', table='my_table'}"); } TEST(IdentifierTest, EmptyDatabaseRemainsEmpty) { Identifier id("", "my_table"); - EXPECT_EQ(id.GetDatabaseName(), ""); - EXPECT_EQ(id.GetTableName(), "my_table"); + ASSERT_EQ(id.GetDatabaseName(), ""); + ASSERT_EQ(id.GetTableName(), "my_table"); } TEST(IdentifierTest, ParseDataTable) { Identifier id("db", "tbl"); - EXPECT_EQ(id.GetTableName(), "tbl"); - EXPECT_EQ(id.GetDataTableName(), "tbl"); - EXPECT_FALSE(id.GetBranchName()); - EXPECT_EQ(id.GetBranchNameOrDefault(), Identifier::kDefaultMainBranch); - EXPECT_FALSE(id.GetSystemTableName()); - EXPECT_FALSE(id.IsSystemTable()); + ASSERT_EQ(id.GetTableName(), "tbl"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, Identifier::kDefaultMainBranch); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); } TEST(IdentifierTest, ParseSystemTable) { Identifier id("db", "tbl$options"); - EXPECT_EQ(id.GetTableName(), "tbl$options"); - EXPECT_EQ(id.GetDataTableName(), "tbl"); - EXPECT_FALSE(id.GetBranchName()); - ASSERT_TRUE(id.GetSystemTableName()); - EXPECT_EQ(id.GetSystemTableName().value(), "options"); - EXPECT_TRUE(id.IsSystemTable()); + ASSERT_EQ(id.GetTableName(), "tbl$options"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); } TEST(IdentifierTest, ParseBranchTable) { Identifier id("db", "tbl$branch_dev"); - EXPECT_EQ(id.GetDataTableName(), "tbl"); - ASSERT_TRUE(id.GetBranchName()); - EXPECT_EQ(id.GetBranchName().value(), "dev"); - EXPECT_EQ(id.GetBranchNameOrDefault(), "dev"); - EXPECT_FALSE(id.GetSystemTableName()); - EXPECT_FALSE(id.IsSystemTable()); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); } TEST(IdentifierTest, ParseBranchSystemTable) { Identifier id("db", "tbl$branch_dev$options"); - EXPECT_EQ(id.GetDataTableName(), "tbl"); - ASSERT_TRUE(id.GetBranchName()); - EXPECT_EQ(id.GetBranchName().value(), "dev"); - ASSERT_TRUE(id.GetSystemTableName()); - EXPECT_EQ(id.GetSystemTableName().value(), "options"); - EXPECT_TRUE(id.IsSystemTable()); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); } TEST(IdentifierTest, InvalidSystemTableName) { Identifier invalid_middle("db", "tbl$bad$options"); - EXPECT_THROW(invalid_middle.IsSystemTable(), std::invalid_argument); + ASSERT_NOK_WITH_MSG(invalid_middle.IsSystemTable(), + "System table can only contain one '$' separator"); Identifier too_many("db", "tbl$branch_dev$options$extra"); - EXPECT_THROW(too_many.IsSystemTable(), std::invalid_argument); + ASSERT_NOK_WITH_MSG(too_many.IsSystemTable(), "Invalid table name"); } TEST(IdentifierTest, InvalidEmptySystemTableNameParts) { - EXPECT_THROW(Identifier("db", "$options").IsSystemTable(), std::invalid_argument); - EXPECT_THROW(Identifier("db", "tbl$").IsSystemTable(), std::invalid_argument); - EXPECT_THROW(Identifier("db", "tbl$branch_").IsSystemTable(), std::invalid_argument); - EXPECT_THROW(Identifier("db", "tbl$branch_dev$").IsSystemTable(), std::invalid_argument); - EXPECT_THROW(Identifier("db", "tbl$$options").IsSystemTable(), std::invalid_argument); + ASSERT_NOK_WITH_MSG(Identifier("db", "$options").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_dev$").IsSystemTable(), + "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$$options").IsSystemTable(), + "System table can only contain one '$' separator"); } } // namespace paimon::test diff --git a/src/paimon/core/table/source/split.cpp b/src/paimon/core/table/source/split.cpp index c8c87edee..2d188cd53 100644 --- a/src/paimon/core/table/source/split.cpp +++ b/src/paimon/core/table/source/split.cpp @@ -26,7 +26,6 @@ #include "paimon/core/table/source/data_split_impl.h" #include "paimon/core/table/source/deletion_file.h" #include "paimon/core/table/source/fallback_data_split.h" -#include "paimon/core/table/system/system_table_scan.h" #include "paimon/core/utils/object_serializer.h" #include "paimon/global_index/indexed_split.h" #include "paimon/io/byte_array_input_stream.h" @@ -163,13 +162,8 @@ Result Split::Serialize(const std::shared_ptr& split, } else { out.WriteValue(false); } - } else if (auto system_table_split = std::dynamic_pointer_cast(split)) { - out.WriteValue(SystemTableSplit::MAGIC); - out.WriteValue(SystemTableSplit::VERSION); - out.WriteString(system_table_split->TablePath()); } else { - return Status::Invalid( - "invalid split, cannot cast to DataSplit, IndexedSplit or SystemTableSplit"); + return Status::Invalid("invalid split, cannot cast to DataSplit or IndexedSplit"); } PAIMON_UNIQUE_PTR bytes = MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); @@ -223,21 +217,6 @@ Result> Split::Deserialize(const char* buffer, size_t len fmt::format("invalid IndexedSplit, remaining {} bytes after deserializing", stream_length - pos)); } - } else if (magic == SystemTableSplit::MAGIC) { - PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue()); - if (version != SystemTableSplit::VERSION) { - return Status::Invalid( - fmt::format("Unsupported SystemTableSplit version: {}", version)); - } - PAIMON_ASSIGN_OR_RAISE(std::string table_path, in.ReadString()); - PAIMON_ASSIGN_OR_RAISE(int64_t pos, in.GetPos()); - PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in.Length()); - if (pos != stream_length) { - return Status::Invalid( - fmt::format("invalid SystemTableSplit, remaining {} bytes after deserializing", - stream_length - pos)); - } - return std::make_shared(table_path); } else if (magic == DataSplitImpl::MAGIC) { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_split, ReadDataSplitWithoutMagicNumber(magic, &in, pool)); @@ -254,6 +233,6 @@ Result> Split::Deserialize(const char* buffer, size_t len stream_length - pos)); } } - return Status::Invalid("invalid split, must be DataSplit, IndexedSplit or SystemTableSplit"); + return Status::Invalid("invalid split, must be DataSplit or IndexedSplit"); } } // namespace paimon diff --git a/src/paimon/core/table/source/table_read.cpp b/src/paimon/core/table/source/table_read.cpp index 8726ca35f..4431b9617 100644 --- a/src/paimon/core/table/source/table_read.cpp +++ b/src/paimon/core/table/source/table_read.cpp @@ -131,11 +131,11 @@ Result> TableRead::Create(std::unique_ptrGetOptions(), context->GetSpecificFileSystem(), context->GetFileSystemSchemeToIdentifierMap())); PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, - TryParseSystemTablePath(context->GetPath())); + SystemTableLoader::TryParsePath(context->GetPath())); if (system_table_path) { PAIMON_ASSIGN_OR_RAISE( std::shared_ptr system_table, - LoadSystemTableFromPath(tmp_core_options.GetFileSystem(), context->GetPath())); + SystemTableLoader::LoadFromPath(tmp_core_options.GetFileSystem(), context->GetPath())); return std::make_unique(system_table, memory_pool); } diff --git a/src/paimon/core/table/source/table_read_test.cpp b/src/paimon/core/table/source/table_read_test.cpp index 8d65d5f48..762e9362c 100644 --- a/src/paimon/core/table/source/table_read_test.cpp +++ b/src/paimon/core/table/source/table_read_test.cpp @@ -22,12 +22,7 @@ #include #include -#include "arrow/api.h" -#include "arrow/c/abi.h" -#include "arrow/c/bridge.h" #include "gtest/gtest.h" -#include "paimon/catalog/catalog.h" -#include "paimon/catalog/identifier.h" #include "paimon/core/core_options.h" #include "paimon/core/operation/abstract_split_read.h" #include "paimon/core/operation/split_read.h" @@ -37,11 +32,7 @@ #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" -#include "paimon/scan_context.h" #include "paimon/status.h" -#include "paimon/table/source/split.h" -#include "paimon/table/source/table_scan.h" -#include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -165,61 +156,4 @@ TEST(TableReadTest, TestMergeOptions) { ASSERT_EQ(expected_options, core_options.ToMap()); } -TEST(TableReadTest, TestReadOptionsSystemTable) { - std::map options = {{Options::FILE_SYSTEM, "local"}, - {Options::FILE_FORMAT, "orc"}, - {Options::MANIFEST_FORMAT, "orc"}, - {"custom.option", "custom-value"}}; - auto dir = UniqueTestDirectory::Create(); - ASSERT_TRUE(dir); - ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); - ASSERT_OK(catalog->CreateDatabase("db1", options, /*ignore_if_exists=*/false)); - - auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); - ::ArrowSchema schema; - ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); - ASSERT_OK(catalog->CreateTable(Identifier("db1", "tbl1"), &schema, - /*partition_keys=*/{}, /*primary_keys=*/{}, options, - /*ignore_if_exists=*/false)); - ArrowSchemaRelease(&schema); - - std::string system_table_path = catalog->GetTableLocation(Identifier("db1", "tbl1$options")); - ScanContextBuilder scan_context_builder(system_table_path); - scan_context_builder.SetOptions(options); - ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); - ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); - ASSERT_EQ(plan->Splits().size(), 1); - - ASSERT_OK_AND_ASSIGN(auto serialized_split, - Split::Serialize(plan->Splits()[0], GetDefaultPool())); - ASSERT_OK_AND_ASSIGN( - auto deserialized_split, - Split::Deserialize(serialized_split.data(), serialized_split.size(), GetDefaultPool())); - - ReadContextBuilder read_context_builder(system_table_path); - read_context_builder.SetOptions(options); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - std::vector> splits = {deserialized_split}; - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits)); - ASSERT_NOK_WITH_MSG(table_read->CreateReader(std::vector>{}), - "single split"); - ASSERT_NOK_WITH_MSG(table_read->CreateReader(std::make_shared()), "unsupported split"); - ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); - ASSERT_TRUE(result); - ASSERT_EQ(result->type()->id(), arrow::Type::STRUCT); - - auto struct_array = std::static_pointer_cast(result->chunk(0)); - auto key_array = std::static_pointer_cast(struct_array->field(0)); - auto value_array = std::static_pointer_cast(struct_array->field(1)); - std::map actual; - for (int64_t i = 0; i < struct_array->length(); ++i) { - actual[key_array->GetString(i)] = value_array->GetString(i); - } - EXPECT_EQ(actual[Options::FILE_SYSTEM], "local"); - EXPECT_EQ(actual[Options::FILE_FORMAT], "orc"); - EXPECT_EQ(actual[Options::MANIFEST_FORMAT], "orc"); - EXPECT_EQ(actual["custom.option"], "custom-value"); -} } // namespace paimon::test diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 43f0097a1..124a25c46 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -167,11 +167,11 @@ Result> TableScan::Create(std::unique_ptrGetOptions(), context->GetSpecificFileSystem())); PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, - TryParseSystemTablePath(context->GetPath())); + SystemTableLoader::TryParsePath(context->GetPath())); if (system_table_path) { PAIMON_ASSIGN_OR_RAISE( std::shared_ptr system_table, - LoadSystemTableFromPath(tmp_options.GetFileSystem(), context->GetPath())); + SystemTableLoader::LoadFromPath(tmp_options.GetFileSystem(), context->GetPath())); return system_table->NewScan(); } SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath()); diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp index fd28c5163..85ee2d7d1 100644 --- a/src/paimon/core/table/system/options_system_table.cpp +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -25,6 +25,7 @@ #include "arrow/api.h" #include "arrow/c/bridge.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/core/schema/table_schema.h" #include "paimon/core/table/system/system_table_scan.h" @@ -42,8 +43,9 @@ std::shared_ptr OptionsSchema() { class OptionsBatchReader : public BatchReader { public: - explicit OptionsBatchReader(std::map options) - : options_(std::move(options)) {} + OptionsBatchReader(std::map options, + const std::shared_ptr& pool) + : arrow_pool_(GetArrowPool(pool)), options_(std::move(options)) {} Result NextBatch() override { if (emitted_) { @@ -51,16 +53,23 @@ class OptionsBatchReader : public BatchReader { } emitted_ = true; - arrow::StringBuilder key_builder; - arrow::StringBuilder value_builder; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr key_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr value_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + auto* key_builder = dynamic_cast(key_array_builder.get()); + auto* value_builder = dynamic_cast(value_array_builder.get()); + if (key_builder == nullptr || value_builder == nullptr) { + return Status::Invalid("cannot create string builders for options system table"); + } for (const auto& [key, value] : options_) { - PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder.Append(key)); - PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder.Append(value)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Append(key)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Append(value)); } std::shared_ptr key_array; std::shared_ptr value_array; - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(key_array, key_builder.Finish()); - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(value_array, value_builder.Finish()); + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Finish(&key_array)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Finish(&value_array)); auto struct_array = std::make_shared( arrow::struct_(OptionsSchema()->fields()), key_array->length(), std::vector>{key_array, value_array}); @@ -81,6 +90,7 @@ class OptionsBatchReader : public BatchReader { } private: + std::unique_ptr arrow_pool_; std::map options_; bool emitted_ = false; }; @@ -92,7 +102,7 @@ OptionsSystemTable::OptionsSystemTable(std::string table_path, : table_path_(std::move(table_path)), table_schema_(std::move(table_schema)) {} std::string OptionsSystemTable::Name() const { - return NAME; + return kName; } std::shared_ptr OptionsSystemTable::ArrowSchema() const { @@ -105,7 +115,7 @@ Result> OptionsSystemTable::NewScan() const { Result> OptionsSystemTable::NewReader( const std::vector>& splits, - const std::shared_ptr& /*pool*/) const { + const std::shared_ptr& pool) const { if (splits.size() != 1) { return Status::Invalid("options system table expects a single split"); } @@ -114,7 +124,7 @@ Result> OptionsSystemTable::NewReader( return Status::Invalid("unsupported split for options system table"); } } - return std::make_unique(table_schema_->Options()); + return std::make_unique(table_schema_->Options(), pool); } } // namespace paimon diff --git a/src/paimon/core/table/system/options_system_table.h b/src/paimon/core/table/system/options_system_table.h index c520f4155..308f9e898 100644 --- a/src/paimon/core/table/system/options_system_table.h +++ b/src/paimon/core/table/system/options_system_table.h @@ -28,7 +28,7 @@ class TableSchema; class OptionsSystemTable : public SystemTable { public: - static constexpr const char* NAME = "options"; + static constexpr const char* kName = "options"; OptionsSystemTable(std::string table_path, std::shared_ptr table_schema); diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp index 1dbbbfe2f..13b0e627d 100644 --- a/src/paimon/core/table/system/system_table.cpp +++ b/src/paimon/core/table/system/system_table.cpp @@ -18,7 +18,6 @@ #include #include -#include #include #include @@ -28,56 +27,59 @@ #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" #include "paimon/core/table/system/options_system_table.h" +#include "paimon/core/utils/branch_manager.h" namespace paimon { -bool IsSupportedSystemTable(const std::string& system_table_name) { - return StringUtils::ToLowerCase(system_table_name) == OptionsSystemTable::NAME; +bool SystemTableLoader::IsSupported(const std::string& system_table_name) { + return StringUtils::ToLowerCase(system_table_name) == OptionsSystemTable::kName; } -Result> CreateSystemTable( +Result> SystemTableLoader::Load( const std::string& system_table_name, const std::shared_ptr& /*fs*/, const std::string& table_path, const std::shared_ptr& table_schema) { std::string normalized_name = StringUtils::ToLowerCase(system_table_name); - if (normalized_name == OptionsSystemTable::NAME) { + if (normalized_name == OptionsSystemTable::kName) { return std::make_shared(table_path, table_schema); } - return Status::NotExist("unsupported system table: ", system_table_name); + return Status::NotImplemented("unsupported system table: ", system_table_name); } -Result> TryParseSystemTablePath(const std::string& path) { +Result> SystemTableLoader::TryParsePath(const std::string& path) { std::string table_name = PathUtil::GetName(path); Identifier identifier(table_name); - try { - if (!identifier.IsSystemTable()) { - return std::optional(); - } - std::string parent = PathUtil::GetParentDirPath(path); - SystemTablePath system_table_path; - system_table_path.table_path = PathUtil::JoinPath(parent, identifier.GetDataTableName()); - system_table_path.branch = identifier.GetBranchName(); - system_table_path.system_table_name = identifier.GetSystemTableName().value(); - return std::optional(std::move(system_table_path)); - } catch (const std::exception& e) { - return Status::Invalid(e.what()); + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (!is_system_table) { + return std::optional(); } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::optional branch, identifier.GetBranchName()); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + std::string parent = PathUtil::GetParentDirPath(path); + SystemTablePath system_table_path; + system_table_path.table_path = PathUtil::JoinPath(parent, data_table_name); + system_table_path.branch = std::move(branch); + system_table_path.system_table_name = system_table_name.value(); + return std::optional(std::move(system_table_path)); } -Result> LoadSystemTableFromPath(const std::shared_ptr& fs, - const std::string& path) { +Result> SystemTableLoader::LoadFromPath( + const std::shared_ptr& fs, const std::string& path) { PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, - TryParseSystemTablePath(path)); + TryParsePath(path)); if (!system_table_path) { return Status::Invalid("path is not a system table path: ", path); } const auto& parsed = system_table_path.value(); - SchemaManager schema_manager(fs, parsed.table_path, parsed.branch.value_or("")); + SchemaManager schema_manager(fs, parsed.table_path, + parsed.branch.value_or(BranchManager::DEFAULT_MAIN_BRANCH)); PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, schema_manager.Latest()); if (!latest_schema) { return Status::NotExist("base table schema not found for system table path: ", path); } - return CreateSystemTable(parsed.system_table_name, fs, path, latest_schema.value()); + return Load(parsed.system_table_name, fs, path, latest_schema.value()); } } // namespace paimon diff --git a/src/paimon/core/table/system/system_table.h b/src/paimon/core/table/system/system_table.h index 4f49d18a7..342f5d822 100644 --- a/src/paimon/core/table/system/system_table.h +++ b/src/paimon/core/table/system/system_table.h @@ -52,15 +52,18 @@ class SystemTable { const std::shared_ptr& pool) const = 0; }; -Result> CreateSystemTable( - const std::string& system_table_name, const std::shared_ptr& fs, - const std::string& table_path, const std::shared_ptr& table_schema); +class SystemTableLoader { + public: + static bool IsSupported(const std::string& system_table_name); -bool IsSupportedSystemTable(const std::string& system_table_name); + static Result> Load( + const std::string& system_table_name, const std::shared_ptr& fs, + const std::string& table_path, const std::shared_ptr& table_schema); -Result> TryParseSystemTablePath(const std::string& path); + static Result> TryParsePath(const std::string& path); -Result> LoadSystemTableFromPath(const std::shared_ptr& fs, + static Result> LoadFromPath(const std::shared_ptr& fs, const std::string& path); +}; } // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.cpp b/src/paimon/core/table/system/system_table_scan.cpp index 64a619764..05f62ebd7 100644 --- a/src/paimon/core/table/system/system_table_scan.cpp +++ b/src/paimon/core/table/system/system_table_scan.cpp @@ -17,14 +17,13 @@ #include "paimon/core/table/system/system_table_scan.h" #include -#include #include #include "paimon/core/table/source/plan_impl.h" namespace paimon { -SystemTableScan::SystemTableScan(std::string table_path) : table_path_(std::move(table_path)) {} +SystemTableScan::SystemTableScan(const std::string& table_path) : table_path_(table_path) {} Result> SystemTableScan::CreatePlan() { std::vector> splits = {std::make_shared(table_path_)}; diff --git a/src/paimon/core/table/system/system_table_scan.h b/src/paimon/core/table/system/system_table_scan.h index 1deccb536..2acd57d2d 100644 --- a/src/paimon/core/table/system/system_table_scan.h +++ b/src/paimon/core/table/system/system_table_scan.h @@ -16,10 +16,8 @@ #pragma once -#include #include #include -#include #include #include "paimon/table/source/table_scan.h" @@ -30,10 +28,7 @@ class Split; class SystemTableSplit : public Split { public: - static constexpr int64_t MAGIC = -739241698710434219L; - static constexpr int32_t VERSION = 1; - - explicit SystemTableSplit(std::string table_path) : table_path_(std::move(table_path)) {} + explicit SystemTableSplit(const std::string& table_path) : table_path_(table_path) {} const std::string& TablePath() const { return table_path_; @@ -45,7 +40,7 @@ class SystemTableSplit : public Split { class SystemTableScan : public TableScan { public: - explicit SystemTableScan(std::string table_path); + explicit SystemTableScan(const std::string& table_path); Result> CreatePlan() override; diff --git a/test/inte/read_inte_test.cpp b/test/inte/read_inte_test.cpp index 0c824bc4d..da235b21f 100644 --- a/test/inte/read_inte_test.cpp +++ b/test/inte/read_inte_test.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include #include #include @@ -29,13 +31,17 @@ #include "arrow/c/abi.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/factories/io_hook.h" #include "paimon/common/reader/complete_row_kind_batch_reader.h" #include "paimon/common/reader/concat_batch_reader.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" +#include "paimon/common/utils/path_util.h" #include "paimon/common/utils/scope_guard.h" +#include "paimon/defs.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_source.h" #include "paimon/core/stats/simple_stats.h" @@ -44,7 +50,6 @@ #include "paimon/core/table/source/fallback_data_split.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" -#include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/memory/memory_pool.h" @@ -54,8 +59,11 @@ #include "paimon/read_context.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" +#include "paimon/scan_context.h" #include "paimon/status.h" +#include "paimon/table/source/plan.h" #include "paimon/table/source/data_split.h" +#include "paimon/table/source/table_scan.h" #include "paimon/table/source/table_read.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/io_exception_helper.h" @@ -189,10 +197,40 @@ class ReadInteTest : public testing::Test, public ::testing::WithParamInterface< return std::dynamic_pointer_cast(split); } - private: +private: std::shared_ptr pool_; }; +namespace { + +std::map CollectStringMap( + const std::shared_ptr& result) { + std::map values; + EXPECT_TRUE(result); + EXPECT_EQ(result->num_chunks(), 1); + if (!result || result->num_chunks() != 1) { + return values; + } + auto struct_array = std::dynamic_pointer_cast(result->chunk(0)); + EXPECT_TRUE(struct_array); + if (!struct_array) { + return values; + } + auto key_array = std::dynamic_pointer_cast(struct_array->field(0)); + auto value_array = std::dynamic_pointer_cast(struct_array->field(1)); + EXPECT_TRUE(key_array); + EXPECT_TRUE(value_array); + if (!key_array || !value_array) { + return values; + } + for (int64_t i = 0; i < struct_array->length(); ++i) { + values.emplace(key_array->GetString(i), value_array->GetString(i)); + } + return values; +} + +} // namespace + std::vector PrepareTestParam() { std::vector values = { TestParam{false, "false", "parquet", PrefetchCacheMode::ALWAYS}, @@ -434,6 +472,81 @@ TEST_P(ReadInteTest, TestReadOnlyPartitionField) { ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); } +TEST(SystemTableReadInteTest, TestReadOptionsSystemTable) { + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {"custom.option", "custom-value"}}; + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string warehouse = PathUtil::JoinPath(dir->Str(), "warehouse"); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(warehouse, options)); + ASSERT_OK(catalog->CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog->CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + std::string system_table_path = catalog->GetTableLocation(Identifier("db1", "tbl1$options")); + ScanContextBuilder scan_context_builder(system_table_path); + scan_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_EQ(plan->Splits().size(), 1); + + ReadContextBuilder read_context_builder(system_table_path); + read_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); + ASSERT_TRUE(result); + + std::map expected = {{"custom.option", "custom-value"}, + {"file-system", "local"}, + {"file.format", "orc"}, + {"manifest.format", "orc"}}; + ASSERT_EQ(CollectStringMap(result), expected) << result->ToString(); +} + +TEST(SystemTableReadInteTest, TestReadBranchOptionsSystemTable) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string source_path = + GetDataDir() + "/parquet/append_table_with_rt_branch.db/append_table_with_rt_branch"; + std::string table_path = PathUtil::JoinPath(dir->Str(), "branch_table"); + ASSERT_TRUE(TestUtil::CopyDirectory(std::filesystem::path(source_path), + std::filesystem::path(table_path))); + + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "parquet"}, + {Options::MANIFEST_FORMAT, "avro"}}; + std::string system_table_path = table_path + "$branch_rt$options"; + ScanContextBuilder scan_context_builder(system_table_path); + scan_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_EQ(plan->Splits().size(), 1); + + ReadContextBuilder read_context_builder(system_table_path); + read_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); + + std::map expected = {{"bucket", "2"}, + {"file.format", "parquet"}, + {"manifest.format", "avro"}}; + ASSERT_EQ(CollectStringMap(result), expected) << result->ToString(); +} + TEST_P(ReadInteTest, TestAppendReadWithMultipleBuckets) { std::vector read_fields = { DataField(3, arrow::field("f3", arrow::float64())), From f947ba08f7ff600011d3afc6e1e997aee2c84a79 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 6 May 2026 15:51:37 +0800 Subject: [PATCH 7/8] Align system table schema and read interfaces --- include/paimon/schema/schema.h | 18 ++++++++++-- .../core/catalog/file_system_catalog_test.cpp | 18 ++++++++---- src/paimon/core/schema/table_schema.h | 2 +- src/paimon/core/table/source/table_read.cpp | 2 +- .../table/system/options_system_table.cpp | 2 +- .../core/table/system/options_system_table.h | 2 +- src/paimon/core/table/system/system_table.cpp | 10 +++++-- src/paimon/core/table/system/system_table.h | 6 ++-- .../core/table/system/system_table_read.cpp | 2 +- .../core/table/system/system_table_schema.cpp | 29 ------------------- .../core/table/system/system_table_schema.h | 11 +------ src/paimon/core/table/table_test.cpp | 10 +++++-- 12 files changed, 53 insertions(+), 59 deletions(-) diff --git a/include/paimon/schema/schema.h b/include/paimon/schema/schema.h index 9ebd9cb1a..5e046730c 100644 --- a/include/paimon/schema/schema.h +++ b/include/paimon/schema/schema.h @@ -57,6 +57,16 @@ class PAIMON_EXPORT Schema { /// failure. virtual Result GetFieldType(const std::string& field_name) const = 0; + /// Get an optional comment describing the schema object. + /// @return The comment if set, or std::nullopt otherwise. + virtual std::optional Comment() const = 0; +}; + +/// Schema contract for data tables. +class PAIMON_EXPORT DataSchema : public Schema { + public: + ~DataSchema() override = default; + /// Get the unique identifier of this table schema. /// @return The schema id virtual int64_t Id() const = 0; @@ -86,10 +96,12 @@ class PAIMON_EXPORT Schema { /// Get the table-level options associated with this schema. /// @return A reference to the map of option key-value pairs (e.g., file format, filesystem). virtual const std::map& Options() const = 0; +}; - /// Get an optional comment describing the table. - /// @return The table comment if set, or std::nullopt otherwise. - virtual std::optional Comment() const = 0; +/// Schema contract for system tables. +class PAIMON_EXPORT SystemSchema : public Schema { + public: + ~SystemSchema() override = default; }; } // namespace paimon diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index 75a4a5f7d..1c320f30d 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -25,6 +25,8 @@ #include "paimon/common/data/blob_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/core/core_options.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_schema.h" #include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/fs/file_system_factory.h" @@ -180,6 +182,8 @@ TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { ASSERT_OK_AND_ASSIGN(std::shared_ptr system_schema, catalog.LoadTableSchema(options_identifier)); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); ASSERT_OK_AND_ASSIGN(auto c_schema, system_schema->GetArrowSchema()); auto loaded_schema_result = arrow::ImportSchema(c_schema.get()); ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString(); @@ -245,6 +249,8 @@ TEST(FileSystemCatalogTest, TestCreateTableWithBlob) { ASSERT_EQ(table_names[0], "tbl1"); ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1"))); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema()); auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie(); ASSERT_TRUE(typed_schema.Equals(loaded_schema)); @@ -415,11 +421,13 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) { ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db0", "tbl0")), "Identifier{database=\'db0\', table=\'tbl0\'} not exist"); ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, catalog.LoadTableSchema(identifier)); - ASSERT_EQ(0, table_schema->Id()); - ASSERT_EQ(3, table_schema->HighestFieldId()); - ASSERT_EQ(1, table_schema->PartitionKeys().size()); - ASSERT_EQ(0, table_schema->PrimaryKeys().size()); - ASSERT_EQ(-1, table_schema->NumBuckets()); + auto data_schema = std::dynamic_pointer_cast(table_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_EQ(0, data_schema->Id()); + ASSERT_EQ(3, data_schema->HighestFieldId()); + ASSERT_EQ(1, data_schema->PartitionKeys().size()); + ASSERT_EQ(0, data_schema->PrimaryKeys().size()); + ASSERT_EQ(-1, data_schema->NumBuckets()); ASSERT_FALSE(table_schema->Comment().has_value()); std::vector field_names = table_schema->FieldNames(); std::vector expected_field_names = {"f0", "f1", "f2", "f3"}; diff --git a/src/paimon/core/schema/table_schema.h b/src/paimon/core/schema/table_schema.h index 60e11b638..161e10713 100644 --- a/src/paimon/core/schema/table_schema.h +++ b/src/paimon/core/schema/table_schema.h @@ -36,7 +36,7 @@ struct ArrowSchema; namespace paimon { /// Schema of a table, including schemaId and fieldId. -class TableSchema : public Schema, public Jsonizable { +class TableSchema : public DataSchema, public Jsonizable { public: static constexpr int64_t FIRST_SCHEMA_ID = 0; static constexpr int32_t PAIMON_07_VERSION = 1; diff --git a/src/paimon/core/table/source/table_read.cpp b/src/paimon/core/table/source/table_read.cpp index 4431b9617..9bf057e95 100644 --- a/src/paimon/core/table/source/table_read.cpp +++ b/src/paimon/core/table/source/table_read.cpp @@ -136,7 +136,7 @@ Result> TableRead::Create(std::unique_ptr system_table, SystemTableLoader::LoadFromPath(tmp_core_options.GetFileSystem(), context->GetPath())); - return std::make_unique(system_table, memory_pool); + return system_table->NewRead(memory_pool); } PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_context, diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp index 85ee2d7d1..ba00c4263 100644 --- a/src/paimon/core/table/system/options_system_table.cpp +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -113,7 +113,7 @@ Result> OptionsSystemTable::NewScan() const { return std::make_unique(table_path_); } -Result> OptionsSystemTable::NewReader( +Result> OptionsSystemTable::CreateBatchReader( const std::vector>& splits, const std::shared_ptr& pool) const { if (splits.size() != 1) { diff --git a/src/paimon/core/table/system/options_system_table.h b/src/paimon/core/table/system/options_system_table.h index 308f9e898..8cec1832f 100644 --- a/src/paimon/core/table/system/options_system_table.h +++ b/src/paimon/core/table/system/options_system_table.h @@ -35,7 +35,7 @@ class OptionsSystemTable : public SystemTable { std::string Name() const override; std::shared_ptr ArrowSchema() const override; Result> NewScan() const override; - Result> NewReader( + Result> CreateBatchReader( const std::vector>& splits, const std::shared_ptr& pool) const override; diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp index 13b0e627d..43a34fd77 100644 --- a/src/paimon/core/table/system/system_table.cpp +++ b/src/paimon/core/table/system/system_table.cpp @@ -27,10 +27,17 @@ #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" #include "paimon/core/table/system/options_system_table.h" +#include "paimon/core/table/system/system_table_read.h" #include "paimon/core/utils/branch_manager.h" namespace paimon { +Result> SystemTable::NewRead( + const std::shared_ptr& pool) const { + return std::make_unique( + std::const_pointer_cast(shared_from_this()), pool); +} + bool SystemTableLoader::IsSupported(const std::string& system_table_name) { return StringUtils::ToLowerCase(system_table_name) == OptionsSystemTable::kName; } @@ -66,8 +73,7 @@ Result> SystemTableLoader::TryParsePath(const std Result> SystemTableLoader::LoadFromPath( const std::shared_ptr& fs, const std::string& path) { - PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, - TryParsePath(path)); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, TryParsePath(path)); if (!system_table_path) { return Status::Invalid("path is not a system table path: ", path); } diff --git a/src/paimon/core/table/system/system_table.h b/src/paimon/core/table/system/system_table.h index 342f5d822..2a57b7e3c 100644 --- a/src/paimon/core/table/system/system_table.h +++ b/src/paimon/core/table/system/system_table.h @@ -31,6 +31,7 @@ namespace paimon { class FileSystem; class MemoryPool; class Split; +class SystemTableRead; class TableScan; class TableSchema; @@ -40,14 +41,15 @@ struct SystemTablePath { std::string system_table_name; }; -class SystemTable { +class SystemTable : public std::enable_shared_from_this { public: virtual ~SystemTable() = default; virtual std::string Name() const = 0; virtual std::shared_ptr ArrowSchema() const = 0; virtual Result> NewScan() const = 0; - virtual Result> NewReader( + Result> NewRead(const std::shared_ptr& pool) const; + virtual Result> CreateBatchReader( const std::vector>& splits, const std::shared_ptr& pool) const = 0; }; diff --git a/src/paimon/core/table/system/system_table_read.cpp b/src/paimon/core/table/system/system_table_read.cpp index 96175b209..55ad9c1bf 100644 --- a/src/paimon/core/table/system/system_table_read.cpp +++ b/src/paimon/core/table/system/system_table_read.cpp @@ -30,7 +30,7 @@ SystemTableRead::SystemTableRead(std::shared_ptr system_table, Result> SystemTableRead::CreateReader( const std::vector>& splits) { - return system_table_->NewReader(splits, GetMemoryPool()); + return system_table_->CreateBatchReader(splits, GetMemoryPool()); } Result> SystemTableRead::CreateReader( diff --git a/src/paimon/core/table/system/system_table_schema.cpp b/src/paimon/core/table/system/system_table_schema.cpp index d0ee48012..838224571 100644 --- a/src/paimon/core/table/system/system_table_schema.cpp +++ b/src/paimon/core/table/system/system_table_schema.cpp @@ -16,7 +16,6 @@ #include "paimon/core/table/system/system_table_schema.h" -#include #include #include #include @@ -56,34 +55,6 @@ Result SystemTableSchema::GetFieldType(const std::string& field_name) return FieldTypeUtils::ConvertToFieldType(field->type()->id()); } -int64_t SystemTableSchema::Id() const { - return 0; -} - -const std::vector& SystemTableSchema::PrimaryKeys() const { - return empty_keys_; -} - -const std::vector& SystemTableSchema::PartitionKeys() const { - return empty_keys_; -} - -const std::vector& SystemTableSchema::BucketKeys() const { - return empty_keys_; -} - -int32_t SystemTableSchema::NumBuckets() const { - return -1; -} - -int32_t SystemTableSchema::HighestFieldId() const { - return static_cast(schema_->num_fields() - 1); -} - -const std::map& SystemTableSchema::Options() const { - return empty_options_; -} - std::optional SystemTableSchema::Comment() const { return std::nullopt; } diff --git a/src/paimon/core/table/system/system_table_schema.h b/src/paimon/core/table/system/system_table_schema.h index 54aaaaa2d..1a95297e6 100644 --- a/src/paimon/core/table/system/system_table_schema.h +++ b/src/paimon/core/table/system/system_table_schema.h @@ -28,7 +28,7 @@ namespace paimon { -class SystemTableSchema : public Schema { +class SystemTableSchema : public SystemSchema { public: explicit SystemTableSchema(std::shared_ptr schema); @@ -36,20 +36,11 @@ class SystemTableSchema : public Schema { Result GetJsonSchema() const override; std::vector FieldNames() const override; Result GetFieldType(const std::string& field_name) const override; - int64_t Id() const override; - const std::vector& PrimaryKeys() const override; - const std::vector& PartitionKeys() const override; - const std::vector& BucketKeys() const override; - int32_t NumBuckets() const override; - int32_t HighestFieldId() const override; - const std::map& Options() const override; std::optional Comment() const override; private: std::shared_ptr schema_; std::vector field_names_; - std::vector empty_keys_; - std::map empty_options_; }; } // namespace paimon diff --git a/src/paimon/core/table/table_test.cpp b/src/paimon/core/table/table_test.cpp index c9fb5e749..b6e6ba1b8 100644 --- a/src/paimon/core/table/table_test.cpp +++ b/src/paimon/core/table/table_test.cpp @@ -20,6 +20,7 @@ #include "gtest/gtest.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" +#include "paimon/schema/schema.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -50,9 +51,12 @@ TEST(TableTest, TestCreateWithUnknownDatabase) { std::shared_ptr latest_schema = table->LatestSchema(); ASSERT_NE(latest_schema, nullptr); - EXPECT_EQ(latest_schema->Id(), 0); - EXPECT_EQ(latest_schema->PartitionKeys(), partition_keys); - EXPECT_EQ(latest_schema->PrimaryKeys(), primary_keys); + auto data_schema = std::dynamic_pointer_cast(latest_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(latest_schema) != nullptr); + EXPECT_EQ(data_schema->Id(), 0); + EXPECT_EQ(data_schema->PartitionKeys(), partition_keys); + EXPECT_EQ(data_schema->PrimaryKeys(), primary_keys); } TEST(TableTest, TestCreateFailedWithNonExistSchema) { From 99290768feeaea2fdecf95bec6479b3ca7a87587 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 6 May 2026 15:55:43 +0800 Subject: [PATCH 8/8] Fix pre-commit formatting issues --- src/paimon/core/catalog/identifier_test.cpp | 3 +-- test/inte/read_inte_test.cpp | 13 ++++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/paimon/core/catalog/identifier_test.cpp b/src/paimon/core/catalog/identifier_test.cpp index 4e9f5fa98..836ecd4c6 100644 --- a/src/paimon/core/catalog/identifier_test.cpp +++ b/src/paimon/core/catalog/identifier_test.cpp @@ -124,8 +124,7 @@ TEST(IdentifierTest, InvalidEmptySystemTableNameParts) { ASSERT_NOK_WITH_MSG(Identifier("db", "$options").IsSystemTable(), "Invalid table name"); ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$").IsSystemTable(), "Invalid table name"); ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_").IsSystemTable(), "Invalid table name"); - ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_dev$").IsSystemTable(), - "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_dev$").IsSystemTable(), "Invalid table name"); ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$$options").IsSystemTable(), "System table can only contain one '$' separator"); } diff --git a/test/inte/read_inte_test.cpp b/test/inte/read_inte_test.cpp index da235b21f..23550928f 100644 --- a/test/inte/read_inte_test.cpp +++ b/test/inte/read_inte_test.cpp @@ -41,7 +41,6 @@ #include "paimon/common/types/data_field.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/scope_guard.h" -#include "paimon/defs.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_source.h" #include "paimon/core/stats/simple_stats.h" @@ -50,6 +49,7 @@ #include "paimon/core/table/source/fallback_data_split.h" #include "paimon/data/decimal.h" #include "paimon/data/timestamp.h" +#include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/memory/memory_pool.h" @@ -61,10 +61,10 @@ #include "paimon/result.h" #include "paimon/scan_context.h" #include "paimon/status.h" -#include "paimon/table/source/plan.h" #include "paimon/table/source/data_split.h" -#include "paimon/table/source/table_scan.h" +#include "paimon/table/source/plan.h" #include "paimon/table/source/table_read.h" +#include "paimon/table/source/table_scan.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/io_exception_helper.h" #include "paimon/testing/utils/read_result_collector.h" @@ -197,7 +197,7 @@ class ReadInteTest : public testing::Test, public ::testing::WithParamInterface< return std::dynamic_pointer_cast(split); } -private: + private: std::shared_ptr pool_; }; @@ -541,9 +541,8 @@ TEST(SystemTableReadInteTest, TestReadBranchOptionsSystemTable) { ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(plan->Splits())); ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); - std::map expected = {{"bucket", "2"}, - {"file.format", "parquet"}, - {"manifest.format", "avro"}}; + std::map expected = { + {"bucket", "2"}, {"file.format", "parquet"}, {"manifest.format", "avro"}}; ASSERT_EQ(CollectStringMap(result), expected) << result->ToString(); }