Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Excep
}

// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
if (stateCleaningEnabled) {
registerProcessingCleanupTimer();
} else {
cleanupLastTimer();
nextLeftIndex.clear();
}
} else {
// Both sides drained for this key — drop per-key bookkeeping so the state
// backend can reclaim the entry.
nextLeftIndex.clear();
cleanupLastTimer();
}
}

Expand Down Expand Up @@ -293,6 +295,14 @@ private void cleanupExpiredVersionInState(long currentWatermark, List<RowData> r
rightState.remove(rightTime);
i += 1;
}
// Remove the last remaining tombstone (DELETE/UPDATE_BEFORE) so that the per-key
// state can be fully cleaned up for deleted versioned rows.
if (indexToKeep >= 0 && indexToKeep == rightRowsSorted.size() - 1) {
RowData last = rightRowsSorted.get(indexToKeep);
if (!RowDataUtil.isAccumulateMsg(last) && getRightTime(last) <= currentWatermark) {
rightState.remove(getRightTime(last));
}
}
}

/**
Expand Down Expand Up @@ -441,4 +451,9 @@ static String getNextLeftIndexStateName() {
static String getRegisteredTimerStateName() {
return REGISTERED_TIMER_STATE_NAME;
}

@VisibleForTesting
static String getRightStateName() {
return RIGHT_STATE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.join.temporal;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -270,6 +271,61 @@ private void testRowTimeTemporalJoinOnUpsertSource(
testHarness.close();
}

/** FLINK-39210: per-key state must be cleaned up once both sides are drained. */
@Test
void testPerKeyStateIsCleanedUpWithoutStateRetention() throws Exception {
TemporalRowTimeJoinOperator joinOperator =
new TemporalRowTimeJoinOperator(rowType, rowType, joinCondition, 0, 0, 0, 0, true);
KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
createTestHarness(joinOperator);

testHarness.open();

testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
// Tombstone the versioned build-side row for k1.
testHarness.processElement2(deleteRecord(3L, "k1", "1a2"));
testHarness.processElement1(insertRecord(4L, "k1", "1a4"));

testHarness.processWatermark1(new Watermark(10));
testHarness.processWatermark2(new Watermark(10));

// After the watermark passes both the probe and the tombstoned build side, the per-key
// bookkeeping must be fully cleaned up even when idle-state retention is disabled.
assertThat(
joinOperator
.getKeyedStateStore()
.getState(
new ValueStateDescriptor<>(
TemporalRowTimeJoinOperator
.getNextLeftIndexStateName(),
Types.LONG))
.value())
.isNull();
assertThat(
joinOperator
.getKeyedStateStore()
.getState(
new ValueStateDescriptor<>(
TemporalRowTimeJoinOperator
.getRegisteredTimerStateName(),
Types.LONG))
.value())
.isNull();
assertThat(
joinOperator
.getKeyedStateStore()
.getMapState(
new MapStateDescriptor<>(
TemporalRowTimeJoinOperator.getRightStateName(),
Types.LONG,
rowType))
.isEmpty())
.isTrue();

testHarness.close();
}

private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData>
createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception {

Expand Down