diff --git a/src/core/com/cosylab/epics/caj/CAJContext.java b/src/core/com/cosylab/epics/caj/CAJContext.java index ee0c01dd..4a3ea30f 100644 --- a/src/core/com/cosylab/epics/caj/CAJContext.java +++ b/src/core/com/cosylab/epics/caj/CAJContext.java @@ -176,6 +176,12 @@ public class CAJContext extends Context implements CAContext, CAJConstants, Conf */ protected int maxArrayBytes = 16384; + /** + * Maximum number of times to retry a partial TCP send before giving up. + * A value of -1 means retry indefinitely (the default). + */ + protected int maxSendRetries = -1; + /** * Minimum interval in seconds between CA search broadcasts. Default is 0.1 seconds */ @@ -465,6 +471,11 @@ protected void loadConfiguration() catch (Exception ex) { logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_ARRAY_BYTES='" + tmp + "'", ex); } + tmp = System.getenv("EPICS_CA_MAX_SEND_RETRIES"); + if (tmp != null) + try { maxSendRetries = Integer.parseInt(tmp); } + catch (Exception ex) + { logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_SEND_RETRIES='" + tmp + "'", ex); } tmp = System.getenv("EPICS_CA_MAX_SEARCH_PERIOD"); if (tmp != null) @@ -485,9 +496,10 @@ protected void loadConfiguration() repeaterPort = jcaLibrary.getPropertyAsInt(contextClassName + ".repeater_port", repeaterPort); serverPort = jcaLibrary.getPropertyAsInt(contextClassName + ".server_port", serverPort); maxArrayBytes = jcaLibrary.getPropertyAsInt(contextClassName + ".max_array_bytes", maxArrayBytes); + maxSendRetries = jcaLibrary.getPropertyAsInt(contextClassName + ".max_send_retries", maxSendRetries); maxSearchInterval = jcaLibrary.getPropertyAsFloat(contextClassName + ".max_search_interval", maxSearchInterval); eventDispatcherClassName = jcaLibrary.getProperty(contextClassName + ".event_dispatcher"); - + // load CAJ specific configuration (overrides default) addressList = jcaLibrary.getProperty(thisClassName + ".addr_list", addressList); autoAddressList = jcaLibrary.getPropertyAsBoolean(thisClassName + ".auto_addr_list", autoAddressList); @@ -500,6 +512,7 @@ protected void loadConfiguration() repeaterPort = jcaLibrary.getPropertyAsInt(thisClassName + ".repeater_port", repeaterPort); serverPort = jcaLibrary.getPropertyAsInt(thisClassName + ".server_port", serverPort); maxArrayBytes = jcaLibrary.getPropertyAsInt(thisClassName + ".max_array_bytes", maxArrayBytes); + maxSendRetries = jcaLibrary.getPropertyAsInt(thisClassName + ".max_send_retries", maxSendRetries); minSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".min_search_interval", minSearchInterval); maxSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".max_search_interval", maxSearchInterval); } @@ -580,6 +593,13 @@ public void configure(Configuration configuration) maxArrayBytes = configuration.getAttributeAsInteger("max_array_bytes", maxArrayBytes); } + // max. send retries (-1 = infinite) + try { + maxSendRetries = configuration.getChild("max_send_retries", false).getValueAsInteger(); + } catch(Exception ex) { + maxSendRetries = configuration.getAttributeAsInteger("max_send_retries", maxSendRetries); + } + // max. search interval try { maxSearchInterval = configuration.getChild("max_search_interval", false).getValueAsFloat(); @@ -1322,6 +1342,7 @@ public void printInfo(PrintStream out) throws IllegalStateException { out.println("REPEATER_PORT : " + repeaterPort); out.println("SERVER_PORT : " + serverPort); out.println("MAX_ARRAY_BYTES : " + maxArrayBytes); + out.println("MAX_SEND_RETRIES : " + (maxSendRetries < 0 ? "infinite" : maxSendRetries)); out.println("MIN_SEARCH_INTERVAL : " + minSearchInterval); out.println("MAX_SEARCH_INTERVAL : " + maxSearchInterval); out.println("EVENT_DISPATCHER: " + eventDispatcher); @@ -1438,6 +1459,14 @@ public int getMaxArrayBytes() { return maxArrayBytes; } + /** + * Get maximum number of TCP send retries (-1 means infinite). + * @return max send retries, or -1 for unlimited. + */ + public int getMaxSendRetries() { + return maxSendRetries; + } + /** * Get repeater port. * @return repeater port. diff --git a/src/core/com/cosylab/epics/caj/impl/CATransport.java b/src/core/com/cosylab/epics/caj/impl/CATransport.java index f2a6cf3a..6a718a1e 100644 --- a/src/core/com/cosylab/epics/caj/impl/CATransport.java +++ b/src/core/com/cosylab/epics/caj/impl/CATransport.java @@ -630,96 +630,76 @@ protected void enableFlowControl() /** * Send a buffer through the transport. - * NOTE: TCP sent buffer/sending has to be synchronized. + * NOTE: TCP sent buffer/sending has to be synchronized. * @param buffer buffer to be sent - * @throws IOException + * @throws IOException */ - public void send(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException + public void send(ByteBuffer buffer) throws IOException { sendLock.lock(); try { - noSyncSend(buffer, asyncCloseOnError); + noSyncSend(buffer); } - finally + finally { sendLock.unlock(); } } /** - * Send a buffer through the transport. - * NOTE: TCP sent buffer/sending has to be synchronized. - * @param buffer buffer to be sent - * @throws IOException + * Send a buffer through the transport without acquiring the send lock + * (caller is responsible for holding it). + * + *
Flips the buffer, then loops until all bytes are written. If the + * kernel TCP send buffer is full, waits briefly and retries. Throws + * {@link IOException} (and closes the transport) if the send buffer + * remains full after the configured max send retries (see + * {@link CAJContext#getMaxSendRetries()}; -1 means retry indefinitely), + * the channel is already closed, or the calling thread is interrupted. + * + * @param buffer fully-written buffer to send (will be flipped) + * @throws IOException on write error, persistent backpressure, or interrupt */ - // TODO optimize !!! - private void noSyncSend(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException + private void noSyncSend(ByteBuffer buffer) throws IOException { try { - // prepare buffer buffer.flip(); - final int SEND_BUFFER_LIMIT = 16000; - int bufferLimit = buffer.limit(); + context.getLogger().finest("Sending " + buffer.limit() + " bytes to " + socketAddress + "."); - // TODO remove?! - context.getLogger().finest("Sending " + bufferLimit + " bytes to " + socketAddress + "."); - - // limit sending large buffers, split the into parts - int parts = (buffer.limit()-1) / SEND_BUFFER_LIMIT + 1; - for (int part = 1; part <= parts; part++) + int tries = 0; + final int maxRetries = context.getMaxSendRetries(); + while (buffer.hasRemaining()) { - if (parts > 1) - { - buffer.limit(Math.min(part * SEND_BUFFER_LIMIT, bufferLimit)); - context.getLogger().finest("[Parted] Sending (part " + part + "/" + parts + ") " + (buffer.limit()-buffer.position()) + " bytes to " + socketAddress + "."); - } - - final int TRIES = 10; - for (int tries = 0; /* tries <= TRIES */ ; tries++) + int bytesSent = channel.write(buffer); + if (bytesSent < 0) + throw new IOException("channel closed"); + + if (buffer.hasRemaining()) { - - // send - int bytesSent = channel.write(buffer); - if (bytesSent < 0) - throw new IOException("bytesSent < 0"); - - // bytesSend == buffer.position(), so there is no need for flip() - if (buffer.position() != buffer.limit()) - { - if (closed) - throw new IOException("transport closed on the client side"); - - if (tries >= TRIES) - { - context.getLogger().warning("Failed to send message to " + socketAddress + " - buffer full, will retry."); - - //if (tries >= 2*TRIES) - // throw new IOException("TCP send buffer persistently full, disconnecting!"); - - } - - // flush & wait for a while... - context.getLogger().finest("Send buffer full for " + socketAddress + ", waiting..."); - channel.socket().getOutputStream().flush(); - try { - Thread.sleep(Math.min(15000,10+tries*100)); - } catch (InterruptedException e) { - // noop - } - continue; + if (closed) + throw new IOException("transport closed on the client side"); + + tries++; + if (maxRetries >= 0 && tries > maxRetries) + throw new IOException("TCP send buffer persistently full, disconnecting " + socketAddress); + + context.getLogger().finest("Send buffer full for " + socketAddress + + ", waiting (attempt " + tries + + (maxRetries < 0 ? "" : "/" + maxRetries) + ")..."); + try { + Thread.sleep(Math.min(15000, 10 + tries * 100)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("interrupted while sending to " + socketAddress); } - else - break; } - } } - catch (IOException ioex) + catch (IOException ioex) { - // close connection close(true); throw ioex; } @@ -832,7 +812,7 @@ public boolean flushInternal() } try { - send(buf, false); + send(buf); } finally { // return back to the cache @@ -883,7 +863,7 @@ public void submit(Request requestMessage) throws IOException { { try { - noSyncSend(message, true); + noSyncSend(message); return; } finally diff --git a/src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java b/src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java index 713c132b..d0c14bb1 100644 --- a/src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java +++ b/src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java @@ -163,6 +163,18 @@ public void submit() throws IOException public void resubscribeSubscription(Transport transport) throws IOException { this.transport = transport; + // Reset buffer state: a prior failed send (IOException mid-write in noSyncSend) + // can leave position < capacity and limit = capacity. The next flip() would then + // set limit = that partial position, causing putInt(8,…) to throw + // IndexOutOfBoundsException on the reconnect after that. + // + // requestMessage is a fixed-size, single-purpose buffer allocated in the + // constructor (see EventAddRequest() lines ~107/114) to hold exactly one + // EventAdd message, so capacity() == message size is always true. Resetting + // to position == limit == capacity restores the fully-written state, so the + // flip() in Transport.submit() always produces position=0, limit=capacity. + requestMessage.limit(requestMessage.capacity()); + requestMessage.position(requestMessage.capacity()); // update channel sid requestMessage.putInt(8, channel.getServerChannelID()); // immediate send (increase priority - all subsequent sends will be done immediately). diff --git a/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestBufferTest.java b/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestBufferTest.java new file mode 100644 index 00000000..e6e378a0 --- /dev/null +++ b/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestBufferTest.java @@ -0,0 +1,158 @@ +package com.cosylab.epics.caj.impl.requests.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +import com.cosylab.epics.caj.impl.CAConstants; + +/** + * Regression test for the ByteBuffer corruption bug in + * {@link com.cosylab.epics.caj.impl.requests.EventAddRequest#resubscribeSubscription}. + * + *
A failed {@code noSyncSend()} during a reconnect could leave + * {@code requestMessage} with {@code position < capacity} after the transport + * had partially consumed the buffer via {@code channel.write()}. On the next + * call to {@code resubscribeSubscription()}, {@code Transport.submit()} would + * call {@code flip()}, baking that partial position into {@code limit}. The + * reconnect after that would then throw {@link IndexOutOfBoundsException} at + * {@code requestMessage.putInt(8, ...)} because {@code limit < 12}. + * + *
The fix resets {@code limit} and {@code position} to {@code capacity} at + * the top of {@code resubscribeSubscription()} so that {@code flip()} always + * produces {@code position=0, limit=capacity}. Because {@code requestMessage} + * is a fixed-size, single-purpose buffer allocated in the + * {@code EventAddRequest} constructor to hold exactly one EventAdd message, + * {@code capacity} is always equal to the message size. + */ +public class EventAddRequestBufferTest { + + // EventAddRequest allocates CA_MESSAGE_HEADER_SIZE (16) + 16 payload bytes + // for the common case of dataCount < 0xFFFF. + private static final int MESSAGE_SIZE = CAConstants.CA_MESSAGE_HEADER_SIZE + 16; + + // Offset of the server channel ID (parameter1) in a standard CA header. + // putInt(8, sid) is the call that throws when limit < 12. + private static final int SID_OFFSET = 8; + + /** + * Reproduces the exact three-step sequence that caused the original + * {@link IndexOutOfBoundsException} in the field. + * + *
+ * Step 1 – constructor writes MESSAGE_SIZE bytes: + * position=MESSAGE_SIZE, limit=MESSAGE_SIZE (fully written) + * + * Step 2 – first resubscribeSubscription: submit() flips, channel.write() + * partially consumes N bytes, then noSyncSend() throws IOException: + * position=N, limit=MESSAGE_SIZE (partially consumed) + * + * Step 3 – second resubscribeSubscription: submit() flips the partial position: + * position=0, limit=N (corrupted — limit is now N, not MESSAGE_SIZE) + * + * Step 4 – third resubscribeSubscription: putInt(8, sid) with limit=N < 12: + * throws IndexOutOfBoundsException + *+ */ + @Test + public void bufferCorruptionAfterPartialSend_throwsOnThirdReconnect() { + ByteBuffer buf = simulateFullyWrittenBuffer(); + + // Step 2: submit flips, partial write advances position to 5, then IOException + buf.flip(); + buf.position(5); // position=5, limit=MESSAGE_SIZE + + // Step 3: second resubscribeSubscription — putInt(8,...) still ok here, + // then submit flips the partial position into limit + buf.putInt(SID_OFFSET, 0x00000001); // absolute put, limit=MESSAGE_SIZE >= 12: ok + buf.flip(); // position=0, limit=5 — corrupted + + assertEquals("limit should be the partial position after the corrupting flip", + 5, buf.limit()); + + // Step 4: third resubscribeSubscription — putInt(8,...) now throws + try { + buf.putInt(SID_OFFSET, 0x00000002); // limit=5, index 8 is out of bounds + fail("Expected IndexOutOfBoundsException — this is the original bug"); + } catch (IndexOutOfBoundsException e) { + // expected — this is the bug that was fixed + } + } + + /** + * Verifies that after applying the fix (resetting {@code limit} and + * {@code position} to {@code capacity}), the buffer is in the + * fully-written state and {@code flip()} produces a correct read view. + */ + @Test + public void fixResetsBufferToFullyWrittenState() { + ByteBuffer buf = simulateFullyWrittenBuffer(); + + // Drive the buffer into the corrupted state (position=0, limit=5) + buf.flip(); + buf.position(5); + buf.flip(); + + // Apply the fix + buf.limit(buf.capacity()); + buf.position(buf.capacity()); + + // putInt(8, sid) must succeed + buf.putInt(SID_OFFSET, 0xCAFEBABE); + + // submit() calls flip() — must yield position=0, limit=MESSAGE_SIZE + buf.flip(); + assertEquals("position should be 0 after flip", 0, buf.position()); + assertEquals("limit should equal capacity (full message) after flip", + MESSAGE_SIZE, buf.limit()); + } + + /** + * Verifies that the fix is idempotent: repeated calls to + * resubscribeSubscription (simulated by the reset + flip cycle) always + * leave the buffer in the same valid send state regardless of how many + * reconnects occur. + */ + @Test + public void fixIsIdempotentAcrossMultipleReconnects() { + ByteBuffer buf = simulateFullyWrittenBuffer(); + + for (int reconnect = 1; reconnect <= 5; reconnect++) { + // Apply fix + buf.limit(buf.capacity()); + buf.position(buf.capacity()); + + // Update SID — must not throw + buf.putInt(SID_OFFSET, reconnect); + + // submit() flips + buf.flip(); + + assertEquals("reconnect " + reconnect + ": position should be 0", + 0, buf.position()); + assertEquals("reconnect " + reconnect + ": limit should equal MESSAGE_SIZE", + MESSAGE_SIZE, buf.limit()); + + // Simulate a partial write consuming half the buffer + buf.position(MESSAGE_SIZE / 2); + } + } + + // ------------------------------------------------------------------------- + + /** Returns a ByteBuffer in the fully-written state the constructor leaves it in. */ + private static ByteBuffer simulateFullyWrittenBuffer() { + ByteBuffer buf = ByteBuffer.allocate(MESSAGE_SIZE); + // Fill with MESSAGE_SIZE bytes, mirroring what insertCAHeader + putFloat/putShort + // do in the EventAddRequest constructor. + for (int i = 0; i < MESSAGE_SIZE; i++) { + buf.put((byte) i); + } + assertEquals(MESSAGE_SIZE, buf.position()); + assertEquals(MESSAGE_SIZE, buf.limit()); + return buf; + } +} diff --git a/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestReconnectIT.java b/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestReconnectIT.java new file mode 100644 index 00000000..8887a6f9 --- /dev/null +++ b/test/com/cosylab/epics/caj/impl/requests/test/EventAddRequestReconnectIT.java @@ -0,0 +1,149 @@ +package com.cosylab.epics.caj.impl.requests.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.cosylab.epics.caj.CAJChannel; +import com.cosylab.epics.caj.CAJContext; +import com.cosylab.epics.caj.CAJMonitor; +import com.cosylab.epics.caj.impl.CATransport; +import com.cosylab.epics.caj.impl.requests.AbstractCARequest; +import com.cosylab.epics.caj.impl.requests.EventAddRequest; + +import gov.aps.jca.Channel; +import gov.aps.jca.Monitor; + +/** + * Demonstrates the ByteBuffer corruption bug in + * {@link EventAddRequest#resubscribeSubscription} using a live IOC. + * + *
Requires a running IOC with a PV named {@value #PV_NAME}. + * + *
Bug summary: A failed {@code noSyncSend()} during a reconnect + * leaves {@code requestMessage} with {@code position < capacity} after the + * transport partially consumed the buffer. The next + * {@code resubscribeSubscription()} call passes the buffer to + * {@code Transport.submit()}, which calls {@code flip()}, baking that partial + * position into {@code limit}. The following reconnect then throws + * {@link IndexOutOfBoundsException} at {@code requestMessage.putInt(8, ...)} + * because {@code limit < 12}. This exception is caught and logged at SEVERE + * in {@code CAJChannel.resubscribeSubscriptions()}, silently leaving the + * monitor dead. + * + *
This test demonstrates the exact three-step sequence using reflection to + * place {@code requestMessage} into the corrupted state, then forces a + * reconnect via {@link CATransport#close(boolean)}. With the fix in place the + * channel reconnects cleanly; without the fix + * {@code resubscribeSubscription()} throws and the monitor is silently lost. + */ +@Ignore("Requires a running IOC with PV '" + EventAddRequestReconnectIT.PV_NAME + "'") +public class EventAddRequestReconnectIT { + + static final String PV_NAME = "eventAddRequesttest_1"; + + private CAJContext context; + private CAJChannel channel; + + @Before + public void setUp() throws Exception { + context = new CAJContext(); + channel = (CAJChannel) context.createChannel(PV_NAME); + context.pendIO(5.0); + assertEquals(Channel.CONNECTED, channel.getConnectionState()); + } + + @After + public void tearDown() throws Exception { + if (context != null && !context.isDestroyed()) + context.destroy(); + context = null; + } + + /** + * Reproduces the three-step buffer corruption sequence, then verifies + * that the fix allows a clean reconnect and the monitor survives. + * + *
Without the fix the reconnect in step 3 would log a SEVERE + * {@link IndexOutOfBoundsException} and the monitor would silently stop + * delivering updates. + */ + @Test + public void monitorSurvivesReconnectAfterPartialSendFailure() throws Exception { + // --- Step 1: create a monitor (allocates EventAddRequest internally) --- + CAJMonitor cajMonitor = (CAJMonitor) channel.addMonitor(Monitor.VALUE); + context.flushIO(); + assertNotNull(cajMonitor); + + // --- Step 2: reach inside and corrupt requestMessage --- + // Simulate the state left by a failed noSyncSend(): + // Transport.submit() called flip() (position=0, limit=capacity), + // channel.write() advanced position to 5, then threw IOException. + // Result: position=5, limit=capacity. + // + // Then the NEXT resubscribeSubscription called flip() on that state, + // leaving position=0, limit=5 — the buffer that triggers the bug. + + Field earField = CAJMonitor.class.getDeclaredField("eventAddRequest"); + earField.setAccessible(true); + EventAddRequest ear = (EventAddRequest) earField.get(cajMonitor); + assertNotNull("eventAddRequest must be set after addMonitor()", ear); + + // requestMessage is declared in AbstractCARequest + Field bufField = AbstractCARequest.class.getDeclaredField("requestMessage"); + bufField.setAccessible(true); + ByteBuffer requestMessage = (ByteBuffer) bufField.get(ear); + assertNotNull(requestMessage); + + int capacity = requestMessage.capacity(); + + // Place the buffer into the corrupted state (position=0, limit=5). + // This is the state that caused the third reconnect to throw. + requestMessage.limit(capacity); // ensure limit is sane before we start + requestMessage.position(capacity); + requestMessage.flip(); // position=0, limit=capacity — as after submit() + requestMessage.position(5); // partial write, then IOException + requestMessage.flip(); // position=0, limit=5 — the corrupted state + + assertEquals("pre-condition: limit should be the partial position", + 5, requestMessage.limit()); + + // --- Step 3: force a reconnect --- + // CAJChannel.resubscribeSubscriptions() will call + // ear.resubscribeSubscription(newTransport), which calls + // requestMessage.putInt(8, sid). With limit=5 and no fix, this throws. + // With the fix, limit and position are reset to capacity first. + CATransport transport = (CATransport) channel.getTransport(); + transport.close(true); // forces disconnect and search/reconnect + + assertEquals(Channel.DISCONNECTED, channel.getConnectionState()); + + Thread.sleep(3000); + + // With the fix: channel reconnects and monitor is active. + // Without the fix: channel reconnects but the SEVERE log shows: + // java.lang.IndexOutOfBoundsException + // at EventAddRequest.resubscribeSubscription(EventAddRequest.java:...) + // and the monitor silently stops delivering updates. + assertEquals("channel should reconnect after forced close", + Channel.CONNECTED, channel.getConnectionState()); + + // Verify the buffer is in the fully-consumed state after the send: + // fix resets to position=capacity, limit=capacity + // flip() → position=0, limit=capacity + // sendBuffer.put(msg) → position=capacity, limit=capacity (all bytes consumed) + // position=capacity, limit=capacity is the "ready for next fix-and-flip" state. + ByteBuffer buf = (ByteBuffer) bufField.get(ear); + assertEquals("after reconnect: buffer limit should equal full message capacity", + capacity, buf.limit()); + assertEquals("after reconnect: buffer position should equal capacity (fully consumed)", + capacity, buf.position()); + } +}