Skip to content

Commit 5c983fa

Browse files
committed
Use dedicated class hierarchy to ensure the padding is applied
1 parent 0ee9e43 commit 5c983fa

12 files changed

+295
-67
lines changed

utils/queue-utils/src/main/java/datadog/common/queue/BaseQueue.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static datadog.trace.util.BitUtils.nextPowerOfTwo;
44

5+
import datadog.common.queue.padding.PaddedSequence;
56
import java.lang.invoke.MethodHandles;
67
import java.lang.invoke.VarHandle;
78
import java.util.AbstractQueue;
@@ -16,18 +17,13 @@
1617
* @param <E> the type of elements held by this queue
1718
*/
1819
abstract class BaseQueue<E> extends AbstractQueue<E> implements NonBlockingQueue<E> {
19-
protected static final VarHandle HEAD_HANDLE;
20-
protected static final VarHandle TAIL_HANDLE;
2120
protected static final VarHandle ARRAY_HANDLE;
2221

2322
static {
2423
try {
25-
final MethodHandles.Lookup lookup = MethodHandles.lookup();
26-
HEAD_HANDLE = lookup.findVarHandle(BaseQueue.class, "head", long.class);
27-
TAIL_HANDLE = lookup.findVarHandle(BaseQueue.class, "tail", long.class);
2824
ARRAY_HANDLE = MethodHandles.arrayElementVarHandle(Object[].class);
29-
} catch (ReflectiveOperationException e) {
30-
throw new ExceptionInInitializerError(e);
25+
} catch (Throwable t) {
26+
throw new ExceptionInInitializerError(t);
3127
}
3228
}
3329

@@ -45,14 +41,10 @@ abstract class BaseQueue<E> extends AbstractQueue<E> implements NonBlockingQueue
4541
private long p0, p1, p2, p3, p4, p5, p6;
4642

4743
/** Next free slot for producer (single-threaded) */
48-
protected volatile long tail = 0L;
49-
50-
// Padding around tail
51-
@SuppressWarnings("unused")
52-
private long q0, q1, q2, q3, q4, q5, q6;
44+
protected final PaddedSequence tail = new PaddedSequence();
5345

5446
/** Next slot to consume (multi-threaded) */
55-
protected volatile long head = 0L;
47+
protected final PaddedSequence head = new PaddedSequence();
5648

5749
// Padding around head
5850
@SuppressWarnings("unused")
@@ -124,8 +116,8 @@ public final int capacity() {
124116

125117
@Override
126118
public final int size() {
127-
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
128-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
119+
long currentTail = tail.getVolatile();
120+
long currentHead = head.getVolatile();
129121
return (int) (currentTail - currentHead);
130122
}
131123
}

utils/queue-utils/src/main/java/datadog/common/queue/MpscArrayQueueVarHandle.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ public boolean offer(E e) {
4747
boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0;
4848

4949
while (true) {
50-
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
50+
long currentTail = tail.getVolatile();
5151

5252
// Check if producer limit exceeded
5353
if (currentTail >= localProducerLimit) {
5454
// Refresh head only when necessary
55-
cachedHead = (long) HEAD_HANDLE.getVolatile(this);
55+
cachedHead = head.getVolatile();
5656
localProducerLimit = cachedHead + capacity;
5757

5858
if (currentTail >= localProducerLimit) {
@@ -64,7 +64,7 @@ public boolean offer(E e) {
6464
}
6565

6666
// Attempt to claim a slot
67-
if (TAIL_HANDLE.compareAndSet(this, currentTail, currentTail + 1)) {
67+
if (tail.compareAndSet(currentTail, currentTail + 1)) {
6868
final int index = (int) (currentTail & mask);
6969

7070
// Release-store ensures producer's write is visible to consumer
@@ -91,7 +91,7 @@ public boolean offer(E e) {
9191
public final E poll() {
9292
final Object[] localBuffer = this.buffer;
9393

94-
long currentHead = (long) HEAD_HANDLE.getOpaque(this);
94+
long currentHead = head.getOpaque();
9595
final int index = (int) (currentHead & mask);
9696

9797
// Acquire-load ensures visibility of producer write
@@ -104,15 +104,15 @@ public final E poll() {
104104
ARRAY_HANDLE.setOpaque(localBuffer, index, null);
105105

106106
// Advance head using opaque write (consumer-only)
107-
HEAD_HANDLE.setOpaque(this, currentHead + 1);
107+
head.setOpaque(currentHead + 1);
108108

109109
return (E) value;
110110
}
111111

112112
@Override
113113
@SuppressWarnings("unchecked")
114114
public final E peek() {
115-
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
115+
final int index = (int) (head.getOpaque() & mask);
116116
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
117117
}
118118
}

utils/queue-utils/src/main/java/datadog/common/queue/MpscBlockingConsumerArrayQueueVarHandle.java

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package datadog.common.queue;
22

3-
import java.lang.invoke.MethodHandles;
4-
import java.lang.invoke.MethodHandles.Lookup;
5-
import java.lang.invoke.VarHandle;
3+
import datadog.common.queue.padding.PaddedThread;
64
import java.util.Objects;
75
import java.util.concurrent.TimeUnit;
86
import java.util.concurrent.locks.LockSupport;
@@ -18,29 +16,9 @@
1816
*/
1917
class MpscBlockingConsumerArrayQueueVarHandle<E> extends MpscArrayQueueVarHandle<E>
2018
implements BlockingConsumerNonBlockingQueue<E> {
21-
private static final VarHandle CONSUMER_THREAD_HANDLE;
22-
23-
static {
24-
try {
25-
final Lookup lookup = MethodHandles.lookup();
26-
CONSUMER_THREAD_HANDLE =
27-
lookup.findVarHandle(
28-
MpscBlockingConsumerArrayQueueVarHandle.class, "consumerThread", Thread.class);
29-
} catch (Throwable t) {
30-
throw new ExceptionInInitializerError(t);
31-
}
32-
}
33-
34-
// Padding to prevent false sharing
35-
@SuppressWarnings("unused")
36-
private long p0, p1, p2, p3, p4, p5, p6;
3719

3820
/** Reference to the waiting consumer thread (set atomically). */
39-
private volatile Thread consumerThread;
40-
41-
// Padding around consumerThread
42-
@SuppressWarnings("unused")
43-
private long q0, q1, q2, q3, q4, q5, q6;
21+
private volatile PaddedThread consumerThread;
4422

4523
/**
4624
* Creates a new MPSC queue.
@@ -65,12 +43,12 @@ public final boolean offer(E e) {
6543
boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0;
6644

6745
while (true) {
68-
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
46+
long currentTail = tail.getVolatile();
6947

7048
// Check if producer limit exceeded
7149
if (currentTail >= localProducerLimit) {
7250
// Refresh head only when necessary
73-
cachedHead = (long) HEAD_HANDLE.getVolatile(this);
51+
cachedHead = head.getVolatile();
7452
localProducerLimit = cachedHead + capacity;
7553

7654
if (currentTail >= localProducerLimit) {
@@ -82,14 +60,14 @@ public final boolean offer(E e) {
8260
}
8361

8462
// Attempt to claim a slot
85-
if (TAIL_HANDLE.compareAndSet(this, currentTail, currentTail + 1)) {
63+
if (tail.compareAndSet(currentTail, currentTail + 1)) {
8664
final int index = (int) (currentTail & mask);
8765

8866
// Release-store ensures producer's write is visible to consumer
8967
ARRAY_HANDLE.setRelease(localBuffer, index, e);
9068

9169
// Atomically clear and unpark the consumer if waiting
92-
Thread c = (Thread) CONSUMER_THREAD_HANDLE.getAndSet(this, null);
70+
Thread c = consumerThread.getAndSet(null);
9371
if (c != null) {
9472
LockSupport.unpark(c);
9573
}
@@ -130,7 +108,7 @@ public final E poll(long timeout, @Nonnull TimeUnit unit) throws InterruptedExce
130108

131109
@Override
132110
public E take() throws InterruptedException {
133-
consumerThread = Thread.currentThread();
111+
consumerThread.setVolatile(Thread.currentThread());
134112
E e;
135113
while ((e = poll()) == null) {
136114
parkUntilNext(-1);
@@ -151,7 +129,7 @@ public E take() throws InterruptedException {
151129
private void parkUntilNext(long nanos) throws InterruptedException {
152130
Thread current = Thread.currentThread();
153131
// Publish the consumer thread (no ordering required)
154-
CONSUMER_THREAD_HANDLE.setOpaque(this, current);
132+
consumerThread.setOpaque(current);
155133
if (nanos <= 0) {
156134
LockSupport.park(this);
157135
} else {
@@ -163,6 +141,6 @@ private void parkUntilNext(long nanos) throws InterruptedException {
163141
}
164142

165143
// Cleanup (no fence needed, single consumer)
166-
CONSUMER_THREAD_HANDLE.setOpaque(this, null);
144+
consumerThread.setOpaque(null);
167145
}
168146
}

utils/queue-utils/src/main/java/datadog/common/queue/SpmcArrayQueueVarHandle.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public SpmcArrayQueueVarHandle(int requestedCapacity) {
3535
public boolean offer(E e) {
3636
Objects.requireNonNull(e);
3737

38-
long currentTail = tail;
38+
long currentTail = tail.getVolatile();
3939
long wrapPoint = currentTail - capacity;
40-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
40+
long currentHead = head.getVolatile();
4141

4242
if (wrapPoint >= currentHead) {
4343
return false; // queue full
@@ -49,7 +49,7 @@ public boolean offer(E e) {
4949
ARRAY_HANDLE.setRelease(this.buffer, index, e);
5050

5151
// Single-producer: simple volatile write to advance tail
52-
TAIL_HANDLE.setVolatile(this, currentTail + 1);
52+
tail.setVolatile(currentTail + 1);
5353
return true;
5454
}
5555

@@ -62,20 +62,20 @@ public E poll() {
6262
boolean parkOnSpin = (Thread.currentThread().getId() & 1) == 0;
6363

6464
while (true) {
65-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
65+
long currentHead = head.getVolatile();
6666
long limit = consumerLimit; // cached tail
6767

6868
if (currentHead >= limit) {
6969
// refresh limit once from tail volatile
70-
limit = (long) TAIL_HANDLE.getVolatile(this);
70+
limit = tail.getVolatile();
7171
if (currentHead >= limit) {
7272
return null; // queue empty
7373
}
7474
consumerLimit = limit; // update local cache
7575
}
7676

7777
// Attempt to claim this slot
78-
if (!HEAD_HANDLE.compareAndSet(this, currentHead, currentHead + 1)) {
78+
if (!head.compareAndSet(currentHead, currentHead + 1)) {
7979
// CAS failed. Backoff to reduce contention
8080
if ((spinCycles & 1) == 0) {
8181
Thread.onSpinWait();
@@ -107,8 +107,8 @@ public E poll() {
107107
@Override
108108
@SuppressWarnings("unchecked")
109109
public E peek() {
110-
long currentHead = (long) HEAD_HANDLE.getVolatile(this);
111-
long currentTail = (long) TAIL_HANDLE.getVolatile(this);
110+
long currentHead = head.getVolatile();
111+
long currentTail = tail.getVolatile();
112112

113113
if (currentHead >= currentTail) return null;
114114

utils/queue-utils/src/main/java/datadog/common/queue/SpscArrayQueueVarHandle.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,46 +25,46 @@ public SpscArrayQueueVarHandle(int requestedCapacity) {
2525
public boolean offer(E e) {
2626
Objects.requireNonNull(e);
2727

28-
final long currentTail = (long) TAIL_HANDLE.getOpaque(this);
28+
final long currentTail = tail.getOpaque();
2929
final int index = (int) (currentTail & mask);
3030

3131
if (currentTail - cachedHead >= capacity) {
3232
// Refresh cached head (read from consumer side)
33-
cachedHead = (long) HEAD_HANDLE.getVolatile(this);
33+
cachedHead = (long) head.getVolatile();
3434
if (currentTail - cachedHead >= capacity) {
3535
return false; // still full
3636
}
3737
}
3838

3939
ARRAY_HANDLE.setRelease(buffer, index, e); // publish value
40-
TAIL_HANDLE.setOpaque(this, currentTail + 1); // relaxed tail update
40+
tail.setOpaque(currentTail + 1); // relaxed tail update
4141
return true;
4242
}
4343

4444
@Override
4545
@SuppressWarnings("unchecked")
4646
public E poll() {
47-
final long currentHead = (long) HEAD_HANDLE.getOpaque(this);
47+
final long currentHead = head.getOpaque();
4848
final int index = (int) (currentHead & mask);
4949

5050
if (currentHead >= cachedTail) {
5151
// refresh tail cache
52-
cachedTail = (long) TAIL_HANDLE.getVolatile(this);
52+
cachedTail = tail.getVolatile();
5353
if (currentHead >= cachedTail) {
5454
return null; // still empty
5555
}
5656
}
5757

5858
Object value = ARRAY_HANDLE.getAcquire(buffer, index);
5959
ARRAY_HANDLE.setOpaque(buffer, index, null); // clear slot
60-
HEAD_HANDLE.setOpaque(this, currentHead + 1); // relaxed head update
60+
head.setOpaque(currentHead + 1); // relaxed head update
6161
return (E) value;
6262
}
6363

6464
@Override
6565
@SuppressWarnings("unchecked")
6666
public E peek() {
67-
final int index = (int) ((long) HEAD_HANDLE.getOpaque(this) & mask);
67+
final int index = (int) (head.getOpaque() & mask);
6868
return (E) ARRAY_HANDLE.getVolatile(buffer, index);
6969
}
7070
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package datadog.common.queue.padding;
2+
3+
/** Left-hand-side (LHS) padding to prevent false sharing. */
4+
class LhsPadding {
5+
/** 7 bytes padding field to occupy space on the left side of the value. */
6+
private long p1, p2, p3, p4, p5, p6, p7;
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package datadog.common.queue.padding;
2+
3+
/** Right-hand-side (RHS) padding to prevent false sharing. */
4+
class LongRhsPadding extends LongValue {
5+
/** 7 bytes fields to occupy space on the right side of the value. */
6+
private long p9, p10, p11, p12, p13, p14, p15;
7+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package datadog.common.queue.padding;
2+
3+
/**
4+
* Holds the actual sequence value, padded on the left to prevent false sharing with preceding
5+
* fields.
6+
*/
7+
class LongValue extends LhsPadding {
8+
/** The volatile value being protected from false sharing. */
9+
protected volatile long value;
10+
}

0 commit comments

Comments
 (0)