Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto allocation_size = std::min(data.size(), checksum_frame_size);
auto file = std::make_shared<MemoryRandomAccessFile>(file_name, std::forward<String>(data));
auto allocation_size = std::min(estimated_size, checksum_frame_size);
switch (checksum_algorithm)
{
case ChecksumAlgo::None:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder
static std::unique_ptr<ReadBufferFromFileBase> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,11 @@ std::unique_ptr<LegacyCompressedReadBufferFromFile> CompressedReadBufferFromFile
std::unique_ptr<CompressedReadBufferFromFile> CompressedReadBufferFromFileBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto file_in = ChecksumReadBufferBuilder::build(
std::move(data),
file_name,
estimated_size,
checksum_algorithm,
checksum_frame_size);
auto file_in
= ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size);
return std::make_unique<CompressedReadBufferFromFileImpl<false>>(std::move(file_in));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder
static std::unique_ptr<CompressedReadBufferFromFile> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);

Expand Down
26 changes: 14 additions & 12 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ int benchEntry(const std::vector<std::string> & opts)
opt.emplace(std::map<std::string, std::string>{}, frame, algorithm);
if (version == 2)
{
// frame checksum
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3;
}
else if (version == 3)
Expand All @@ -443,21 +444,21 @@ int benchEntry(const std::vector<std::string> & opts)
= genBlocks(random_seed, num_rows, num_cols, field, sparse_ratio, logger);
}

size_t write_cost_ms = 0;
TableID table_id = 1;
auto settings = DB::Settings();
auto db_context = env.getContext();
auto path_pool
= std::make_shared<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1");
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, table_id, *path_pool, "test.t1");
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = DB::DM::DMContext::createUnique(
*db_context,
path_pool,
storage_pool,
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
table_id,
/*pk_col_id*/ 0,
false,
1,
Expand All @@ -469,6 +470,7 @@ int benchEntry(const std::vector<std::string> & opts)
// Write
if (write_repeat > 0)
{
size_t write_cost_ms = 0;
LOG_INFO(logger, "start writing");
for (size_t i = 0; i < write_repeat; ++i)
{
Expand All @@ -489,15 +491,17 @@ int benchEntry(const std::vector<std::string> & opts)
write_cost_ms += duration;
LOG_INFO(logger, "attempt {} finished in {} ms", i, duration);
}
size_t effective_size_on_disk = dmfile->getBytesOnDisk();
LOG_INFO(
logger,
"average write time: {} ms",
(static_cast<double>(write_cost_ms) / static_cast<double>(repeat)));
LOG_INFO(
logger,
"throughput (MB/s): {:.3f}",
(static_cast<double>(effective_size) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(write_cost_ms) / 1024 / 1024));
"write throughput by uncompressed size: {:.3f}MiB/s;"
" write throughput by compressed size: {:.3f}MiB/s",
(effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024));
}

// Read
Expand Down Expand Up @@ -548,12 +552,10 @@ int benchEntry(const std::vector<std::string> & opts)
LOG_INFO(logger, "average read time: {} ms", (static_cast<double>(read_cost_ms) / static_cast<double>(repeat)));
LOG_INFO(
logger,
"throughput by deserialized bytes (MB/s): {:.3f}"
" throughput by compressed bytes (MB/s): {:.3f}",
(static_cast<double>(effective_size_read) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(read_cost_ms) / 1024 / 1024),
(static_cast<double>(effective_size_on_disk) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(read_cost_ms) / 1024 / 1024));
"read throughput by uncompressed bytes: {:.3f}MiB/s;"
" read throughput by compressed bytes: {:.3f}MiB/s",
(effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024));
}
catch (const boost::wrapexcept<boost::bad_any_cast> & e)
{
Expand Down
47 changes: 23 additions & 24 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,33 @@ class MarkLoader
col_mark_fname);

const auto & merged_file_info = info_iter->second;
auto file_path = dmfile_meta->mergedPath(merged_file_info.number);
auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number);
auto offset = merged_file_info.offset;
auto data_size = merged_file_info.size;
const auto file_path = dmfile_meta->mergedPath(merged_file_info.number);
const auto offset = merged_file_info.offset;
const auto data_size = merged_file_info.size;

if (data_size == 0)
return res;

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
encrypt_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile_meta->encryptionMergedPath(merged_file_info.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(data_size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
auto buf = ChecksumReadBufferBuilder::build(
std::move(raw_data),
file_path, // just for debug, the buffer is part of the merged file
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
buf->readBig(reinterpret_cast<char *>(res->data()), bytes_size);
Expand Down Expand Up @@ -234,9 +233,10 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
{
const auto * dmfile_meta = typeid_cast<const DMFileMetaV2 *>(reader.dmfile->meta.get());
assert(dmfile_meta != nullptr);
const auto & info = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base));
if (info == dmfile_meta->merged_sub_file_infos.end())
const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base));
if (info_iter == dmfile_meta->merged_sub_file_infos.end())
{
// Not merged into merged file, read from the original data file.
return CompressedReadBufferFromFileBuilder::build(
reader.file_provider,
reader.dmfile->colDataPath(file_name_base),
Expand All @@ -247,32 +247,31 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
reader.dmfile->getConfiguration()->getChecksumFrameLength());
}

assert(info != dmfile_meta->merged_sub_file_infos.end());
auto file_path = dmfile_meta->mergedPath(info->second.number);
auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto size = info->second.size;
assert(info_iter != dmfile_meta->merged_sub_file_infos.end());
auto file_path = dmfile_meta->mergedPath(info_iter->second.number);
const auto offset = info_iter->second.offset;
const auto data_size = info_iter->second.size;

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
encrypt_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile_meta->encryptionMergedPath(info_iter->second.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(size);
buffer.read(reinterpret_cast<char *>(raw_data.data()), size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
return CompressedReadBufferFromFileBuilder::build(
std::move(raw_data),
file_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ class DMFile : private boost::noncopyable
friend class DMFileLocalIndexWriter;
friend class DMFileReader;
friend class MarkLoader;
friend class MinMaxIndexLoader;
friend class ColumnReadStream;
friend class DMFilePackFilter;
friend class DMFileBlockInputStreamBuilder;
Expand Down
Loading