Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/paimon/catalog/identifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#pragma once

#include <optional>
#include <string>

#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

Expand All @@ -27,18 +29,32 @@ namespace paimon {
class PAIMON_EXPORT Identifier {
public:
static const char kUnknownDatabase[];
static const char kSystemTableSplitter[];
static const char kSystemBranchPrefix[];
static const char kDefaultMainBranch[];

explicit Identifier(const std::string& table);
Identifier(const std::string& database, const std::string& table);

bool operator==(const Identifier& other);
const std::string& GetDatabaseName() const;
const std::string& GetTableName() const;
Result<std::string> GetDataTableName() const;
Result<std::optional<std::string>> GetBranchName() const;
Result<std::string> GetBranchNameOrDefault() const;
Result<std::optional<std::string>> GetSystemTableName() const;
Result<bool> IsSystemTable() const;
std::string ToString() const;

private:
Status SplitTableName() const;

const std::string database_;
const std::string table_;
mutable bool parsed_ = false;
mutable std::string data_table_;
mutable std::optional<std::string> branch_;
mutable std::optional<std::string> system_table_;
};

} // namespace paimon
18 changes: 15 additions & 3 deletions include/paimon/schema/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ class PAIMON_EXPORT Schema {
/// failure.
virtual Result<FieldType> GetFieldType(const std::string& field_name) const = 0;

/// Get an optional comment describing the schema object.
/// @return The comment if set, or std::nullopt otherwise.
virtual std::optional<std::string> Comment() const = 0;
};

/// Schema contract for data tables.
class PAIMON_EXPORT DataSchema : public Schema {
public:
~DataSchema() override = default;

/// Get the unique identifier of this table schema.
/// @return The schema id
virtual int64_t Id() const = 0;
Expand Down Expand Up @@ -86,10 +96,12 @@ class PAIMON_EXPORT Schema {
/// Get the table-level options associated with this schema.
/// @return A reference to the map of option key-value pairs (e.g., file format, filesystem).
virtual const std::map<std::string, std::string>& Options() const = 0;
};

/// Get an optional comment describing the table.
/// @return The table comment if set, or std::nullopt otherwise.
virtual std::optional<std::string> Comment() const = 0;
/// Schema contract for system tables.
class PAIMON_EXPORT SystemSchema : public Schema {
public:
~SystemSchema() override = default;
};

} // namespace paimon
4 changes: 4 additions & 0 deletions include/paimon/table/source/table_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class PAIMON_EXPORT TableRead {
protected:
explicit TableRead(const std::shared_ptr<MemoryPool>& memory_pool);

std::shared_ptr<MemoryPool> GetMemoryPool() const {
return pool_;
}

private:
std::shared_ptr<MemoryPool> pool_;
};
Expand Down
5 changes: 5 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ set(PAIMON_CORE_SRCS
core/table/source/table_read.cpp
core/table/source/table_scan.cpp
core/table/source/data_evolution_batch_scan.cpp
core/table/system/options_system_table.cpp
core/table/system/system_table.cpp
core/table/system/system_table_read.cpp
core/table/system/system_table_scan.cpp
core/table/system/system_table_schema.cpp
core/tag/tag.cpp
core/utils/field_mapping.cpp
core/utils/file_store_path_factory.cpp
Expand Down
65 changes: 57 additions & 8 deletions src/paimon/core/catalog/file_system_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/core/snapshot.h"
#include "paimon/core/table/system/system_table.h"
#include "paimon/core/table/system/system_table_schema.h"
#include "paimon/core/utils/branch_manager.h"
#include "paimon/core/utils/snapshot_manager.h"
#include "paimon/defs.h"
Expand Down Expand Up @@ -92,6 +94,19 @@ Result<bool> FileSystemCatalog::DatabaseExists(const std::string& db_name) const
}

Result<bool> FileSystemCatalog::TableExists(const Identifier& identifier) const {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable());
if (is_system_table) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> system_table_name,
identifier.GetSystemTableName());
if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) {
return false;
}
PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName());
Identifier data_identifier(identifier.GetDatabaseName(), data_table_name);
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(data_identifier));
return latest_schema != std::nullopt;
}
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(identifier));
return latest_schema != std::nullopt;
Expand All @@ -110,7 +125,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema*
const std::vector<std::string>& primary_keys,
const std::map<std::string, std::string>& options,
bool ignore_if_exists) {
if (IsSystemTable(identifier)) {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier));
if (is_system_table) {
return Status::Invalid(
fmt::format("Cannot create table for system table {}, please use data table.",
identifier.ToString()));
Expand Down Expand Up @@ -148,7 +164,8 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema*

Result<std::optional<std::shared_ptr<TableSchema>>> FileSystemCatalog::TableSchemaExists(
const Identifier& identifier) const {
if (IsSystemTable(identifier)) {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier));
if (is_system_table) {
return Status::NotImplemented(
"do not support checking TableSchemaExists for system table.");
}
Expand All @@ -168,12 +185,15 @@ bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) {
return db_name == SYSTEM_DATABASE_NAME;
}

bool FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) {
return (identifier.GetTableName().find(SYSTEM_TABLE_SPLITTER) != std::string::npos);
Result<bool> FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) {
return identifier.IsSystemTable();
}

bool FileSystemCatalog::IsSystemTable(const Identifier& identifier) {
return IsSystemDatabase(identifier.GetDatabaseName()) || IsSpecifiedSystemTable(identifier);
Result<bool> FileSystemCatalog::IsSystemTable(const Identifier& identifier) {
if (IsSystemDatabase(identifier.GetDatabaseName())) {
return true;
}
return IsSpecifiedSystemTable(identifier);
}

std::string FileSystemCatalog::NewDatabasePath(const std::string& warehouse,
Expand Down Expand Up @@ -237,6 +257,26 @@ Result<bool> FileSystemCatalog::TableExistsInFileSystem(const std::string& table

Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
const Identifier& identifier) const {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable());
if (is_system_table) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::string> system_table_name,
identifier.GetSystemTableName());
if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) {
return Status::NotExist(fmt::format("{} not exist", identifier.ToString()));
}
PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName());
Identifier data_identifier(identifier.GetDatabaseName(), data_table_name);
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(data_identifier));
if (!latest_schema) {
return Status::NotExist(fmt::format("{} not exist", data_identifier.ToString()));
}
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<SystemTable> system_table,
SystemTableLoader::Load(system_table_name.value(), fs_, GetTableLocation(identifier),
latest_schema.value()));
return std::make_shared<SystemTableSchema>(system_table->ArrowSchema());
}
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(identifier));
if (!latest_schema) {
Expand All @@ -246,6 +286,12 @@ Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
}

Result<std::shared_ptr<Table>> FileSystemCatalog::GetTable(const Identifier& identifier) const {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable());
if (is_system_table) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Schema> schema, LoadTableSchema(identifier));
return std::make_shared<Table>(schema, identifier.GetDatabaseName(),
identifier.GetTableName());
}
return Table::Create(fs_, GetTableLocation(identifier), identifier);
}

Expand Down Expand Up @@ -351,7 +397,8 @@ Status FileSystemCatalog::DropTableImpl(const Identifier& identifier,
}

Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if_not_exists) {
if (IsSystemTable(identifier)) {
PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier));
if (is_system_table) {
return Status::Invalid(fmt::format("Cannot drop system table {}.", identifier.ToString()));
}

Expand Down Expand Up @@ -414,7 +461,9 @@ Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if

Status FileSystemCatalog::RenameTable(const Identifier& from_table, const Identifier& to_table,
bool ignore_if_not_exists) {
if (IsSystemTable(from_table) || IsSystemTable(to_table)) {
PAIMON_ASSIGN_OR_RAISE(bool is_from_system_table, IsSystemTable(from_table));
PAIMON_ASSIGN_OR_RAISE(bool is_to_system_table, IsSystemTable(to_table));
if (is_from_system_table || is_to_system_table) {
return Status::Invalid(fmt::format("Cannot rename system table {} or {}.",
from_table.ToString(), to_table.ToString()));
}
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/core/catalog/file_system_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class FileSystemCatalog : public Catalog {
static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name);
static std::string NewDataTablePath(const std::string& warehouse, const Identifier& identifier);
static bool IsSystemDatabase(const std::string& db_name);
static bool IsSpecifiedSystemTable(const Identifier& identifier);
static bool IsSystemTable(const Identifier& identifier);
static Result<bool> IsSpecifiedSystemTable(const Identifier& identifier);
static Result<bool> IsSystemTable(const Identifier& identifier);
Result<std::optional<std::shared_ptr<TableSchema>>> TableSchemaExists(
const Identifier& identifier) const;

Expand Down
78 changes: 71 additions & 7 deletions src/paimon/core/catalog/file_system_catalog_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "paimon/common/data/blob_utils.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/core/core_options.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/core/table/system/system_table_schema.h"
#include "paimon/defs.h"
#include "paimon/fs/file_system.h"
#include "paimon/fs/file_system_factory.h"
Expand Down Expand Up @@ -149,6 +151,65 @@ TEST(FileSystemCatalogTest, TestCreateTable) {
ArrowSchemaRelease(&schema);
}

TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) {
std::map<std::string, std::string> options;
options[Options::FILE_SYSTEM] = "local";
options[Options::FILE_FORMAT] = "orc";
options["custom.option"] = "custom-value";
ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options));
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);
FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str());
ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true));

auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())});
::ArrowSchema schema;
ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok());
ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema,
/*partition_keys=*/{}, /*primary_keys=*/{}, options,
/*ignore_if_exists=*/false));
ArrowSchemaRelease(&schema);

Identifier options_identifier("db1", "tbl1$options");
ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(options_identifier));
ASSERT_TRUE(exists);
ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown")));
ASSERT_FALSE(exists);
ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "missing$options")));
ASSERT_FALSE(exists);
ASSERT_EQ(catalog.GetTableLocation(options_identifier),
PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options"));

ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> system_schema,
catalog.LoadTableSchema(options_identifier));
ASSERT_TRUE(std::dynamic_pointer_cast<SystemSchema>(system_schema) != nullptr);
ASSERT_TRUE(std::dynamic_pointer_cast<SystemTableSchema>(system_schema) != nullptr);
ASSERT_OK_AND_ASSIGN(auto c_schema, system_schema->GetArrowSchema());
auto loaded_schema_result = arrow::ImportSchema(c_schema.get());
ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString();
auto loaded_schema = loaded_schema_result.ValueUnsafe();
ASSERT_EQ(loaded_schema->field_names(), (std::vector<std::string>{"key", "value"}));
ASSERT_EQ(loaded_schema->field(0)->type()->id(), arrow::Type::STRING);
ASSERT_EQ(loaded_schema->field(1)->type()->id(), arrow::Type::STRING);
ASSERT_FALSE(loaded_schema->field(0)->nullable());
ASSERT_FALSE(loaded_schema->field(1)->nullable());

ASSERT_OK_AND_ASSIGN(auto system_table, catalog.GetTable(options_identifier));
ASSERT_EQ(system_table->Name(), "tbl1$options");
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl1$unknown")), "not exist");
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "missing$options")), "not exist");

::ArrowSchema system_create_schema;
ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok());
ASSERT_NOK_WITH_MSG(
catalog.CreateTable(options_identifier, &system_create_schema, {}, {}, options, false),
"Cannot create table for system table");
ArrowSchemaRelease(&system_create_schema);
ASSERT_NOK_WITH_MSG(catalog.DropTable(options_identifier, false), "Cannot drop system table");
ASSERT_NOK_WITH_MSG(catalog.RenameTable(options_identifier, Identifier("db1", "tbl2"), false),
"Cannot rename system table");
}

TEST(FileSystemCatalogTest, TestCreateTableWithBlob) {
std::map<std::string, std::string> options;
options[Options::FILE_SYSTEM] = "local";
Expand Down Expand Up @@ -188,6 +249,8 @@ TEST(FileSystemCatalogTest, TestCreateTableWithBlob) {
ASSERT_EQ(table_names[0], "tbl1");
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> table_schema,
catalog.LoadTableSchema(Identifier("db1", "tbl1")));
ASSERT_TRUE(std::dynamic_pointer_cast<DataSchema>(table_schema) != nullptr);
ASSERT_TRUE(std::dynamic_pointer_cast<TableSchema>(table_schema) != nullptr);
ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema());
auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie();
ASSERT_TRUE(typed_schema.Equals(loaded_schema));
Expand Down Expand Up @@ -358,11 +421,13 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) {
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db0", "tbl0")),
"Identifier{database=\'db0\', table=\'tbl0\'} not exist");
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> table_schema, catalog.LoadTableSchema(identifier));
ASSERT_EQ(0, table_schema->Id());
ASSERT_EQ(3, table_schema->HighestFieldId());
ASSERT_EQ(1, table_schema->PartitionKeys().size());
ASSERT_EQ(0, table_schema->PrimaryKeys().size());
ASSERT_EQ(-1, table_schema->NumBuckets());
auto data_schema = std::dynamic_pointer_cast<DataSchema>(table_schema);
ASSERT_TRUE(data_schema != nullptr);
ASSERT_EQ(0, data_schema->Id());
ASSERT_EQ(3, data_schema->HighestFieldId());
ASSERT_EQ(1, data_schema->PartitionKeys().size());
ASSERT_EQ(0, data_schema->PrimaryKeys().size());
ASSERT_EQ(-1, data_schema->NumBuckets());
ASSERT_FALSE(table_schema->Comment().has_value());
std::vector<std::string> field_names = table_schema->FieldNames();
std::vector<std::string> expected_field_names = {"f0", "f1", "f2", "f3"};
Expand Down Expand Up @@ -396,8 +461,7 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) {
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(identifier),
"Identifier{database=\'db1\', table=\'tbl1\'} not exist");

ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")),
"do not support checking TableSchemaExists for system table.");
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), "not exist");
ArrowSchemaRelease(&schema);
}

Expand Down
Loading
Loading