@@ -26,26 +26,72 @@ namespace CurrentMetrics
2626{
2727extern const Metric DT_NumMemTable;
2828extern const Metric DT_BytesMemTable;
29+ extern const Metric DT_BytesMemTableAllocated;
2930} // namespace CurrentMetrics
3031
3132namespace DB ::DM
3233{
3334
35+ // / Member functions of MemTableSet::Statistic ///
36+
37+ MemTableSet::Statistic::Statistic ()
38+ : holder_bytes(CurrentMetrics::DT_BytesMemTable, 0 )
39+ , holder_allocated_bytes(CurrentMetrics::DT_BytesMemTableAllocated, 0 )
40+ {}
41+
42+ void MemTableSet::Statistic::append (
43+ size_t rows_added,
44+ size_t bytes_added,
45+ size_t allocated_bytes_added,
46+ size_t deletes_added,
47+ size_t files_added)
48+ {
49+ column_files_count += files_added;
50+ rows += rows_added;
51+ bytes += bytes_added;
52+ allocated_bytes += allocated_bytes_added;
53+ deletes += deletes_added;
54+ // update the current metrics
55+ holder_bytes.changeTo (bytes.load ());
56+ holder_allocated_bytes.changeTo (allocated_bytes.load ());
57+ }
58+
59+ void MemTableSet::Statistic::resetTo (
60+ size_t new_column_files_count,
61+ size_t new_rows,
62+ size_t new_bytes,
63+ size_t new_allocated_bytes,
64+ size_t new_deletes)
65+ {
66+ column_files_count = new_column_files_count;
67+ rows = new_rows;
68+ bytes = new_bytes;
69+ allocated_bytes = new_allocated_bytes;
70+ deletes = new_deletes;
71+ // update the current metrics
72+ holder_bytes.changeTo (bytes.load ());
73+ holder_allocated_bytes.changeTo (allocated_bytes.load ());
74+ }
75+
76+ // / Member functions of MemTableSet ///
77+
3478MemTableSet::MemTableSet (const ColumnFiles & in_memory_files)
3579 : holder_counter(CurrentMetrics::DT_NumMemTable, 1 )
36- , holder_allocated_bytes(CurrentMetrics::DT_BytesMemTable, 0 )
3780 , column_files(in_memory_files)
3881 , log(Logger::get())
3982{
40- column_files_count = column_files.size ();
83+ size_t new_rows = 0 ;
84+ size_t new_bytes = 0 ;
85+ size_t new_alloc_bytes = 0 ;
86+ size_t new_deletes = 0 ;
4187 for (const auto & file : column_files)
4288 {
43- rows += file->getRows ();
44- bytes += file->getBytes ();
45- allocated_bytes += file->getAllocateBytes ();
46- deletes += file->getDeletes ();
89+ new_rows += file->getRows ();
90+ new_bytes += file->getBytes ();
91+ new_alloc_bytes += file->getAllocateBytes ();
92+ new_deletes += file->getDeletes ();
4793 }
48- holder_allocated_bytes. changeTo (allocated_bytes. load () );
94+ stat. resetTo (column_files. size (), new_rows, new_bytes, new_alloc_bytes, new_deletes );
4995}
5096
5197void MemTableSet::appendColumnFileInner (const ColumnFilePtr & column_file)
@@ -60,12 +106,12 @@ void MemTableSet::appendColumnFileInner(const ColumnFilePtr & column_file)
60106 }
61107
62108 column_files.push_back (column_file);
63- column_files_count = column_files. size ();
64-
65- rows += column_file->getRows ();
66- bytes += column_file->getBytes ();
67- allocated_bytes += column_file->getAllocateBytes ();
68- deletes += column_file-> getDeletes ( );
109+ stat. append (
110+ column_file-> getRows (),
111+ column_file->getBytes (),
112+ column_file->getAllocateBytes (),
113+ column_file->getDeletes (),
114+ /* files_added= */ 1 );
69115}
70116
71117std::pair</* New */ ColumnFiles, /* Flushed */ ColumnFiles> MemTableSet::diffColumnFiles (
@@ -229,9 +275,13 @@ void MemTableSet::appendToCache(DMContext & context, const Block & block, size_t
229275 if (unlikely (!append_res.success ))
230276 throw Exception (" Write to MemTableSet failed" , ErrorCodes::LOGICAL_ERROR);
231277 }
232- rows += limit;
233- bytes += append_bytes;
234- allocated_bytes += append_res.new_alloc_bytes ;
278+
279+ stat.append ( //
280+ limit,
281+ append_bytes,
282+ append_res.new_alloc_bytes ,
283+ /* deletes_added*/ 0 ,
284+ /* files_added*/ 0 );
235285}
236286
237287void MemTableSet::appendDeleteRange (const RowKeyRange & delete_range)
@@ -294,13 +344,18 @@ ColumnFileSetSnapshotPtr MemTableSet::createSnapshot(
294344 // This may indicate that you forget to acquire a lock -- there are modifications
295345 // while this function is still running...
296346 RUNTIME_CHECK (
297- total_rows == rows && total_deletes == deletes,
347+ total_rows == stat. rows && total_deletes == stat. deletes ,
298348 total_rows,
299- rows.load (),
349+ stat. rows .load (),
300350 total_deletes,
301- deletes.load ());
302-
303- return std::make_shared<ColumnFileSetSnapshot>(data_provider, std::move (column_files_snap), rows, bytes, deletes);
351+ stat.deletes .load ());
352+
353+ return std::make_shared<ColumnFileSetSnapshot>(
354+ data_provider,
355+ std::move (column_files_snap),
356+ stat.rows ,
357+ stat.bytes ,
358+ stat.deletes );
304359}
305360
306361ColumnFileFlushTaskPtr MemTableSet::buildFlushTask (
@@ -333,16 +388,16 @@ ColumnFileFlushTaskPtr MemTableSet::buildFlushTask(
333388 cur_rows_offset += column_file->getRows ();
334389 cur_deletes_offset += column_file->getDeletes ();
335390 }
336- if (unlikely (flush_task->getFlushRows () != rows || flush_task->getFlushDeletes () != deletes))
391+ if (unlikely (flush_task->getFlushRows () != stat. rows || flush_task->getFlushDeletes () != stat. deletes ))
337392 {
338393 LOG_ERROR (
339394 log,
340395 " Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Column "
341396 " Files: {}" ,
342397 flush_task->getFlushRows (),
343398 flush_task->getFlushDeletes (),
344- rows.load (),
345- deletes.load (),
399+ stat. rows .load (),
400+ stat. deletes .load (),
346401 ColumnFile::filesToString (column_files));
347402 throw Exception (" Rows and deletes check failed." , ErrorCodes::LOGICAL_ERROR);
348403 }
@@ -378,11 +433,12 @@ void MemTableSet::removeColumnFilesInFlushTask(const ColumnFileFlushTask & flush
378433 new_deletes += column_file->getDeletes ();
379434 }
380435 column_files.swap (new_column_files);
381- column_files_count = column_files.size ();
382- rows = new_rows;
383- bytes = new_bytes;
384- allocated_bytes = new_alloc_bytes;
385- deletes = new_deletes;
436+ stat.resetTo ( //
437+ new_column_files.size (),
438+ new_rows,
439+ new_bytes,
440+ new_alloc_bytes,
441+ new_deletes);
386442}
387443
388444
0 commit comments