diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java index 0d3ce15c1bf..b024b5caafc 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java @@ -6,7 +6,6 @@ package io.opentelemetry.sdk.metrics.internal.aggregator; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.internal.GuardedBy; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -27,6 +26,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; /** @@ -92,22 +92,8 @@ static final class Handle extends AggregatorHandle { // read-only private final double[] boundaries; - private final Object lock = new Object(); - - @GuardedBy("lock") - private double sum; - - @GuardedBy("lock") - private double min; - - @GuardedBy("lock") - private double max; - - @GuardedBy("lock") - private long count; - - @GuardedBy("lock") - private final long[] counts; + private final Cell[] cells; + private final long[] countsArr; // Used only when MemoryMode = REUSABLE_DATA @Nullable private final MutableHistogramPointData reusablePoint; @@ -120,13 +106,13 @@ static final class Handle extends AggregatorHandle { super(reservoirFactory, /* isDoubleType= */ true); this.boundaryList = boundaryList; this.boundaries = boundaries; - this.counts = new long[this.boundaries.length + 1]; - this.sum = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; + this.cells = new Cell[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < cells.length; i++) { + cells[i] = new Cell(boundaries.length + 1); + } + this.countsArr = new long[boundaries.length + 1]; if (memoryMode == MemoryMode.REUSABLE_DATA) { - this.reusablePoint = new MutableHistogramPointData(counts.length); + this.reusablePoint = new MutableHistogramPointData(countsArr.length); } else { this.reusablePoint = null; } @@ -147,8 +133,33 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( Attributes attributes, List exemplars, boolean reset) { - synchronized (lock) { + for (Cell cell : cells) { + cell.lock.lock(); + } + try { HistogramPointData pointData; + Arrays.fill(countsArr, 0); + double sum = 0; + long count = 0; + double min = Double.MAX_VALUE; + double max = -1; + + for (Cell cell : cells) { + sum += cell.sum; + min = Math.min(min, cell.min); + max = Math.max(max, cell.max); + for (int i = 0; i < cell.counts.length; i++) { + long currentCellCount = cell.counts[i]; + count += currentCellCount; + countsArr[i] += currentCellCount; + } + if (reset) { + cell.sum = 0; + cell.min = Double.MAX_VALUE; + cell.max = -1; + Arrays.fill(cell.counts, 0); + } + } if (reusablePoint == null) { pointData = ImmutableHistogramPointData.create( @@ -156,12 +167,12 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( epochNanos, attributes, sum, - this.count > 0, - this.min, - this.count > 0, - this.max, + count > 0, + min, + count > 0, + max, boundaryList, - PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)), + PrimitiveLongList.wrap(Arrays.copyOf(countsArr, countsArr.length)), exemplars); } else /* REUSABLE_DATA */ { pointData = @@ -170,22 +181,19 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( epochNanos, attributes, sum, - this.count > 0, - this.min, - this.count > 0, - this.max, + count > 0, + min, + count > 0, + max, boundaryList, - counts, + countsArr, exemplars); } - if (reset) { - this.sum = 0; - this.min = Double.MAX_VALUE; - this.max = -1; - this.count = 0; - Arrays.fill(this.counts, 0); - } return pointData; + } finally { + for (Cell cell : cells) { + cell.lock.unlock(); + } } } @@ -193,12 +201,28 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles( protected void doRecordDouble(double value) { int bucketIndex = ExplicitBucketHistogramUtils.findBucketIndex(this.boundaries, value); - synchronized (lock) { - this.sum += value; - this.min = Math.min(this.min, value); - this.max = Math.max(this.max, value); - this.count++; - this.counts[bucketIndex]++; + int cellIndex = Math.abs((int) (Thread.currentThread().getId() % cells.length)); + Cell cell = cells[cellIndex]; + cell.lock.lock(); + try { + cell.sum += value; + cell.min = Math.min(cell.min, value); + cell.max = Math.max(cell.max, value); + cell.counts[bucketIndex]++; + } finally { + cell.lock.unlock(); + } + } + + private static class Cell { + private final ReentrantLock lock = new ReentrantLock(); + private final long[] counts; + private double sum = 0; + private double min = Double.MAX_VALUE; + private double max = -1; + + private Cell(int buckets) { + this.counts = new long[buckets]; } } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index a746c00dec0..68444c1f84c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -30,7 +30,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -238,18 +238,14 @@ AggregatorHandle maybeGetPooledAggregatorHandle() { * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that * its safe to proceed with Collect operations. */ + @SuppressWarnings("ThreadPriorityCheck") private AggregatorHolder getHolderForRecord() { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 == 0) { - return aggregatorHolder; - } else { - // Collect is in progress, decrement recordsInProgress to allow collect to proceed and - // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - } while (true); + AggregatorHolder aggregatorHolder = this.aggregatorHolder; + while (!aggregatorHolder.tryAcquireForRecord()) { + aggregatorHolder = this.aggregatorHolder; + Thread.yield(); + } + return aggregatorHolder; } /** @@ -257,7 +253,7 @@ private AggregatorHolder getHolderForRecord() { * indicate that recording is complete, and it is safe to collect. */ private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + aggregatorHolder.releaseForRecord(); } @Override @@ -266,22 +262,15 @@ public MetricData collect( InstrumentationScopeInfo instrumentationScopeInfo, long startEpochNanos, long epochNanos) { - ConcurrentHashMap> aggregatorHandles; AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = (memoryMode == REUSABLE_DATA) ? new AggregatorHolder<>(previousCollectionAggregatorHandles) : new AggregatorHolder<>(); - // Increment recordsInProgress by 1, which produces an odd number acting as a signal that - // record operations should re-read the volatile this.aggregatorHolder. - // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record - // operations are complete. - int recordsInProgress = holder.activeRecordingThreads.addAndGet(1); - while (recordsInProgress > 1) { - recordsInProgress = holder.activeRecordingThreads.get(); - } - aggregatorHandles = holder.aggregatorHandles; + holder.acquireForCollect(); + ConcurrentHashMap> aggregatorHandles = + holder.aggregatorHandles; List points; if (memoryMode == REUSABLE_DATA) { @@ -379,14 +368,38 @@ private static class AggregatorHolder { // (AggregatorHolder), and so if a recording thread encounters an odd value, // all it needs to do is release the "read lock" it just obtained (decrementing by 2), // and then grab and record against the new current interval (AggregatorHolder). - private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + private final ReentrantLock[] locks; private AggregatorHolder() { - aggregatorHandles = new ConcurrentHashMap<>(); + this(new ConcurrentHashMap<>()); } private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; + locks = new ReentrantLock[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < locks.length; i++) { + locks[i] = new ReentrantLock(); + } + } + + private boolean tryAcquireForRecord() { + return forThread().tryLock(); + } + + private void releaseForRecord() { + forThread().unlock(); + } + + @SuppressWarnings("ThreadPriorityCheck") + private void acquireForCollect() { + for (int i = 0; i < locks.length; i++) { + locks[i].lock(); + } + } + + private ReentrantLock forThread() { + int index = Math.abs((int) (Thread.currentThread().getId() % locks.length)); + return locks[index]; } }