Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ struct PAIMON_EXPORT Options {
static const char SPILL_COMPRESSION_ZSTD_LEVEL[];
/// "cache-page-size" - Memory page size for caching. Default value is 64 kb.
static const char CACHE_PAGE_SIZE[];
/// "file.format.per.level" - Define different file format for different level, you can add the
/// conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level
/// is not provided, the default format which set by FILE_FORMAT will be used.
static const char FILE_FORMAT_PER_LEVEL[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ set(PAIMON_CORE_SRCS
core/mergetree/compact/partial_update_merge_function.cpp
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp
core/mergetree/compact/changelog_merge_tree_rewriter.cpp
core/mergetree/merge_tree_writer.cpp
core/mergetree/levels.cpp
core/mergetree/lookup_levels.cpp
Expand Down Expand Up @@ -560,7 +562,10 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/aggregate/field_sum_agg_test.cpp
core/mergetree/compact/deduplicate_merge_function_test.cpp
core/mergetree/compact/first_row_merge_function_test.cpp
core/mergetree/compact/first_row_merge_function_wrapper_test.cpp
core/mergetree/compact/interval_partition_test.cpp
core/mergetree/compact/lookup_changelog_merge_function_wrapper_test.cpp
core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp
core/mergetree/compact/lookup_merge_function_test.cpp
core/mergetree/compact/partial_update_merge_function_test.cpp
core/mergetree/compact/reducer_merge_function_wrapper_test.cpp
Expand Down Expand Up @@ -621,6 +626,7 @@ if(PAIMON_BUILD_TESTS)
core/table/source/table_scan_test.cpp
core/tag/tag_test.cpp
core/utils/branch_manager_test.cpp
core/utils/file_store_path_factory_cache_test.cpp
core/utils/field_mapping_test.cpp
core/utils/fields_comparator_test.cpp
core/utils/file_store_path_factory_test.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,38 @@ TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) {
}
}

TEST(RowCompactedSerializerTest, TestListType) {
auto pool = GetDefaultPool();
// prepare data
auto inner_child1 = arrow::field("inner1", arrow::list(arrow::int32()));
auto arrow_type = arrow::struct_({inner_child1});
// each inner child per row
std::shared_ptr<arrow::Array> array = arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
R"([
[[5, 6, 7]],
[[1, 2, 3]],
[[4]]
])")
.ValueOrDie();
auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
ASSERT_TRUE(struct_array);
auto columnar_row =
std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, struct_array->fields(), pool,
/*row_id=*/0);

// serialize and deserialize
ASSERT_OK_AND_ASSIGN(auto serializer,
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
ASSERT_OK_AND_ASSIGN(auto bytes, serializer->SerializeToBytes(*columnar_row));
ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));

// check result
ASSERT_EQ(row->GetFieldCount(), 1);

// for inner_child1
ASSERT_EQ(row->GetArray(0)->ToIntArray().value(), std::vector<int32_t>({5, 6, 7}));
}

TEST(RowCompactedSerializerTest, TestSliceComparator) {
auto pool = GetDefaultPool();
arrow::FieldVector fields = {
Expand Down
1 change: 1 addition & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ const char Options::LOOKUP_CACHE_BLOOM_FILTER_FPP[] = "lookup.cache.bloom.filter
const char Options::LOOKUP_CACHE_SPILL_COMPRESSION[] = "lookup.cache-spill-compression";
const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level";
const char Options::CACHE_PAGE_SIZE[] = "cache-page-size";
const char Options::FILE_FORMAT_PER_LEVEL[] = "file.format.per.level";
} // namespace paimon
6 changes: 5 additions & 1 deletion src/paimon/common/io/cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ std::shared_ptr<MemorySegment> CacheManager::GetPage(
auto ptr = std::make_shared<MemorySegment>(segment);
return std::make_shared<CacheValue>(ptr);
};
return cache->Get(key, supplier)->GetSegment();
auto cache_value = cache->Get(key, supplier);
if (!cache_value) {
return nullptr;
}
return cache_value->GetSegment();
}

void CacheManager::InvalidPage(const std::shared_ptr<CacheKey>& key) {
Expand Down
12 changes: 12 additions & 0 deletions src/paimon/common/sst/sst_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ Result<std::shared_ptr<SstFileReader>> SstFileReader::Create(
auto trailer_data =
block_cache->GetBlock(index_block_handle->Offset() + index_block_handle->Size(),
BlockTrailer::ENCODED_LENGTH, true);
if (!trailer_data) {
return Status::Invalid("Read trailer error");
}
auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput();
auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input);
auto block_data =
block_cache->GetBlock(index_block_handle->Offset(), index_block_handle->Size(), true);
if (!block_data) {
return Status::Invalid("Read block error");
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<MemorySegment> uncompressed_data,
DecompressBlock(block_data, trailer, pool));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BlockReader> reader,
Expand Down Expand Up @@ -124,9 +130,15 @@ Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(
const std::shared_ptr<BlockHandle>& handle, bool index) {
auto trailer_data = block_cache_->GetBlock(handle->Offset() + handle->Size(),
BlockTrailer::ENCODED_LENGTH, true);
if (!trailer_data) {
return Status::Invalid("Read trailer failed");
}
auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput();
auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input);
auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index);
if (!block_data) {
return Status::Invalid("Read block failed");
}
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<MemorySegment> uncompressed_data,
DecompressBlock(block_data, trailer, pool_));
return BlockReader::Create(MemorySlice::Wrap(uncompressed_data), comparator_);
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/append/append_only_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetDataFileWriterCre
::ArrowSchema arrow_schema;
ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); });
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema));
auto format = options_.GetWriteFileFormat();
auto format = options_.GetFileFormat();
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<WriterBuilder> writer_builder,
format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize()));
Expand Down
51 changes: 46 additions & 5 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class ConfigParser {
std::string normalized_value = StringUtils::ToLowerCase(iter->second);
PAIMON_ASSIGN_OR_RAISE(*value, Factory::Get(normalized_value, config_map_));
} else {
PAIMON_ASSIGN_OR_RAISE(*value, Factory::Get(default_identifier, config_map_));
PAIMON_ASSIGN_OR_RAISE(
*value, Factory::Get(StringUtils::ToLowerCase(default_identifier), config_map_));
}
return Status::OK();
}
Expand Down Expand Up @@ -336,6 +337,7 @@ struct CoreOptions::Impl {
double lookup_cache_bloom_filter_fpp = 0.05;
CompressOptions lookup_compress_options{"zstd", 1};
int64_t cache_page_size = 64 * 1024; // 64KB
std::map<int32_t, std::shared_ptr<FileFormat>> file_format_per_level;
};

// Parse configurations from a map and return a populated CoreOptions object
Expand Down Expand Up @@ -596,6 +598,32 @@ Result<CoreOptions> CoreOptions::FromMap(
// Parse cache-page-size
PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::CACHE_PAGE_SIZE, &impl->cache_page_size));

// parse file.format.per.level
std::string file_format_per_level_str;
PAIMON_RETURN_NOT_OK(
parser.ParseString(Options::FILE_FORMAT_PER_LEVEL, &file_format_per_level_str));
if (!file_format_per_level_str.empty()) {
auto level2format =
StringUtils::Split(file_format_per_level_str, std::string(","), std::string(":"));
for (const auto& single_level : level2format) {
if (single_level.size() != 2) {
return Status::Invalid(
fmt::format("fail to parse key {}, value {} (usage example: 0:avro,3:parquet)",
Options::FILE_FORMAT_PER_LEVEL, file_format_per_level_str));
}
auto level = StringUtils::StringToValue<int32_t>(single_level[0]);
if (!level || level.value() < 0) {
return Status::Invalid(
fmt::format("fail to parse level {} from string to int in {}", single_level[0],
Options::FILE_FORMAT_PER_LEVEL));
}
std::shared_ptr<FileFormat> file_format;
PAIMON_RETURN_NOT_OK(parser.ParseObject<FileFormatFactory>(
"_no_use", /*default_identifier=*/single_level[1], &file_format));
impl->file_format_per_level[level.value()] = file_format;
}
}

return options;
}

Expand All @@ -617,7 +645,15 @@ int32_t CoreOptions::GetBucket() const {
return impl_->bucket;
}

std::shared_ptr<FileFormat> CoreOptions::GetWriteFileFormat() const {
std::shared_ptr<FileFormat> CoreOptions::GetWriteFileFormat(int32_t level) const {
auto iter = impl_->file_format_per_level.find(level);
if (iter != impl_->file_format_per_level.end()) {
return iter->second;
}
return impl_->file_format;
}

std::shared_ptr<FileFormat> CoreOptions::GetFileFormat() const {
return impl_->file_format;
}

Expand Down Expand Up @@ -807,9 +843,13 @@ const std::map<std::string, std::string>& CoreOptions::ToMap() const {
}

bool CoreOptions::NeedLookup() const {
return GetMergeEngine() == MergeEngine::FIRST_ROW ||
GetChangelogProducer() == ChangelogProducer::LOOKUP || DeletionVectorsEnabled() ||
impl_->force_lookup;
return GetLookupStrategy().need_lookup;
}

LookupStrategy CoreOptions::GetLookupStrategy() const {
return {GetMergeEngine() == MergeEngine::FIRST_ROW,
GetChangelogProducer() == ChangelogProducer::LOOKUP, DeletionVectorsEnabled(),
impl_->force_lookup};
}

bool CoreOptions::CompactionForceRewriteAllFiles() const {
Expand Down Expand Up @@ -966,4 +1006,5 @@ const CompressOptions& CoreOptions::GetLookupCompressOptions() const {
int32_t CoreOptions::GetCachePageSize() const {
return static_cast<int32_t>(impl_->cache_page_size);
}

} // namespace paimon
5 changes: 4 additions & 1 deletion src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "paimon/core/options/changelog_producer.h"
#include "paimon/core/options/compress_options.h"
#include "paimon/core/options/external_path_strategy.h"
#include "paimon/core/options/lookup_strategy.h"
#include "paimon/core/options/merge_engine.h"
#include "paimon/core/options/sort_engine.h"
#include "paimon/format/file_format.h"
Expand All @@ -53,7 +54,8 @@ class PAIMON_EXPORT CoreOptions {
~CoreOptions();

int32_t GetBucket() const;
std::shared_ptr<FileFormat> GetWriteFileFormat() const;
std::shared_ptr<FileFormat> GetFileFormat() const;
std::shared_ptr<FileFormat> GetWriteFileFormat(int32_t level) const;
std::shared_ptr<FileSystem> GetFileSystem() const;
const std::string& GetFileCompression() const;
int32_t GetFileCompressionZstdLevel() const;
Expand Down Expand Up @@ -101,6 +103,7 @@ class PAIMON_EXPORT CoreOptions {
int64_t DeletionVectorTargetFileSize() const;
ChangelogProducer GetChangelogProducer() const;
bool NeedLookup() const;
LookupStrategy GetLookupStrategy() const;
bool FileIndexReadEnabled() const;

std::map<std::string, std::string> GetFieldsSequenceGroups() const;
Expand Down
29 changes: 25 additions & 4 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ namespace paimon::test {
TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({}));
ASSERT_EQ(core_options.GetManifestFormat()->Identifier(), "avro");
ASSERT_EQ(core_options.GetWriteFileFormat()->Identifier(), "parquet");
ASSERT_EQ(core_options.GetFileFormat()->Identifier(), "parquet");
ASSERT_EQ(core_options.GetWriteFileFormat(0)->Identifier(), "parquet");
ASSERT_EQ(core_options.GetWriteFileFormat(3)->Identifier(), "parquet");
ASSERT_TRUE(core_options.GetFileSystem());
ASSERT_EQ(-1, core_options.GetBucket());
ASSERT_EQ(64 * 1024L, core_options.GetPageSize());
Expand Down Expand Up @@ -82,6 +84,10 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize());
ASSERT_EQ(ChangelogProducer::NONE, core_options.GetChangelogProducer());
ASSERT_FALSE(core_options.NeedLookup());
LookupStrategy expected_lookup_strategy = {/*is_first_row=*/false,
/*produce_changelog=*/false,
/*deletion_vector=*/false, /*force_lookup=*/false};
ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy());
ASSERT_TRUE(core_options.GetFieldsSequenceGroups().empty());
ASSERT_FALSE(core_options.PartialUpdateRemoveRecordOnDelete());
ASSERT_TRUE(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup().empty());
Expand Down Expand Up @@ -185,14 +191,17 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::LOOKUP_CACHE_BLOOM_FILTER_FPP, "0.5"},
{Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"},
{Options::SPILL_COMPRESSION_ZSTD_LEVEL, "2"},
{Options::CACHE_PAGE_SIZE, "6MB"}};
{Options::CACHE_PAGE_SIZE, "6MB"},
{Options::FILE_FORMAT_PER_LEVEL, "0:AVRO,3:parquet"}};

ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
auto fs = core_options.GetFileSystem();
ASSERT_TRUE(fs);

auto format = core_options.GetWriteFileFormat();
ASSERT_EQ(format->Identifier(), "orc");
ASSERT_EQ(core_options.GetFileFormat()->Identifier(), "orc");
ASSERT_EQ(core_options.GetWriteFileFormat(0)->Identifier(), "avro");
ASSERT_EQ(core_options.GetWriteFileFormat(1)->Identifier(), "orc");
ASSERT_EQ(core_options.GetWriteFileFormat(3)->Identifier(), "parquet");

auto manifest_format = core_options.GetManifestFormat();
ASSERT_EQ(manifest_format->Identifier(), "avro");
Expand Down Expand Up @@ -235,6 +244,11 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ(4 * 1024 * 1024, core_options.DeletionVectorTargetFileSize());
ASSERT_EQ(ChangelogProducer::FULL_COMPACTION, core_options.GetChangelogProducer());
ASSERT_TRUE(core_options.NeedLookup());
LookupStrategy expected_lookup_strategy = {/*is_first_row=*/false,
/*produce_changelog=*/false,
/*deletion_vector=*/true, /*force_lookup=*/true};
ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy());

std::map<std::string, std::string> seq_grp;
seq_grp["g_1,g_3"] = "c,d";
ASSERT_EQ(core_options.GetFieldsSequenceGroups(), seq_grp);
Expand Down Expand Up @@ -294,6 +308,13 @@ TEST(CoreOptionsTest, TestInvalidCase) {
"invalid changelog producer: invalid");
}

TEST(CoreOptionsTest, TestInvalidFileFormatPerLevel) {
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_FORMAT_PER_LEVEL, "0:AVRO:parquet"}}),
"fail to parse key file.format.per.level, value 0:AVRO:parquet");
ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_FORMAT_PER_LEVEL, "aaa:avro"}}),
"fail to parse level aaa from string to int in file.format.per.level");
}

TEST(CoreOptionsTest, TestCreateExternalPath) {
std::map<std::string, std::string> options = {
{Options::DATA_FILE_EXTERNAL_PATHS,
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/global_index/global_index_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Status GlobalIndexScanImpl::Scan() {
path_factory_,
FileStorePathFactory::Create(
root_path_, arrow_schema, table_schema_->PartitionKeys(),
options_.GetPartitionDefaultName(), options_.GetWriteFileFormat()->Identifier(),
options_.GetPartitionDefaultName(), options_.GetFileFormat()->Identifier(),
options_.DataFilePrefix(), options_.LegacyPartitionNameEnabled(), external_paths,
global_index_external_path, options_.IndexFileInDataFileDir(), pool_));

Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/global_index/global_index_write_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Result<std::shared_ptr<GlobalIndexFileManager>> CreateGlobalIndexFileManager(
std::shared_ptr<FileStorePathFactory> path_factory,
FileStorePathFactory::Create(
table_path, all_arrow_schema, table_schema->PartitionKeys(),
core_options.GetPartitionDefaultName(), core_options.GetWriteFileFormat()->Identifier(),
core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(),
core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(),
external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(),
pool));
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/core/io/single_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TEST(SingleFileWriterTest, TestSimple) {
ASSERT_OK_AND_ASSIGN(
CoreOptions options,
CoreOptions::FromMap({{Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, "orc"}}));
auto file_format = options.GetWriteFileFormat();
auto file_format = options.GetWriteFileFormat(/*level=*/0);
auto file_system = options.GetFileSystem();
ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportType(*data_type, &arrow_schema).ok());
Expand Down Expand Up @@ -91,7 +91,7 @@ TEST(SingleFileWriterTest, TestInvalidConvert) {
ASSERT_OK_AND_ASSIGN(
CoreOptions options,
CoreOptions::FromMap({{Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, "orc"}}));
auto file_format = options.GetWriteFileFormat();
auto file_format = options.GetWriteFileFormat(/*level=*/0);
auto file_system = options.GetFileSystem();
ArrowSchema arrow_schema;
ASSERT_TRUE(arrow::ExportType(*data_type, &arrow_schema).ok());
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/key_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct KeyValue {
KeyValue() = default;

KeyValue(const RowKind* _value_kind, int64_t _sequence_number, int32_t _level,
std::shared_ptr<InternalRow>&& _key, std::unique_ptr<InternalRow>&& _value)
std::shared_ptr<InternalRow> _key, std::unique_ptr<InternalRow> _value)
: value_kind(_value_kind),
sequence_number(_sequence_number),
level(_level),
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/manifest/manifest_entry_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ManifestEntryWriterTest : public ::testing::Test {

EXPECT_OK_AND_ASSIGN(CoreOptions options,
CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}}));
auto file_format = options.GetWriteFileFormat();
auto file_format = options.GetWriteFileFormat(/*level=*/0);
std::shared_ptr<arrow::DataType> data_type =
VersionedObjectSerializer<ManifestEntry>::VersionType(ManifestEntry::DataType());
ArrowSchema arrow_schema;
Expand Down
Loading
Loading