Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "paimon/common/metrics/metrics_impl.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/options_utils.h"
#include "paimon/core/utils/duration.h"
#include "paimon/format/parquet/parquet_field_id_converter.h"
#include "paimon/format/parquet/parquet_format_defs.h"
#include "paimon/format/parquet/parquet_timestamp_converter.h"
Expand Down Expand Up @@ -151,6 +152,9 @@ Status ParquetFileBatchReader::SetReadSchema(
read_row_groups_ = row_groups;
read_column_indices_ = column_indices;

metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups());
metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_FILTERED, row_groups.size());

PAIMON_ASSIGN_OR_RAISE(std::set<int32_t> ordered_row_groups,
reader_->FilterRowGroupsByReadRanges(read_ranges_, read_row_groups_));
return reader_->PrepareForReadingLazy(ordered_row_groups, read_column_indices_);
Expand Down Expand Up @@ -221,6 +225,7 @@ Result<std::vector<int32_t>> ParquetFileBatchReader::FilterRowGroupsByBitmap(
}

Result<BatchReader::ReadBatch> ParquetFileBatchReader::NextBatch() {
Duration timer;
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch, reader_->Next());
if (batch == nullptr) {
return BatchReader::MakeEofBatch();
Expand All @@ -243,6 +248,14 @@ Result<BatchReader::ReadBatch> ParquetFileBatchReader::NextBatch() {
std::unique_ptr<ArrowArray> c_array = std::make_unique<ArrowArray>();
std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get()));

read_rows_ += array->length();
read_batch_count_++;
read_next_batch_latency_ms_ += timer.Get();
metrics_->SetCounter(ParquetMetrics::READ_ROWS, read_rows_);
metrics_->SetCounter(ParquetMetrics::READ_BATCH_COUNT, read_batch_count_);
metrics_->SetCounter(ParquetMetrics::READ_NEXT_BATCH_LATENCY_MS, read_next_batch_latency_ms_);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a counter for READ_NEXT_BATCH_LATENCY_MS seems inappropriate. Since the prefetch reader aggregates metrics across all sub-readers, the resulting value is not very meaningful for a latency metric. If the goal is to capture end-to-end latency, I think it would be more appropriate to record it as a histogram at the framework level, rather than aggregating sub-reader metrics.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug seems also exist in the orc format, we will fix this in the future


return make_pair(std::move(c_array), std::move(c_schema));
}

Expand Down
4 changes: 4 additions & 0 deletions src/paimon/format/parquet/parquet_file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader {

std::shared_ptr<Metrics> metrics_;

uint64_t read_rows_ = 0;
uint64_t read_batch_count_ = 0;
uint64_t read_next_batch_latency_ms_ = 0;

// last time set read schema
std::vector<int32_t> read_row_groups_;
std::vector<int32_t> read_column_indices_;
Expand Down
7 changes: 7 additions & 0 deletions src/paimon/format/parquet/parquet_format_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ static constexpr uint32_t DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT = 512;
class ParquetMetrics {
public:
static inline const char WRITE_RECORD_COUNT[] = "parquet.write.record.count";

// read
static inline const char READ_ROW_GROUPS_TOTAL[] = "parquet.read.row_groups.total";
static inline const char READ_ROW_GROUPS_FILTERED[] = "parquet.read.row_groups.filtered";
static inline const char READ_ROWS[] = "parquet.read.rows";
static inline const char READ_BATCH_COUNT[] = "parquet.read.batch.count";
static inline const char READ_NEXT_BATCH_LATENCY_MS[] = "parquet.read.next_batch.latency.ms";
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: for high-level categories, consider using . as the separator to represent hierarchy (e.g., parquet.read), while using - to separate words within the same level (e.g., batch-count).


} // namespace paimon::parquet
Loading