diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 21e87bee4..89797a38d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -58,6 +58,7 @@ set(ICEBERG_SOURCES manifest/v3_metadata.cc metadata_columns.cc metrics_config.cc + metrics_reporters.cc name_mapping.cc partition_field.cc partition_spec.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index bfc502fd8..b0edaf650 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -76,6 +76,7 @@ iceberg_sources = files( 'manifest/v3_metadata.cc', 'metadata_columns.cc', 'metrics_config.cc', + 'metrics_reporters.cc', 'name_mapping.cc', 'partition_field.cc', 'partition_spec.cc', diff --git a/src/iceberg/metrics_reporter.h b/src/iceberg/metrics_reporter.h new file mode 100644 index 000000000..16999f67d --- /dev/null +++ b/src/iceberg/metrics_reporter.h @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Duration type for metrics reporting in milliseconds. +using DurationMs = std::chrono::milliseconds; + +/// \brief Report generated after a table scan operation. +/// +/// Contains metrics about the planning and execution of a table scan, +/// including information about manifests and data files processed. +struct ICEBERG_EXPORT ScanReport { + /// \brief The fully qualified name of the table that was scanned. + std::string table_name; + + /// \brief Snapshot ID that was scanned, if available. + int64_t snapshot_id = -1; + + /// \brief Filter expression used in the scan, if any. + std::string filter; + + /// \brief Schema ID. + int32_t schema_id = -1; + + /// \brief Total duration of the entire scan operation. + DurationMs total_duration{0}; + + /// \brief Duration spent planning the scan. + DurationMs total_planning_duration{0}; + + /// \brief Number of data files in the scan result. + int64_t result_data_files = 0; + + /// \brief Number of delete files in the scan result. + int64_t result_delete_files = 0; + + /// \brief Total number of data manifests. + int64_t total_data_manifests = 0; + + /// \brief Number of data manifests that were skipped. + int64_t skipped_data_files = 0; + + /// \brief Number of data manifests that were skipped. + int64_t skipped_delete_files = 0; + + /// \brief Number of data manifests that were scanned. + int64_t scanned_data_manifests = 0; + + /// \brief Number of data manifests that were skipped due to filtering. + int64_t skipped_data_manifests = 0; + + /// \brief Total number of delete manifests. + int64_t total_delete_manifests = 0; + + /// \brief Number of delete manifests that were scanned. + int64_t scanned_delete_manifests = 0; + + /// \brief Number of delete manifests that were skipped. + int64_t skipped_delete_manifests = 0; +}; + +/// \brief Report generated after a commit operation. +/// +/// Contains metrics about the changes made in a commit, including +/// files added/removed and retry information. +struct ICEBERG_EXPORT CommitReport { + /// \brief The fully qualified name of the table that was modified. + std::string table_name; + + /// \brief The snapshot ID created by this commit. + int64_t snapshot_id = -1; + + /// \brief The sequence number assigned to this commit. + int64_t sequence_number = -1; + + /// \brief The operation that was performed (append, overwrite, delete, etc.). + std::string operation; + + /// \brief Number of commit attempts (1 = success on first try). + int32_t attempts = 1; + + /// \brief Number of data files added in this commit. + int64_t added_data_files = 0; + + /// \brief Number of data files removed in this commit. + int64_t removed_data_files = 0; + + /// \brief Total number of data files after this commit. + int64_t total_data_files = 0; + + /// \brief Number of delete files added in this commit. + int64_t added_delete_files = 0; + + /// \brief Number of delete files removed in this commit. + int64_t removed_delete_files = 0; + + /// \brief Total number of delete files after this commit. + int64_t total_delete_files = 0; + + /// \brief Number of records added in this commit. + int64_t added_records = 0; + + /// \brief Number of records removed in this commit. + int64_t removed_records = 0; + + /// \brief Size in bytes of files added. + int64_t added_files_size = 0; + + /// \brief Size in bytes of files removed. + int64_t removed_files_size = 0; +}; + +/// \brief The type of a metrics report. +enum class MetricsReportType { + kScanReport, + kCommitReport, +}; + +/// \brief Get the string representation of a metrics report type. +ICEBERG_EXPORT constexpr std::string_view ToString(MetricsReportType type) noexcept { + switch (type) { + case MetricsReportType::kScanReport: + return "scan"; + case MetricsReportType::kCommitReport: + return "commit"; + } + std::unreachable(); +} + +/// \brief A metrics report, which can be either a ScanReport or CommitReport. +/// +/// This variant type allows handling both report types uniformly through +/// the MetricsReporter interface. +using MetricsReport = std::variant; + +/// \brief Get the type of a metrics report. +/// +/// \param report The metrics report to get the type of. +/// \return The type of the metrics report. +ICEBERG_EXPORT inline MetricsReportType GetReportType(const MetricsReport& report) { + return std::visit( + [](const auto& r) -> MetricsReportType { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return MetricsReportType::kScanReport; + } else { + return MetricsReportType::kCommitReport; + } + }, + report); +} + +/// \brief Interface for reporting metrics from Iceberg operations. +/// +/// Implementations of this interface can be used to collect and report +/// metrics about scan and commit operations. Common implementations include +/// logging reporters, metrics collectors, and the noop reporter for testing. +class ICEBERG_EXPORT MetricsReporter { + public: + virtual ~MetricsReporter() = default; + + /// \brief Report a metrics report. + /// + /// Implementations should handle the report according to their purpose + /// (e.g., logging, sending to a metrics service, etc.). + /// + /// \param report The metrics report to process. + virtual void Report(const MetricsReport& report) = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics_reporters.cc b/src/iceberg/metrics_reporters.cc new file mode 100644 index 000000000..eb521c017 --- /dev/null +++ b/src/iceberg/metrics_reporters.cc @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_reporters.h" + +#include + +#include "iceberg/util/string_util.h" + +namespace iceberg { + +namespace { + +/// \brief Registry type for MetricsReporter factories with heterogeneous lookup support. +using MetricsReporterRegistry = std::unordered_map; + +/// \brief Get the set of known metrics reporter types. +const std::unordered_set& DefaultReporterTypes() { + static const std::unordered_set kReporterTypes = { + std::string(kMetricsReporterTypeNoop), + }; + return kReporterTypes; +} + +/// \brief Infer the reporter type from properties. +std::string InferReporterType( + const std::unordered_map& properties) { + auto it = properties.find(std::string(kMetricsReporterType)); + if (it != properties.end() && !it->second.empty()) { + return StringUtils::ToLower(it->second); + } + // Default to noop reporter + return std::string(kMetricsReporterTypeNoop); +} + +/// \brief Metrics reporter that does nothing. +/// +/// This is the default reporter used when no reporter is configured. +/// It silently discards all reports. +class NoopMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] std::string_view name, + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report([[maybe_unused]] const MetricsReport& report) override { + // Intentionally empty - noop implementation discards all reports + } +}; + +/// \brief Template helper to create factory functions for reporter types. +template +MetricsReporterFactory MakeReporterFactory() { + return + [](std::string_view name, const std::unordered_map& props) + -> Result> { return T::Make(name, props); }; +} + +/// \brief Create the default registry with built-in reporters. +MetricsReporterRegistry CreateDefaultRegistry() { + return { + {std::string(kMetricsReporterTypeNoop), MakeReporterFactory()}, + }; +} + +/// \brief Get the global registry of metrics reporter factories. +MetricsReporterRegistry& GetRegistry() { + static MetricsReporterRegistry registry = CreateDefaultRegistry(); + return registry; +} + +} // namespace + +void MetricsReporters::Register(std::string_view reporter_type, + MetricsReporterFactory factory) { + GetRegistry()[StringUtils::ToLower(reporter_type)] = std::move(factory); +} + +Result> MetricsReporters::Load( + std::string_view name, + const std::unordered_map& properties) { + std::string reporter_type = InferReporterType(properties); + + auto& registry = GetRegistry(); + auto it = registry.find(reporter_type); + if (it == registry.end()) { + if (DefaultReporterTypes().contains(reporter_type)) { + return NotImplemented("Metrics reporter type '{}' is not yet supported", + reporter_type); + } + return InvalidArgument("Unknown metrics reporter type: '{}'", reporter_type); + } + + return it->second(name, properties); +} + +} // namespace iceberg diff --git a/src/iceberg/metrics_reporters.h b/src/iceberg/metrics_reporters.h new file mode 100644 index 000000000..95d2e7a91 --- /dev/null +++ b/src/iceberg/metrics_reporters.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics_reporters.h +/// \brief Factory for creating MetricsReporter instances. + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics_reporter.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Property key for configuring the metrics reporter type. +/// +/// Set this property in table properties to specify which metrics reporter +/// implementation to use. The value should match a registered reporter type. +inline constexpr std::string_view kMetricsReporterType = "metrics.reporter.type"; + +/// \brief Property value for the noop metrics reporter. +inline constexpr std::string_view kMetricsReporterTypeNoop = "noop"; + +/// \brief Function type for creating MetricsReporter instances. +/// +/// \param name The name identifier for the reporter. +/// \param properties Configuration properties for the reporter. +/// \return A new MetricsReporter instance or an error. +using MetricsReporterFactory = std::function>( + std::string_view name, + const std::unordered_map& properties)>; + +/// \brief Factory class for creating and managing MetricsReporter instances. +/// +/// This class provides a registry-based factory for creating MetricsReporter +/// implementations. Custom reporter implementations can be registered using +/// the Register() method. +class ICEBERG_EXPORT MetricsReporters { + public: + /// \brief Load a metrics reporter based on properties. + /// + /// This method looks up the "metrics.reporter.type" property to determine + /// which reporter implementation to create. If not specified, returns a + /// NoopMetricsReporter. + /// + /// \param name Name identifier for the reporter. + /// \param properties Configuration properties containing reporter type. + /// \return A new MetricsReporter instance or an error. + static Result> Load( + std::string_view name, + const std::unordered_map& properties); + + /// \brief Register a factory for a metrics reporter type. + /// + /// This method is not thread-safe. All registrations should be done during + /// application startup before any concurrent access to Load(). + /// + /// \param reporter_type Case-insensitive type identifier (e.g., "noop"). + /// \param factory Factory function that produces the reporter. + static void Register(std::string_view reporter_type, MetricsReporterFactory factory); +}; + +} // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fdd88888e..873827080 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -76,6 +76,7 @@ add_iceberg_test(table_test SOURCES location_provider_test.cc metrics_config_test.cc + metrics_reporter_test.cc snapshot_summary_builder_test.cc snapshot_test.cc snapshot_util_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 71ab6942e..cbfd79a46 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -49,6 +49,7 @@ iceberg_tests = { 'sources': files( 'location_provider_test.cc', 'metrics_config_test.cc', + 'metrics_reporter_test.cc', 'snapshot_test.cc', 'snapshot_util_test.cc', 'table_metadata_builder_test.cc', diff --git a/src/iceberg/test/metrics_reporter_test.cc b/src/iceberg/test/metrics_reporter_test.cc new file mode 100644 index 000000000..5a0c17957 --- /dev/null +++ b/src/iceberg/test/metrics_reporter_test.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_reporter.h" + +#include +#include +#include +#include + +#include + +#include "iceberg/metrics_reporters.h" + +namespace iceberg { + +class CollectingMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] std::string_view name, + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report(const MetricsReport& report) override { reports_.push_back(report); } + + const std::vector& reports() const { return reports_; } + + private: + std::vector reports_; +}; + +TEST(CustomMetricsReporterTest, RegisterAndLoad) { + // Register custom reporter + MetricsReporters::Register( + "collecting", + [](std::string_view name, const std::unordered_map& props) + -> Result> { + return CollectingMetricsReporter::Make(name, props); + }); + + // Load the custom reporter + std::unordered_map properties = { + {std::string(kMetricsReporterType), "collecting"}}; + auto result = MetricsReporters::Load("test", properties); + + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value(), nullptr); + + // Report and verify + auto* reporter = dynamic_cast(result.value().get()); + ASSERT_NE(reporter, nullptr); + + ScanReport scan_report{.table_name = "test.table"}; + reporter->Report(scan_report); + + EXPECT_EQ(reporter->reports().size(), 1); + EXPECT_EQ(GetReportType(reporter->reports()[0]), MetricsReportType::kScanReport); +} + +TEST(CustomMetricsReporterTest, RegisterCaseInsensitive) { + // Register with uppercase + MetricsReporters::Register( + "UPPERCASE", + [](std::string_view, const std::unordered_map&) + -> Result> { + return std::make_unique(); + }); + + // Load with lowercase + std::unordered_map properties = { + {std::string(kMetricsReporterType), "uppercase"}}; + auto result = MetricsReporters::Load("test", properties); + + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value(), nullptr); +} + +} // namespace iceberg