Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/core/com/cosylab/epics/caj/CAJContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ public class CAJContext extends Context implements CAContext, CAJConstants, Conf
*/
protected int maxArrayBytes = 16384;

/**
* Maximum number of times to retry a partial TCP send before giving up.
* A value of -1 means retry indefinitely (the default).
*/
protected int maxSendRetries = -1;

/**
* Minimum interval in seconds between CA search broadcasts. Default is 0.1 seconds
*/
Expand Down Expand Up @@ -465,6 +471,11 @@ protected void loadConfiguration()
catch (Exception ex)
{ logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_ARRAY_BYTES='" + tmp + "'", ex); }

tmp = System.getenv("EPICS_CA_MAX_SEND_RETRIES");
if (tmp != null)
try { maxSendRetries = Integer.parseInt(tmp); }
catch (Exception ex)
{ logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_SEND_RETRIES='" + tmp + "'", ex); }

tmp = System.getenv("EPICS_CA_MAX_SEARCH_PERIOD");
if (tmp != null)
Expand All @@ -485,9 +496,10 @@ protected void loadConfiguration()
repeaterPort = jcaLibrary.getPropertyAsInt(contextClassName + ".repeater_port", repeaterPort);
serverPort = jcaLibrary.getPropertyAsInt(contextClassName + ".server_port", serverPort);
maxArrayBytes = jcaLibrary.getPropertyAsInt(contextClassName + ".max_array_bytes", maxArrayBytes);
maxSendRetries = jcaLibrary.getPropertyAsInt(contextClassName + ".max_send_retries", maxSendRetries);
maxSearchInterval = jcaLibrary.getPropertyAsFloat(contextClassName + ".max_search_interval", maxSearchInterval);
eventDispatcherClassName = jcaLibrary.getProperty(contextClassName + ".event_dispatcher");

// load CAJ specific configuration (overrides default)
addressList = jcaLibrary.getProperty(thisClassName + ".addr_list", addressList);
autoAddressList = jcaLibrary.getPropertyAsBoolean(thisClassName + ".auto_addr_list", autoAddressList);
Expand All @@ -500,6 +512,7 @@ protected void loadConfiguration()
repeaterPort = jcaLibrary.getPropertyAsInt(thisClassName + ".repeater_port", repeaterPort);
serverPort = jcaLibrary.getPropertyAsInt(thisClassName + ".server_port", serverPort);
maxArrayBytes = jcaLibrary.getPropertyAsInt(thisClassName + ".max_array_bytes", maxArrayBytes);
maxSendRetries = jcaLibrary.getPropertyAsInt(thisClassName + ".max_send_retries", maxSendRetries);
minSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".min_search_interval", minSearchInterval);
maxSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".max_search_interval", maxSearchInterval);
}
Expand Down Expand Up @@ -580,6 +593,13 @@ public void configure(Configuration configuration)
maxArrayBytes = configuration.getAttributeAsInteger("max_array_bytes", maxArrayBytes);
}

// max. send retries (-1 = infinite)
try {
maxSendRetries = configuration.getChild("max_send_retries", false).getValueAsInteger();
} catch(Exception ex) {
maxSendRetries = configuration.getAttributeAsInteger("max_send_retries", maxSendRetries);
}

// max. search interval
try {
maxSearchInterval = configuration.getChild("max_search_interval", false).getValueAsFloat();
Expand Down Expand Up @@ -1322,6 +1342,7 @@ public void printInfo(PrintStream out) throws IllegalStateException {
out.println("REPEATER_PORT : " + repeaterPort);
out.println("SERVER_PORT : " + serverPort);
out.println("MAX_ARRAY_BYTES : " + maxArrayBytes);
out.println("MAX_SEND_RETRIES : " + (maxSendRetries < 0 ? "infinite" : maxSendRetries));
out.println("MIN_SEARCH_INTERVAL : " + minSearchInterval);
out.println("MAX_SEARCH_INTERVAL : " + maxSearchInterval);
out.println("EVENT_DISPATCHER: " + eventDispatcher);
Expand Down Expand Up @@ -1438,6 +1459,14 @@ public int getMaxArrayBytes() {
return maxArrayBytes;
}

/**
* Get maximum number of TCP send retries (-1 means infinite).
* @return max send retries, or -1 for unlimited.
*/
public int getMaxSendRetries() {
return maxSendRetries;
}

/**
* Get repeater port.
* @return repeater port.
Expand Down
110 changes: 45 additions & 65 deletions src/core/com/cosylab/epics/caj/impl/CATransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,96 +630,76 @@ protected void enableFlowControl()

/**
* Send a buffer through the transport.
* NOTE: TCP sent buffer/sending has to be synchronized.
* NOTE: TCP sent buffer/sending has to be synchronized.
* @param buffer buffer to be sent
* @throws IOException
* @throws IOException
*/
public void send(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException
public void send(ByteBuffer buffer) throws IOException
{
sendLock.lock();
try
{
noSyncSend(buffer, asyncCloseOnError);
noSyncSend(buffer);
}
finally
finally
{
sendLock.unlock();
}
}

/**
* Send a buffer through the transport.
* NOTE: TCP sent buffer/sending has to be synchronized.
* @param buffer buffer to be sent
* @throws IOException
* Send a buffer through the transport without acquiring the send lock
* (caller is responsible for holding it).
*
* <p>Flips the buffer, then loops until all bytes are written. If the
* kernel TCP send buffer is full, waits briefly and retries. Throws
* {@link IOException} (and closes the transport) if the send buffer
* remains full after the configured max send retries (see
* {@link CAJContext#getMaxSendRetries()}; -1 means retry indefinitely),
* the channel is already closed, or the calling thread is interrupted.
*
* @param buffer fully-written buffer to send (will be flipped)
* @throws IOException on write error, persistent backpressure, or interrupt
*/
// TODO optimize !!!
private void noSyncSend(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException
private void noSyncSend(ByteBuffer buffer) throws IOException
{
try
{
// prepare buffer
buffer.flip();

final int SEND_BUFFER_LIMIT = 16000;
int bufferLimit = buffer.limit();
context.getLogger().finest("Sending " + buffer.limit() + " bytes to " + socketAddress + ".");

// TODO remove?!
context.getLogger().finest("Sending " + bufferLimit + " bytes to " + socketAddress + ".");

// limit sending large buffers, split the into parts
int parts = (buffer.limit()-1) / SEND_BUFFER_LIMIT + 1;
for (int part = 1; part <= parts; part++)
int tries = 0;
final int maxRetries = context.getMaxSendRetries();
while (buffer.hasRemaining())
{
if (parts > 1)
{
buffer.limit(Math.min(part * SEND_BUFFER_LIMIT, bufferLimit));
context.getLogger().finest("[Parted] Sending (part " + part + "/" + parts + ") " + (buffer.limit()-buffer.position()) + " bytes to " + socketAddress + ".");
}

final int TRIES = 10;
for (int tries = 0; /* tries <= TRIES */ ; tries++)
int bytesSent = channel.write(buffer);
if (bytesSent < 0)
throw new IOException("channel closed");

if (buffer.hasRemaining())
{

// send
int bytesSent = channel.write(buffer);
if (bytesSent < 0)
throw new IOException("bytesSent < 0");

// bytesSend == buffer.position(), so there is no need for flip()
if (buffer.position() != buffer.limit())
{
if (closed)
throw new IOException("transport closed on the client side");

if (tries >= TRIES)
{
context.getLogger().warning("Failed to send message to " + socketAddress + " - buffer full, will retry.");

//if (tries >= 2*TRIES)
// throw new IOException("TCP send buffer persistently full, disconnecting!");

}

// flush & wait for a while...
context.getLogger().finest("Send buffer full for " + socketAddress + ", waiting...");
channel.socket().getOutputStream().flush();
try {
Thread.sleep(Math.min(15000,10+tries*100));
} catch (InterruptedException e) {
// noop
}
continue;
if (closed)
throw new IOException("transport closed on the client side");

tries++;
if (maxRetries >= 0 && tries > maxRetries)
throw new IOException("TCP send buffer persistently full, disconnecting " + socketAddress);

context.getLogger().finest("Send buffer full for " + socketAddress
+ ", waiting (attempt " + tries
+ (maxRetries < 0 ? "" : "/" + maxRetries) + ")...");
try {
Thread.sleep(Math.min(15000, 10 + tries * 100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("interrupted while sending to " + socketAddress);
}
else
break;
}

}
}
catch (IOException ioex)
catch (IOException ioex)
{
// close connection
close(true);
throw ioex;
}
Expand Down Expand Up @@ -832,7 +812,7 @@ public boolean flushInternal()
}

try {
send(buf, false);
send(buf);
}
finally {
// return back to the cache
Expand Down Expand Up @@ -883,7 +863,7 @@ public void submit(Request requestMessage) throws IOException {
{
try
{
noSyncSend(message, true);
noSyncSend(message);
return;
}
finally
Expand Down
12 changes: 12 additions & 0 deletions src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ public void submit() throws IOException
public void resubscribeSubscription(Transport transport) throws IOException
{
this.transport = transport;
// Reset buffer state: a prior failed send (IOException mid-write in noSyncSend)
// can leave position < capacity and limit = capacity. The next flip() would then
// set limit = that partial position, causing putInt(8,…) to throw
// IndexOutOfBoundsException on the reconnect after that.
//
// requestMessage is a fixed-size, single-purpose buffer allocated in the
// constructor (see EventAddRequest() lines ~107/114) to hold exactly one
// EventAdd message, so capacity() == message size is always true. Resetting
// to position == limit == capacity restores the fully-written state, so the
// flip() in Transport.submit() always produces position=0, limit=capacity.
requestMessage.limit(requestMessage.capacity());
requestMessage.position(requestMessage.capacity());
// update channel sid
requestMessage.putInt(8, channel.getServerChannelID());
// immediate send (increase priority - all subsequent sends will be done immediately).
Expand Down
Loading
Loading