diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java index 354363c3f0e79..1db43d36c3678 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java @@ -194,16 +194,14 @@ public static long getNextTriggerWatermarkWithOffset( } long triggerWatermark; - // consider the DST timezone - if (useDayLightSaving) { - long utcWindowStart = - getWindowStartWithOffset( - toUtcTimestampMills(currentWatermark, shiftTimezone), offset, interval); - triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone); - } else { - long start = getWindowStartWithOffset(currentWatermark, offset, interval); - triggerWatermark = start + interval - 1; - } + // Always compute the trigger watermark in the shifted-timezone space so that window + // boundaries align to local time for TIMESTAMP_LTZ columns. + // toUtcTimestampMills / toEpochMillsForTimer are no-ops for UTC, so this is safe for + // all timezone combinations including the DST case. + long utcWindowStart = + getWindowStartWithOffset( + toUtcTimestampMills(currentWatermark, shiftTimezone), offset, interval); + triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone); if (triggerWatermark > currentWatermark) { return triggerWatermark; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index aaece6302f1a4..bb598691a9aac 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -795,6 +795,66 @@ void testProcessingTimeTumblingWindows() throws Exception { testHarness.close(); } + /** + * Regression test for data loss when using TIMESTAMP_LTZ with a non-UTC, non-DST timezone for a + * daily tumble window. + */ + @TestTemplate + void testEventTimeTumblingWindowsWithLtzAndNonDstTimezone() throws Exception { + // Only meaningful for non-UTC timezone: the bug was that the trigger watermark was + // computed in UTC space instead of local (shifted) space. + org.junit.jupiter.api.Assumptions.assumeTrue( + shiftTimeZone.equals(SHANGHAI_ZONE_ID), + "This regression test only applies to non-UTC timezone"); + + // 1-day tumble window in Asia/Shanghai (UTC+8). + final SliceAssigner assigner = + SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofDays(1)); + final SlicingSumAndCountAggsFunction aggsFunction = + new SlicingSumAndCountAggsFunction(assigner); + OneInputStreamOperator operator = + buildWindowOperator(assigner, aggsFunction, null); + + OneInputStreamOperatorTestHarness testHarness = + createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + + // Both records fall in local day [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai: + // 2020-10-10 00:07:59 Asia/Shanghai = 2020-10-09T16:07:59Z = epoch 1602259679000 + // 2020-10-10 11:11:59 Asia/Shanghai = 2020-10-10T03:11:59Z = epoch 1602299519000 + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1602259679000L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1602299519000L))); + + // Advance watermark past the records but not yet to the local-day boundary. + // Local midnight 2020-10-11 00:00:00 Asia/Shanghai = 2020-10-10T16:00:00Z. + // Trigger epoch = 2020-10-10T15:59:59.999Z = 1602345599999. + testHarness.processWatermark(new Watermark(1602309599999L)); // 2020-10-10T07:59:59.999Z + expectedOutput.add(new Watermark(1602309599999L)); + ASSERTER.assertOutputEqualsSorted( + "No window should fire before local-day boundary.", + expectedOutput, + testHarness.getOutput()); + + // Advance watermark to exactly the local-day trigger epoch. + // Window [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai must fire. + testHarness.processWatermark(new Watermark(1602345599999L)); + + // Window boundaries expressed as shifted-UTC millis via localMills(): + // start epoch = 2020-10-09T16:00:00Z = 1602259200000 → localMills = 1602288000000 + // end epoch = 2020-10-10T16:00:00Z = 1602345600000 → localMills = 1602374400000 + expectedOutput.add( + insertRecord( + "key1", 2L, 2L, localMills(1602259200000L), localMills(1602345600000L))); + expectedOutput.add(new Watermark(1602345599999L)); + ASSERTER.assertOutputEqualsSorted( + "Window must fire at local-day boundary.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + @TestTemplate void testInvalidWindows() { final SliceAssigner assigner = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java index 2ed5ce9284071..78e8e9a4355dd 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java @@ -24,6 +24,7 @@ import java.time.ZoneId; import java.util.TimeZone; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; @@ -133,7 +134,54 @@ void testMaxWatermark() { assertThat(toEpochMills(Long.MAX_VALUE, zoneId)).isEqualTo(Long.MAX_VALUE); } + /** + * Verifies that getNextTriggerWatermarkWithOffset uses timezone-aware arithmetic for non-UTC, + * non-DST timezones (e.g., Asia/Shanghai, UTC+8). + */ + @Test + void testGetNextTriggerWatermarkForNonDstLtzTimezone() { + ZoneId shangHai = ZoneId.of("Asia/Shanghai"); // UTC+8, no DST + long oneDayMs = 86_400_000L; + + // currentWatermark = 2020-10-09T16:07:59Z (= 2020-10-10 00:07:59 Asia/Shanghai) + long wm1 = epochOf("2020-10-09T16:07:59"); + // The record belongs to local day [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai. + // The window's trigger epoch = 2020-10-10T15:59:59.999Z (= local 2020-10-10 23:59:59.999) + long expectedTrigger = epochOf("2020-10-10T15:59:59.999"); + + assertThat(getNextTriggerWatermarkWithOffset(wm1, oneDayMs, 0, shangHai, false)) + .as("trigger must align to local midnight, not UTC midnight") + .isEqualTo(expectedTrigger); + + // A watermark just below the trigger should still return the same trigger + assertThat( + getNextTriggerWatermarkWithOffset( + expectedTrigger - 1, oneDayMs, 0, shangHai, false)) + .isEqualTo(expectedTrigger); + + // A watermark exactly at the trigger should advance to the NEXT day's trigger + long nextDayTrigger = expectedTrigger + oneDayMs; // = 2020-10-11T15:59:59.999Z + assertThat(getNextTriggerWatermarkWithOffset(expectedTrigger, oneDayMs, 0, shangHai, false)) + .isEqualTo(nextDayTrigger); + } + + @Test + void testGetNextTriggerWatermarkForUtc() { + ZoneId utc = UTC_ZONE_ID; + long oneDayMs = 86_400_000L; + + // For UTC the result must be the same as the old epoch-millis arithmetic + long wm = epochOf("2020-10-09T16:07:59"); + long expectedTrigger = epochOf("2020-10-09T23:59:59.999"); + assertThat(getNextTriggerWatermarkWithOffset(wm, oneDayMs, 0, utc, false)) + .isEqualTo(expectedTrigger); + } + private static long utcMills(String utcDateTime) { return LocalDateTime.parse(utcDateTime).atZone(UTC_ZONE_ID).toInstant().toEpochMilli(); } + + private static long epochOf(String utcDateTime) { + return utcMills(utcDateTime); + } }