diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index f81c0bdc6..80761b0be 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -37,6 +37,7 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" +#include "paimon/core/utils/duration.h" #include "paimon/format/parquet/parquet_field_id_converter.h" #include "paimon/format/parquet/parquet_format_defs.h" #include "paimon/format/parquet/parquet_timestamp_converter.h" @@ -151,6 +152,9 @@ Status ParquetFileBatchReader::SetReadSchema( read_row_groups_ = row_groups; read_column_indices_ = column_indices; + metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups()); + metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_FILTERED, row_groups.size()); + PAIMON_ASSIGN_OR_RAISE(std::set ordered_row_groups, reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_)); return reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_); @@ -221,6 +225,7 @@ Result> ParquetFileBatchReader::FilterRowGroupsByBitmap( } Result ParquetFileBatchReader::NextBatch() { + Duration timer; PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, reader_->Next()); if (batch == nullptr) { return BatchReader::MakeEofBatch(); @@ -243,6 +248,14 @@ Result ParquetFileBatchReader::NextBatch() { std::unique_ptr c_array = std::make_unique(); std::unique_ptr c_schema = std::make_unique(); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + + read_rows_ += array->length(); + read_batch_count_++; + read_next_batch_latency_ms_ += timer.Get(); + metrics_->SetCounter(ParquetMetrics::READ_ROWS, read_rows_); + metrics_->SetCounter(ParquetMetrics::READ_BATCH_COUNT, read_batch_count_); + metrics_->SetCounter(ParquetMetrics::READ_NEXT_BATCH_LATENCY_MS, read_next_batch_latency_ms_); + return make_pair(std::move(c_array), std::move(c_schema)); } diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 81fb2b8dc..7d18bafe7 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -174,6 +174,10 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { std::shared_ptr metrics_; + uint64_t read_rows_ = 0; + uint64_t read_batch_count_ = 0; + uint64_t read_next_batch_latency_ms_ = 0; + // last time set read schema std::vector read_row_groups_; std::vector read_column_indices_; diff --git a/src/paimon/format/parquet/parquet_format_defs.h b/src/paimon/format/parquet/parquet_format_defs.h index 9022dfcf5..be1f7dc1a 100644 --- a/src/paimon/format/parquet/parquet_format_defs.h +++ b/src/paimon/format/parquet/parquet_format_defs.h @@ -61,6 +61,13 @@ static constexpr uint32_t DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT = 512; class ParquetMetrics { public: static inline const char WRITE_RECORD_COUNT[] = "parquet.write.record.count"; + + // read + static inline const char READ_ROW_GROUPS_TOTAL[] = "parquet.read.row_groups.total"; + static inline const char READ_ROW_GROUPS_FILTERED[] = "parquet.read.row_groups.filtered"; + static inline const char READ_ROWS[] = "parquet.read.rows"; + static inline const char READ_BATCH_COUNT[] = "parquet.read.batch.count"; + static inline const char READ_NEXT_BATCH_LATENCY_MS[] = "parquet.read.next_batch.latency.ms"; }; } // namespace paimon::parquet