diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java index b64bcdd928454e..643908e52c15ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java @@ -53,7 +53,12 @@ public ConnectPoolMgr(int maxConnections) { public void timeoutChecker(long now) { for (ConnectContext connectContext : connectionMap.values()) { - connectContext.checkTimeout(now); + try { + connectContext.checkTimeout(now); + } catch (Throwable t) { + LOG.warn("failed to check timeout for connection, connectionId: {}, user: {}", + connectContext.getConnectionId(), connectContext.getQualifiedUser(), t); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 32ea481fa9fb7d..b22e89cf209567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -160,9 +160,13 @@ public Map getUserConnectionMap() { private class TimeoutChecker extends TimerTask { @Override public void run() { - long now = System.currentTimeMillis(); - connectPoolMgr.timeoutChecker(now); - flightSqlConnectPoolMgr.timeoutChecker(now); + try { + long now = System.currentTimeMillis(); + connectPoolMgr.timeoutChecker(now); + flightSqlConnectPoolMgr.timeoutChecker(now); + } catch (Throwable t) { + LOG.warn("failed to check connection timeout", t); + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java index 9898a7cf1d4d7e..7f750d37dee6f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class ConnectSchedulerTest { @@ -105,4 +106,39 @@ public void testSubmitTooMany() throws InterruptedException { ConnectContext context = new ConnectContext(); Assert.assertTrue(scheduler.submit(context)); } + + @Test + public void testTimeoutCheckerContinuesAfterContextException() { + ConnectPoolMgr connectPoolMgr = new ConnectPoolMgr(10); + ThrowingConnectContext throwingContext = new ThrowingConnectContext(); + CountingConnectContext countingContext = new CountingConnectContext(); + throwingContext.setConnectionId(1); + countingContext.setConnectionId(2); + connectPoolMgr.getConnectionMap().put(throwingContext.getConnectionId(), throwingContext); + connectPoolMgr.getConnectionMap().put(countingContext.getConnectionId(), countingContext); + + connectPoolMgr.timeoutChecker(System.currentTimeMillis()); + + Assert.assertEquals(1, throwingContext.checkCount.get()); + Assert.assertEquals(1, countingContext.checkCount.get()); + } + + private static class ThrowingConnectContext extends ConnectContext { + private final AtomicInteger checkCount = new AtomicInteger(0); + + @Override + public void checkTimeout(long now) { + checkCount.incrementAndGet(); + throw new RuntimeException("mock check timeout exception"); + } + } + + private static class CountingConnectContext extends ConnectContext { + private final AtomicInteger checkCount = new AtomicInteger(0); + + @Override + public void checkTimeout(long now) { + checkCount.incrementAndGet(); + } + } }