diff --git a/bin/zkEnv.sh b/bin/zkEnv.sh index 40889874818..b14e63d558d 100755 --- a/bin/zkEnv.sh +++ b/bin/zkEnv.sh @@ -113,6 +113,11 @@ for d in "$ZOOBINDIR"/../zookeeper-metrics-providers/zookeeper-prometheus-metric CLASSPATH="$d:$CLASSPATH" done +#make it work for developers +for d in "$ZOOBINDIR"/../zookeeper-metrics-providers/zookeeper-timeline-metrics/target/lib/*.jar; do + CLASSPATH="$d:$CLASSPATH" +done + #make it work for developers CLASSPATH="$ZOOBINDIR/../build/classes:$CLASSPATH" @@ -122,6 +127,9 @@ CLASSPATH="$ZOOBINDIR/../zookeeper-server/target/classes:$CLASSPATH" #make it work for developers CLASSPATH="$ZOOBINDIR/../zookeeper-metrics-providers/zookeeper-prometheus-metrics/target/classes:$CLASSPATH" +#make it work for developers +CLASSPATH="$ZOOBINDIR/../zookeeper-metrics-providers/zookeeper-timeline-metrics/target/classes:$CLASSPATH" + case "$(uname)" in CYGWIN* | MINGW*) cygwin=true ;; *) cygwin=false ;; diff --git a/zookeeper-assembly/pom.xml b/zookeeper-assembly/pom.xml index 317233d6f3b..03def0d2398 100644 --- a/zookeeper-assembly/pom.xml +++ b/zookeeper-assembly/pom.xml @@ -75,6 +75,11 @@ zookeeper-prometheus-metrics ${project.version} + + org.apache.zookeeper + zookeeper-timeline-metrics + ${project.version} + org.apache.zookeeper zookeeper-recipes diff --git a/zookeeper-metrics-providers/pom.xml b/zookeeper-metrics-providers/pom.xml index 054763d003a..60140bfaeca 100644 --- a/zookeeper-metrics-providers/pom.xml +++ b/zookeeper-metrics-providers/pom.xml @@ -33,6 +33,7 @@ zookeeper-prometheus-metrics + zookeeper-timeline-metrics diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/pom.xml b/zookeeper-metrics-providers/zookeeper-timeline-metrics/pom.xml new file mode 100755 index 00000000000..fae23b7475d --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + org.apache.zookeeper + zookeeper-metrics-providers + 3.10.0-SNAPSHOT + + + zookeeper-timeline-metrics + jar + Apache ZooKeeper - Timeline Metrics Provider + ZooKeeper Timeline Metrics Provider implementation + + + + + + + + org.apache.zookeeper + zookeeper + ${project.version} + + + + + org.slf4j + slf4j-api + + + + + io.dropwizard.metrics + metrics-core + test + + + + + org.mockito + mockito-core + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.platform + junit-platform-runner + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + false + true + false + + + + + + + diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/MetricSnapshot.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/MetricSnapshot.java new file mode 100644 index 00000000000..1f79a4caf41 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/MetricSnapshot.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.metrics.timeline; + +import java.util.HashMap; +import java.util.Map; + +/** + * Represents a point-in-time snapshot of ZooKeeper metrics. + * + *

This class is a data transfer object that captures metric values at a specific + * timestamp for export to Timeline/Ambari Metrics Collector. It contains three types + * of metrics:

+ * + * + *

Instances of this class are immutable after creation and are sent to the + * Timeline sink for persistence and visualization.

+ * + * @see TimelineMetricsProvider + * @see TimelineMetricsSink + */ +public class MetricSnapshot { + + private final long timestamp; + private final String hostname; + private final String appId; + + // Separate collections for different metric types + private final Map counters = new HashMap<>(); + private final Map gauges = new HashMap<>(); + private final Map summaries = new HashMap<>(); + + /** + * Creates a new metric snapshot. + * + * @param timestamp the timestamp in milliseconds since epoch + * @param hostname the hostname of the ZooKeeper server + * @param appId the application ID (typically "zookeeper") + */ + public MetricSnapshot(long timestamp, String hostname, String appId) { + this.timestamp = timestamp; + this.hostname = hostname; + this.appId = appId; + } + + /** + * Adds a counter metric to the snapshot. + * + *

Counters represent monotonically increasing values such as total requests, + * total bytes received, etc.

+ * + * @param name the metric name + * @param value the counter value + */ + public void addCounter(String name, long value) { + counters.put(name, value); + } + + /** + * Adds a gauge metric to the snapshot. + * + *

Gauges represent current values that can increase or decrease, such as + * number of active connections, queue size, etc.

+ * + * @param name the metric name + * @param value the gauge value + */ + public void addGauge(String name, double value) { + gauges.put(name, value); + } + + /** + * Adds a summary metric to the snapshot. + * + *

Summaries represent computed statistics such as averages, minimums, maximums, + * and percentiles. The existing {@link org.apache.zookeeper.server.metric.AvgMinMaxCounter} + * and {@link org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounter} classes + * already compute these values and provide them as separate metrics (e.g., "latency_avg", + * "latency_min", "latency_max", "latency_p99").

+ * + * @param name the metric name (e.g., "request_latency_avg") + * @param value the computed statistic value + */ + public void addSummary(String name, double value) { + summaries.put(name, value); + } + + /** + * Returns the total number of metrics in this snapshot. + * + * @return the sum of counters, gauges, and summaries + */ + public int getMetricCount() { + return counters.size() + gauges.size() + summaries.size(); + } + + /** + * Returns the timestamp of this snapshot. + * + * @return timestamp in milliseconds since epoch + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the hostname of the ZooKeeper server. + * + * @return the hostname + */ + public String getHostname() { + return hostname; + } + + /** + * Returns the application ID. + * + * @return the application ID (typically "zookeeper") + */ + public String getAppId() { + return appId; + } + + /** + * Returns all counter metrics in this snapshot. + * + * @return a view of the counters map + */ + public Map getCounters() { + return counters; + } + + /** + * Returns all gauge metrics in this snapshot. + * + * @return a view of the gauges map + */ + public Map getGauges() { + return gauges; + } + + /** + * Returns all summary metrics in this snapshot. + * + * @return a view of the summaries map + */ + public Map getSummaries() { + return summaries; + } + + @Override + public String toString() { + return String.format("MetricSnapshot{timestamp=%d, hostname='%s', appId='%s', " + + "counters=%d, gauges=%d, summaries=%d}", + timestamp, hostname, appId, counters.size(), gauges.size(), summaries.size()); + } + + /** + * Helper method to repeat a character n times (Java 8 compatible). + */ + private String repeatChar(char c, int count) { + StringBuilder sb = new StringBuilder(count); + for (int i = 0; i < count; i++) { + sb.append(c); + } + return sb.toString(); + } + + /** + * Prints all metrics in this snapshot to a formatted string. + * + *

This method is useful for debugging and logging. It prints all counters, + * gauges, and summaries in a human-readable format.

+ * + * @return a formatted string containing all metrics + */ + public String printAllMetrics() { + StringBuilder sb = new StringBuilder(); + sb.append(repeatChar('=', 80)).append("\n"); + sb.append("MetricSnapshot Details\n"); + sb.append(repeatChar('=', 80)).append("\n"); + sb.append(String.format("Timestamp: %d (%s)%n", timestamp, + new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp)))); + sb.append(String.format("Hostname: %s%n", hostname)); + sb.append(String.format("AppId: %s%n", appId)); + sb.append(String.format("Total Metrics: %d (Counters: %d, Gauges: %d, Summaries: %d)%n", + getMetricCount(), counters.size(), gauges.size(), summaries.size())); + sb.append(repeatChar('=', 80)).append("\n\n"); + + // Print Counters + if (!counters.isEmpty()) { + sb.append("COUNTERS (").append(counters.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + counters.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %,d%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + // Print Gauges + if (!gauges.isEmpty()) { + sb.append("GAUGES (").append(gauges.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + gauges.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %.2f%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + // Print Summaries + if (!summaries.isEmpty()) { + sb.append("SUMMARIES (").append(summaries.size()).append("):\n"); + sb.append(repeatChar('-', 80)).append("\n"); + summaries.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach(entry -> sb.append(String.format(" %-50s : %.2f%n", + entry.getKey(), entry.getValue()))); + sb.append("\n"); + } + + sb.append(repeatChar('=', 80)).append("\n"); + return sb.toString(); + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java new file mode 100644 index 00000000000..2d81b320581 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProvider.java @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.apache.zookeeper.metrics.Counter; +import org.apache.zookeeper.metrics.CounterSet; +import org.apache.zookeeper.metrics.Gauge; +import org.apache.zookeeper.metrics.GaugeSet; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.metrics.MetricsProvider; +import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException; +import org.apache.zookeeper.metrics.Summary; +import org.apache.zookeeper.metrics.SummarySet; +import org.apache.zookeeper.server.metric.AvgMinMaxCounter; +import org.apache.zookeeper.server.metric.AvgMinMaxCounterSet; +import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounter; +import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounterSet; +import org.apache.zookeeper.server.metric.SimpleCounter; +import org.apache.zookeeper.server.metric.SimpleCounterSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MetricsProvider implementation that sends ZooKeeper metrics to Timeline collectors. + * + *

This provider periodically samples metrics from its internal {@link MetricsContext} + * and sends them to an external Timeline metrics sink (such as Ambari Metrics Collector). + * The sink implementation is loaded dynamically at runtime, allowing ZooKeeper to + * remain independent of specific metrics collection systems.

+ * + *

Configuration:

+ *

This provider is configured via zoo.cfg with the following properties:

+ *
+ * # Enable Timeline metrics provider
+ * metricsProvider.className=org.apache.zookeeper.metrics.timeline.TimelineMetricsProvider
+ *
+ * # Sink class (loaded from external JAR on classpath)
+ * metricsProvider.timeline.sink.class=org.apache.hadoop.metrics2.sink.timeline.ZooKeeperTimelineMetricsSink
+ *
+ * # Collection settings
+ * metricsProvider.timeline.collection.period=60
+ * metricsProvider.timeline.hostname=zk1.example.com
+ * metricsProvider.timeline.appId=zookeeper
+ *
+ * # All other metricsProvider.timeline.* properties are passed to the sink
+ * metricsProvider.timeline.collector.hosts=collector1.example.com,collector2.example.com
+ * metricsProvider.timeline.collector.protocol=http
+ * metricsProvider.timeline.collector.port=6188
+ * 
+ * + *

Lifecycle:

+ *
    + *
  1. ZooKeeper instantiates this class via reflection
  2. + *
  3. {@link #configure(Properties)} loads configuration and sink class
  4. + *
  5. {@link #start()} begins periodic metric collection
  6. + *
  7. Metrics are collected every N seconds and sent to sink
  8. + *
  9. {@link #stop()} shuts down collection and closes sink
  10. + *
+ * + * @see TimelineMetricsSink + * @see MetricSnapshot + */ +public class TimelineMetricsProvider implements MetricsProvider { + + private static final Logger LOG = LoggerFactory.getLogger(TimelineMetricsProvider.class); + + // Configuration property keys + private static final String SINK_CLASS_PROPERTY = "timeline.sink.class"; + private static final String COLLECTION_PERIOD_PROPERTY = "timeline.collection.period"; + private static final String HOSTNAME_PROPERTY = "timeline.hostname"; + private static final String APP_ID_PROPERTY = "timeline.appId"; + + // Default values + private static final String DEFAULT_SINK_CLASS = + "org.apache.hadoop.metrics2.sink.timeline.ZooKeeperTimelineMetricsSink"; + private static final int DEFAULT_COLLECTION_PERIOD_SECONDS = 60; + private static final String DEFAULT_APP_ID = "zookeeper"; + + // Instance fields + private final TimelineMetricsContext rootContext = new TimelineMetricsContext(); + private ScheduledExecutorService scheduler; + private TimelineMetricsSink sink; + private int collectionPeriodSeconds; + private String hostname; + private String appId; + private volatile boolean started = false; + + /** + * Default constructor required by MetricsProvider contract. + */ + public TimelineMetricsProvider() { + // Empty constructor - initialization happens in configure() + } + + /** + * Configure the provider with properties from zoo.cfg. + * + *

This method loads the sink class dynamically and configures it with + * all properties that start with "metricsProvider.timeline.". The sink class must be + * available on the classpath (typically from an external JAR).

+ * + * @param configuration Properties from zoo.cfg + * @throws MetricsProviderLifeCycleException if configuration fails + */ + @Override + public void configure(Properties configuration) throws MetricsProviderLifeCycleException { + try { + // Load basic configuration + this.collectionPeriodSeconds = Integer.parseInt( + configuration.getProperty(COLLECTION_PERIOD_PROPERTY, + String.valueOf(DEFAULT_COLLECTION_PERIOD_SECONDS))); + + this.appId = configuration.getProperty(APP_ID_PROPERTY, DEFAULT_APP_ID); + + this.hostname = configuration.getProperty(HOSTNAME_PROPERTY); + if (hostname == null || hostname.trim().isEmpty()) { + this.hostname = getLocalHostname(); + } + + LOG.info("Configuring TimelineMetricsProvider: hostname={}, appId={}, collectionPeriod={} seconds", + hostname, appId, collectionPeriodSeconds); + + // Try to load and configure sink - but don't fail if it's not available + String sinkClassName = configuration.getProperty(SINK_CLASS_PROPERTY, DEFAULT_SINK_CLASS); + try { + this.sink = loadSink(sinkClassName); + this.sink.configure(configuration); + LOG.info("Successfully configured TimelineMetricsProvider with sink: {}", sinkClassName); + } catch (ClassNotFoundException e) { + LOG.warn("Timeline sink class not found: {}. Timeline metrics will be disabled. " + + "To enable Timeline metrics, ensure the sink implementation JAR is available on the classpath. " + + "ZooKeeper will continue to operate normally without Timeline metrics.", sinkClassName); + this.sink = null; + } catch (Exception e) { + LOG.warn("Failed to configure Timeline sink: {}. Timeline metrics will be disabled. " + + "ZooKeeper will continue to operate normally without Timeline metrics.", e.getMessage(), e); + this.sink = null; + } + + } catch (Exception e) { + LOG.error("Failed to configure TimelineMetricsProvider", e); + throw new MetricsProviderLifeCycleException("Configuration failed", e); + } + } + + /** + * Start the provider and begin periodic metric collection. + * + *

This method creates a scheduled executor that collects metrics + * every N seconds (configured via metricsProvider.timeline.collection.period). The + * collection runs on a daemon thread to avoid blocking ZooKeeper shutdown.

+ * + * @throws MetricsProviderLifeCycleException if startup fails + */ + @Override + public void start() throws MetricsProviderLifeCycleException { + if (started) { + LOG.warn("TimelineMetricsProvider already started"); + return; + } + + // If sink is not available, don't start the scheduler + if (sink == null) { + LOG.warn("Timeline sink not configured. Metric collection will not start. " + + "ZooKeeper will continue to operate normally without Timeline metrics."); + return; + } + + try { + // Create scheduler with daemon thread + this.scheduler = Executors.newScheduledThreadPool(1, r -> { + Thread t = new Thread(r, "TimelineMetricsCollector"); + t.setDaemon(true); + return t; + }); + + // Schedule periodic collection + scheduler.scheduleAtFixedRate( + this::collectAndSend, + 0, // Initial delay + collectionPeriodSeconds, + TimeUnit.SECONDS + ); + + started = true; + LOG.info("Started TimelineMetricsProvider - collecting metrics every {} seconds", + collectionPeriodSeconds); + + } catch (Exception e) { + LOG.error("Failed to start TimelineMetricsProvider", e); + throw new MetricsProviderLifeCycleException("Startup failed", e); + } + } + + /** + * Stop the provider and release all resources. + * + *

This method shuts down the scheduler, closes the sink, and releases + * all resources. It can be called multiple times safely.

+ */ + @Override + public void stop() { + if (!started) { + return; + } + + LOG.info("Stopping TimelineMetricsProvider"); + + // Shutdown scheduler + if (scheduler != null) { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + // Close sink + if (sink != null) { + try { + sink.close(); + } catch (Exception e) { + LOG.error("Error closing Timeline sink", e); + } + } + + // Clear all metrics from context + rootContext.clear(); + + started = false; + LOG.info("Stopped TimelineMetricsProvider"); + } + + /** + * Returns the root metrics context. + * + *

This provider maintains its own {@link TimelineMetricsContext} that stores + * all registered metrics. Components can register counters, gauges, summaries, etc. + * which will be automatically collected and sent to Timeline.

+ * + * @return the root metrics context + */ + @Override + public MetricsContext getRootContext() { + return rootContext; + } + + /** + * Dumps all current metric values. + * + *

This method is called by legacy monitoring commands. It iterates through + * all metrics stored in the context and provides their current values.

+ * + * @param sink the receiver of metric name-value pairs + */ + @Override + public void dump(BiConsumer sink) { + rootContext.dump(sink); + } + + /** + * Resets all metric values. + * + *

This resets all counters and summaries to their initial state. + * Gauges are not reset as they represent current values.

+ */ + @Override + public void resetAllValues() { + rootContext.reset(); + } + + /** + * Collects metrics from the context and sends to sink. + * + *

This method is called periodically by the scheduler. It creates a snapshot + * of all current metric values and sends it to the configured sink.

+ * + *

Exceptions are caught and logged to prevent them from stopping + * the scheduled collection.

+ */ + private void collectAndSend() { + try { + if (sink == null) { + LOG.debug("Timeline sink is null, skipping metric collection"); + return; + } + + // Create snapshot + MetricSnapshot snapshot = new MetricSnapshot( + System.currentTimeMillis(), + hostname, + appId + ); + + // Dump all metrics from context to snapshot + rootContext.dumpToSnapshot(snapshot); + + // Send to Timeline + sink.send(snapshot); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sent {} metrics to Timeline", snapshot.getMetricCount()); + LOG.debug("{}", snapshot.printAllMetrics()); + } + + } catch (Exception e) { + LOG.error("Failed to collect and send metrics", e); + } + } + + /** + * Loads the Timeline sink class dynamically via reflection. + * + *

The sink class must be available on the classpath (typically from + * an external JAR). This allows ZooKeeper to remain independent of + * specific metrics collection systems.

+ * + * @param className the fully qualified class name of the sink + * @return an instance of the sink + * @throws ClassNotFoundException if the class cannot be found + * @throws Exception if the class cannot be instantiated + */ + private TimelineMetricsSink loadSink(String className) throws ClassNotFoundException, Exception { + LOG.info("Loading Timeline sink class: {}", className); + + try { + Class clazz = Class.forName(className); + Object instance = clazz.getDeclaredConstructor().newInstance(); + + if (!(instance instanceof TimelineMetricsSink)) { + throw new IllegalArgumentException( + "Class " + className + " does not implement TimelineMetricsSink"); + } + + return (TimelineMetricsSink) instance; + + } catch (ClassNotFoundException e) { + // Re-throw ClassNotFoundException so it can be caught separately in configure() + throw e; + } catch (Exception e) { + throw new Exception("Failed to instantiate Timeline sink: " + className, e); + } + } + + /** + * Gets the local hostname. + * + * @return the hostname, or "unknown" if it cannot be determined + */ + private String getLocalHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname, using 'unknown'", e); + return "unknown"; + } + } + + /** + * Internal MetricsContext implementation that stores all metrics. + * + *

This context reuses existing metric implementations from zookeeper-server + * (SimpleCounter, AvgMinMaxCounter, etc.) to ensure consistent behavior with + * other metrics providers.

+ */ + private static class TimelineMetricsContext implements MetricsContext { + + private final ConcurrentMap counters = new ConcurrentHashMap<>(); + private final ConcurrentMap counterSets = new ConcurrentHashMap<>(); + private final ConcurrentMap gauges = new ConcurrentHashMap<>(); + private final ConcurrentMap gaugeSets = new ConcurrentHashMap<>(); + private final ConcurrentMap basicSummaries = new ConcurrentHashMap<>(); + private final ConcurrentMap summaries = new ConcurrentHashMap<>(); + private final ConcurrentMap basicSummarySets = new ConcurrentHashMap<>(); + private final ConcurrentMap summarySets = new ConcurrentHashMap<>(); + + @Override + public MetricsContext getContext(String name) { + // No hierarchy yet - return this + return this; + } + + @Override + public Counter getCounter(String name) { + return counters.computeIfAbsent(name, SimpleCounter::new); + } + + @Override + public CounterSet getCounterSet(String name) { + Objects.requireNonNull(name, "Cannot register a CounterSet with null name"); + return counterSets.computeIfAbsent(name, SimpleCounterSet::new); + } + + @Override + public void registerGauge(String name, Gauge gauge) { + Objects.requireNonNull(gauge, "Cannot register a null Gauge for " + name); + gauges.put(name, gauge); + } + + @Override + public void unregisterGauge(String name) { + gauges.remove(name); + } + + @Override + public void registerGaugeSet(String name, GaugeSet gaugeSet) { + Objects.requireNonNull(name, "Cannot register a GaugeSet with null name"); + Objects.requireNonNull(gaugeSet, "Cannot register a null GaugeSet for " + name); + gaugeSets.put(name, gaugeSet); + } + + @Override + public void unregisterGaugeSet(String name) { + Objects.requireNonNull(name, "Cannot unregister GaugeSet with null name"); + gaugeSets.remove(name); + } + + @Override + public Summary getSummary(String name, DetailLevel detailLevel) { + if (detailLevel == DetailLevel.BASIC) { + return basicSummaries.computeIfAbsent(name, (n) -> { + if (summaries.containsKey(n)) { + throw new IllegalArgumentException("Already registered a non basic summary as " + n); + } + return new AvgMinMaxCounter(name); + }); + } else { + return summaries.computeIfAbsent(name, (n) -> { + if (basicSummaries.containsKey(n)) { + throw new IllegalArgumentException("Already registered a basic summary as " + n); + } + return new AvgMinMaxPercentileCounter(name); + }); + } + } + + @Override + public SummarySet getSummarySet(String name, DetailLevel detailLevel) { + if (detailLevel == DetailLevel.BASIC) { + return basicSummarySets.computeIfAbsent(name, (n) -> { + if (summarySets.containsKey(n)) { + throw new IllegalArgumentException("Already registered a non basic summary set as " + n); + } + return new AvgMinMaxCounterSet(name); + }); + } else { + return summarySets.computeIfAbsent(name, (n) -> { + if (basicSummarySets.containsKey(n)) { + throw new IllegalArgumentException("Already registered a basic summary set as " + n); + } + return new AvgMinMaxPercentileCounterSet(name); + }); + } + } + + /** + * Dumps all metrics to a MetricSnapshot for Timeline export. + */ + void dumpToSnapshot(MetricSnapshot snapshot) { + // Dump gauges + gauges.forEach((name, gauge) -> { + Number value = gauge.get(); + if (value != null) { + snapshot.addGauge(name, value.doubleValue()); + } + }); + + // Dump gauge sets + gaugeSets.forEach((name, gaugeSet) -> + gaugeSet.values().forEach((key, value) -> { + if (key != null) { + snapshot.addGauge(key + "_" + name, value != null ? value.doubleValue() : 0); + } + }) + ); + + // Dump counters + counters.values().forEach(counter -> { + counter.values().forEach((name, value) -> { + snapshot.addCounter(name, ((Number) value).longValue()); + }); + }); + + // Dump counter sets + counterSets.values().forEach(counterSet -> { + counterSet.values().forEach((name, value) -> { + snapshot.addCounter(name, ((Number) value).longValue()); + }); + }); + + // Dump basic summaries (avg, min, max) + basicSummaries.values().forEach(summary -> { + summary.values().forEach((name, value) -> { + snapshot.addSummary(name, ((Number) value).doubleValue()); + }); + }); + + // Dump advanced summaries (avg, min, max, percentiles) + summaries.values().forEach(summary -> { + summary.values().forEach((name, value) -> { + snapshot.addSummary(name, ((Number) value).doubleValue()); + }); + }); + + // Dump basic summary sets + basicSummarySets.values().forEach(summarySet -> { + summarySet.values().forEach((name, value) -> { + snapshot.addSummary(name, ((Number) value).doubleValue()); + }); + }); + + // Dump advanced summary sets + summarySets.values().forEach(summarySet -> { + summarySet.values().forEach((name, value) -> { + snapshot.addSummary(name, ((Number) value).doubleValue()); + }); + }); + } + + /** + * Dumps all metrics for legacy monitoring commands. + */ + void dump(BiConsumer sink) { + gauges.forEach((name, gauge) -> { + Number value = gauge.get(); + if (value != null) { + sink.accept(name, value); + } + }); + + gaugeSets.forEach((name, gaugeSet) -> + gaugeSet.values().forEach((key, value) -> { + if (key != null) { + sink.accept(key + "_" + name, value != null ? value : 0); + } + }) + ); + + counters.values().forEach(counter -> counter.values().forEach(sink)); + counterSets.values().forEach(counterSet -> counterSet.values().forEach(sink)); + basicSummaries.values().forEach(summary -> summary.values().forEach(sink)); + summaries.values().forEach(summary -> summary.values().forEach(sink)); + basicSummarySets.values().forEach(summarySet -> summarySet.values().forEach(sink)); + summarySets.values().forEach(summarySet -> summarySet.values().forEach(sink)); + } + + /** + * Resets all metrics to their initial state. + */ + void reset() { + counters.values().forEach(SimpleCounter::reset); + counterSets.values().forEach(SimpleCounterSet::reset); + basicSummaries.values().forEach(AvgMinMaxCounter::reset); + summaries.values().forEach(AvgMinMaxPercentileCounter::reset); + basicSummarySets.values().forEach(AvgMinMaxCounterSet::reset); + summarySets.values().forEach(AvgMinMaxPercentileCounterSet::reset); + // No need to reset gauges - they're read-only + } + + /** + * Clears all metrics from the context. + */ + void clear() { + gauges.clear(); + gaugeSets.clear(); + counters.clear(); + counterSets.clear(); + basicSummaries.clear(); + summaries.clear(); + basicSummarySets.clear(); + summarySets.clear(); + } + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsSink.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsSink.java new file mode 100644 index 00000000000..a68623fa963 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/main/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsSink.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.metrics.timeline; + +import java.util.Properties; + +/** + * Interface for Timeline metrics sinks. + * + *

This interface defines the contract between ZooKeeper's metrics collection + * system and external Timeline metrics collectors (such as Ambari Metrics Collector). + * Implementations of this interface are loaded dynamically at runtime, allowing + * ZooKeeper to remain independent of specific metrics collection systems.

+ * + *

The typical lifecycle is:

+ *
    + *
  1. Sink is instantiated via reflection (Class.forName)
  2. + *
  3. {@link #configure(Properties)} is called with configuration
  4. + *
  5. {@link #send(MetricSnapshot)} is called periodically with metrics
  6. + *
  7. {@link #close()} is called during shutdown
  8. + *
+ * + *

Example implementation in external JAR:

+ *
+ * public class MyTimelineSink implements TimelineMetricsSink {
+ *     public void configure(Properties config) throws Exception {
+ *         // Initialize HTTP client, load collector addresses, etc.
+ *     }
+ *
+ *     public void send(MetricSnapshot snapshot) throws Exception {
+ *         // Transform snapshot to target format and send via HTTP
+ *     }
+ *
+ *     public void close() throws Exception {
+ *         // Cleanup resources
+ *     }
+ * }
+ * 
+ * + * @see TimelineMetricsProvider + * @see MetricSnapshot + */ +public interface TimelineMetricsSink { + + /** + * Configure the sink with the provided properties. + * + *

This method is called once during initialization, before any metrics + * are sent. Implementations should use this method to:

+ *
    + *
  • Load collector addresses and connection settings
  • + *
  • Initialize HTTP clients or other communication mechanisms
  • + *
  • Set up SSL/TLS if required
  • + *
  • Validate configuration parameters
  • + *
+ * + * @param config Configuration properties from zoo.cfg. All properties + * with the "metricsProvider.timeline." prefix are passed to the sink. + * @throws Exception if configuration fails. The exception will be logged + * and ZooKeeper startup will continue without Timeline metrics. + */ + void configure(Properties config) throws Exception; + + /** + * Send a snapshot of metrics to the Timeline collector. + * + *

This method is called periodically (typically every 60 seconds) with + * a snapshot of all current metric values. Implementations should:

+ *
    + *
  • Transform the snapshot to the target format (e.g., JSON)
  • + *
  • Send metrics to the collector via HTTP POST or other protocol
  • + *
  • Handle transient failures gracefully (retry, cache, etc.)
  • + *
  • Return quickly to avoid blocking metric collection
  • + *
+ * + *

Note: This method may be called from a scheduled executor thread. + * Implementations should be thread-safe and avoid blocking operations + * that could delay subsequent metric collections.

+ * + * @param snapshot A snapshot of all metrics at a specific point in time. + * Contains counters, gauges, and summary statistics. + * @throws Exception if sending fails. Exceptions are logged but do not + * stop metric collection. The next snapshot will be + * attempted on schedule. + */ + void send(MetricSnapshot snapshot) throws Exception; + + /** + * Close the sink and release all resources. + * + *

This method is called during ZooKeeper shutdown. Implementations should:

+ *
    + *
  • Flush any cached metrics
  • + *
  • Close HTTP connections
  • + *
  • Shutdown thread pools
  • + *
  • Release any other resources
  • + *
+ * + *

This method should complete quickly (within a few seconds) to avoid + * delaying ZooKeeper shutdown.

+ * + * @throws Exception if cleanup fails. Exceptions are logged but do not + * prevent ZooKeeper shutdown. + */ + void close() throws Exception; +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/MockTimelineMetricsSink.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/MockTimelineMetricsSink.java new file mode 100644 index 00000000000..eca801be744 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/MockTimelineMetricsSink.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Mock implementation of TimelineMetricsSink for testing purposes. + * + *

This mock captures all snapshots sent to it and provides methods + * to verify the snapshots in tests.

+ */ +public class MockTimelineMetricsSink implements TimelineMetricsSink { + + private final List snapshots = new ArrayList<>(); + private Properties configuration; + private boolean configured = false; + private boolean closed = false; + private volatile boolean throwOnSend = false; + private volatile boolean throwOnConfigure = false; + + @Override + public void configure(Properties configuration) throws Exception { + if (throwOnConfigure) { + throw new Exception("Mock configuration failure"); + } + this.configuration = configuration; + this.configured = true; + } + + @Override + public void send(MetricSnapshot snapshot) throws Exception { + if (throwOnSend) { + throw new Exception("Mock send failure"); + } + synchronized (snapshots) { + snapshots.add(snapshot); + } + } + + @Override + public void close() throws Exception { + this.closed = true; + } + + /** + * Returns all captured snapshots. + */ + public List getSnapshots() { + synchronized (snapshots) { + return new ArrayList<>(snapshots); + } + } + + /** + * Returns the last captured snapshot, or null if none. + */ + public MetricSnapshot getLastSnapshot() { + synchronized (snapshots) { + return snapshots.isEmpty() ? null : snapshots.get(snapshots.size() - 1); + } + } + + /** + * Returns the number of snapshots captured. + */ + public int getSnapshotCount() { + synchronized (snapshots) { + return snapshots.size(); + } + } + + /** + * Clears all captured snapshots. + */ + public void clearSnapshots() { + synchronized (snapshots) { + snapshots.clear(); + } + } + + /** + * Returns the configuration passed to configure(). + */ + public Properties getConfiguration() { + return configuration; + } + + /** + * Returns whether configure() was called. + */ + public boolean isConfigured() { + return configured; + } + + /** + * Returns whether close() was called. + */ + public boolean isClosed() { + return closed; + } + + /** + * Sets whether send() should throw an exception. + */ + public void setThrowOnSend(boolean throwOnSend) { + this.throwOnSend = throwOnSend; + } + + /** + * Sets whether configure() should throw an exception. + */ + public void setThrowOnConfigure(boolean throwOnConfigure) { + this.throwOnConfigure = throwOnConfigure; + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderConfigTest.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderConfigTest.java new file mode 100644 index 00000000000..e6c7a22a85d --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderConfigTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Properties; +import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for Timeline Metrics Provider configuration. + * + *

These tests verify configuration handling, validation, and error cases + * for the TimelineMetricsProvider.

+ */ +public class TimelineMetricsProviderConfigTest extends TimelineMetricsTestBase { + + private TimelineMetricsProvider provider; + + @AfterEach + public void tearDown() { + if (provider != null) { + provider.stop(); + } + } + + @Test + public void testValidConfig() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "60"); + configuration.setProperty("timeline.hostname", "test-host"); + configuration.setProperty("timeline.appId", "test-zookeeper"); + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testDefaultCollectionPeriod() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + // Don't set collection.period - should use default + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testDefaultAppId() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + // Don't set appId - should use default "zookeeper" + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testAutoDetectHostname() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + // Don't set hostname - should auto-detect + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testMissingSinkClass() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + // Use default sink class that doesn't exist + configuration.setProperty("timeline.collection.period", "60"); + + // Should not throw during configure + provider.configure(configuration); + + // Should not throw during start (just logs warning) + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testInvalidSinkClass() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", "com.example.NonExistentClass"); + configuration.setProperty("timeline.collection.period", "60"); + + // Should not throw during configure + provider.configure(configuration); + + // Should not throw during start (just logs warning and continues without sink) + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testInvalidCollectionPeriod() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "invalid"); + + // Should throw NumberFormatException wrapped in MetricsProviderLifeCycleException + try { + provider.configure(configuration); + assertTrue(false, "Should have thrown exception for invalid collection period"); + } catch (MetricsProviderLifeCycleException e) { + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof NumberFormatException); + } + } + + @Test + public void testZeroCollectionPeriod() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "0"); + + provider.configure(configuration); + // Zero period is invalid for ScheduledThreadPoolExecutor + try { + provider.start(); + assertTrue(false, "Should have thrown exception for zero collection period"); + } catch (MetricsProviderLifeCycleException e) { + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testNegativeCollectionPeriod() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "-1"); + + provider.configure(configuration); + // Negative period is invalid for ScheduledThreadPoolExecutor + try { + provider.start(); + assertTrue(false, "Should have thrown exception for negative collection period"); + } catch (MetricsProviderLifeCycleException e) { + assertNotNull(e.getCause()); + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testStopWithoutStart() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + + provider.configure(configuration); + // Stop without starting should not throw + provider.stop(); + } + + @Test + public void testMultipleStopCalls() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + + provider.configure(configuration); + provider.start(); + + // Multiple stop calls should be safe + provider.stop(); + provider.stop(); + provider.stop(); + } + + @Test + public void testCustomHostname() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.hostname", "custom-host.example.com"); + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testCustomAppId() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.appId", "custom-app"); + + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testEmptyHostname() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.hostname", ""); + + // Should auto-detect hostname when empty string is provided + provider.configure(configuration); + provider.start(); + + assertNotNull(provider.getRootContext()); + } + + @Test + public void testSinkReceivesConfiguration() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "5"); + configuration.setProperty("timeline.custom.property", "test-value"); + + provider.configure(configuration); + provider.start(); + + // Give it time to initialize + Thread.sleep(100); + + // The mock sink should have received the full configuration + assertNotNull(provider.getRootContext()); + } + + @Test + public void testProviderWithoutSink() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + // Intentionally use a class that doesn't exist + configuration.setProperty("timeline.sink.class", "does.not.Exist"); + configuration.setProperty("timeline.collection.period", "1"); + + provider.configure(configuration); + provider.start(); + + // Provider should still work, just without sink + assertNotNull(provider.getRootContext()); + + // Can still register metrics + provider.getRootContext().getCounter("test").add(1); + + // Can still dump metrics + int[] count = {0}; + provider.dump((k, v) -> count[0]++); + assertTrue(count[0] > 0); + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderTest.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderTest.java new file mode 100644 index 00000000000..a44f778d762 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsProviderTest.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zookeeper.metrics.Counter; +import org.apache.zookeeper.metrics.CounterSet; +import org.apache.zookeeper.metrics.Gauge; +import org.apache.zookeeper.metrics.GaugeSet; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.metrics.Summary; +import org.apache.zookeeper.metrics.SummarySet; +import org.apache.zookeeper.server.util.QuotaMetricsUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for Timeline Metrics Provider. + * + *

These tests verify the functionality of the TimelineMetricsProvider including + * metric registration, collection, and export to Timeline sinks.

+ */ +public class TimelineMetricsProviderTest extends TimelineMetricsTestBase { + + private TimelineMetricsProvider provider; + private MockTimelineMetricsSink mockSink; + + @BeforeEach + public void setup() throws Exception { + provider = new TimelineMetricsProvider(); + Properties configuration = new Properties(); + configuration.setProperty("timeline.sink.class", + "org.apache.zookeeper.metrics.timeline.MockTimelineMetricsSink"); + configuration.setProperty("timeline.collection.period", "1"); // 1 second for fast tests + configuration.setProperty("timeline.hostname", "test-host"); + configuration.setProperty("timeline.appId", "test-zookeeper"); + provider.configure(configuration); + provider.start(); + + // Give the provider time to start and get the sink reference + Thread.sleep(100); + } + + @AfterEach + public void tearDown() { + if (provider != null) { + provider.stop(); + } + } + + @Test + public void testCounters() throws Exception { + Counter counter = provider.getRootContext().getCounter("test_counter"); + counter.add(10); + + int[] count = { 0 }; + provider.dump((k, v) -> { + if (k.equals("test_counter")) { + assertEquals(10, ((Number) v).intValue()); + count[0]++; + } + }); + assertEquals(1, count[0]); + + // Adding more should work + counter.add(5); + count[0] = 0; + provider.dump((k, v) -> { + if (k.equals("test_counter")) { + assertEquals(15, ((Number) v).intValue()); + count[0]++; + } + }); + assertEquals(1, count[0]); + + // We always must get the same object + assertSame(counter, provider.getRootContext().getCounter("test_counter")); + } + + @Test + public void testCounterSet() throws Exception { + final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE; + final CounterSet counterSet = provider.getRootContext().getCounterSet(name); + final String[] keys = { "ns1", "ns2" }; + + // Update counters + counterSet.inc("ns1"); + counterSet.add("ns1", 2); + counterSet.inc("ns2"); + + // Validate with dump - CounterSet uses format "key_name" based on SimpleCounterSet + final Map expectedMetrics = new HashMap<>(); + expectedMetrics.put("ns1_" + name, 3L); + expectedMetrics.put("ns2_" + name, 1L); + + validateWithDump(expectedMetrics); + + // Validate registering with same name returns same object + assertSame(counterSet, provider.getRootContext().getCounterSet(name)); + } + + @Test + public void testCounterSet_nullName() { + assertThrows(NullPointerException.class, + () -> provider.getRootContext().getCounterSet(null)); + } + + @Test + public void testGauge() throws Exception { + int[] values = { 78, -89 }; + int[] callCounts = { 0, 0 }; + + Gauge gauge0 = () -> { + callCounts[0]++; + return values[0]; + }; + + Gauge gauge1 = () -> { + callCounts[1]++; + return values[1]; + }; + + provider.getRootContext().registerGauge("test_gauge", gauge0); + + int[] count = { 0 }; + provider.dump((k, v) -> { + if (k.equals("test_gauge")) { + assertEquals(values[0], ((Number) v).intValue()); + count[0]++; + } + }); + assertEquals(1, callCounts[0]); + assertEquals(0, callCounts[1]); + assertEquals(1, count[0]); + + // Unregister and verify + provider.getRootContext().unregisterGauge("test_gauge"); + count[0] = 0; + provider.dump((k, v) -> { + if (k.equals("test_gauge")) { + count[0]++; + } + }); + assertEquals(0, count[0]); + + // Register new gauge + provider.getRootContext().registerGauge("test_gauge", gauge1); + count[0] = 0; + provider.dump((k, v) -> { + if (k.equals("test_gauge")) { + assertEquals(values[1], ((Number) v).intValue()); + count[0]++; + } + }); + assertEquals(1, callCounts[1]); + assertEquals(1, count[0]); + } + + @Test + public void testGauge_nullGauge() { + assertThrows(NullPointerException.class, + () -> provider.getRootContext().registerGauge("test", null)); + } + + @Test + public void testBasicSummary() throws Exception { + Summary summary = provider.getRootContext().getSummary("test_summary", + MetricsContext.DetailLevel.BASIC); + summary.add(10); + summary.add(20); + summary.add(30); + + int[] count = { 0 }; + provider.dump((k, v) -> { + count[0]++; + double value = ((Number) v).doubleValue(); + + // AvgMinMaxCounter formats keys as "prefix_name", e.g., "avg_test_summary" + switch (k) { + case "avg_test_summary": + assertEquals(20.0, value, 0.01); + break; + case "min_test_summary": + assertEquals(10.0, value, 0.01); + break; + case "max_test_summary": + assertEquals(30.0, value, 0.01); + break; + case "cnt_test_summary": + assertEquals(3.0, value, 0.01); + break; + case "sum_test_summary": + assertEquals(60.0, value, 0.01); + break; + } + }); + assertTrue(count[0] >= 5); // Should have at least avg, min, max, cnt, sum + + // We always must get the same object + assertSame(summary, provider.getRootContext().getSummary("test_summary", + MetricsContext.DetailLevel.BASIC)); + + // Cannot get same name with different detail level + assertThrows(IllegalArgumentException.class, + () -> provider.getRootContext().getSummary("test_summary", + MetricsContext.DetailLevel.ADVANCED)); + } + + @Test + public void testAdvancedSummary() throws Exception { + Summary summary = provider.getRootContext().getSummary("test_advanced_summary", + MetricsContext.DetailLevel.ADVANCED); + + // Add values + for (int i = 1; i <= 100; i++) { + summary.add(i); + } + + int[] count = { 0 }; + provider.dump((k, v) -> { + if (k.contains("test_advanced_summary")) { + count[0]++; + } + }); + + // Should have avg, min, max, cnt, sum, and percentiles (p50, p95, p99, p999) + // AvgMinMaxPercentileCounter provides 9 metrics total + assertTrue(count[0] >= 5); + + // We always must get the same object + assertSame(summary, provider.getRootContext().getSummary("test_advanced_summary", + MetricsContext.DetailLevel.ADVANCED)); + + // Cannot get same name with different detail level + assertThrows(IllegalArgumentException.class, + () -> provider.getRootContext().getSummary("test_advanced_summary", + MetricsContext.DetailLevel.BASIC)); + } + + @Test + public void testSummarySet() throws Exception { + final String name = "test_summary_set"; + final String[] keys = { "key1", "key2" }; + + final SummarySet summarySet = provider.getRootContext().getSummarySet(name, + MetricsContext.DetailLevel.BASIC); + + // Update summaries + summarySet.add("key1", 10); + summarySet.add("key1", 20); + summarySet.add("key2", 5); + + int[] count = { 0 }; + provider.dump((k, v) -> { + if (k.contains("key1") || k.contains("key2")) { + count[0]++; + } + }); + + assertTrue(count[0] > 0); + + // Validate registering with same name returns same object + assertSame(summarySet, provider.getRootContext().getSummarySet(name, + MetricsContext.DetailLevel.BASIC)); + + // Cannot get same name with different detail level + assertThrows(IllegalArgumentException.class, + () -> provider.getRootContext().getSummarySet(name, + MetricsContext.DetailLevel.ADVANCED)); + } + + @Test + public void testGaugeSet() throws Exception { + final String name = QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE; + final Map metricsMap = new HashMap<>(); + metricsMap.put("ns1", 100.0); + metricsMap.put("ns2", 200.0); + + final AtomicInteger callCount = new AtomicInteger(0); + final GaugeSet gaugeSet = () -> { + callCount.incrementAndGet(); + return metricsMap; + }; + + provider.getRootContext().registerGaugeSet(name, gaugeSet); + + // Validate with dump + int[] count = { 0 }; + provider.dump((k, v) -> { + if (k.contains("ns1") || k.contains("ns2")) { + count[0]++; + } + }); + + assertTrue(count[0] >= 2); + assertTrue(callCount.get() >= 1); + + // Unregister + callCount.set(0); + provider.getRootContext().unregisterGaugeSet(name); + + count[0] = 0; + provider.dump((k, v) -> { + if (k.contains(name)) { + count[0]++; + } + }); + assertEquals(0, count[0]); + assertEquals(0, callCount.get()); + } + + @Test + public void testGaugeSet_nullName() { + assertThrows(NullPointerException.class, + () -> provider.getRootContext().registerGaugeSet(null, () -> null)); + } + + @Test + public void testGaugeSet_nullGaugeSet() { + assertThrows(NullPointerException.class, + () -> provider.getRootContext().registerGaugeSet("test", null)); + } + + @Test + public void testGaugeSet_unregisterNull() { + assertThrows(NullPointerException.class, + () -> provider.getRootContext().unregisterGaugeSet(null)); + } + + @Test + public void testMetricCollection() throws Exception { + // Register some metrics + Counter counter = provider.getRootContext().getCounter("collection_test"); + counter.add(42); + + Gauge gauge = () -> 123; + provider.getRootContext().registerGauge("collection_gauge", gauge); + + // Wait for at least one collection cycle (1 second + some buffer) + Thread.sleep(1500); + + // Verify metrics were collected + Map metrics = new HashMap<>(); + provider.dump(metrics::put); + + assertTrue(metrics.containsKey("collection_test")); + assertEquals(42L, ((Number) metrics.get("collection_test")).longValue()); + assertTrue(metrics.containsKey("collection_gauge")); + assertEquals(123, ((Number) metrics.get("collection_gauge")).intValue()); + } + + @Test + public void testResetAllValues() throws Exception { + Counter counter = provider.getRootContext().getCounter("reset_test"); + counter.add(100); + + Summary summary = provider.getRootContext().getSummary("reset_summary", + MetricsContext.DetailLevel.BASIC); + summary.add(50); + + // Verify initial values + Map metrics = new HashMap<>(); + provider.dump(metrics::put); + assertEquals(100L, ((Number) metrics.get("reset_test")).longValue()); + + // Reset + provider.resetAllValues(); + + // Verify reset + metrics.clear(); + provider.dump(metrics::put); + assertEquals(0L, ((Number) metrics.get("reset_test")).longValue()); + } + + @Test + public void testGetContext() { + MetricsContext context1 = provider.getRootContext(); + MetricsContext context2 = provider.getRootContext().getContext("subcontext"); + + // Timeline provider uses flat namespace, so sub-contexts return same context + assertSame(context1, context2); + } + + private void validateWithDump(final Map expectedMetrics) { + final Map returnedMetrics = new HashMap<>(); + provider.dump(returnedMetrics::put); + + expectedMetrics.forEach((key, value) -> { + assertTrue(returnedMetrics.containsKey(key), + "Expected metric " + key + " not found"); + assertEquals(value.longValue(), + ((Number) returnedMetrics.get(key)).longValue(), + "Value mismatch for metric " + key); + }); + } +} diff --git a/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsTestBase.java b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsTestBase.java new file mode 100644 index 00000000000..aa0a8aff947 --- /dev/null +++ b/zookeeper-metrics-providers/zookeeper-timeline-metrics/src/test/java/org/apache/zookeeper/metrics/timeline/TimelineMetricsTestBase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.metrics.timeline; + +/** + * Base class for Timeline metrics unit tests. + * + *

Provides common setup and teardown logic for all Timeline metrics tests.

+ */ +public abstract class TimelineMetricsTestBase { + + /** + * Helper method to wait for async metric collection to complete. + * + * @param maxWaitMs maximum time to wait in milliseconds + * @param condition condition to wait for + * @throws InterruptedException if interrupted while waiting + */ + protected void waitFor(long maxWaitMs, BooleanSupplier condition) throws InterruptedException { + long start = System.currentTimeMillis(); + while (!condition.getAsBoolean()) { + if (System.currentTimeMillis() - start > maxWaitMs) { + throw new AssertionError("Condition not met within " + maxWaitMs + "ms"); + } + Thread.sleep(50); + } + } + + /** + * Functional interface for boolean condition checking. + */ + @FunctionalInterface + protected interface BooleanSupplier { + boolean getAsBoolean(); + } +}