From a66bb1a56e5ea0e885160a19d025fda20bd8ca26 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 12 Feb 2026 10:59:50 -0600 Subject: [PATCH 1/3] striped atomicinteger in sync metric storage --- .../DefaultSynchronousMetricStorage.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index a746c00dec0..230d5887b64 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -238,18 +238,15 @@ AggregatorHandle maybeGetPooledAggregatorHandle() { * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that * its safe to proceed with Collect operations. */ + @SuppressWarnings("ThreadPriorityCheck") private AggregatorHolder getHolderForRecord() { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 == 0) { - return aggregatorHolder; - } else { - // Collect is in progress, decrement recordsInProgress to allow collect to proceed and - // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - } while (true); + AggregatorHolder aggregatorHolder = this.aggregatorHolder; + while (!aggregatorHolder.tryAcquireForRecord()) { + aggregatorHolder.releaseForRecord(); + aggregatorHolder = this.aggregatorHolder; + Thread.yield(); + } + return aggregatorHolder; } /** @@ -257,7 +254,7 @@ private AggregatorHolder getHolderForRecord() { * indicate that recording is complete, and it is safe to collect. */ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + aggregatorHolder.releaseForRecord(); } @Override @@ -266,22 +263,14 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - ConcurrentHashMap> aggregatorHandles; AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = (memoryMode == REUSABLE_DATA) ? new AggregatorHolder<>(previousCollectionAggregatorHandles) : new AggregatorHolder<>(); - // Increment recordsInProgress by 1, which produces an odd number acting as a signal that - // record operations should re-read the volatile this.aggregatorHolder. - // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record - // operations are complete. - int recordsInProgress = holder.activeRecordingThreads.addAndGet(1); - while (recordsInProgress > 1) { - recordsInProgress = holder.activeRecordingThreads.get(); - } - aggregatorHandles = holder.aggregatorHandles; + holder.acquireForCollect(); + ConcurrentHashMap> aggregatorHandles = holder.aggregatorHandles; List points; if (memoryMode == REUSABLE_DATA) { @@ -379,14 +368,44 @@ private static class AggregatorHolder { // (AggregatorHolder), and so if a recording thread encounters an odd value, // all it needs to do is release the "read lock" it just obtained (decrementing by 2), // and then grab and record against the new current interval (AggregatorHolder). - private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + private final AtomicInteger[] activeRecordingThreads; private AggregatorHolder() { - aggregatorHandles = new ConcurrentHashMap<>(); + this(new ConcurrentHashMap<>()); } private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; + activeRecordingThreads = new AtomicInteger[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < activeRecordingThreads.length; i++) { + activeRecordingThreads[i] = new AtomicInteger(0); + } + } + + private boolean tryAcquireForRecord() { + return forThread().addAndGet(2) % 2 == 0; + } + + private void releaseForRecord() { + forThread().addAndGet(-2); + } + + @SuppressWarnings("ThreadPriorityCheck") + private void acquireForCollect() { + for (int i = 0; i < activeRecordingThreads.length; i++) { + activeRecordingThreads[i].addAndGet(1); + } + for (int i = 0; i < activeRecordingThreads.length; i++) { + AtomicInteger val = activeRecordingThreads[i]; + while (val.get() > 1) { + Thread.yield(); + } + } + } + + private AtomicInteger forThread() { + int index = Math.abs((int) (Thread.currentThread().getId() % activeRecordingThreads.length)); + return activeRecordingThreads[index]; } } From dd6313443ee5caa8e6b67eed3126fcea38c7221d Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 12 Feb 2026 13:33:50 -0600 Subject: [PATCH 2/3] Split explicit bucket aggregator into cells --- ...ubleExplicitBucketHistogramAggregator.java | 118 +++++++++++------- .../DefaultSynchronousMetricStorage.java | 3 +- 2 files changed, 73 insertions(+), 48 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java index 0d3ce15c1bf..b024b5caafc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.internal.GuardedBy; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -27,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; /** @@ -92,22 +92,8 @@ static final class Handle extends AggregatorHandle { // read-only private final double[] boundaries; - private final Object lock = new Object(); - - @GuardedBy("lock") - private double sum; - - @GuardedBy("lock") - private double min; - - @GuardedBy("lock") - private double max; - - @GuardedBy("lock") - private long count; - - @GuardedBy("lock") - private final long[] counts; + private final Cell[] cells; + private final long[] countsArr; // Used only when MemoryMode = REUSABLE_DATA @Nullable private final MutableHistogramPointData reusablePoint; @@ -120,13 +106,13 @@ static final class Handle extends AggregatorHandle { super(reservoirFactory, /* isDoubleType= */ true); this.boundaryList = boundaryList; this.boundaries = boundaries; - this.counts = new long[this.boundaries.length + 1]; - this.sum = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; + this.cells = new Cell[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < cells.length; i++) { + cells[i] = new Cell(boundaries.length + 1); + } + this.countsArr = new long[boundaries.length + 1]; if (memoryMode == MemoryMode.REUSABLE_DATA) { - this.reusablePoint = new MutableHistogramPointData(counts.length); + this.reusablePoint = new MutableHistogramPointData(countsArr.length); } else { this.reusablePoint = null; } @@ -147,8 +133,33 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( Attributes attributes, List exemplars, boolean reset) { - synchronized (lock) { + for (Cell cell : cells) { + cell.lock.lock(); + } + try { HistogramPointData pointData; + Arrays.fill(countsArr, 0); + double sum = 0; + long count = 0; + double min = Double.MAX_VALUE; + double max = -1; + + for (Cell cell : cells) { + sum += cell.sum; + min = Math.min(min, cell.min); + max = Math.max(max, cell.max); + for (int i = 0; i < cell.counts.length; i++) { + long currentCellCount = cell.counts[i]; + count += currentCellCount; + countsArr[i] += currentCellCount; + } + if (reset) { + cell.sum = 0; + cell.min = Double.MAX_VALUE; + cell.max = -1; + Arrays.fill(cell.counts, 0); + } + } if (reusablePoint == null) { pointData = ImmutableHistogramPointData.create( @@ -156,12 +167,12 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( epochNanos, attributes, sum, - this.count > 0, - this.min, - this.count > 0, - this.max, + count > 0, + min, + count > 0, + max, boundaryList, - PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)), + PrimitiveLongList.wrap(Arrays.copyOf(countsArr, countsArr.length)), exemplars); } else /* REUSABLE_DATA */ { pointData = @@ -170,22 +181,19 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( epochNanos, attributes, sum, - this.count > 0, - this.min, - this.count > 0, - this.max, + count > 0, + min, + count > 0, + max, boundaryList, - counts, + countsArr, exemplars); } - if (reset) { - this.sum = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; - Arrays.fill(this.counts, 0); - } return pointData; + } finally { + for (Cell cell : cells) { + cell.lock.unlock(); + } } } @@ -193,12 +201,28 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( protected void doRecordDouble(double value) { int bucketIndex = ExplicitBucketHistogramUtils.findBucketIndex(this.boundaries, value); - synchronized (lock) { - this.sum += value; - this.min = Math.min(this.min, value); - this.max = Math.max(this.max, value); - this.count++; - this.counts[bucketIndex]++; + int cellIndex = Math.abs((int) (Thread.currentThread().getId() % cells.length)); + Cell cell = cells[cellIndex]; + cell.lock.lock(); + try { + cell.sum += value; + cell.min = Math.min(cell.min, value); + cell.max = Math.max(cell.max, value); + cell.counts[bucketIndex]++; + } finally { + cell.lock.unlock(); + } + } + + private static class Cell { + private final ReentrantLock lock = new ReentrantLock(); + private final long[] counts; + private double sum = 0; + private double min = Double.MAX_VALUE; + private double max = -1; + + private Cell(int buckets) { + this.counts = new long[buckets]; } } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 230d5887b64..3faff6eda50 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -270,7 +270,8 @@ public MetricData collect( : new AggregatorHolder<>(); holder.acquireForCollect(); - ConcurrentHashMap> aggregatorHandles = holder.aggregatorHandles; + ConcurrentHashMap> aggregatorHandles = + holder.aggregatorHandles; List points; if (memoryMode == REUSABLE_DATA) { From 68238cb16cb2e8b80ef1c715e8e8fd4dc859a6ec Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Thu, 12 Feb 2026 13:58:54 -0600 Subject: [PATCH 3/3] Use striped reentrent lock in sync metric stroage instead of striped AtomicLong --- .../DefaultSynchronousMetricStorage.java | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index 3faff6eda50..68444c1f84c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -30,7 +30,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -242,7 +242,6 @@ AggregatorHandle maybeGetPooledAggregatorHandle() { private AggregatorHolder getHolderForRecord() { AggregatorHolder aggregatorHolder = this.aggregatorHolder; while (!aggregatorHolder.tryAcquireForRecord()) { - aggregatorHolder.releaseForRecord(); aggregatorHolder = this.aggregatorHolder; Thread.yield(); } @@ -369,7 +368,7 @@ private static class AggregatorHolder { // (AggregatorHolder), and so if a recording thread encounters an odd value, // all it needs to do is release the "read lock" it just obtained (decrementing by 2), // and then grab and record against the new current interval (AggregatorHolder). - private final AtomicInteger[] activeRecordingThreads; + private final ReentrantLock[] locks; private AggregatorHolder() { this(new ConcurrentHashMap<>()); @@ -377,36 +376,30 @@ private AggregatorHolder() { private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; - activeRecordingThreads = new AtomicInteger[Runtime.getRuntime().availableProcessors()]; - for (int i = 0; i < activeRecordingThreads.length; i++) { - activeRecordingThreads[i] = new AtomicInteger(0); + locks = new ReentrantLock[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < locks.length; i++) { + locks[i] = new ReentrantLock(); } } private boolean tryAcquireForRecord() { - return forThread().addAndGet(2) % 2 == 0; + return forThread().tryLock(); } private void releaseForRecord() { - forThread().addAndGet(-2); + forThread().unlock(); } @SuppressWarnings("ThreadPriorityCheck") private void acquireForCollect() { - for (int i = 0; i < activeRecordingThreads.length; i++) { - activeRecordingThreads[i].addAndGet(1); - } - for (int i = 0; i < activeRecordingThreads.length; i++) { - AtomicInteger val = activeRecordingThreads[i]; - while (val.get() > 1) { - Thread.yield(); - } + for (int i = 0; i < locks.length; i++) { + locks[i].lock(); } } - private AtomicInteger forThread() { - int index = Math.abs((int) (Thread.currentThread().getId() % activeRecordingThreads.length)); - return activeRecordingThreads[index]; + private ReentrantLock forThread() { + int index = Math.abs((int) (Thread.currentThread().getId() % locks.length)); + return locks[index]; } }