Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ public void endInput() throws Exception {
// timer not in running will be forbidden to fire after this, so that when the async
// operation is stuck, it results in deadlock due to what the timeout timer is not fired
waitInFlightInputsFinished();
// Send MAX_WATERMARK downstream if not yet received, ensuring end-of-event-time signal.
if (asyncExecutionController.getCurrentWatermark() < Long.MAX_VALUE) {
asyncExecutionController.submitWatermark(Watermark.MAX_WATERMARK);
}
}

public void invoke(AecRecord<IN, OUT> element) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void close() {
outputQueue.clear();
}

public Watermark getCurrentWatermark() {
return currentWatermark;
}

@VisibleForTesting
public Epoch<OUT> getActiveEpoch() {
return activeEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ public void close() {
recordsBuffer.close();
}

public long getCurrentWatermark() {
return epochManager.getCurrentWatermark().getTimestamp();
}

@VisibleForTesting
public Epoch<OUT> getActiveEpoch() {
return epochManager.getActiveEpoch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ void testMultiKeysWithWatermark() throws Exception {
testHarness.processWatermark(new Watermark(initialTime + 8));

testHarness.endInput();
epoch = new Epoch<>(new Watermark(initialTime + 8));
// endInput() sends MAX_WATERMARK, so active epoch is now epoch_max
epoch = new Epoch<>(Watermark.MAX_WATERMARK);
assertThat(aec.getActiveEpoch()).isEqualTo(epoch);

expectedOutput.add(new StreamRecord<>(0, initialTime + 1));
Expand All @@ -125,6 +126,7 @@ void testMultiKeysWithWatermark() throws Exception {
expectedOutput.add(new StreamRecord<>(1, initialTime + 7));
expectedOutput.add(new StreamRecord<>(0, initialTime + 8));
expectedOutput.add(new Watermark(initialTime + 8));
expectedOutput.add(Watermark.MAX_WATERMARK);

Queue<Object> expected =
new LinkedList<>(
Expand Down Expand Up @@ -165,6 +167,7 @@ void testOneKeyWithWatermark() throws Exception {
expectedOutput.add(new StreamRecord<>(2, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<>(2, initialTime + 3));
expectedOutput.add(Watermark.MAX_WATERMARK);

TestHarnessUtil.assertOutputEquals(
"output is not correct.", expectedOutput, testHarness.getOutput());
Expand Down Expand Up @@ -248,6 +251,7 @@ private void testMultiKeysMixWatermark(int capacity) throws Exception {
expectedOutput.add(new Watermark(initialTime + 8));
expectedOutput.add(new StreamRecord<>(4, initialTime + 9));
expectedOutput.add(new StreamRecord<>(6, initialTime + 10));
expectedOutput.add(Watermark.MAX_WATERMARK);

Queue<Object> expected =
new LinkedList<>(
Expand Down Expand Up @@ -293,6 +297,7 @@ void testMultiKeysWithoutWatermark() throws Exception {
expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
expectedOutput.add(Watermark.MAX_WATERMARK);

testHarness.endInput();

Expand Down Expand Up @@ -325,17 +330,19 @@ public void testSnapshotAndRestore() throws Exception {
testLazyAsyncFunction.countDown();
testHarness.endInput();

ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
expected.add(new StreamRecord<>(1, initialTime + 1));
expected.add(new StreamRecord<>(2, initialTime + 2));
expected.add(new StreamRecord<>(3, initialTime + 3));
expected.add(new StreamRecord<>(4, initialTime + 4));
ConcurrentLinkedQueue<Object> expectedBeforeRestore = new ConcurrentLinkedQueue<>();
expectedBeforeRestore.add(new StreamRecord<>(1, initialTime + 1));
expectedBeforeRestore.add(new StreamRecord<>(2, initialTime + 2));
expectedBeforeRestore.add(new StreamRecord<>(3, initialTime + 3));
expectedBeforeRestore.add(new StreamRecord<>(4, initialTime + 4));
// endInput() sends MAX_WATERMARK downstream when upstream didn't send it
expectedBeforeRestore.add(Watermark.MAX_WATERMARK);

testHarness.getOutput().removeIf(record -> record instanceof CheckpointBarrier);

TestHarnessUtil.assertOutputEqualsSorted(
"StateAndRestored Test Output was not correct before restore.",
expected,
expectedBeforeRestore,
testHarness.getOutput(),
new StreamRecordComparator());
testHarness.close();
Expand Down Expand Up @@ -364,6 +371,11 @@ public void testSnapshotAndRestore() throws Exception {
assertThat(epochWithMaxWatermark).isPresent();
assertThat(epochWithMaxWatermark.get().getOngoingRecordCount()).isEqualTo(0);

ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
expected.add(new StreamRecord<>(1, initialTime + 1));
expected.add(new StreamRecord<>(2, initialTime + 2));
expected.add(new StreamRecord<>(3, initialTime + 3));
expected.add(new StreamRecord<>(4, initialTime + 4));
expected.add(new Watermark(1000L));
expected.add(new StreamRecord<>(5, initialTime + 5));
expected.add(new StreamRecord<>(6, initialTime + 6));
Expand All @@ -374,7 +386,8 @@ public void testSnapshotAndRestore() throws Exception {
testLazyAsyncFunction.countDown();
testHarness.endInput();

// TODO FLINK-37981 send Long.MAX watermark downstream
// MAX_WATERMARK was already sent via the epoch mechanism when epoch_1000L completed.
// After endInput, only epoch_max (Long.MAX_VALUE) remains as the active epoch.
assertThat(unwrapEpochManager(testHarness).getOutputQueue().size()).isEqualTo(1);
assertThat(
unwrapEpochManager(testHarness)
Expand All @@ -392,6 +405,33 @@ public void testSnapshotAndRestore() throws Exception {
testHarness.close();
}

/** Verifies that MAX_WATERMARK is sent downstream when endInput() is called. */
@Test
public void testEndInputSendsMaxWatermark() throws Exception {
try (final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness =
createKeyedTestHarness(noLockAsyncDouble2Function, TIMEOUT, 10)) {
testHarness.open();
final long initialTime = 0L;

testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
// Note: processWatermark(MAX_WATERMARK) is NOT called before endInput()

testHarness.endInput();

// MAX_WATERMARK must be sent downstream even if not explicitly received from upstream.
final long lastWatermarkTs =
testHarness.getOutput().stream()
.filter(e -> e instanceof Watermark)
.mapToLong(e -> ((Watermark) e).getTimestamp())
.max()
.orElse(Long.MIN_VALUE);
assertThat(lastWatermarkTs).isEqualTo(Long.MAX_VALUE);
}
}

@Test
public void testKeyedAsyncTimeoutFailure() throws Exception {
testKeyedAsyncTimeout(
Expand Down Expand Up @@ -438,6 +478,7 @@ private void testKeyedAsyncTimeout(

final ConcurrentLinkedQueue<Object> expectedOutput =
new ConcurrentLinkedQueue<>(expectedRecords);
expectedOutput.add(Watermark.MAX_WATERMARK);

TestHarnessUtil.assertOutputEquals(
"Output is not correct.", expectedOutput, testHarness.getOutput());
Expand Down