From 94162be3c90b713645672b21433b764a56584e2e Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Sat, 8 Nov 2025 12:34:02 +0300 Subject: [PATCH 1/8] ChannelClosedException is abstract --- .../main/java/com/softwaremill/jox/ChannelClosedException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java index df875f33..d6f25b57 100644 --- a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java +++ b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java @@ -3,7 +3,7 @@ /** * Thrown by {@link Channel#send(Object)} and {@link Channel#receive()} when the channel is closed. */ -public sealed class ChannelClosedException extends RuntimeException +public abstract sealed class ChannelClosedException extends RuntimeException permits ChannelDoneException, ChannelErrorException { public ChannelClosedException() {} From ce19478a6cbb140c09b7e5fec8ff3aa409b51664 Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Sat, 8 Nov 2025 12:57:10 +0300 Subject: [PATCH 2/8] micro-optimization 1) delete redundant field isUnlimited (-1 byte; simpler if) 2) integer division --- .../java/com/softwaremill/jox/Channel.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 971b18be..0b7ec66e 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -96,7 +96,8 @@ operations on these (previous) segments, and we'll end up wanting to remove such private final int capacity; final boolean isRendezvous; - private final boolean isUnlimited; + + // final boolean isUnlimited = capacity < 0; !isUnlimited = capacity >= 0 // mutable state @@ -163,15 +164,14 @@ operations on these (previous) segments, and we'll end up wanting to remove such * capacity is 0. */ private Channel(int capacity) { - if (capacity < UNLIMITED_CAPACITY) { + if (capacity < UNLIMITED_CAPACITY) throw new IllegalArgumentException( "Capacity must be 0 (rendezvous), positive (buffered) or -1 (unlimited" + " channels)."); - } this.capacity = capacity; isRendezvous = capacity == 0L; - isUnlimited = capacity == UNLIMITED_CAPACITY; + boolean isUnlimited = capacity == UNLIMITED_CAPACITY; var isRendezvousOrUnlimited = isRendezvous || isUnlimited; var firstSegment = @@ -198,8 +198,9 @@ private void processInitialBuffer() { var currentSegment = bufferEndSegment; // the number of segments where all cells are processed, or some are processed (last segment - // of the buffer) - var segmentsToProcess = (int) Math.ceil((double) capacity / Segment.SEGMENT_SIZE); + // of the buffer) = Math.ceil((double) capacity / Segment.SEGMENT_SIZE) + int segmentsToProcess = capacity <= 0 ? 0 + : (int)((capacity + Segment.SEGMENT_SIZE - 1L) / Segment.SEGMENT_SIZE); for (int segmentId = 0; segmentId < segmentsToProcess; segmentId++) { currentSegment = @@ -353,8 +354,8 @@ private Object updateCellSend( var state = segment.getCell(i); if (state == null) { - // reading the buffer end & receiver's counter if needed - if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { + // reading the buffer end & receiver's counter if needed: !isUnlimited + if (capacity >= 0 && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) { // cell is empty, and no receiver, not in buffer -> suspend if (select != null) { // cell is empty, no receiver, and we are in a select -> store the select @@ -634,7 +635,7 @@ private Object updateCellReceive( // **************** private void expandBuffer() { - if (isRendezvous || isUnlimited) return; + if (capacity <= 0) return; // isRendezvous || isUnlimited while (true) { // reading the segment before the counter increment - this is needed to find the // required segment later From c3a8647e3c8371a46ea6a0fa98a5fb4b6f3d88d8 Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Thu, 13 Nov 2025 23:36:20 +0300 Subject: [PATCH 3/8] fix test on Windows --- .../test/java/com/softwaremill/jox/flows/FlowIOTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowIOTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowIOTest.java index 3ab54e4c..88fb718b 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowIOTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowIOTest.java @@ -8,8 +8,8 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystemException; import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; @@ -395,8 +395,8 @@ void runToFile_throwExceptionIfFileCannotBeOpened() { Path path = folder.resolve("not-existing-file.txt"); var source = Flows.fromByteArrays(new byte[0]); - // when & then - assertThrows(NoSuchFileException.class, () -> source.runToFile(path)); + // when & then: Linux NoSuchFileException, windows: more generic FileSystemException + assertThrows(FileSystemException.class, () -> source.runToFile(path)); } @Test From 44de1687fcddf1cd1235360e0d2a2214cf856a4e Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Thu, 13 Nov 2025 23:36:45 +0300 Subject: [PATCH 4/8] remove unused flows MultiArrayIterator code --- .../com/softwaremill/jox/flows/MultiArrayIterator.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/flows/src/main/java/com/softwaremill/jox/flows/MultiArrayIterator.java b/flows/src/main/java/com/softwaremill/jox/flows/MultiArrayIterator.java index 791dea22..6b26e6cc 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/MultiArrayIterator.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/MultiArrayIterator.java @@ -12,11 +12,9 @@ class MultiArrayIterator implements Iterator { private final List arrays; private int currentArrayIndex = 0; private int currentPosition = 0; - private final int totalLength; public MultiArrayIterator(List arrays) { this.arrays = arrays; - this.totalLength = arrays.stream().mapToInt(arr -> arr.length).sum(); } @Override @@ -63,11 +61,6 @@ public int available() { return remaining; } - /** Returns the total length of all arrays combined. */ - public int totalLength() { - return totalLength; - } - /** Creates an empty MultiArrayIterator. */ public static MultiArrayIterator empty() { return new MultiArrayIterator(List.of()); From 6c23d06dd6d3ef4f4e2160452c45f81afdf37969 Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Thu, 13 Nov 2025 23:37:19 +0300 Subject: [PATCH 5/8] rollback abstract ChannelClosedException: breaks contract --- .../java/com/softwaremill/jox/ChannelClosedException.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java index d6f25b57..100c4a43 100644 --- a/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java +++ b/channels/src/main/java/com/softwaremill/jox/ChannelClosedException.java @@ -2,8 +2,10 @@ /** * Thrown by {@link Channel#send(Object)} and {@link Channel#receive()} when the channel is closed. + * + *

~ Abstract: use {@link ChannelDoneException} or {@link ChannelErrorException} */ -public abstract sealed class ChannelClosedException extends RuntimeException +public sealed class ChannelClosedException extends RuntimeException permits ChannelDoneException, ChannelErrorException { public ChannelClosedException() {} From a347fedf195c9dc4c2b24ebd2f0007d97a1848bf Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Thu, 13 Nov 2025 23:45:51 +0300 Subject: [PATCH 6/8] =?UTF-8?q?if=20=E2=86=92=20switch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/softwaremill/jox/Channel.java | 131 ++++++++++-------- 1 file changed, 74 insertions(+), 57 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Channel.java b/channels/src/main/java/com/softwaremill/jox/Channel.java index 0b7ec66e..ce99babf 100644 --- a/channels/src/main/java/com/softwaremill/jox/Channel.java +++ b/channels/src/main/java/com/softwaremill/jox/Channel.java @@ -199,8 +199,10 @@ private void processInitialBuffer() { var currentSegment = bufferEndSegment; // the number of segments where all cells are processed, or some are processed (last segment // of the buffer) = Math.ceil((double) capacity / Segment.SEGMENT_SIZE) - int segmentsToProcess = capacity <= 0 ? 0 - : (int)((capacity + Segment.SEGMENT_SIZE - 1L) / Segment.SEGMENT_SIZE); + int segmentsToProcess = + capacity <= 0 + ? 0 + : (int) ((capacity + Segment.SEGMENT_SIZE - 1L) / Segment.SEGMENT_SIZE); for (int segmentId = 0; segmentId < segmentsToProcess; segmentId++) { currentSegment = @@ -609,17 +611,20 @@ private Object updateCellReceive( } // else: CAS unsuccessful, repeat } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND) { - // cell interrupted -> trying with a new one - return ReceiveResult.FAILED; - } else if (state == RESUMING) { - // expandBuffer() is resuming the sender -> repeat - Thread.onSpinWait(); - } else if (state == CLOSED) { - return ReceiveResult.CLOSED; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND -> { + // cell interrupted -> trying with a new one + return ReceiveResult.FAILED; + } + case CellState.RESUMING -> + // expandBuffer() is resuming the sender -> repeat + Thread.onSpinWait(); + case CellState.CLOSED -> { + return ReceiveResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value segment.setCell(i, DONE); @@ -739,23 +744,28 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) { // happened return ExpandBufferResult.DONE; } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND) { - // a sender was interrupted - restart - return ExpandBufferResult.FAILED; - } else if (state == INTERRUPTED_RECEIVE) { - // a receiver continuation must have been here before - another buffer expansion - // already happened - return ExpandBufferResult.DONE; - } else if (state == BROKEN) { - // the cell is broken, receive() started another buffer expansion - return ExpandBufferResult.DONE; - } else if (state == RESUMING) { - Thread.onSpinWait(); // receive() is resuming the sender -> repeat - } else if (state == CLOSED) { - return ExpandBufferResult.CLOSED; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND -> { + // a sender was interrupted - restart + return ExpandBufferResult.FAILED; + } + case CellState.INTERRUPTED_RECEIVE -> { + // a receiver continuation must have been here before - another buffer + // expansion already happened + return ExpandBufferResult.DONE; + } + case CellState.BROKEN -> { + // the cell is broken, receive() started another buffer expansion + return ExpandBufferResult.DONE; + } + case CellState.RESUMING -> + Thread.onSpinWait(); // receive() is resuming the sender -> repeat + case CellState.CLOSED -> { + return ExpandBufferResult.CLOSED; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value: if the ordering of operations was different, we would put @@ -910,18 +920,21 @@ private void updateCellClose(Segment segment, int i) { Thread.onSpinWait(); } } else if (state instanceof CellState) { - if (state == DONE || state == BROKEN) { - // nothing to do - a sender & receiver have already met - return; - } else if (state == INTERRUPTED_RECEIVE || state == INTERRUPTED_SEND) { - // nothing to do - segment counters already decremented or waiting to be - // decremented - return; - } else if (state == RESUMING) { - Thread.onSpinWait(); // receive() or expandBuffer() are resuming the cell - wait - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.DONE, CellState.BROKEN -> { + // nothing to do - a sender & receiver have already met + return; + } + case CellState.INTERRUPTED_RECEIVE, CellState.INTERRUPTED_SEND -> { + // nothing to do - segment counters already decremented or waiting to be + // decremented + return; + } + case CellState.RESUMING -> + Thread.onSpinWait();// receive() or expandBuffer() are resuming the cell - wait + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value: discarding @@ -1015,21 +1028,25 @@ private boolean hasValueToReceive(Segment segment, int i) { } else if (state instanceof StoredSelectClause ss) { return ss.isSender(); // as above } else if (state instanceof CellState) { - if (state == INTERRUPTED_SEND || state == INTERRUPTED_RECEIVE) { - // cell interrupted -> nothing to receive; in case of an interrupted receiver, - // the counter is already updated - return false; - } else if (state == RESUMING) { - // receive() or expandBuffer() is resuming the sender -> repeat - Thread.onSpinWait(); - } else if (state == CLOSED) { - return false; - } else if (state == DONE || state == BROKEN) { - // a concurrent receiver already finished / poisoned the cell - return false; - } else { - throw new IllegalStateException( - "Unexpected state: " + state + " in channel: " + this); + switch (state) { + case CellState.INTERRUPTED_SEND, CellState.INTERRUPTED_RECEIVE -> { + // cell interrupted -> nothing to receive; in case of an interrupted + // receiver, the counter is already updated + return false; + } + case CellState.RESUMING -> + // receive() or expandBuffer() is resuming the sender -> repeat + Thread.onSpinWait(); + case CellState.CLOSED -> { + return false; + } + case CellState.DONE, CellState.BROKEN -> { + // a concurrent receiver already finished / poisoned the cell + return false; + } + default -> + throw new IllegalStateException( + "Unexpected state: " + state + " in channel: " + this); } } else { // buffered value From a560a31a36b7c7c3bbd2fb40a218a0a9df18fe93 Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Fri, 14 Nov 2025 00:04:12 +0300 Subject: [PATCH 7/8] strict type Segment --- .../java/com/softwaremill/jox/Segment.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/Segment.java b/channels/src/main/java/com/softwaremill/jox/Segment.java index 20e6e9ce..a7868dd7 100644 --- a/channels/src/main/java/com/softwaremill/jox/Segment.java +++ b/channels/src/main/java/com/softwaremill/jox/Segment.java @@ -11,9 +11,7 @@ final class Segment { static final Segment NULL_SEGMENT = new Segment(-1, null, 0, false); /** Used in {@code next} to indicate that the segment is closed. */ - private enum State { - CLOSED - } + private static final Segment CLOSED = new Segment(-1, null, 0, false); // immutable state @@ -24,8 +22,8 @@ private enum State { private final Object[] data = new Object[SEGMENT_SIZE]; - /** Possible values: {@code Segment} or {@code State.CLOSED} (union type). */ - private volatile Object next = null; + /** Possible values: {@code Segment} or {@link #CLOSED} (union type). */ + private volatile Segment next; private volatile Segment prev; @@ -56,7 +54,7 @@ private enum State { MethodHandles.Lookup l = MethodHandles.privateLookupIn(Segment.class, MethodHandles.lookup()); DATA = MethodHandles.arrayElementVarHandle(Object[].class); - NEXT = l.findVarHandle(Segment.class, "next", Object.class); + NEXT = l.findVarHandle(Segment.class, "next", Segment.class); PREV = l.findVarHandle(Segment.class, "prev", Segment.class); POINTERS_NOT_PROCESSED_NOT_INTERRUPTED = l.findVarHandle( @@ -88,7 +86,7 @@ void cleanPrev() { Segment getNext() { var s = next; - return s == State.CLOSED ? null : (Segment) s; + return s == CLOSED ? null : s; } Segment getPrev() { @@ -255,21 +253,21 @@ void remove() { /** * Closes the segment chain - sets the {@code next} pointer of the last segment to {@code - * State.CLOSED}, and returns the last segment. + * CLOSED}, and returns the last segment. */ Segment close() { var s = this; while (true) { var n = s.next; if (n == null) { // this is the tail segment - if (NEXT.compareAndSet(s, null, State.CLOSED)) { + if (NEXT.compareAndSet(s, null, CLOSED)) { return s; } // else: try again - } else if (n == State.CLOSED) { + } else if (n == CLOSED) { return s; } else { - s = (Segment) n; + s = n; } } } @@ -284,9 +282,9 @@ private Segment aliveSegmentLeft() { /** Should only be called, if this is not the tail segment. */ private Segment aliveSegmentRight() { - var n = (Segment) next; // this is not the tail, so there's a next segment + var n = next; // this is not the tail, so there's a next segment while (n.isRemoved() && !n.isTail()) { - n = (Segment) n.next; // again, not tail + n = n.next; // again, not tail } return n; } @@ -323,7 +321,7 @@ private static Segment findSegment(Segment start, long id) { var current = start; while (current.getId() < id || current.isRemoved()) { var n = current.next; - if (n == State.CLOSED) { + if (n == CLOSED) { // segment chain is closed, so we can't create a new segment return null; } else if (n == null) { @@ -339,7 +337,7 @@ private static Segment findSegment(Segment start, long id) { } // else: try again with current } else { - current = (Segment) n; + current = n; } } return current; @@ -396,7 +394,7 @@ public String toString() { + "id=" + id + ", next=" - + (n == null ? "null" : (n == State.CLOSED ? "closed" : ((Segment) n).id)) + + (n == null ? "null" : (n == CLOSED ? "closed" : n.id)) + ", prev=" + (p == null ? "null" : p.id) + ", pointers=" From 657a8b855db4b3011ed41c9363cff46837d1572d Mon Sep 17 00:00:00 2001 From: "A.Fink" Date: Fri, 14 Nov 2025 00:06:03 +0300 Subject: [PATCH 8/8] delete unused DefaultClauseMarker --- .../java/com/softwaremill/jox/SelectClause.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/channels/src/main/java/com/softwaremill/jox/SelectClause.java b/channels/src/main/java/com/softwaremill/jox/SelectClause.java index 2f3c936a..e497879e 100644 --- a/channels/src/main/java/com/softwaremill/jox/SelectClause.java +++ b/channels/src/main/java/com/softwaremill/jox/SelectClause.java @@ -25,7 +25,7 @@ public abstract class SelectClause { abstract T transformedRawValue(Object rawValue); } -class DefaultClause extends SelectClause { +final class DefaultClause extends SelectClause { private final Supplier callback; public DefaultClause(Supplier callback) { @@ -39,7 +39,11 @@ Channel getChannel() { @Override Object register(SelectInstance select) { - return DefaultClauseMarker.DEFAULT; + /* + * Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to + * indicate that the default clause has been selected during registration. + */ + return this; } @Override @@ -47,11 +51,3 @@ T transformedRawValue(Object rawValue) { return callback.get(); } } - -/** - * Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to - * indicate that the default clause has been selected during registration. - */ -enum DefaultClauseMarker { - DEFAULT -}