diff --git a/include/paimon/catalog/identifier.h b/include/paimon/catalog/identifier.h index 061c60a05..dd86c9175 100644 --- a/include/paimon/catalog/identifier.h +++ b/include/paimon/catalog/identifier.h @@ -16,8 +16,10 @@ #pragma once +#include #include +#include "paimon/result.h" #include "paimon/type_fwd.h" #include "paimon/visibility.h" @@ -27,6 +29,9 @@ namespace paimon { class PAIMON_EXPORT Identifier { public: static const char kUnknownDatabase[]; + static const char kSystemTableSplitter[]; + static const char kSystemBranchPrefix[]; + static const char kDefaultMainBranch[]; explicit Identifier(const std::string& table); Identifier(const std::string& database, const std::string& table); @@ -34,11 +39,22 @@ class PAIMON_EXPORT Identifier { bool operator==(const Identifier& other); const std::string& GetDatabaseName() const; const std::string& GetTableName() const; + Result GetDataTableName() const; + Result> GetBranchName() const; + Result GetBranchNameOrDefault() const; + Result> GetSystemTableName() const; + Result IsSystemTable() const; std::string ToString() const; private: + Status SplitTableName() const; + const std::string database_; const std::string table_; + mutable bool parsed_ = false; + mutable std::string data_table_; + mutable std::optional branch_; + mutable std::optional system_table_; }; } // namespace paimon diff --git a/include/paimon/schema/schema.h b/include/paimon/schema/schema.h index 9ebd9cb1a..5e046730c 100644 --- a/include/paimon/schema/schema.h +++ b/include/paimon/schema/schema.h @@ -57,6 +57,16 @@ class PAIMON_EXPORT Schema { /// failure. virtual Result GetFieldType(const std::string& field_name) const = 0; + /// Get an optional comment describing the schema object. + /// @return The comment if set, or std::nullopt otherwise. + virtual std::optional Comment() const = 0; +}; + +/// Schema contract for data tables. +class PAIMON_EXPORT DataSchema : public Schema { + public: + ~DataSchema() override = default; + /// Get the unique identifier of this table schema. /// @return The schema id virtual int64_t Id() const = 0; @@ -86,10 +96,12 @@ class PAIMON_EXPORT Schema { /// Get the table-level options associated with this schema. /// @return A reference to the map of option key-value pairs (e.g., file format, filesystem). virtual const std::map& Options() const = 0; +}; - /// Get an optional comment describing the table. - /// @return The table comment if set, or std::nullopt otherwise. - virtual std::optional Comment() const = 0; +/// Schema contract for system tables. +class PAIMON_EXPORT SystemSchema : public Schema { + public: + ~SystemSchema() override = default; }; } // namespace paimon diff --git a/include/paimon/table/source/table_read.h b/include/paimon/table/source/table_read.h index f8327da1f..51c1b6efb 100644 --- a/include/paimon/table/source/table_read.h +++ b/include/paimon/table/source/table_read.h @@ -66,6 +66,10 @@ class PAIMON_EXPORT TableRead { protected: explicit TableRead(const std::shared_ptr& memory_pool); + std::shared_ptr GetMemoryPool() const { + return pool_; + } + private: std::shared_ptr pool_; }; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 1301169b5..42adad419 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -313,6 +313,11 @@ set(PAIMON_CORE_SRCS core/table/source/table_read.cpp core/table/source/table_scan.cpp core/table/source/data_evolution_batch_scan.cpp + core/table/system/options_system_table.cpp + core/table/system/system_table.cpp + core/table/system/system_table_read.cpp + core/table/system/system_table_scan.cpp + core/table/system/system_table_schema.cpp core/tag/tag.cpp core/utils/field_mapping.cpp core/utils/file_store_path_factory.cpp diff --git a/src/paimon/core/catalog/file_system_catalog.cpp b/src/paimon/core/catalog/file_system_catalog.cpp index 3d07ade2c..c5bee3faf 100644 --- a/src/paimon/core/catalog/file_system_catalog.cpp +++ b/src/paimon/core/catalog/file_system_catalog.cpp @@ -30,6 +30,8 @@ #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/snapshot.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/table/system/system_table_schema.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/snapshot_manager.h" #include "paimon/defs.h" @@ -92,6 +94,19 @@ Result FileSystemCatalog::DatabaseExists(const std::string& db_name) const } Result FileSystemCatalog::TableExists(const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { + return false; + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + return latest_schema != std::nullopt; + } PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(identifier)); return latest_schema != std::nullopt; @@ -110,7 +125,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema* const std::vector& primary_keys, const std::map& options, bool ignore_if_exists) { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::Invalid( fmt::format("Cannot create table for system table {}, please use data table.", identifier.ToString())); @@ -148,7 +164,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema* Result>> FileSystemCatalog::TableSchemaExists( const Identifier& identifier) const { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::NotImplemented( "do not support checking TableSchemaExists for system table."); } @@ -168,12 +185,15 @@ bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) { return db_name == SYSTEM_DATABASE_NAME; } -bool FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { - return (identifier.GetTableName().find(SYSTEM_TABLE_SPLITTER) != std::string::npos); +Result FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { + return identifier.IsSystemTable(); } -bool FileSystemCatalog::IsSystemTable(const Identifier& identifier) { - return IsSystemDatabase(identifier.GetDatabaseName()) || IsSpecifiedSystemTable(identifier); +Result FileSystemCatalog::IsSystemTable(const Identifier& identifier) { + if (IsSystemDatabase(identifier.GetDatabaseName())) { + return true; + } + return IsSpecifiedSystemTable(identifier); } std::string FileSystemCatalog::NewDatabasePath(const std::string& warehouse, @@ -237,6 +257,26 @@ Result FileSystemCatalog::TableExistsInFileSystem(const std::string& table Result> FileSystemCatalog::LoadTableSchema( const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { + return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + if (!latest_schema) { + return Status::NotExist(fmt::format("{} not exist", data_identifier.ToString())); + } + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + SystemTableLoader::Load(system_table_name.value(), fs_, GetTableLocation(identifier), + latest_schema.value())); + return std::make_shared(system_table->ArrowSchema()); + } PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, TableSchemaExists(identifier)); if (!latest_schema) { @@ -246,6 +286,12 @@ Result> FileSystemCatalog::LoadTableSchema( } Result> FileSystemCatalog::GetTable(const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, LoadTableSchema(identifier)); + return std::make_shared(schema, identifier.GetDatabaseName(), + identifier.GetTableName()); + } return Table::Create(fs_, GetTableLocation(identifier), identifier); } @@ -351,7 +397,8 @@ Status FileSystemCatalog::DropTableImpl(const Identifier& identifier, } Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if_not_exists) { - if (IsSystemTable(identifier)) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { return Status::Invalid(fmt::format("Cannot drop system table {}.", identifier.ToString())); } @@ -414,7 +461,9 @@ Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if Status FileSystemCatalog::RenameTable(const Identifier& from_table, const Identifier& to_table, bool ignore_if_not_exists) { - if (IsSystemTable(from_table) || IsSystemTable(to_table)) { + PAIMON_ASSIGN_OR_RAISE(bool is_from_system_table, IsSystemTable(from_table)); + PAIMON_ASSIGN_OR_RAISE(bool is_to_system_table, IsSystemTable(to_table)); + if (is_from_system_table || is_to_system_table) { return Status::Invalid(fmt::format("Cannot rename system table {} or {}.", from_table.ToString(), to_table.ToString())); } diff --git a/src/paimon/core/catalog/file_system_catalog.h b/src/paimon/core/catalog/file_system_catalog.h index cf80d9401..0563a8973 100644 --- a/src/paimon/core/catalog/file_system_catalog.h +++ b/src/paimon/core/catalog/file_system_catalog.h @@ -69,8 +69,8 @@ class FileSystemCatalog : public Catalog { static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name); static std::string NewDataTablePath(const std::string& warehouse, const Identifier& identifier); static bool IsSystemDatabase(const std::string& db_name); - static bool IsSpecifiedSystemTable(const Identifier& identifier); - static bool IsSystemTable(const Identifier& identifier); + static Result IsSpecifiedSystemTable(const Identifier& identifier); + static Result IsSystemTable(const Identifier& identifier); Result>> TableSchemaExists( const Identifier& identifier) const; diff --git a/src/paimon/core/catalog/file_system_catalog_test.cpp b/src/paimon/core/catalog/file_system_catalog_test.cpp index f97801359..1c320f30d 100644 --- a/src/paimon/core/catalog/file_system_catalog_test.cpp +++ b/src/paimon/core/catalog/file_system_catalog_test.cpp @@ -25,6 +25,8 @@ #include "paimon/common/data/blob_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/core/core_options.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_schema.h" #include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/fs/file_system_factory.h" @@ -149,6 +151,65 @@ TEST(FileSystemCatalogTest, TestCreateTable) { ArrowSchemaRelease(&schema); } +TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + options["custom.option"] = "custom-value"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + Identifier options_identifier("db1", "tbl1$options"); + ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(options_identifier)); + ASSERT_TRUE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown"))); + ASSERT_FALSE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "missing$options"))); + ASSERT_FALSE(exists); + ASSERT_EQ(catalog.GetTableLocation(options_identifier), + PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options")); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr system_schema, + catalog.LoadTableSchema(options_identifier)); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); + ASSERT_OK_AND_ASSIGN(auto c_schema, system_schema->GetArrowSchema()); + auto loaded_schema_result = arrow::ImportSchema(c_schema.get()); + ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString(); + auto loaded_schema = loaded_schema_result.ValueUnsafe(); + ASSERT_EQ(loaded_schema->field_names(), (std::vector{"key", "value"})); + ASSERT_EQ(loaded_schema->field(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(loaded_schema->field(1)->type()->id(), arrow::Type::STRING); + ASSERT_FALSE(loaded_schema->field(0)->nullable()); + ASSERT_FALSE(loaded_schema->field(1)->nullable()); + + ASSERT_OK_AND_ASSIGN(auto system_table, catalog.GetTable(options_identifier)); + ASSERT_EQ(system_table->Name(), "tbl1$options"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl1$unknown")), "not exist"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "missing$options")), "not exist"); + + ::ArrowSchema system_create_schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(options_identifier, &system_create_schema, {}, {}, options, false), + "Cannot create table for system table"); + ArrowSchemaRelease(&system_create_schema); + ASSERT_NOK_WITH_MSG(catalog.DropTable(options_identifier, false), "Cannot drop system table"); + ASSERT_NOK_WITH_MSG(catalog.RenameTable(options_identifier, Identifier("db1", "tbl2"), false), + "Cannot rename system table"); +} + TEST(FileSystemCatalogTest, TestCreateTableWithBlob) { std::map options; options[Options::FILE_SYSTEM] = "local"; @@ -188,6 +249,8 @@ TEST(FileSystemCatalogTest, TestCreateTableWithBlob) { ASSERT_EQ(table_names[0], "tbl1"); ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1"))); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema()); auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie(); ASSERT_TRUE(typed_schema.Equals(loaded_schema)); @@ -358,11 +421,13 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) { ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db0", "tbl0")), "Identifier{database=\'db0\', table=\'tbl0\'} not exist"); ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, catalog.LoadTableSchema(identifier)); - ASSERT_EQ(0, table_schema->Id()); - ASSERT_EQ(3, table_schema->HighestFieldId()); - ASSERT_EQ(1, table_schema->PartitionKeys().size()); - ASSERT_EQ(0, table_schema->PrimaryKeys().size()); - ASSERT_EQ(-1, table_schema->NumBuckets()); + auto data_schema = std::dynamic_pointer_cast(table_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_EQ(0, data_schema->Id()); + ASSERT_EQ(3, data_schema->HighestFieldId()); + ASSERT_EQ(1, data_schema->PartitionKeys().size()); + ASSERT_EQ(0, data_schema->PrimaryKeys().size()); + ASSERT_EQ(-1, data_schema->NumBuckets()); ASSERT_FALSE(table_schema->Comment().has_value()); std::vector field_names = table_schema->FieldNames(); std::vector expected_field_names = {"f0", "f1", "f2", "f3"}; @@ -396,8 +461,7 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) { ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(identifier), "Identifier{database=\'db1\', table=\'tbl1\'} not exist"); - ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), - "do not support checking TableSchemaExists for system table."); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), "not exist"); ArrowSchemaRelease(&schema); } diff --git a/src/paimon/core/catalog/identifier.cpp b/src/paimon/core/catalog/identifier.cpp index 7ef7c04df..3ec998c83 100644 --- a/src/paimon/core/catalog/identifier.cpp +++ b/src/paimon/core/catalog/identifier.cpp @@ -16,11 +16,21 @@ #include "paimon/catalog/identifier.h" +#include +#include +#include + #include "fmt/format.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/result.h" +#include "paimon/status.h" namespace paimon { const char Identifier::kUnknownDatabase[] = "unknown"; +const char Identifier::kSystemTableSplitter[] = "$"; +const char Identifier::kSystemBranchPrefix[] = "branch_"; +const char Identifier::kDefaultMainBranch[] = "main"; Identifier::Identifier(const std::string& table) : Identifier(std::string(kUnknownDatabase), table) {} @@ -40,8 +50,73 @@ const std::string& Identifier::GetTableName() const { return table_; } +Result Identifier::GetDataTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return data_table_; +} + +Result> Identifier::GetBranchName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return branch_; +} + +Result Identifier::GetBranchNameOrDefault() const { + PAIMON_ASSIGN_OR_RAISE(std::optional branch, GetBranchName()); + return branch.value_or(kDefaultMainBranch); +} + +Result> Identifier::GetSystemTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return system_table_; +} + +Result Identifier::IsSystemTable() const { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table, GetSystemTableName()); + return system_table.has_value(); +} + std::string Identifier::ToString() const { return fmt::format("Identifier{{database='{}', table='{}'}}", database_, table_); } +Status Identifier::SplitTableName() const { + if (parsed_) { + return Status::OK(); + } + std::string data_table; + std::optional branch; + std::optional system_table; + std::vector splits = + StringUtils::Split(table_, kSystemTableSplitter, /*ignore_empty=*/false); + if (splits.size() == 1) { + data_table = table_; + } else if (splits.size() == 2) { + data_table = splits[0]; + if (StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + } else { + system_table = splits[1]; + } + } else if (splits.size() == 3) { + if (!StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + return Status::Invalid(fmt::format( + "System table can only contain one '$' separator, but this is: {}", table_)); + } + data_table = splits[0]; + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + system_table = splits[2]; + } else { + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); + } + if (data_table.empty() || (branch && branch->empty()) || + (system_table && system_table->empty())) { + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); + } + data_table_ = std::move(data_table); + branch_ = std::move(branch); + system_table_ = std::move(system_table); + parsed_ = true; + return Status::OK(); +} + } // namespace paimon diff --git a/src/paimon/core/catalog/identifier_test.cpp b/src/paimon/core/catalog/identifier_test.cpp index 8c1750158..836ecd4c6 100644 --- a/src/paimon/core/catalog/identifier_test.cpp +++ b/src/paimon/core/catalog/identifier_test.cpp @@ -17,19 +17,20 @@ #include "paimon/catalog/identifier.h" #include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" namespace paimon::test { TEST(IdentifierTest, ConstructorAndGetters) { Identifier id("test_db", "test_table"); - EXPECT_EQ(id.GetDatabaseName(), "test_db"); - EXPECT_EQ(id.GetTableName(), "test_table"); + ASSERT_EQ(id.GetDatabaseName(), "test_db"); + ASSERT_EQ(id.GetTableName(), "test_table"); } TEST(IdentifierTest, SingleArgumentConstructorUsesUnknownDatabase) { Identifier id("test_table"); - EXPECT_EQ(id.GetDatabaseName(), Identifier::kUnknownDatabase); - EXPECT_EQ(id.GetTableName(), "test_table"); + ASSERT_EQ(id.GetDatabaseName(), Identifier::kUnknownDatabase); + ASSERT_EQ(id.GetTableName(), "test_table"); } TEST(IdentifierTest, EqualityOperator) { @@ -37,19 +38,95 @@ TEST(IdentifierTest, EqualityOperator) { Identifier id2("db1", "table1"); Identifier id3("db2", "table2"); - EXPECT_TRUE(id1 == id2); - EXPECT_FALSE(id1 == id3); + ASSERT_TRUE(id1 == id2); + ASSERT_FALSE(id1 == id3); } TEST(IdentifierTest, ToString) { Identifier id("my_db", "my_table"); - EXPECT_EQ(id.ToString(), "Identifier{database='my_db', table='my_table'}"); + ASSERT_EQ(id.ToString(), "Identifier{database='my_db', table='my_table'}"); } TEST(IdentifierTest, EmptyDatabaseRemainsEmpty) { Identifier id("", "my_table"); - EXPECT_EQ(id.GetDatabaseName(), ""); - EXPECT_EQ(id.GetTableName(), "my_table"); + ASSERT_EQ(id.GetDatabaseName(), ""); + ASSERT_EQ(id.GetTableName(), "my_table"); +} + +TEST(IdentifierTest, ParseDataTable) { + Identifier id("db", "tbl"); + ASSERT_EQ(id.GetTableName(), "tbl"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, Identifier::kDefaultMainBranch); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); +} + +TEST(IdentifierTest, ParseSystemTable) { + Identifier id("db", "tbl$options"); + ASSERT_EQ(id.GetTableName(), "tbl$options"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); +} + +TEST(IdentifierTest, ParseBranchTable) { + Identifier id("db", "tbl$branch_dev"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); +} + +TEST(IdentifierTest, ParseBranchSystemTable) { + Identifier id("db", "tbl$branch_dev$options"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); +} + +TEST(IdentifierTest, InvalidSystemTableName) { + Identifier invalid_middle("db", "tbl$bad$options"); + ASSERT_NOK_WITH_MSG(invalid_middle.IsSystemTable(), + "System table can only contain one '$' separator"); + + Identifier too_many("db", "tbl$branch_dev$options$extra"); + ASSERT_NOK_WITH_MSG(too_many.IsSystemTable(), "Invalid table name"); +} + +TEST(IdentifierTest, InvalidEmptySystemTableNameParts) { + ASSERT_NOK_WITH_MSG(Identifier("db", "$options").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_dev$").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$$options").IsSystemTable(), + "System table can only contain one '$' separator"); } } // namespace paimon::test diff --git a/src/paimon/core/schema/table_schema.h b/src/paimon/core/schema/table_schema.h index 60e11b638..161e10713 100644 --- a/src/paimon/core/schema/table_schema.h +++ b/src/paimon/core/schema/table_schema.h @@ -36,7 +36,7 @@ struct ArrowSchema; namespace paimon { /// Schema of a table, including schemaId and fieldId. -class TableSchema : public Schema, public Jsonizable { +class TableSchema : public DataSchema, public Jsonizable { public: static constexpr int64_t FIRST_SCHEMA_ID = 0; static constexpr int32_t PAIMON_07_VERSION = 1; diff --git a/src/paimon/core/table/source/table_read.cpp b/src/paimon/core/table/source/table_read.cpp index cc7e646fa..9bf057e95 100644 --- a/src/paimon/core/table/source/table_read.cpp +++ b/src/paimon/core/table/source/table_read.cpp @@ -33,6 +33,8 @@ #include "paimon/core/table/source/append_only_table_read.h" #include "paimon/core/table/source/fallback_table_read.h" #include "paimon/core/table/source/key_value_table_read.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/table/system/system_table_read.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/defs.h" @@ -124,6 +126,19 @@ Result> TableRead::Create(std::unique_ptrGetMemoryPool(); auto executor = context->GetExecutor(); + PAIMON_ASSIGN_OR_RAISE( + CoreOptions tmp_core_options, + CoreOptions::FromMap(context->GetOptions(), context->GetSpecificFileSystem(), + context->GetFileSystemSchemeToIdentifierMap())); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + SystemTableLoader::TryParsePath(context->GetPath())); + if (system_table_path) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + SystemTableLoader::LoadFromPath(tmp_core_options.GetFileSystem(), context->GetPath())); + return system_table->NewRead(memory_pool); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr internal_context, CreateInternalReadContext(context, context->GetBranch())); diff --git a/src/paimon/core/table/source/table_read_test.cpp b/src/paimon/core/table/source/table_read_test.cpp index 2b72dfdbc..762e9362c 100644 --- a/src/paimon/core/table/source/table_read_test.cpp +++ b/src/paimon/core/table/source/table_read_test.cpp @@ -17,8 +17,10 @@ #include "paimon/table/source/table_read.h" #include +#include #include #include +#include #include "gtest/gtest.h" #include "paimon/core/core_options.h" @@ -153,4 +155,5 @@ TEST(TableReadTest, TestMergeOptions) { {"manifest.format", "orc"}, {"file.format", "orc"}}; ASSERT_EQ(expected_options, core_options.ToMap()); } + } // namespace paimon::test diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 43ac11b4f..124a25c46 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -48,6 +48,7 @@ #include "paimon/core/table/source/merge_tree_split_generator.h" #include "paimon/core/table/source/snapshot/snapshot_reader.h" #include "paimon/core/table/source/split_generator.h" +#include "paimon/core/table/system/system_table.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/core/utils/index_file_path_factories.h" @@ -165,6 +166,14 @@ Result> TableScan::Create(std::unique_ptrGetOptions(), context->GetSpecificFileSystem())); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, + SystemTableLoader::TryParsePath(context->GetPath())); + if (system_table_path) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr system_table, + SystemTableLoader::LoadFromPath(tmp_options.GetFileSystem(), context->GetPath())); + return system_table->NewScan(); + } SchemaManager schema_manager(tmp_options.GetFileSystem(), context->GetPath()); PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, schema_manager.Latest()); diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp new file mode 100644 index 000000000..ba00c4263 --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -0,0 +1,130 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/options_system_table.h" + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_scan.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +namespace { + +std::shared_ptr OptionsSchema() { + return arrow::schema({arrow::field("key", arrow::utf8(), /*nullable=*/false), + arrow::field("value", arrow::utf8(), /*nullable=*/false)}); +} + +class OptionsBatchReader : public BatchReader { + public: + OptionsBatchReader(std::map options, + const std::shared_ptr& pool) + : arrow_pool_(GetArrowPool(pool)), options_(std::move(options)) {} + + Result NextBatch() override { + if (emitted_) { + return BatchReader::MakeEofBatch(); + } + emitted_ = true; + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr key_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr value_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + auto* key_builder = dynamic_cast(key_array_builder.get()); + auto* value_builder = dynamic_cast(value_array_builder.get()); + if (key_builder == nullptr || value_builder == nullptr) { + return Status::Invalid("cannot create string builders for options system table"); + } + for (const auto& [key, value] : options_) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Append(key)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Append(value)); + } + std::shared_ptr key_array; + std::shared_ptr value_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Finish(&key_array)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Finish(&value_array)); + auto struct_array = std::make_shared( + arrow::struct_(OptionsSchema()->fields()), key_array->length(), + std::vector>{key_array, value_array}); + + auto c_array = std::make_unique<::ArrowArray>(); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*struct_array, c_array.get(), c_schema.get())); + return std::make_pair(std::move(c_array), std::move(c_schema)); + } + + std::shared_ptr GetReaderMetrics() const override { + return std::make_shared(); + } + + void Close() override { + emitted_ = true; + } + + private: + std::unique_ptr arrow_pool_; + std::map options_; + bool emitted_ = false; +}; + +} // namespace + +OptionsSystemTable::OptionsSystemTable(std::string table_path, + std::shared_ptr table_schema) + : table_path_(std::move(table_path)), table_schema_(std::move(table_schema)) {} + +std::string OptionsSystemTable::Name() const { + return kName; +} + +std::shared_ptr OptionsSystemTable::ArrowSchema() const { + return OptionsSchema(); +} + +Result> OptionsSystemTable::NewScan() const { + return std::make_unique(table_path_); +} + +Result> OptionsSystemTable::CreateBatchReader( + const std::vector>& splits, + const std::shared_ptr& pool) const { + if (splits.size() != 1) { + return Status::Invalid("options system table expects a single split"); + } + for (const auto& split : splits) { + if (!std::dynamic_pointer_cast(split)) { + return Status::Invalid("unsupported split for options system table"); + } + } + return std::make_unique(table_schema_->Options(), pool); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/options_system_table.h b/src/paimon/core/table/system/options_system_table.h new file mode 100644 index 000000000..8cec1832f --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/core/table/system/system_table.h" + +namespace paimon { +class TableSchema; + +class OptionsSystemTable : public SystemTable { + public: + static constexpr const char* kName = "options"; + + OptionsSystemTable(std::string table_path, std::shared_ptr table_schema); + + std::string Name() const override; + std::shared_ptr ArrowSchema() const override; + Result> NewScan() const override; + Result> CreateBatchReader( + const std::vector>& splits, + const std::shared_ptr& pool) const override; + + private: + std::string table_path_; + std::shared_ptr table_schema_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp new file mode 100644 index 000000000..43a34fd77 --- /dev/null +++ b/src/paimon/core/table/system/system_table.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table.h" + +#include +#include +#include +#include + +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/options_system_table.h" +#include "paimon/core/table/system/system_table_read.h" +#include "paimon/core/utils/branch_manager.h" + +namespace paimon { + +Result> SystemTable::NewRead( + const std::shared_ptr& pool) const { + return std::make_unique( + std::const_pointer_cast(shared_from_this()), pool); +} + +bool SystemTableLoader::IsSupported(const std::string& system_table_name) { + return StringUtils::ToLowerCase(system_table_name) == OptionsSystemTable::kName; +} + +Result> SystemTableLoader::Load( + const std::string& system_table_name, const std::shared_ptr& /*fs*/, + const std::string& table_path, const std::shared_ptr& table_schema) { + std::string normalized_name = StringUtils::ToLowerCase(system_table_name); + if (normalized_name == OptionsSystemTable::kName) { + return std::make_shared(table_path, table_schema); + } + return Status::NotImplemented("unsupported system table: ", system_table_name); +} + +Result> SystemTableLoader::TryParsePath(const std::string& path) { + std::string table_name = PathUtil::GetName(path); + Identifier identifier(table_name); + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (!is_system_table) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::optional branch, identifier.GetBranchName()); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + std::string parent = PathUtil::GetParentDirPath(path); + SystemTablePath system_table_path; + system_table_path.table_path = PathUtil::JoinPath(parent, data_table_name); + system_table_path.branch = std::move(branch); + system_table_path.system_table_name = system_table_name.value(); + return std::optional(std::move(system_table_path)); +} + +Result> SystemTableLoader::LoadFromPath( + const std::shared_ptr& fs, const std::string& path) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, TryParsePath(path)); + if (!system_table_path) { + return Status::Invalid("path is not a system table path: ", path); + } + const auto& parsed = system_table_path.value(); + SchemaManager schema_manager(fs, parsed.table_path, + parsed.branch.value_or(BranchManager::DEFAULT_MAIN_BRANCH)); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + schema_manager.Latest()); + if (!latest_schema) { + return Status::NotExist("base table schema not found for system table path: ", path); + } + return Load(parsed.system_table_name, fs, path, latest_schema.value()); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.h b/src/paimon/core/table/system/system_table.h new file mode 100644 index 000000000..2a57b7e3c --- /dev/null +++ b/src/paimon/core/table/system/system_table.h @@ -0,0 +1,71 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" + +namespace paimon { +class FileSystem; +class MemoryPool; +class Split; +class SystemTableRead; +class TableScan; +class TableSchema; + +struct SystemTablePath { + std::string table_path; + std::optional branch; + std::string system_table_name; +}; + +class SystemTable : public std::enable_shared_from_this { + public: + virtual ~SystemTable() = default; + + virtual std::string Name() const = 0; + virtual std::shared_ptr ArrowSchema() const = 0; + virtual Result> NewScan() const = 0; + Result> NewRead(const std::shared_ptr& pool) const; + virtual Result> CreateBatchReader( + const std::vector>& splits, + const std::shared_ptr& pool) const = 0; +}; + +class SystemTableLoader { + public: + static bool IsSupported(const std::string& system_table_name); + + static Result> Load( + const std::string& system_table_name, const std::shared_ptr& fs, + const std::string& table_path, const std::shared_ptr& table_schema); + + static Result> TryParsePath(const std::string& path); + + static Result> LoadFromPath(const std::shared_ptr& fs, + const std::string& path); +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_read.cpp b/src/paimon/core/table/system/system_table_read.cpp new file mode 100644 index 000000000..55ad9c1bf --- /dev/null +++ b/src/paimon/core/table/system/system_table_read.cpp @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_read.h" + +#include +#include +#include + +#include "paimon/core/table/system/system_table.h" + +namespace paimon { + +SystemTableRead::SystemTableRead(std::shared_ptr system_table, + const std::shared_ptr& memory_pool) + : TableRead(memory_pool), system_table_(std::move(system_table)) {} + +Result> SystemTableRead::CreateReader( + const std::vector>& splits) { + return system_table_->CreateBatchReader(splits, GetMemoryPool()); +} + +Result> SystemTableRead::CreateReader( + const std::shared_ptr& split) { + std::vector> splits = {split}; + return CreateReader(splits); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_read.h b/src/paimon/core/table/system/system_table_read.h new file mode 100644 index 000000000..4d995cfa5 --- /dev/null +++ b/src/paimon/core/table/system/system_table_read.h @@ -0,0 +1,40 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/table/source/table_read.h" + +namespace paimon { +class SystemTable; + +class SystemTableRead : public TableRead { + public: + SystemTableRead(std::shared_ptr system_table, + const std::shared_ptr& memory_pool); + + Result> CreateReader( + const std::vector>& splits) override; + Result> CreateReader(const std::shared_ptr& split) override; + + private: + std::shared_ptr system_table_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.cpp b/src/paimon/core/table/system/system_table_scan.cpp new file mode 100644 index 000000000..05f62ebd7 --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.cpp @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_scan.h" + +#include +#include + +#include "paimon/core/table/source/plan_impl.h" + +namespace paimon { + +SystemTableScan::SystemTableScan(const std::string& table_path) : table_path_(table_path) {} + +Result> SystemTableScan::CreatePlan() { + std::vector> splits = {std::make_shared(table_path_)}; + return std::make_shared(std::nullopt, splits); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.h b/src/paimon/core/table/system/system_table_scan.h new file mode 100644 index 000000000..2acd57d2d --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.h @@ -0,0 +1,51 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/table/source/table_scan.h" + +namespace paimon { +class Plan; +class Split; + +class SystemTableSplit : public Split { + public: + explicit SystemTableSplit(const std::string& table_path) : table_path_(table_path) {} + + const std::string& TablePath() const { + return table_path_; + } + + private: + std::string table_path_; +}; + +class SystemTableScan : public TableScan { + public: + explicit SystemTableScan(const std::string& table_path); + + Result> CreatePlan() override; + + private: + std::string table_path_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.cpp b/src/paimon/core/table/system/system_table_schema.cpp new file mode 100644 index 000000000..838224571 --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.cpp @@ -0,0 +1,62 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/system/system_table_schema.h" + +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/field_type_utils.h" +#include "paimon/status.h" + +namespace paimon { + +SystemTableSchema::SystemTableSchema(std::shared_ptr schema) + : schema_(std::move(schema)) { + field_names_ = schema_->field_names(); +} + +Result> SystemTableSchema::GetArrowSchema() const { + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema_, c_schema.get())); + return c_schema; +} + +Result SystemTableSchema::GetJsonSchema() const { + return Status::NotImplemented("system table JSON schema is not supported"); +} + +std::vector SystemTableSchema::FieldNames() const { + return field_names_; +} + +Result SystemTableSchema::GetFieldType(const std::string& field_name) const { + auto field = schema_->GetFieldByName(field_name); + if (!field) { + return Status::NotExist("field ", field_name, " not exist in system table schema"); + } + return FieldTypeUtils::ConvertToFieldType(field->type()->id()); +} + +std::optional SystemTableSchema::Comment() const { + return std::nullopt; +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.h b/src/paimon/core/table/system/system_table_schema.h new file mode 100644 index 000000000..1a95297e6 --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.h @@ -0,0 +1,46 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/result.h" +#include "paimon/schema/schema.h" + +namespace paimon { + +class SystemTableSchema : public SystemSchema { + public: + explicit SystemTableSchema(std::shared_ptr schema); + + Result> GetArrowSchema() const override; + Result GetJsonSchema() const override; + std::vector FieldNames() const override; + Result GetFieldType(const std::string& field_name) const override; + std::optional Comment() const override; + + private: + std::shared_ptr schema_; + std::vector field_names_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/table_test.cpp b/src/paimon/core/table/table_test.cpp index c9fb5e749..b6e6ba1b8 100644 --- a/src/paimon/core/table/table_test.cpp +++ b/src/paimon/core/table/table_test.cpp @@ -20,6 +20,7 @@ #include "gtest/gtest.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" +#include "paimon/schema/schema.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -50,9 +51,12 @@ TEST(TableTest, TestCreateWithUnknownDatabase) { std::shared_ptr latest_schema = table->LatestSchema(); ASSERT_NE(latest_schema, nullptr); - EXPECT_EQ(latest_schema->Id(), 0); - EXPECT_EQ(latest_schema->PartitionKeys(), partition_keys); - EXPECT_EQ(latest_schema->PrimaryKeys(), primary_keys); + auto data_schema = std::dynamic_pointer_cast(latest_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(latest_schema) != nullptr); + EXPECT_EQ(data_schema->Id(), 0); + EXPECT_EQ(data_schema->PartitionKeys(), partition_keys); + EXPECT_EQ(data_schema->PrimaryKeys(), primary_keys); } TEST(TableTest, TestCreateFailedWithNonExistSchema) { diff --git a/test/inte/read_inte_test.cpp b/test/inte/read_inte_test.cpp index 0c824bc4d..23550928f 100644 --- a/test/inte/read_inte_test.cpp +++ b/test/inte/read_inte_test.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include #include #include @@ -29,12 +31,15 @@ #include "arrow/c/abi.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/factories/io_hook.h" #include "paimon/common/reader/complete_row_kind_batch_reader.h" #include "paimon/common/reader/concat_batch_reader.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" +#include "paimon/common/utils/path_util.h" #include "paimon/common/utils/scope_guard.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_source.h" @@ -54,9 +59,12 @@ #include "paimon/read_context.h" #include "paimon/reader/batch_reader.h" #include "paimon/result.h" +#include "paimon/scan_context.h" #include "paimon/status.h" #include "paimon/table/source/data_split.h" +#include "paimon/table/source/plan.h" #include "paimon/table/source/table_read.h" +#include "paimon/table/source/table_scan.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/io_exception_helper.h" #include "paimon/testing/utils/read_result_collector.h" @@ -193,6 +201,36 @@ class ReadInteTest : public testing::Test, public ::testing::WithParamInterface< std::shared_ptr pool_; }; +namespace { + +std::map CollectStringMap( + const std::shared_ptr& result) { + std::map values; + EXPECT_TRUE(result); + EXPECT_EQ(result->num_chunks(), 1); + if (!result || result->num_chunks() != 1) { + return values; + } + auto struct_array = std::dynamic_pointer_cast(result->chunk(0)); + EXPECT_TRUE(struct_array); + if (!struct_array) { + return values; + } + auto key_array = std::dynamic_pointer_cast(struct_array->field(0)); + auto value_array = std::dynamic_pointer_cast(struct_array->field(1)); + EXPECT_TRUE(key_array); + EXPECT_TRUE(value_array); + if (!key_array || !value_array) { + return values; + } + for (int64_t i = 0; i < struct_array->length(); ++i) { + values.emplace(key_array->GetString(i), value_array->GetString(i)); + } + return values; +} + +} // namespace + std::vector PrepareTestParam() { std::vector values = { TestParam{false, "false", "parquet", PrefetchCacheMode::ALWAYS}, @@ -434,6 +472,80 @@ TEST_P(ReadInteTest, TestReadOnlyPartitionField) { ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); } +TEST(SystemTableReadInteTest, TestReadOptionsSystemTable) { + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "orc"}, + {Options::MANIFEST_FORMAT, "orc"}, + {"custom.option", "custom-value"}}; + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string warehouse = PathUtil::JoinPath(dir->Str(), "warehouse"); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(warehouse, options)); + ASSERT_OK(catalog->CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog->CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + std::string system_table_path = catalog->GetTableLocation(Identifier("db1", "tbl1$options")); + ScanContextBuilder scan_context_builder(system_table_path); + scan_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_EQ(plan->Splits().size(), 1); + + ReadContextBuilder read_context_builder(system_table_path); + read_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); + ASSERT_TRUE(result); + + std::map expected = {{"custom.option", "custom-value"}, + {"file-system", "local"}, + {"file.format", "orc"}, + {"manifest.format", "orc"}}; + ASSERT_EQ(CollectStringMap(result), expected) << result->ToString(); +} + +TEST(SystemTableReadInteTest, TestReadBranchOptionsSystemTable) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string source_path = + GetDataDir() + "/parquet/append_table_with_rt_branch.db/append_table_with_rt_branch"; + std::string table_path = PathUtil::JoinPath(dir->Str(), "branch_table"); + ASSERT_TRUE(TestUtil::CopyDirectory(std::filesystem::path(source_path), + std::filesystem::path(table_path))); + + std::map options = {{Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "parquet"}, + {Options::MANIFEST_FORMAT, "avro"}}; + std::string system_table_path = table_path + "$branch_rt$options"; + ScanContextBuilder scan_context_builder(system_table_path); + scan_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto plan, table_scan->CreatePlan()); + ASSERT_EQ(plan->Splits().size(), 1); + + ReadContextBuilder read_context_builder(system_table_path); + read_context_builder.SetOptions(options); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto result, ReadResultCollector::CollectResult(batch_reader.get())); + + std::map expected = { + {"bucket", "2"}, {"file.format", "parquet"}, {"manifest.format", "avro"}}; + ASSERT_EQ(CollectStringMap(result), expected) << result->ToString(); +} + TEST_P(ReadInteTest, TestAppendReadWithMultipleBuckets) { std::vector read_fields = { DataField(3, arrow::field("f3", arrow::float64())),