Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 82 additions & 64 deletions channels/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ operations on these (previous) segments, and we'll end up wanting to remove such

private final int capacity;
final boolean isRendezvous;
private final boolean isUnlimited;

// final boolean isUnlimited = capacity < 0; !isUnlimited = capacity >= 0

// mutable state

Expand Down Expand Up @@ -163,15 +164,14 @@ operations on these (previous) segments, and we'll end up wanting to remove such
* capacity is 0.
*/
private Channel(int capacity) {
if (capacity < UNLIMITED_CAPACITY) {
if (capacity < UNLIMITED_CAPACITY)
throw new IllegalArgumentException(
"Capacity must be 0 (rendezvous), positive (buffered) or -1 (unlimited"
+ " channels).");
}

this.capacity = capacity;
isRendezvous = capacity == 0L;
isUnlimited = capacity == UNLIMITED_CAPACITY;
boolean isUnlimited = capacity == UNLIMITED_CAPACITY;
var isRendezvousOrUnlimited = isRendezvous || isUnlimited;

var firstSegment =
Expand All @@ -198,8 +198,11 @@ private void processInitialBuffer() {

var currentSegment = bufferEndSegment;
// the number of segments where all cells are processed, or some are processed (last segment
// of the buffer)
var segmentsToProcess = (int) Math.ceil((double) capacity / Segment.SEGMENT_SIZE);
// of the buffer) = Math.ceil((double) capacity / Segment.SEGMENT_SIZE)
int segmentsToProcess =
capacity <= 0
? 0
: (int) ((capacity + Segment.SEGMENT_SIZE - 1L) / Segment.SEGMENT_SIZE);

for (int segmentId = 0; segmentId < segmentsToProcess; segmentId++) {
currentSegment =
Expand Down Expand Up @@ -353,8 +356,8 @@ private Object updateCellSend(
var state = segment.getCell(i);

if (state == null) {
// reading the buffer end & receiver's counter if needed
if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) {
// reading the buffer end & receiver's counter if needed: !isUnlimited
if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) {
// cell is empty, and no receiver, not in buffer -> suspend
if (select != null) {
// cell is empty, no receiver, and we are in a select -> store the select
Expand Down Expand Up @@ -608,17 +611,20 @@ private Object updateCellReceive(
}
// else: CAS unsuccessful, repeat
} else if (state instanceof CellState) {
if (state == INTERRUPTED_SEND) {
// cell interrupted -> trying with a new one
return ReceiveResult.FAILED;
} else if (state == RESUMING) {
// expandBuffer() is resuming the sender -> repeat
Thread.onSpinWait();
} else if (state == CLOSED) {
return ReceiveResult.CLOSED;
} else {
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
switch (state) {
case CellState.INTERRUPTED_SEND -> {
// cell interrupted -> trying with a new one
return ReceiveResult.FAILED;
}
case CellState.RESUMING ->
// expandBuffer() is resuming the sender -> repeat
Thread.onSpinWait();
case CellState.CLOSED -> {
return ReceiveResult.CLOSED;
}
default ->
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
}
} else { // buffered value
segment.setCell(i, DONE);
Expand All @@ -634,7 +640,7 @@ private Object updateCellReceive(
// ****************

private void expandBuffer() {
if (isRendezvous || isUnlimited) return;
if (capacity <= 0) return; // isRendezvous || isUnlimited
while (true) {
// reading the segment before the counter increment - this is needed to find the
// required segment later
Expand Down Expand Up @@ -738,23 +744,28 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// happened
return ExpandBufferResult.DONE;
} else if (state instanceof CellState) {
if (state == INTERRUPTED_SEND) {
// a sender was interrupted - restart
return ExpandBufferResult.FAILED;
} else if (state == INTERRUPTED_RECEIVE) {
// a receiver continuation must have been here before - another buffer expansion
// already happened
return ExpandBufferResult.DONE;
} else if (state == BROKEN) {
// the cell is broken, receive() started another buffer expansion
return ExpandBufferResult.DONE;
} else if (state == RESUMING) {
Thread.onSpinWait(); // receive() is resuming the sender -> repeat
} else if (state == CLOSED) {
return ExpandBufferResult.CLOSED;
} else {
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
switch (state) {
case CellState.INTERRUPTED_SEND -> {
// a sender was interrupted - restart
return ExpandBufferResult.FAILED;
}
case CellState.INTERRUPTED_RECEIVE -> {
// a receiver continuation must have been here before - another buffer
// expansion already happened
return ExpandBufferResult.DONE;
}
case CellState.BROKEN -> {
// the cell is broken, receive() started another buffer expansion
return ExpandBufferResult.DONE;
}
case CellState.RESUMING ->
Thread.onSpinWait(); // receive() is resuming the sender -> repeat
case CellState.CLOSED -> {
return ExpandBufferResult.CLOSED;
}
default ->
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
}
} else {
// buffered value: if the ordering of operations was different, we would put
Expand Down Expand Up @@ -909,18 +920,21 @@ private void updateCellClose(Segment segment, int i) {
Thread.onSpinWait();
}
} else if (state instanceof CellState) {
if (state == DONE || state == BROKEN) {
// nothing to do - a sender & receiver have already met
return;
} else if (state == INTERRUPTED_RECEIVE || state == INTERRUPTED_SEND) {
// nothing to do - segment counters already decremented or waiting to be
// decremented
return;
} else if (state == RESUMING) {
Thread.onSpinWait(); // receive() or expandBuffer() are resuming the cell - wait
} else {
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
switch (state) {
case CellState.DONE, CellState.BROKEN -> {
// nothing to do - a sender & receiver have already met
return;
}
case CellState.INTERRUPTED_RECEIVE, CellState.INTERRUPTED_SEND -> {
// nothing to do - segment counters already decremented or waiting to be
// decremented
return;
}
case CellState.RESUMING ->
Thread.onSpinWait();// receive() or expandBuffer() are resuming the cell - wait
default ->
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
}
} else {
// buffered value: discarding
Expand Down Expand Up @@ -1014,21 +1028,25 @@ private boolean hasValueToReceive(Segment segment, int i) {
} else if (state instanceof StoredSelectClause ss) {
return ss.isSender(); // as above
} else if (state instanceof CellState) {
if (state == INTERRUPTED_SEND || state == INTERRUPTED_RECEIVE) {
// cell interrupted -> nothing to receive; in case of an interrupted receiver,
// the counter is already updated
return false;
} else if (state == RESUMING) {
// receive() or expandBuffer() is resuming the sender -> repeat
Thread.onSpinWait();
} else if (state == CLOSED) {
return false;
} else if (state == DONE || state == BROKEN) {
// a concurrent receiver already finished / poisoned the cell
return false;
} else {
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
switch (state) {
case CellState.INTERRUPTED_SEND, CellState.INTERRUPTED_RECEIVE -> {
// cell interrupted -> nothing to receive; in case of an interrupted
// receiver, the counter is already updated
return false;
}
case CellState.RESUMING ->
// receive() or expandBuffer() is resuming the sender -> repeat
Thread.onSpinWait();
case CellState.CLOSED -> {
return false;
}
case CellState.DONE, CellState.BROKEN -> {
// a concurrent receiver already finished / poisoned the cell
return false;
}
default ->
throw new IllegalStateException(
"Unexpected state: " + state + " in channel: " + this);
}
} else {
// buffered value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Thrown by {@link Channel#send(Object)} and {@link Channel#receive()} when the channel is closed.
*
* <p>~ Abstract: use {@link ChannelDoneException} or {@link ChannelErrorException}
*/
public sealed class ChannelClosedException extends RuntimeException
permits ChannelDoneException, ChannelErrorException {
Expand Down
30 changes: 14 additions & 16 deletions channels/src/main/java/com/softwaremill/jox/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ final class Segment {
static final Segment NULL_SEGMENT = new Segment(-1, null, 0, false);

/** Used in {@code next} to indicate that the segment is closed. */
private enum State {
CLOSED
}
private static final Segment CLOSED = new Segment(-1, null, 0, false);

// immutable state

Expand All @@ -24,8 +22,8 @@ private enum State {

private final Object[] data = new Object[SEGMENT_SIZE];

/** Possible values: {@code Segment} or {@code State.CLOSED} (union type). */
private volatile Object next = null;
/** Possible values: {@code Segment} or {@link #CLOSED} (union type). */
private volatile Segment next;

private volatile Segment prev;

Expand Down Expand Up @@ -56,7 +54,7 @@ private enum State {
MethodHandles.Lookup l =
MethodHandles.privateLookupIn(Segment.class, MethodHandles.lookup());
DATA = MethodHandles.arrayElementVarHandle(Object[].class);
NEXT = l.findVarHandle(Segment.class, "next", Object.class);
NEXT = l.findVarHandle(Segment.class, "next", Segment.class);
PREV = l.findVarHandle(Segment.class, "prev", Segment.class);
POINTERS_NOT_PROCESSED_NOT_INTERRUPTED =
l.findVarHandle(
Expand Down Expand Up @@ -88,7 +86,7 @@ void cleanPrev() {

Segment getNext() {
var s = next;
return s == State.CLOSED ? null : (Segment) s;
return s == CLOSED ? null : s;
}

Segment getPrev() {
Expand Down Expand Up @@ -255,21 +253,21 @@ void remove() {

/**
* Closes the segment chain - sets the {@code next} pointer of the last segment to {@code
* State.CLOSED}, and returns the last segment.
* CLOSED}, and returns the last segment.
*/
Segment close() {
var s = this;
while (true) {
var n = s.next;
if (n == null) { // this is the tail segment
if (NEXT.compareAndSet(s, null, State.CLOSED)) {
if (NEXT.compareAndSet(s, null, CLOSED)) {
return s;
}
// else: try again
} else if (n == State.CLOSED) {
} else if (n == CLOSED) {
return s;
} else {
s = (Segment) n;
s = n;
}
}
}
Expand All @@ -284,9 +282,9 @@ private Segment aliveSegmentLeft() {

/** Should only be called, if this is not the tail segment. */
private Segment aliveSegmentRight() {
var n = (Segment) next; // this is not the tail, so there's a next segment
var n = next; // this is not the tail, so there's a next segment
while (n.isRemoved() && !n.isTail()) {
n = (Segment) n.next; // again, not tail
n = n.next; // again, not tail
}
return n;
}
Expand Down Expand Up @@ -323,7 +321,7 @@ private static Segment findSegment(Segment start, long id) {
var current = start;
while (current.getId() < id || current.isRemoved()) {
var n = current.next;
if (n == State.CLOSED) {
if (n == CLOSED) {
// segment chain is closed, so we can't create a new segment
return null;
} else if (n == null) {
Expand All @@ -339,7 +337,7 @@ private static Segment findSegment(Segment start, long id) {
}
// else: try again with current
} else {
current = (Segment) n;
current = n;
}
}
return current;
Expand Down Expand Up @@ -396,7 +394,7 @@ public String toString() {
+ "id="
+ id
+ ", next="
+ (n == null ? "null" : (n == State.CLOSED ? "closed" : ((Segment) n).id))
+ (n == null ? "null" : (n == CLOSED ? "closed" : n.id))
+ ", prev="
+ (p == null ? "null" : p.id)
+ ", pointers="
Expand Down
16 changes: 6 additions & 10 deletions channels/src/main/java/com/softwaremill/jox/SelectClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public abstract class SelectClause<T> {
abstract T transformedRawValue(Object rawValue);
}

class DefaultClause<T> extends SelectClause<T> {
final class DefaultClause<T> extends SelectClause<T> {
private final Supplier<T> callback;

public DefaultClause(Supplier<T> callback) {
Expand All @@ -39,19 +39,15 @@ Channel<?> getChannel() {

@Override
Object register(SelectInstance select) {
return DefaultClauseMarker.DEFAULT;
/*
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to
* indicate that the default clause has been selected during registration.
*/
return this;
}

@Override
T transformedRawValue(Object rawValue) {
return callback.get();
}
}

/**
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to
* indicate that the default clause has been selected during registration.
*/
enum DefaultClauseMarker {
DEFAULT
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ class MultiArrayIterator implements Iterator<Byte> {
private final List<byte[]> arrays;
private int currentArrayIndex = 0;
private int currentPosition = 0;
private final int totalLength;

public MultiArrayIterator(List<byte[]> arrays) {
this.arrays = arrays;
this.totalLength = arrays.stream().mapToInt(arr -> arr.length).sum();
}

@Override
Expand Down Expand Up @@ -63,11 +61,6 @@ public int available() {
return remaining;
}

/** Returns the total length of all arrays combined. */
public int totalLength() {
return totalLength;
}

/** Creates an empty MultiArrayIterator. */
public static MultiArrayIterator empty() {
return new MultiArrayIterator(List.of());
Expand Down
Loading
Loading