From e1f6d66ffd47b03a03983f600588f371207dff46 Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Wed, 22 Apr 2026 20:27:33 +0800 Subject: [PATCH] [FLINK-37981][table] TableAsyncExecutionController sends Long.MAX watermark downstream in endInput --- .../TableKeyedAsyncWaitOperator.java | 4 ++ .../join/lookup/keyordered/EpochManager.java | 4 ++ .../TableAsyncExecutionController.java | 4 ++ .../TableKeyedAsyncWaitOperatorTest.java | 57 ++++++++++++++++--- 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableKeyedAsyncWaitOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableKeyedAsyncWaitOperator.java index 9c514b868e526..b0be684b84a0c 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableKeyedAsyncWaitOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableKeyedAsyncWaitOperator.java @@ -241,6 +241,10 @@ public void endInput() throws Exception { // timer not in running will be forbidden to fire after this, so that when the async // operation is stuck, it results in deadlock due to what the timeout timer is not fired waitInFlightInputsFinished(); + // Send MAX_WATERMARK downstream if not yet received, ensuring end-of-event-time signal. + if (asyncExecutionController.getCurrentWatermark() < Long.MAX_VALUE) { + asyncExecutionController.submitWatermark(Watermark.MAX_WATERMARK); + } } public void invoke(AecRecord element) throws Exception { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/EpochManager.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/EpochManager.java index 36992b33fe8e7..c5c65f1fba90d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/EpochManager.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/EpochManager.java @@ -100,6 +100,10 @@ public void close() { outputQueue.clear(); } + public Watermark getCurrentWatermark() { + return currentWatermark; + } + @VisibleForTesting public Epoch getActiveEpoch() { return activeEpoch; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/TableAsyncExecutionController.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/TableAsyncExecutionController.java index 8f5645172d77a..00da6056308bc 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/TableAsyncExecutionController.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/TableAsyncExecutionController.java @@ -197,6 +197,10 @@ public void close() { recordsBuffer.close(); } + public long getCurrentWatermark() { + return epochManager.getCurrentWatermark().getTimestamp(); + } + @VisibleForTesting public Epoch getActiveEpoch() { return epochManager.getActiveEpoch(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/lookup/TableKeyedAsyncWaitOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/lookup/TableKeyedAsyncWaitOperatorTest.java index a2a8a70b04f8c..30bedf25b070a 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/lookup/TableKeyedAsyncWaitOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/lookup/TableKeyedAsyncWaitOperatorTest.java @@ -111,7 +111,8 @@ void testMultiKeysWithWatermark() throws Exception { testHarness.processWatermark(new Watermark(initialTime + 8)); testHarness.endInput(); - epoch = new Epoch<>(new Watermark(initialTime + 8)); + // endInput() sends MAX_WATERMARK, so active epoch is now epoch_max + epoch = new Epoch<>(Watermark.MAX_WATERMARK); assertThat(aec.getActiveEpoch()).isEqualTo(epoch); expectedOutput.add(new StreamRecord<>(0, initialTime + 1)); @@ -125,6 +126,7 @@ void testMultiKeysWithWatermark() throws Exception { expectedOutput.add(new StreamRecord<>(1, initialTime + 7)); expectedOutput.add(new StreamRecord<>(0, initialTime + 8)); expectedOutput.add(new Watermark(initialTime + 8)); + expectedOutput.add(Watermark.MAX_WATERMARK); Queue expected = new LinkedList<>( @@ -165,6 +167,7 @@ void testOneKeyWithWatermark() throws Exception { expectedOutput.add(new StreamRecord<>(2, initialTime + 2)); expectedOutput.add(new Watermark(initialTime + 2)); expectedOutput.add(new StreamRecord<>(2, initialTime + 3)); + expectedOutput.add(Watermark.MAX_WATERMARK); TestHarnessUtil.assertOutputEquals( "output is not correct.", expectedOutput, testHarness.getOutput()); @@ -248,6 +251,7 @@ private void testMultiKeysMixWatermark(int capacity) throws Exception { expectedOutput.add(new Watermark(initialTime + 8)); expectedOutput.add(new StreamRecord<>(4, initialTime + 9)); expectedOutput.add(new StreamRecord<>(6, initialTime + 10)); + expectedOutput.add(Watermark.MAX_WATERMARK); Queue expected = new LinkedList<>( @@ -293,6 +297,7 @@ void testMultiKeysWithoutWatermark() throws Exception { expectedOutput.add(new StreamRecord<>(12, initialTime + 6)); expectedOutput.add(new StreamRecord<>(14, initialTime + 7)); expectedOutput.add(new StreamRecord<>(16, initialTime + 8)); + expectedOutput.add(Watermark.MAX_WATERMARK); testHarness.endInput(); @@ -325,17 +330,19 @@ public void testSnapshotAndRestore() throws Exception { testLazyAsyncFunction.countDown(); testHarness.endInput(); - ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); - expected.add(new StreamRecord<>(1, initialTime + 1)); - expected.add(new StreamRecord<>(2, initialTime + 2)); - expected.add(new StreamRecord<>(3, initialTime + 3)); - expected.add(new StreamRecord<>(4, initialTime + 4)); + ConcurrentLinkedQueue expectedBeforeRestore = new ConcurrentLinkedQueue<>(); + expectedBeforeRestore.add(new StreamRecord<>(1, initialTime + 1)); + expectedBeforeRestore.add(new StreamRecord<>(2, initialTime + 2)); + expectedBeforeRestore.add(new StreamRecord<>(3, initialTime + 3)); + expectedBeforeRestore.add(new StreamRecord<>(4, initialTime + 4)); + // endInput() sends MAX_WATERMARK downstream when upstream didn't send it + expectedBeforeRestore.add(Watermark.MAX_WATERMARK); testHarness.getOutput().removeIf(record -> record instanceof CheckpointBarrier); TestHarnessUtil.assertOutputEqualsSorted( "StateAndRestored Test Output was not correct before restore.", - expected, + expectedBeforeRestore, testHarness.getOutput(), new StreamRecordComparator()); testHarness.close(); @@ -364,6 +371,11 @@ public void testSnapshotAndRestore() throws Exception { assertThat(epochWithMaxWatermark).isPresent(); assertThat(epochWithMaxWatermark.get().getOngoingRecordCount()).isEqualTo(0); + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + expected.add(new StreamRecord<>(1, initialTime + 1)); + expected.add(new StreamRecord<>(2, initialTime + 2)); + expected.add(new StreamRecord<>(3, initialTime + 3)); + expected.add(new StreamRecord<>(4, initialTime + 4)); expected.add(new Watermark(1000L)); expected.add(new StreamRecord<>(5, initialTime + 5)); expected.add(new StreamRecord<>(6, initialTime + 6)); @@ -374,7 +386,8 @@ public void testSnapshotAndRestore() throws Exception { testLazyAsyncFunction.countDown(); testHarness.endInput(); - // TODO FLINK-37981 send Long.MAX watermark downstream + // MAX_WATERMARK was already sent via the epoch mechanism when epoch_1000L completed. + // After endInput, only epoch_max (Long.MAX_VALUE) remains as the active epoch. assertThat(unwrapEpochManager(testHarness).getOutputQueue().size()).isEqualTo(1); assertThat( unwrapEpochManager(testHarness) @@ -392,6 +405,33 @@ public void testSnapshotAndRestore() throws Exception { testHarness.close(); } + /** Verifies that MAX_WATERMARK is sent downstream when endInput() is called. */ + @Test + public void testEndInputSendsMaxWatermark() throws Exception { + try (final KeyedOneInputStreamOperatorTestHarness testHarness = + createKeyedTestHarness(noLockAsyncDouble2Function, TIMEOUT, 10)) { + testHarness.open(); + final long initialTime = 0L; + + testHarness.processElement(new StreamRecord<>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<>(2, initialTime + 2)); + testHarness.processWatermark(new Watermark(initialTime + 2)); + testHarness.processElement(new StreamRecord<>(3, initialTime + 3)); + // Note: processWatermark(MAX_WATERMARK) is NOT called before endInput() + + testHarness.endInput(); + + // MAX_WATERMARK must be sent downstream even if not explicitly received from upstream. + final long lastWatermarkTs = + testHarness.getOutput().stream() + .filter(e -> e instanceof Watermark) + .mapToLong(e -> ((Watermark) e).getTimestamp()) + .max() + .orElse(Long.MIN_VALUE); + assertThat(lastWatermarkTs).isEqualTo(Long.MAX_VALUE); + } + } + @Test public void testKeyedAsyncTimeoutFailure() throws Exception { testKeyedAsyncTimeout( @@ -438,6 +478,7 @@ private void testKeyedAsyncTimeout( final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(expectedRecords); + expectedOutput.add(Watermark.MAX_WATERMARK); TestHarnessUtil.assertOutputEquals( "Output is not correct.", expectedOutput, testHarness.getOutput());