Skip to content

Fix ByteBuffer corruption on reconnect causing IndexOutOfBoundsException#87

Open
jacomago wants to merge 4 commits intoepics-base:masterfrom
jacomago:resubscribe-bug
Open

Fix ByteBuffer corruption on reconnect causing IndexOutOfBoundsException#87
jacomago wants to merge 4 commits intoepics-base:masterfrom
jacomago:resubscribe-bug

Conversation

@jacomago
Copy link
Copy Markdown

@jacomago jacomago commented Mar 27, 2026

Two bugs triggered by connection failures on Java 11+ (including Java 21):

  1. EventAddRequest.resubscribeSubscription(): a failed noSyncSend could leave requestMessage with position < capacity. The next reconnect would flip() that partial position into limit, and the reconnect after that would throw IndexOutOfBoundsException at putInt(8, ...) because limit < 12. Fix: reset limit and position to capacity at the top of resubscribeSubscription() so flip() in Transport.submit() always produces position=0, limit=capacity.

  2. CATransport.noSyncSend(): channel.socket().getOutputStream().flush() throws IllegalBlockingModeException when the SocketChannel is in non-blocking mode (as used by the Reactor), and is a no-op for TCP output in any mode. The uncaught RuntimeException bypassed the catch(IOException) handler, skipping close(true) and leaving the buffer in a partially-consumed state. Fix: remove the call.

Fixes #86

  • Add some tests for this problem
  • Refactors the noSyncSend
  • Adds a new configuration variable MAX_SEND_RETRIES to cap the number of retries.

Copy link
Copy Markdown
Contributor

@shroffk shroffk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hoping there was no edge case use for the explicit flush()

@kasemir
Copy link
Copy Markdown
Contributor

kasemir commented Mar 30, 2026

I'm not familiar with the code. The comments are very good, so the next person looking at this gets a good idea of what we're trying to accomplish here.

Still, simply based on having used java.nio.Buffer in other code I'm surprised by this:

// ...Resetting to the fully-written state
...
requestMessage.position(requestMessage.capacity());

A Buffer tends to be sized to hold the largest possible message. For example, a buffer may have a capacity of 2048 bytes. To use the buffer, the buffer is cleared, a message with for example 10 bytes is placed into the buffer, flip(), and write.

For the next usage, clear() or maybe rewind() the buffer, fill with content, position() shows how much valid content there is, flip(), and so on.

I get it that we have a bug here where the buffer isn't properly reset. But to fix that, I would expect one of

requestMessage.position(SizeOfTheExpectedMessage);
requestMessage.clear();
requestMessage.rewind();

which reset to buffer to either "empty" or to the size of the expected message.
Setting it to requestMessage.capacity() sets it to the somewhat arbitrary size of the largest possible message.

Or do we know for sure that requestMessage will only hold one type of message, and requestMessage.capacity() is always the size of that same message?

@kasemir
Copy link
Copy Markdown
Contributor

kasemir commented Mar 30, 2026

I am hoping there was no edge case use for the explicit flush()

Right. Who knows?
Could do this to stay on the safe side.

// This 'flush' seems to be unnecessary and even cause an exception ...
// but keeping it for compatibility with unknown side effects 
try
{
   ..flush();
}
catch (Exception ex)
{
  // Ignore
}

@jacomago
Copy link
Copy Markdown
Author

I'm currently trying to reproduce the bug so I can explicitly test this fix.

I think rewind makes more sense. But I just went through code and it is that there is one message of a specific size, because we get the size from the type information.

I guess since this is happening in resubscribe the last event or events weren't cleared.

@jacomago
Copy link
Copy Markdown
Author

jacomago commented Apr 1, 2026

I am hoping there was no edge case use for the explicit flush()

Right. Who knows? Could do this to stay on the safe side.

// This 'flush' seems to be unnecessary and even cause an exception ...
// but keeping it for compatibility with unknown side effects 
try
{
   ..flush();
}
catch (Exception ex)
{
  // Ignore
}

The flush does nothing, the outputstream returned is from Socket which does not override flush and the default method for flush is empty.

jacomago and others added 3 commits April 1, 2026 10:33
Two bugs triggered by connection failures on Java 11+ (including Java 21):

1. EventAddRequest.resubscribeSubscription(): a failed noSyncSend could
   leave requestMessage with position < capacity.  The next reconnect would
   flip() that partial position into limit, and the reconnect after that
   would throw IndexOutOfBoundsException at putInt(8, ...) because
   limit < 12.  Fix: reset limit and position to capacity at the top of
   resubscribeSubscription() so flip() in Transport.submit() always
   produces position=0, limit=capacity.

2. CATransport.noSyncSend(): channel.socket().getOutputStream().flush()
   throws IllegalBlockingModeException when the SocketChannel is in
   non-blocking mode (as used by the Reactor), and is a no-op for TCP
   output in any mode.  The uncaught RuntimeException bypassed the
   catch(IOException) handler, skipping close(true) and leaving the
   buffer in a partially-consumed state.  Fix: remove the call.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove dead asyncCloseOnError parameter: it was never consulted —
  the method always called close(true) on error. Both call sites passed
  true anyway; the one that passed false (flushInternal) already had its
  own close() in the catch block.

- Replace the double loop (outer 16 KB parts chunking + inner unbounded
  retry) with a single while (buffer.hasRemaining()) loop. Non-blocking
  channel.write() already returns fewer bytes when the kernel send
  buffer is full, making the explicit chunking redundant.

- Enforce the retry limit: the exit condition (tries <= TRIES) had been
  commented out, making the loop infinite. Now throws IOException after
  MAX_SEND_RETRIES (10) attempts so a persistently full send buffer
  triggers a clean disconnect rather than blocking forever.

- Restore interrupt handling: InterruptedException was silently swallowed.
  Now restores the interrupted status and throws IOException.

- Promote the retry constant to a named static field.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jacomago jacomago marked this pull request as ready for review April 7, 2026 10:49
Reverts default to infinite as before
@jacomago
Copy link
Copy Markdown
Author

jacomago commented Apr 7, 2026

@kasemir @shroffk

Add some tests for the specific problem, couldn't force it in the archiver.

Also did some refactoring for the noSyncSend

@jacomago
Copy link
Copy Markdown
Author

jacomago commented Apr 7, 2026

FYI @ralphlange

@ralphlange
Copy link
Copy Markdown
Contributor

That sounds like our crashes. Let me show this to my colleagues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

resubscribe failure

4 participants