Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class FlinkMetricsITCase {
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.setConfiguration(reporter.addToConfiguration(buildTestConfig()))
.build());

private static final String CATALOG_NAME = "testcatalog";
Expand All @@ -93,6 +93,21 @@ abstract class FlinkMetricsITCase {

private TableEnvironment tEnv;

/**
* Builds a minimal Flink {@link Configuration} for use in tests.
*
* <p>The network buffer pool size is intentionally reduced from the default 64MB to 32MB to
* lower JVM direct memory pressure when multiple IT cases run sequentially in the same JVM
* fork. The default size is not needed for these tests, which do not exercise high-throughput
* network paths.
*/
private static Configuration buildTestConfig() {
Configuration config = new Configuration();
config.setString("taskmanager.memory.network.min", "32mb");
config.setString("taskmanager.memory.network.max", "32mb");
return config;
}

@BeforeAll
protected static void beforeAll() {
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
Expand All @@ -102,6 +117,11 @@ protected static void beforeAll() {
try {
MINI_CLUSTER_EXTENSION.before();
} catch (Exception e) {
// JUnit 5 does not invoke @AfterAll when @BeforeAll throws, so we must explicitly
// call after() here to release any direct memory that was partially allocated by
// the NetworkBufferPool before the failure. Leaving it unreleased would exhaust
// JVM direct memory for subsequent test classes in the same JVM fork.
MINI_CLUSTER_EXTENSION.after();
throw new FlussRuntimeException("Fail to init Flink mini cluster", e);
}
}
Expand Down Expand Up @@ -129,17 +149,22 @@ void afterEach() {

@AfterAll
static void afterAll() throws Exception {
if (admin != null) {
admin.close();
admin = null;
}

if (conn != null) {
conn.close();
conn = null;
// Use try/finally to guarantee MINI_CLUSTER_EXTENSION.after() is always called,
// even if admin or conn cleanup throws. An unreleased MiniCluster holds JVM direct
// memory (NetworkBufferPool) that would cause OOM errors in subsequent test classes
// running in the same JVM fork.
try {
if (admin != null) {
admin.close();
admin = null;
}
if (conn != null) {
conn.close();
conn = null;
}
} finally {
MINI_CLUSTER_EXTENSION.after();
}

MINI_CLUSTER_EXTENSION.after();
}

protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor)
Expand Down