diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 971b18be..ce99babf 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -96,7 +96,8 @@ operations on these (previous) segments, and we'll end up wanting to remove such private final int capacity; final boolean isRendezvous; - private final boolean isUnlimited; + + // final boolean isUnlimited = capacity < 0; !isUnlimited = capacity >= 0 // mutable state @@ -163,15 +164,14 @@ operations on these (previous) segments, and we'll end up wanting to remove such * capacity is 0. */ private Channel(int capacity) { - if (capacity < UNLIMITED_CAPACITY) { + if (capacity < UNLIMITED_CAPACITY) throw new IllegalArgumentException( "Capacity must be 0 (rendezvous), positive (buffered) or -1 (unlimited" + " channels)."); - } this.capacity = capacity; isRendezvous = capacity == 0L; - isUnlimited = capacity == UNLIMITED_CAPACITY; + boolean isUnlimited = capacity == UNLIMITED_CAPACITY; var isRendezvousOrUnlimited = isRendezvous || isUnlimited; var firstSegment = @@ -198,8 +198,11 @@ private void processInitialBuffer() { var currentSegment = bufferEndSegment; // the number of segments where all cells are processed, or some are processed (last segment - // of the buffer) - var segmentsToProcess = (int) Math.ceil((double) capacity / Segment.SEGMENT_SIZE); + // of the buffer) = Math.ceil((double) capacity / Segment.SEGMENT_SIZE) + int segmentsToProcess = + capacity <= 0 + ? 0 + : (int) ((capacity + Segment.SEGMENT_SIZE - 1L) / Segment.SEGMENT_SIZE); for (int segmentId = 0; segmentId < segmentsToProcess; segmentId++) { currentSegment = @@ -353,8 +356,8 @@ private Object updateCellSend( var state = segment.getCell(i); if (state == null) { - // reading the buffer end & receiver's counter if needed - if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { + // reading the buffer end & receiver's counter if needed: !isUnlimited + if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { // cell is empty, and no receiver, not in buffer -> suspend if (select != null) { // cell is empty, no receiver, and we are in a select -> store the select @@ -608,17 +611,20 @@ private Object updateCellReceive( } // else: CAS unsuccessful, repeat } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND) { - // cell interrupted -> trying with a new one - return ReceiveResult.FAILED; - } else if (state == RESUMING) { - // expandBuffer() is resuming the sender -> repeat - Thread.onSpinWait(); - } else if (state == CLOSED) { - return ReceiveResult.CLOSED; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND -> { + // cell interrupted -> trying with a new one + return ReceiveResult.FAILED; + } + case CellState.RESUMING -> + // expandBuffer() is resuming the sender -> repeat + Thread.onSpinWait(); + case CellState.CLOSED -> { + return ReceiveResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value segment.setCell(i, DONE); @@ -634,7 +640,7 @@ private Object updateCellReceive( // **************** private void expandBuffer() { - if (isRendezvous || isUnlimited) return; + if (capacity <= 0) return; // isRendezvous || isUnlimited while (true) { // reading the segment before the counter increment - this is needed to find the // required segment later @@ -738,23 +744,28 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) { // happened return ExpandBufferResult.DONE; } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND) { - // a sender was interrupted - restart - return ExpandBufferResult.FAILED; - } else if (state == INTERRUPTED_RECEIVE) { - // a receiver continuation must have been here before - another buffer expansion - // already happened - return ExpandBufferResult.DONE; - } else if (state == BROKEN) { - // the cell is broken, receive() started another buffer expansion - return ExpandBufferResult.DONE; - } else if (state == RESUMING) { - Thread.onSpinWait(); // receive() is resuming the sender -> repeat - } else if (state == CLOSED) { - return ExpandBufferResult.CLOSED; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND -> { + // a sender was interrupted - restart + return ExpandBufferResult.FAILED; + } + case CellState.INTERRUPTED_RECEIVE -> { + // a receiver continuation must have been here before - another buffer + // expansion already happened + return ExpandBufferResult.DONE; + } + case CellState.BROKEN -> { + // the cell is broken, receive() started another buffer expansion + return ExpandBufferResult.DONE; + } + case CellState.RESUMING -> + Thread.onSpinWait(); // receive() is resuming the sender -> repeat + case CellState.CLOSED -> { + return ExpandBufferResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value: if the ordering of operations was different, we would put @@ -909,18 +920,21 @@ private void updateCellClose(Segment segment, int i) { Thread.onSpinWait(); } } else if (state instanceof CellState) { - if (state == DONE || state == BROKEN) { - // nothing to do - a sender & receiver have already met - return; - } else if (state == INTERRUPTED_RECEIVE || state == INTERRUPTED_SEND) { - // nothing to do - segment counters already decremented or waiting to be - // decremented - return; - } else if (state == RESUMING) { - Thread.onSpinWait(); // receive() or expandBuffer() are resuming the cell - wait - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.DONE, CellState.BROKEN -> { + // nothing to do - a sender & receiver have already met + return; + } + case CellState.INTERRUPTED_RECEIVE, CellState.INTERRUPTED_SEND -> { + // nothing to do - segment counters already decremented or waiting to be + // decremented + return; + } + case CellState.RESUMING -> + Thread.onSpinWait();// receive() or expandBuffer() are resuming the cell - wait + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value: discarding @@ -1014,21 +1028,25 @@ private boolean hasValueToReceive(Segment segment, int i) { } else if (state instanceof StoredSelectClause ss) { return ss.isSender(); // as above } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND || state == INTERRUPTED_RECEIVE) { - // cell interrupted -> nothing to receive; in case of an interrupted receiver, - // the counter is already updated - return false; - } else if (state == RESUMING) { - // receive() or expandBuffer() is resuming the sender -> repeat - Thread.onSpinWait(); - } else if (state == CLOSED) { - return false; - } else if (state == DONE || state == BROKEN) { - // a concurrent receiver already finished / poisoned the cell - return false; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND, CellState.INTERRUPTED_RECEIVE -> { + // cell interrupted -> nothing to receive; in case of an interrupted + // receiver, the counter is already updated + return false; + } + case CellState.RESUMING -> + // receive() or expandBuffer() is resuming the sender -> repeat + Thread.onSpinWait(); + case CellState.CLOSED -> { + return false; + } + case CellState.DONE, CellState.BROKEN -> { + // a concurrent receiver already finished / poisoned the cell + return false; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value diff --git a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java index df875f33..100c4a43 100644 --- a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java +++ b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java @@ -2,6 +2,8 @@ /** * Thrown by {@link Channel#send(Object)} and {@link Channel#receive()} when the channel is closed. + * + *
~ Abstract: use {@link ChannelDoneException} or {@link ChannelErrorException}
*/
public sealed class ChannelClosedException extends RuntimeException
permits ChannelDoneException, ChannelErrorException {
diff --git a/channels/src/main/java/com/softwaremill/jox/Segment.java b/channels/src/main/java/com/softwaremill/jox/Segment.java
index 20e6e9ce..a7868dd7 100644
--- a/channels/src/main/java/com/softwaremill/jox/Segment.java
+++ b/channels/src/main/java/com/softwaremill/jox/Segment.java
@@ -11,9 +11,7 @@ final class Segment {
static final Segment NULL_SEGMENT = new Segment(-1, null, 0, false);
/** Used in {@code next} to indicate that the segment is closed. */
- private enum State {
- CLOSED
- }
+ private static final Segment CLOSED = new Segment(-1, null, 0, false);
// immutable state
@@ -24,8 +22,8 @@ private enum State {
private final Object[] data = new Object[SEGMENT_SIZE];
- /** Possible values: {@code Segment} or {@code State.CLOSED} (union type). */
- private volatile Object next = null;
+ /** Possible values: {@code Segment} or {@link #CLOSED} (union type). */
+ private volatile Segment next;
private volatile Segment prev;
@@ -56,7 +54,7 @@ private enum State {
MethodHandles.Lookup l =
MethodHandles.privateLookupIn(Segment.class, MethodHandles.lookup());
DATA = MethodHandles.arrayElementVarHandle(Object[].class);
- NEXT = l.findVarHandle(Segment.class, "next", Object.class);
+ NEXT = l.findVarHandle(Segment.class, "next", Segment.class);
PREV = l.findVarHandle(Segment.class, "prev", Segment.class);
POINTERS_NOT_PROCESSED_NOT_INTERRUPTED =
l.findVarHandle(
@@ -88,7 +86,7 @@ void cleanPrev() {
Segment getNext() {
var s = next;
- return s == State.CLOSED ? null : (Segment) s;
+ return s == CLOSED ? null : s;
}
Segment getPrev() {
@@ -255,21 +253,21 @@ void remove() {
/**
* Closes the segment chain - sets the {@code next} pointer of the last segment to {@code
- * State.CLOSED}, and returns the last segment.
+ * CLOSED}, and returns the last segment.
*/
Segment close() {
var s = this;
while (true) {
var n = s.next;
if (n == null) { // this is the tail segment
- if (NEXT.compareAndSet(s, null, State.CLOSED)) {
+ if (NEXT.compareAndSet(s, null, CLOSED)) {
return s;
}
// else: try again
- } else if (n == State.CLOSED) {
+ } else if (n == CLOSED) {
return s;
} else {
- s = (Segment) n;
+ s = n;
}
}
}
@@ -284,9 +282,9 @@ private Segment aliveSegmentLeft() {
/** Should only be called, if this is not the tail segment. */
private Segment aliveSegmentRight() {
- var n = (Segment) next; // this is not the tail, so there's a next segment
+ var n = next; // this is not the tail, so there's a next segment
while (n.isRemoved() && !n.isTail()) {
- n = (Segment) n.next; // again, not tail
+ n = n.next; // again, not tail
}
return n;
}
@@ -323,7 +321,7 @@ private static Segment findSegment(Segment start, long id) {
var current = start;
while (current.getId() < id || current.isRemoved()) {
var n = current.next;
- if (n == State.CLOSED) {
+ if (n == CLOSED) {
// segment chain is closed, so we can't create a new segment
return null;
} else if (n == null) {
@@ -339,7 +337,7 @@ private static Segment findSegment(Segment start, long id) {
}
// else: try again with current
} else {
- current = (Segment) n;
+ current = n;
}
}
return current;
@@ -396,7 +394,7 @@ public String toString() {
+ "id="
+ id
+ ", next="
- + (n == null ? "null" : (n == State.CLOSED ? "closed" : ((Segment) n).id))
+ + (n == null ? "null" : (n == CLOSED ? "closed" : n.id))
+ ", prev="
+ (p == null ? "null" : p.id)
+ ", pointers="
diff --git a/channels/src/main/java/com/softwaremill/jox/SelectClause.java b/channels/src/main/java/com/softwaremill/jox/SelectClause.java
index 2f3c936a..e497879e 100644
--- a/channels/src/main/java/com/softwaremill/jox/SelectClause.java
+++ b/channels/src/main/java/com/softwaremill/jox/SelectClause.java
@@ -25,7 +25,7 @@ public abstract class SelectClause