Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ public ConnectPoolMgr(int maxConnections) {

public void timeoutChecker(long now) {
for (ConnectContext connectContext : connectionMap.values()) {
connectContext.checkTimeout(now);
try {
connectContext.checkTimeout(now);
} catch (Throwable t) {
LOG.warn("failed to check timeout for connection, connectionId: {}, user: {}",
connectContext.getConnectionId(), connectContext.getQualifiedUser(), t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,13 @@ public Map<String, AtomicInteger> getUserConnectionMap() {
private class TimeoutChecker extends TimerTask {
@Override
public void run() {
long now = System.currentTimeMillis();
connectPoolMgr.timeoutChecker(now);
flightSqlConnectPoolMgr.timeoutChecker(now);
try {
long now = System.currentTimeMillis();
connectPoolMgr.timeoutChecker(now);
flightSqlConnectPoolMgr.timeoutChecker(now);
} catch (Throwable t) {
LOG.warn("failed to check connection timeout", t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;

import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConnectSchedulerTest {
Expand Down Expand Up @@ -105,4 +106,39 @@ public void testSubmitTooMany() throws InterruptedException {
ConnectContext context = new ConnectContext();
Assert.assertTrue(scheduler.submit(context));
}

@Test
public void testTimeoutCheckerContinuesAfterContextException() {
ConnectPoolMgr connectPoolMgr = new ConnectPoolMgr(10);
ThrowingConnectContext throwingContext = new ThrowingConnectContext();
CountingConnectContext countingContext = new CountingConnectContext();
throwingContext.setConnectionId(1);
countingContext.setConnectionId(2);
connectPoolMgr.getConnectionMap().put(throwingContext.getConnectionId(), throwingContext);
connectPoolMgr.getConnectionMap().put(countingContext.getConnectionId(), countingContext);

connectPoolMgr.timeoutChecker(System.currentTimeMillis());

Assert.assertEquals(1, throwingContext.checkCount.get());
Assert.assertEquals(1, countingContext.checkCount.get());
}

private static class ThrowingConnectContext extends ConnectContext {
private final AtomicInteger checkCount = new AtomicInteger(0);

@Override
public void checkTimeout(long now) {
checkCount.incrementAndGet();
throw new RuntimeException("mock check timeout exception");
}
}

private static class CountingConnectContext extends ConnectContext {
private final AtomicInteger checkCount = new AtomicInteger(0);

@Override
public void checkTimeout(long now) {
checkCount.incrementAndGet();
}
}
}
Loading