From 0c9a998ec44726e8db3393225bf745605aebf89b Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Fri, 13 Mar 2026 08:36:02 +0000 Subject: [PATCH] Fix OOM when startTaskManager in FlinkMetricsITCase --- .../flink/metrics/FlinkMetricsITCase.java | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java index f13e4860e6..66a349a95e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java @@ -81,7 +81,7 @@ abstract class FlinkMetricsITCase { new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) - .setConfiguration(reporter.addToConfiguration(new Configuration())) + .setConfiguration(reporter.addToConfiguration(buildTestConfig())) .build()); private static final String CATALOG_NAME = "testcatalog"; @@ -93,6 +93,21 @@ abstract class FlinkMetricsITCase { private TableEnvironment tEnv; + /** + * Builds a minimal Flink {@link Configuration} for use in tests. + * + *

The network buffer pool size is intentionally reduced from the default 64MB to 32MB to + * lower JVM direct memory pressure when multiple IT cases run sequentially in the same JVM + * fork. The default size is not needed for these tests, which do not exercise high-throughput + * network paths. + */ + private static Configuration buildTestConfig() { + Configuration config = new Configuration(); + config.setString("taskmanager.memory.network.min", "32mb"); + config.setString("taskmanager.memory.network.max", "32mb"); + return config; + } + @BeforeAll protected static void beforeAll() { clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); @@ -102,6 +117,11 @@ protected static void beforeAll() { try { MINI_CLUSTER_EXTENSION.before(); } catch (Exception e) { + // JUnit 5 does not invoke @AfterAll when @BeforeAll throws, so we must explicitly + // call after() here to release any direct memory that was partially allocated by + // the NetworkBufferPool before the failure. Leaving it unreleased would exhaust + // JVM direct memory for subsequent test classes in the same JVM fork. + MINI_CLUSTER_EXTENSION.after(); throw new FlussRuntimeException("Fail to init Flink mini cluster", e); } } @@ -129,17 +149,22 @@ void afterEach() { @AfterAll static void afterAll() throws Exception { - if (admin != null) { - admin.close(); - admin = null; - } - - if (conn != null) { - conn.close(); - conn = null; + // Use try/finally to guarantee MINI_CLUSTER_EXTENSION.after() is always called, + // even if admin or conn cleanup throws. An unreleased MiniCluster holds JVM direct + // memory (NetworkBufferPool) that would cause OOM errors in subsequent test classes + // running in the same JVM fork. + try { + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } + } finally { + MINI_CLUSTER_EXTENSION.after(); } - - MINI_CLUSTER_EXTENSION.after(); } protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor)