Skip to content

Commit c79ab59

Browse files
committed
Add metrics
Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent 92bfd28 commit c79ab59

File tree

8 files changed

+85
-50
lines changed

8 files changed

+85
-50
lines changed

dbms/src/Common/CurrentMetrics.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@
8888
M(ConnectionPoolSize) \
8989
M(MemoryTrackingQueryStorageTask) \
9090
M(MemoryTrackingFetchPages) \
91-
M(MemoryTrackingSharedColumnData)
91+
M(MemoryTrackingSharedColumnData) \
92+
M(DT_NumSegments) \
93+
M(DT_NumMemTable) \
94+
M(DT_BytesMemTable)
9295

9396
namespace CurrentMetrics
9497
{

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,6 @@ class ColumnFile
7373
INMEMORY_FILE = 4,
7474
};
7575

76-
struct Cache
77-
{
78-
explicit Cache(const Block & header)
79-
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
80-
{}
81-
explicit Cache(Block && block)
82-
: block(std::move(block))
83-
{}
84-
85-
std::mutex mutex;
86-
Block block;
87-
};
88-
using CachePtr = std::shared_ptr<Cache>;
8976
using ColIdToOffset = std::unordered_map<ColId, size_t>;
9077

9178
public:
@@ -138,7 +125,13 @@ class ColumnFile
138125
/// been persisted in the disk and their data will be immutable.
139126
virtual bool isAppendable() const { return false; }
140127
virtual void disableAppend() {}
141-
virtual bool append(
128+
129+
struct AppendResult
130+
{
131+
bool success = false; // whether the append is successful
132+
size_t new_alloc_bytes = 0; // the new allocated bytes after append
133+
};
134+
virtual AppendResult append(
142135
const DMContext & /*dm_context*/,
143136
const Block & /*data*/,
144137
size_t /*offset*/,

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
4141
// Copy data from cache
4242
const auto & type = getDataType(cd.id);
4343
auto col_data = type->createColumn();
44+
col_data->reserve(rows);
4445
col_data->insertRangeFrom(*(cache->block.getByPosition(col_offset).column), 0, rows);
4546
// Cast if need
4647
auto col_converted = convertColumnByColumnDefineIfNeed(type, std::move(col_data), cd);
@@ -64,24 +65,30 @@ ColumnFileReaderPtr ColumnFileInMemory::getReader(
6465
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
6566
}
6667

67-
bool ColumnFileInMemory::append(
68+
void ColumnFileInMemory::disableAppend()
69+
{
70+
disable_append = true;
71+
// TODO: Call shrinkToFit() to release the extra memory of the cache block.
72+
}
73+
74+
ColumnFile::AppendResult ColumnFileInMemory::append(
6875
const DMContext & context,
6976
const Block & data,
7077
size_t offset,
7178
size_t limit,
7279
size_t data_bytes)
7380
{
7481
if (disable_append)
75-
return false;
82+
return AppendResult{false, 0};
7683

7784
std::scoped_lock lock(cache->mutex);
7885
if (!isSameSchema(cache->block, data))
79-
return false;
86+
return AppendResult{false, 0};
8087

8188
// check whether this instance overflows
8289
if (cache->block.rows() >= context.delta_cache_limit_rows
8390
|| cache->block.bytes() >= context.delta_cache_limit_bytes)
84-
return false;
91+
return AppendResult{false, 0};
8592

8693
size_t new_alloc_block_bytes = 0;
8794
for (size_t i = 0; i < cache->block.columns(); ++i)
@@ -96,15 +103,7 @@ bool ColumnFileInMemory::append(
96103

97104
rows += limit;
98105
bytes += data_bytes;
99-
LOG_INFO(
100-
Logger::get(),
101-
"Append rows to ColumnFileInMemory, new_rows={} new_bytes={} new_alloc_bytes={} total_rows={} total_bytes={}",
102-
limit - offset,
103-
data_bytes,
104-
new_alloc_block_bytes,
105-
rows,
106-
bytes);
107-
return true;
106+
return AppendResult{true, new_alloc_block_bytes};
108107
}
109108

110109
Block ColumnFileInMemory::readDataForFlush() const

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,20 @@ class ColumnFileInMemory : public ColumnFile
3030
friend class ColumnFileInMemoryReader;
3131
friend struct Remote::Serializer;
3232

33+
struct Cache
34+
{
35+
explicit Cache(const Block & header)
36+
: block(header.cloneWithColumns(header.cloneEmptyColumns()))
37+
{}
38+
explicit Cache(Block && block)
39+
: block(std::move(block))
40+
{}
41+
42+
std::mutex mutex;
43+
Block block;
44+
};
45+
using CachePtr = std::shared_ptr<Cache>;
46+
3347
private:
3448
ColumnFileSchemaPtr schema;
3549

@@ -83,9 +97,13 @@ class ColumnFileInMemory : public ColumnFile
8397
ReadTag) const override;
8498

8599
bool isAppendable() const override { return !disable_append; }
86-
void disableAppend() override { disable_append = true; }
87-
bool append(const DMContext & dm_context, const Block & data, size_t offset, size_t limit, size_t data_bytes)
88-
override;
100+
void disableAppend() override;
101+
AppendResult append(
102+
const DMContext & dm_context,
103+
const Block & data,
104+
size_t offset,
105+
size_t limit,
106+
size_t data_bytes) override;
89107

90108
Block readDataForFlush() const;
91109

dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,32 @@
2222
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
2323
#include <Storages/PathPool.h>
2424

25+
namespace CurrentMetrics
26+
{
27+
extern const Metric DT_NumMemTable;
28+
extern const Metric DT_BytesMemTable;
29+
} // namespace CurrentMetrics
30+
2531
namespace DB::DM
2632
{
2733

34+
MemTableSet::MemTableSet(const ColumnFiles & in_memory_files)
35+
: holder_counter(CurrentMetrics::DT_NumMemTable, 1)
36+
, holder_allocated_bytes(CurrentMetrics::DT_BytesMemTable, 0)
37+
, column_files(in_memory_files)
38+
, log(Logger::get())
39+
{
40+
column_files_count = column_files.size();
41+
for (const auto & file : column_files)
42+
{
43+
rows += file->getRows();
44+
bytes += file->getBytes();
45+
allocated_bytes += file->getAllocateBytes();
46+
deletes += file->getDeletes();
47+
}
48+
holder_allocated_bytes.changeTo(allocated_bytes.load());
49+
}
50+
2851
void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
2952
{
3053
if (!column_files.empty())
@@ -182,16 +205,16 @@ void MemTableSet::appendColumnFile(const ColumnFilePtr & column_file)
182205
void MemTableSet::appendToCache(DMContext & context, const Block & block, size_t offset, size_t limit)
183206
{
184207
// If the `column_files` is not empty, and the last `column_file` is a `ColumnInMemoryFile`, we will merge the newly block into the last `column_file`.
185-
bool success = false;
208+
ColumnFile::AppendResult append_res;
186209
size_t append_bytes = block.bytes(offset, limit);
187210
if (!column_files.empty())
188211
{
189212
auto & last_column_file = column_files.back();
190213
if (last_column_file->isAppendable())
191-
success = last_column_file->append(context, block, offset, limit, append_bytes);
214+
append_res = last_column_file->append(context, block, offset, limit, append_bytes);
192215
}
193216

194-
if (!success)
217+
if (!append_res.success)
195218
{
196219
/// Otherwise, create a new `ColumnInMemoryFile` and write into it.
197220

@@ -202,13 +225,13 @@ void MemTableSet::appendToCache(DMContext & context, const Block & block, size_t
202225
// Must append the empty `new_column_file` to `column_files` before appending data to it,
203226
// because `appendColumnFileInner` will update stats related to `column_files` but we will update stats relate to `new_column_file` here.
204227
appendColumnFileInner(new_column_file);
205-
success = new_column_file->append(context, block, offset, limit, append_bytes);
206-
if (unlikely(!success))
228+
append_res = new_column_file->append(context, block, offset, limit, append_bytes);
229+
if (unlikely(!append_res.success))
207230
throw Exception("Write to MemTableSet failed", ErrorCodes::LOGICAL_ERROR);
208231
}
209232
rows += limit;
210233
bytes += append_bytes;
211-
234+
allocated_bytes += append_res.new_alloc_bytes;
212235
}
213236

214237
void MemTableSet::appendDeleteRange(const RowKeyRange & delete_range)

dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ class MemTableSet
3737
#else
3838
public:
3939
#endif
40+
// Keep track of the number of mem-table in memory.
41+
CurrentMetrics::Increment holder_counter;
42+
CurrentMetrics::Increment holder_allocated_bytes;
43+
4044
// Note that we must update `column_files_count` for outer thread-safe after `column_files` changed
4145
ColumnFiles column_files;
4246

@@ -54,18 +58,7 @@ class MemTableSet
5458
void appendColumnFileInner(const ColumnFilePtr & column_file);
5559

5660
public:
57-
explicit MemTableSet(const ColumnFiles & in_memory_files = {})
58-
: column_files(in_memory_files)
59-
, log(Logger::get())
60-
{
61-
column_files_count = column_files.size();
62-
for (const auto & file : column_files)
63-
{
64-
rows += file->getRows();
65-
bytes += file->getBytes();
66-
deletes += file->getDeletes();
67-
}
68-
}
61+
explicit MemTableSet(const ColumnFiles & in_memory_files = {});
6962

7063
/**
7164
* Resets the logger by using the one from the segment.

dbms/src/Storages/DeltaMerge/Segment.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ extern const Metric DT_SnapshotOfPlaceIndex;
128128
extern const Metric DT_SnapshotOfReplayVersionChain;
129129
extern const Metric DT_SnapshotOfSegmentIngest;
130130
extern const Metric DT_SnapshotOfBitmapFilter;
131+
extern const Metric DT_NumSegments;
131132
} // namespace CurrentMetrics
132133

133134
namespace DB
@@ -295,7 +296,8 @@ Segment::Segment( //
295296
PageIdU64 next_segment_id_,
296297
const DeltaValueSpacePtr & delta_,
297298
const StableValueSpacePtr & stable_)
298-
: epoch(epoch_)
299+
: holder_counter(CurrentMetrics::NumSegments)
300+
, epoch(epoch_)
299301
, rowkey_range(rowkey_range_)
300302
, is_common_handle(rowkey_range.is_common_handle)
301303
, rowkey_column_size(rowkey_range.rowkey_column_size)

dbms/src/Storages/DeltaMerge/Segment.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,10 @@ class Segment
837837
#else
838838
public:
839839
#endif
840+
841+
// Keep track of the number of segments in memory.
842+
CurrentMetrics::Increment holder_counter;
843+
840844
/// The version of this segment. After split / merge / mergeDelta / replaceData, epoch got increased by 1.
841845
const UInt64 epoch;
842846

0 commit comments

Comments
 (0)