From 867c559e3dc73c5d7f62d2aab3a2fb898b83756d Mon Sep 17 00:00:00 2001 From: feiniaofeiafei Date: Wed, 1 Jul 2026 11:22:07 +0800 Subject: [PATCH] [fix](connection) Prevent timeout checker from stopping after an exception (#65040) Problem Summary: The connection timeout checker is scheduled with `scheduleAtFixedRate`. If an unchecked exception escapes from the task, `ScheduledThreadPoolExecutor` suppresses all subsequent executions. As a result, expired connections may no longer be cleaned up. This PR: - Isolates exceptions from each `ConnectContext`, so one broken connection does not prevent other connections from being checked. - Adds an outer exception boundary to keep the periodic timeout checker alive. - Logs the stack trace together with the connection ID and user for diagnosis. - Adds a unit test verifying that timeout checking continues after one context throws an exception. This applies to both MySQL and Arrow Flight SQL connection pools. --- .../org/apache/doris/qe/ConnectPoolMgr.java | 7 +++- .../org/apache/doris/qe/ConnectScheduler.java | 10 ++++-- .../apache/doris/qe/ConnectSchedulerTest.java | 36 +++++++++++++++++++ 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java index b64bcdd928454e..643908e52c15ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectPoolMgr.java @@ -53,7 +53,12 @@ public ConnectPoolMgr(int maxConnections) { public void timeoutChecker(long now) { for (ConnectContext connectContext : connectionMap.values()) { - connectContext.checkTimeout(now); + try { + connectContext.checkTimeout(now); + } catch (Throwable t) { + LOG.warn("failed to check timeout for connection, connectionId: {}, user: {}", + connectContext.getConnectionId(), connectContext.getQualifiedUser(), t); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 32ea481fa9fb7d..b22e89cf209567 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -160,9 +160,13 @@ public Map getUserConnectionMap() { private class TimeoutChecker extends TimerTask { @Override public void run() { - long now = System.currentTimeMillis(); - connectPoolMgr.timeoutChecker(now); - flightSqlConnectPoolMgr.timeoutChecker(now); + try { + long now = System.currentTimeMillis(); + connectPoolMgr.timeoutChecker(now); + flightSqlConnectPoolMgr.timeoutChecker(now); + } catch (Throwable t) { + LOG.warn("failed to check connection timeout", t); + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java index 9898a7cf1d4d7e..7f750d37dee6f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class ConnectSchedulerTest { @@ -105,4 +106,39 @@ public void testSubmitTooMany() throws InterruptedException { ConnectContext context = new ConnectContext(); Assert.assertTrue(scheduler.submit(context)); } + + @Test + public void testTimeoutCheckerContinuesAfterContextException() { + ConnectPoolMgr connectPoolMgr = new ConnectPoolMgr(10); + ThrowingConnectContext throwingContext = new ThrowingConnectContext(); + CountingConnectContext countingContext = new CountingConnectContext(); + throwingContext.setConnectionId(1); + countingContext.setConnectionId(2); + connectPoolMgr.getConnectionMap().put(throwingContext.getConnectionId(), throwingContext); + connectPoolMgr.getConnectionMap().put(countingContext.getConnectionId(), countingContext); + + connectPoolMgr.timeoutChecker(System.currentTimeMillis()); + + Assert.assertEquals(1, throwingContext.checkCount.get()); + Assert.assertEquals(1, countingContext.checkCount.get()); + } + + private static class ThrowingConnectContext extends ConnectContext { + private final AtomicInteger checkCount = new AtomicInteger(0); + + @Override + public void checkTimeout(long now) { + checkCount.incrementAndGet(); + throw new RuntimeException("mock check timeout exception"); + } + } + + private static class CountingConnectContext extends ConnectContext { + private final AtomicInteger checkCount = new AtomicInteger(0); + + @Override + public void checkTimeout(long now) { + checkCount.incrementAndGet(); + } + } }