From 49895aec79ca15b8b0ad2c7a02ba3b7fae5cd3d2 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Wed, 22 Apr 2026 11:48:42 +0800 Subject: [PATCH] [FLINK-39210][table] Fixed Memory Leak Issue in Flink SQL with Multiple Temporal Joins --- .../temporal/TemporalRowTimeJoinOperator.java | 25 +++++++-- .../TemporalRowTimeJoinOperatorTest.java | 56 +++++++++++++++++++ 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index 940e15c2b4d03..65172fa70a40f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -205,13 +205,15 @@ public void onEventTime(InternalTimer timer) throws Excep } // if we have more state at any side, then update the timer, else clean it up. - if (stateCleaningEnabled) { - if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) { + if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) { + if (stateCleaningEnabled) { registerProcessingCleanupTimer(); - } else { - cleanupLastTimer(); - nextLeftIndex.clear(); } + } else { + // Both sides drained for this key — drop per-key bookkeeping so the state + // backend can reclaim the entry. + nextLeftIndex.clear(); + cleanupLastTimer(); } } @@ -293,6 +295,14 @@ private void cleanupExpiredVersionInState(long currentWatermark, List r rightState.remove(rightTime); i += 1; } + // Remove the last remaining tombstone (DELETE/UPDATE_BEFORE) so that the per-key + // state can be fully cleaned up for deleted versioned rows. + if (indexToKeep >= 0 && indexToKeep == rightRowsSorted.size() - 1) { + RowData last = rightRowsSorted.get(indexToKeep); + if (!RowDataUtil.isAccumulateMsg(last) && getRightTime(last) <= currentWatermark) { + rightState.remove(getRightTime(last)); + } + } } /** @@ -441,4 +451,9 @@ static String getNextLeftIndexStateName() { static String getRegisteredTimerStateName() { return REGISTERED_TIMER_STATE_NAME; } + + @VisibleForTesting + static String getRightStateName() { + return RIGHT_STATE_NAME; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java index bff3b8ae73aaf..6003ae13f21d3 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.join.temporal; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.watermark.Watermark; @@ -270,6 +271,61 @@ private void testRowTimeTemporalJoinOnUpsertSource( testHarness.close(); } + /** FLINK-39210: per-key state must be cleaned up once both sides are drained. */ + @Test + void testPerKeyStateIsCleanedUpWithoutStateRetention() throws Exception { + TemporalRowTimeJoinOperator joinOperator = + new TemporalRowTimeJoinOperator(rowType, rowType, joinCondition, 0, 0, 0, 0, true); + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(joinOperator); + + testHarness.open(); + + testHarness.processElement1(insertRecord(1L, "k1", "1a1")); + testHarness.processElement2(insertRecord(2L, "k1", "1a2")); + // Tombstone the versioned build-side row for k1. + testHarness.processElement2(deleteRecord(3L, "k1", "1a2")); + testHarness.processElement1(insertRecord(4L, "k1", "1a4")); + + testHarness.processWatermark1(new Watermark(10)); + testHarness.processWatermark2(new Watermark(10)); + + // After the watermark passes both the probe and the tombstoned build side, the per-key + // bookkeeping must be fully cleaned up even when idle-state retention is disabled. + assertThat( + joinOperator + .getKeyedStateStore() + .getState( + new ValueStateDescriptor<>( + TemporalRowTimeJoinOperator + .getNextLeftIndexStateName(), + Types.LONG)) + .value()) + .isNull(); + assertThat( + joinOperator + .getKeyedStateStore() + .getState( + new ValueStateDescriptor<>( + TemporalRowTimeJoinOperator + .getRegisteredTimerStateName(), + Types.LONG)) + .value()) + .isNull(); + assertThat( + joinOperator + .getKeyedStateStore() + .getMapState( + new MapStateDescriptor<>( + TemporalRowTimeJoinOperator.getRightStateName(), + Types.LONG, + rowType)) + .isEmpty()) + .isTrue(); + + testHarness.close(); + } + private KeyedTwoInputStreamOperatorTestHarness createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception {