Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
Expand All @@ -27,6 +26,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -92,22 +92,8 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
// read-only
private final double[] boundaries;

private final Object lock = new Object();

@GuardedBy("lock")
private double sum;

@GuardedBy("lock")
private double min;

@GuardedBy("lock")
private double max;

@GuardedBy("lock")
private long count;

@GuardedBy("lock")
private final long[] counts;
private final Cell[] cells;
private final long[] countsArr;

// Used only when MemoryMode = REUSABLE_DATA
@Nullable private final MutableHistogramPointData reusablePoint;
Expand All @@ -120,13 +106,13 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
super(reservoirFactory, /* isDoubleType= */ true);
this.boundaryList = boundaryList;
this.boundaries = boundaries;
this.counts = new long[this.boundaries.length + 1];
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
this.cells = new Cell[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < cells.length; i++) {
cells[i] = new Cell(boundaries.length + 1);
}
this.countsArr = new long[boundaries.length + 1];
if (memoryMode == MemoryMode.REUSABLE_DATA) {
this.reusablePoint = new MutableHistogramPointData(counts.length);
this.reusablePoint = new MutableHistogramPointData(countsArr.length);
} else {
this.reusablePoint = null;
}
Expand All @@ -147,21 +133,46 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles(
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
synchronized (lock) {
for (Cell cell : cells) {
cell.lock.lock();
}
try {
HistogramPointData pointData;
Arrays.fill(countsArr, 0);
double sum = 0;
long count = 0;
double min = Double.MAX_VALUE;
double max = -1;

for (Cell cell : cells) {
sum += cell.sum;
min = Math.min(min, cell.min);
max = Math.max(max, cell.max);
for (int i = 0; i < cell.counts.length; i++) {
long currentCellCount = cell.counts[i];
count += currentCellCount;
countsArr[i] += currentCellCount;
}
if (reset) {
cell.sum = 0;
cell.min = Double.MAX_VALUE;
cell.max = -1;
Arrays.fill(cell.counts, 0);
}
}
if (reusablePoint == null) {
pointData =
ImmutableHistogramPointData.create(
startEpochNanos,
epochNanos,
attributes,
sum,
this.count > 0,
this.min,
this.count > 0,
this.max,
count > 0,
min,
count > 0,
max,
boundaryList,
PrimitiveLongList.wrap(Arrays.copyOf(counts, counts.length)),
PrimitiveLongList.wrap(Arrays.copyOf(countsArr, countsArr.length)),
exemplars);
} else /* REUSABLE_DATA */ {
pointData =
Expand All @@ -170,35 +181,48 @@ protected HistogramPointData doAggregateThenMaybeResetDoubles(
epochNanos,
attributes,
sum,
this.count > 0,
this.min,
this.count > 0,
this.max,
count > 0,
min,
count > 0,
max,
boundaryList,
counts,
countsArr,
exemplars);
}
if (reset) {
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
Arrays.fill(this.counts, 0);
}
return pointData;
} finally {
for (Cell cell : cells) {
cell.lock.unlock();
}
}
}

@Override
protected void doRecordDouble(double value) {
int bucketIndex = ExplicitBucketHistogramUtils.findBucketIndex(this.boundaries, value);

synchronized (lock) {
this.sum += value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.count++;
this.counts[bucketIndex]++;
int cellIndex = Math.abs((int) (Thread.currentThread().getId() % cells.length));
Cell cell = cells[cellIndex];
cell.lock.lock();
try {
cell.sum += value;
cell.min = Math.min(cell.min, value);
cell.max = Math.max(cell.max, value);
cell.counts[bucketIndex]++;
} finally {
cell.lock.unlock();
}
}

private static class Cell {
private final ReentrantLock lock = new ReentrantLock();
private final long[] counts;
private double sum = 0;
private double min = Double.MAX_VALUE;
private double max = -1;

private Cell(int buckets) {
this.counts = new long[buckets];
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -238,26 +238,22 @@ AggregatorHandle<T> maybeGetPooledAggregatorHandle() {
* #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that
* its safe to proceed with Collect operations.
*/
@SuppressWarnings("ThreadPriorityCheck")
private AggregatorHolder<T> getHolderForRecord() {
do {
AggregatorHolder<T> aggregatorHolder = this.aggregatorHolder;
int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2);
if (recordsInProgress % 2 == 0) {
return aggregatorHolder;
} else {
// Collect is in progress, decrement recordsInProgress to allow collect to proceed and
// re-read aggregatorHolder
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
}
} while (true);
AggregatorHolder<T> aggregatorHolder = this.aggregatorHolder;
while (!aggregatorHolder.tryAcquireForRecord()) {
aggregatorHolder = this.aggregatorHolder;
Thread.yield();
}
return aggregatorHolder;
}

/**
* Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to
* indicate that recording is complete, and it is safe to collect.
*/
private void releaseHolderForRecord(AggregatorHolder<T> aggregatorHolder) {
aggregatorHolder.activeRecordingThreads.addAndGet(-2);
aggregatorHolder.releaseForRecord();
}

@Override
Expand All @@ -266,22 +262,15 @@ public MetricData collect(
InstrumentationScopeInfo instrumentationScopeInfo,
long startEpochNanos,
long epochNanos) {
ConcurrentHashMap<Attributes, AggregatorHandle<T>> aggregatorHandles;
AggregatorHolder<T> holder = this.aggregatorHolder;
this.aggregatorHolder =
(memoryMode == REUSABLE_DATA)
? new AggregatorHolder<>(previousCollectionAggregatorHandles)
: new AggregatorHolder<>();

// Increment recordsInProgress by 1, which produces an odd number acting as a signal that
// record operations should re-read the volatile this.aggregatorHolder.
// Repeatedly grab recordsInProgress until it is <= 1, which signals all active record
// operations are complete.
int recordsInProgress = holder.activeRecordingThreads.addAndGet(1);
while (recordsInProgress > 1) {
recordsInProgress = holder.activeRecordingThreads.get();
}
aggregatorHandles = holder.aggregatorHandles;
holder.acquireForCollect();
ConcurrentHashMap<Attributes, AggregatorHandle<T>> aggregatorHandles =
holder.aggregatorHandles;

List<T> points;
if (memoryMode == REUSABLE_DATA) {
Expand Down Expand Up @@ -379,14 +368,38 @@ private static class AggregatorHolder<T extends PointData> {
// (AggregatorHolder), and so if a recording thread encounters an odd value,
// all it needs to do is release the "read lock" it just obtained (decrementing by 2),
// and then grab and record against the new current interval (AggregatorHolder).
private final AtomicInteger activeRecordingThreads = new AtomicInteger(0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: update javadoc comments to reflect current design before merging

private final ReentrantLock[] locks;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud:

Initially I went with a striped set of AtomicInt here: AtomicInteger[]

But a striped set of ReentrantLocks was more performant in the benchmark:

Benchmark Temporality Cardinality Instrument Type With Striped Atomic Long With Striped Reentrant Lock Change %
threads1 DELTA 1 COUNTER_SUM 10624.568 8113.116 -23.6%
threads1 DELTA 1 UP_DOWN_COUNTER_SUM 7051.987 7116.562 +0.9%
threads1 DELTA 1 GAUGE_LAST_VALUE 3004.456 3579.052 +19.1%
threads1 DELTA 1 HISTOGRAM_EXPLICIT 6193.367 5108.527 -17.5%
threads1 DELTA 1 HISTOGRAM_BASE2_EXPONENTIAL 4068.654 3767.116 -7.4%
threads1 DELTA 100 COUNTER_SUM 7345.512 7705.679 +4.9%
threads1 DELTA 100 UP_DOWN_COUNTER_SUM 6728.076 7870.198 +17.0%
threads1 DELTA 100 GAUGE_LAST_VALUE 2816.403 2998.816 +6.5%
threads1 DELTA 100 HISTOGRAM_EXPLICIT 6707.058 4143.646 -38.2%
threads1 DELTA 100 HISTOGRAM_BASE2_EXPONENTIAL 3486.534 3668.988 +5.2%
threads1 CUMULATIVE 1 COUNTER_SUM 14545.199 15211.867 +4.6%
threads1 CUMULATIVE 1 UP_DOWN_COUNTER_SUM 14764.813 15272.696 +3.4%
threads1 CUMULATIVE 1 GAUGE_LAST_VALUE 5054.376 7315.585 +44.7%
threads1 CUMULATIVE 1 HISTOGRAM_EXPLICIT 8409.192 6646.129 -21.0%
threads1 CUMULATIVE 1 HISTOGRAM_BASE2_EXPONENTIAL 3602.837 4202.725 +16.7%
threads1 CUMULATIVE 100 COUNTER_SUM 7977.291 8553.166 +7.2%
threads1 CUMULATIVE 100 UP_DOWN_COUNTER_SUM 8651.882 9148.823 +5.7%
threads1 CUMULATIVE 100 GAUGE_LAST_VALUE 8066.028 8658.896 +7.4%
threads1 CUMULATIVE 100 HISTOGRAM_EXPLICIT 6841.747 5590.956 -18.3%
threads1 CUMULATIVE 100 HISTOGRAM_BASE2_EXPONENTIAL 3769.735 4061.193 +7.7%
threads4 DELTA 1 COUNTER_SUM 1036.584 3642.403 +251.4%
threads4 DELTA 1 UP_DOWN_COUNTER_SUM 953.509 2202.933 +131.0%
threads4 DELTA 1 GAUGE_LAST_VALUE 1063.733 1886.270 +77.3%
threads4 DELTA 1 HISTOGRAM_EXPLICIT 1657.467 2254.040 +36.0%
threads4 DELTA 1 HISTOGRAM_BASE2_EXPONENTIAL 928.849 1623.778 +74.8%
threads4 DELTA 100 COUNTER_SUM 1593.081 4987.529 +213.1%
threads4 DELTA 100 UP_DOWN_COUNTER_SUM 3318.371 4880.035 +47.1%
threads4 DELTA 100 GAUGE_LAST_VALUE 2630.933 3816.537 +45.1%
threads4 DELTA 100 HISTOGRAM_EXPLICIT 2153.202 5494.981 +155.2%
threads4 DELTA 100 HISTOGRAM_BASE2_EXPONENTIAL 2039.287 3385.470 +66.0%
threads4 CUMULATIVE 1 COUNTER_SUM 4189.110 6061.089 +44.7%
threads4 CUMULATIVE 1 UP_DOWN_COUNTER_SUM 4060.842 6635.251 +63.4%
threads4 CUMULATIVE 1 GAUGE_LAST_VALUE 1386.341 1971.709 +42.2%
threads4 CUMULATIVE 1 HISTOGRAM_EXPLICIT 1588.722 3745.206 +135.7%
threads4 CUMULATIVE 1 HISTOGRAM_BASE2_EXPONENTIAL 1159.695 1818.146 +56.8%
threads4 CUMULATIVE 100 COUNTER_SUM 4060.759 7510.675 +85.0%
threads4 CUMULATIVE 100 UP_DOWN_COUNTER_SUM 4674.983 6670.130 +42.7%
threads4 CUMULATIVE 100 GAUGE_LAST_VALUE 4033.773 6777.530 +68.0%
threads4 CUMULATIVE 100 HISTOGRAM_EXPLICIT 3511.330 9629.410 +174.3%
threads4 CUMULATIVE 100 HISTOGRAM_BASE2_EXPONENTIAL 3415.420 5692.648 +66.7%

But this might not be the whole picture:

  • The benchmarks show improvement in the threads4 cumulative case, despite the cumulative path being unchanged here. So there's definitely some noise / variance in the benchmark.
  • The lock approach should never have contention if each recording thread is obtaining its own lock. And this should be true some of the time. But its not true all the time since I use threadId % locks.length to select the lock. This means that there could be cases where the recording threadIds hit the same lock, and then are blocked waiting for the other to complete. The AtomicInteger[] design may be more expensive, but allows multiple recording threads to access to the same underlying aggregator concurrently.


private AggregatorHolder() {
aggregatorHandles = new ConcurrentHashMap<>();
this(new ConcurrentHashMap<>());
}

private AggregatorHolder(ConcurrentHashMap<Attributes, AggregatorHandle<T>> aggregatorHandles) {
this.aggregatorHandles = aggregatorHandles;
locks = new ReentrantLock[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < locks.length; i++) {
locks[i] = new ReentrantLock();
}
}

private boolean tryAcquireForRecord() {
return forThread().tryLock();
}

private void releaseForRecord() {
forThread().unlock();
}

@SuppressWarnings("ThreadPriorityCheck")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove leftover annotation from a previous revision

private void acquireForCollect() {
for (int i = 0; i < locks.length; i++) {
locks[i].lock();
}
}

private ReentrantLock forThread() {
int index = Math.abs((int) (Thread.currentThread().getId() % locks.length));
return locks[index];
}
}

Expand Down
Loading