Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,14 @@ public static long getNextTriggerWatermarkWithOffset(
}

long triggerWatermark;
// consider the DST timezone
if (useDayLightSaving) {
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark, shiftTimezone), offset, interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone);
} else {
long start = getWindowStartWithOffset(currentWatermark, offset, interval);
triggerWatermark = start + interval - 1;
}
// Always compute the trigger watermark in the shifted-timezone space so that window
// boundaries align to local time for TIMESTAMP_LTZ columns.
// toUtcTimestampMills / toEpochMillsForTimer are no-ops for UTC, so this is safe for
// all timezone combinations including the DST case.
long utcWindowStart =
getWindowStartWithOffset(
toUtcTimestampMills(currentWatermark, shiftTimezone), offset, interval);
triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone);

if (triggerWatermark > currentWatermark) {
return triggerWatermark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,66 @@ void testProcessingTimeTumblingWindows() throws Exception {
testHarness.close();
}

/**
* Regression test for data loss when using TIMESTAMP_LTZ with a non-UTC, non-DST timezone for a
* daily tumble window.
*/
@TestTemplate
void testEventTimeTumblingWindowsWithLtzAndNonDstTimezone() throws Exception {
// Only meaningful for non-UTC timezone: the bug was that the trigger watermark was
// computed in UTC space instead of local (shifted) space.
org.junit.jupiter.api.Assumptions.assumeTrue(
shiftTimeZone.equals(SHANGHAI_ZONE_ID),
"This regression test only applies to non-UTC timezone");

// 1-day tumble window in Asia/Shanghai (UTC+8).
final SliceAssigner assigner =
SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofDays(1));
final SlicingSumAndCountAggsFunction aggsFunction =
new SlicingSumAndCountAggsFunction(assigner);
OneInputStreamOperator<RowData, RowData> operator =
buildWindowOperator(assigner, aggsFunction, null);

OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(operator);
testHarness.setup(OUT_SERIALIZER);
testHarness.open();

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

// Both records fall in local day [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai:
// 2020-10-10 00:07:59 Asia/Shanghai = 2020-10-09T16:07:59Z = epoch 1602259679000
// 2020-10-10 11:11:59 Asia/Shanghai = 2020-10-10T03:11:59Z = epoch 1602299519000
testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1602259679000L)));
testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1602299519000L)));

// Advance watermark past the records but not yet to the local-day boundary.
// Local midnight 2020-10-11 00:00:00 Asia/Shanghai = 2020-10-10T16:00:00Z.
// Trigger epoch = 2020-10-10T15:59:59.999Z = 1602345599999.
testHarness.processWatermark(new Watermark(1602309599999L)); // 2020-10-10T07:59:59.999Z
expectedOutput.add(new Watermark(1602309599999L));
ASSERTER.assertOutputEqualsSorted(
"No window should fire before local-day boundary.",
expectedOutput,
testHarness.getOutput());

// Advance watermark to exactly the local-day trigger epoch.
// Window [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai must fire.
testHarness.processWatermark(new Watermark(1602345599999L));

// Window boundaries expressed as shifted-UTC millis via localMills():
// start epoch = 2020-10-09T16:00:00Z = 1602259200000 → localMills = 1602288000000
// end epoch = 2020-10-10T16:00:00Z = 1602345600000 → localMills = 1602374400000
expectedOutput.add(
insertRecord(
"key1", 2L, 2L, localMills(1602259200000L), localMills(1602345600000L)));
expectedOutput.add(new Watermark(1602345599999L));
ASSERTER.assertOutputEqualsSorted(
"Window must fire at local-day boundary.", expectedOutput, testHarness.getOutput());

testHarness.close();
}

@TestTemplate
void testInvalidWindows() {
final SliceAssigner assigner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.ZoneId;
import java.util.TimeZone;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.getNextTriggerWatermarkWithOffset;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
Expand Down Expand Up @@ -133,7 +134,54 @@ void testMaxWatermark() {
assertThat(toEpochMills(Long.MAX_VALUE, zoneId)).isEqualTo(Long.MAX_VALUE);
}

/**
* Verifies that getNextTriggerWatermarkWithOffset uses timezone-aware arithmetic for non-UTC,
* non-DST timezones (e.g., Asia/Shanghai, UTC+8).
*/
@Test
void testGetNextTriggerWatermarkForNonDstLtzTimezone() {
ZoneId shangHai = ZoneId.of("Asia/Shanghai"); // UTC+8, no DST
long oneDayMs = 86_400_000L;

// currentWatermark = 2020-10-09T16:07:59Z (= 2020-10-10 00:07:59 Asia/Shanghai)
long wm1 = epochOf("2020-10-09T16:07:59");
// The record belongs to local day [2020-10-10 00:00, 2020-10-11 00:00) Asia/Shanghai.
// The window's trigger epoch = 2020-10-10T15:59:59.999Z (= local 2020-10-10 23:59:59.999)
long expectedTrigger = epochOf("2020-10-10T15:59:59.999");

assertThat(getNextTriggerWatermarkWithOffset(wm1, oneDayMs, 0, shangHai, false))
.as("trigger must align to local midnight, not UTC midnight")
.isEqualTo(expectedTrigger);

// A watermark just below the trigger should still return the same trigger
assertThat(
getNextTriggerWatermarkWithOffset(
expectedTrigger - 1, oneDayMs, 0, shangHai, false))
.isEqualTo(expectedTrigger);

// A watermark exactly at the trigger should advance to the NEXT day's trigger
long nextDayTrigger = expectedTrigger + oneDayMs; // = 2020-10-11T15:59:59.999Z
assertThat(getNextTriggerWatermarkWithOffset(expectedTrigger, oneDayMs, 0, shangHai, false))
.isEqualTo(nextDayTrigger);
}

@Test
void testGetNextTriggerWatermarkForUtc() {
ZoneId utc = UTC_ZONE_ID;
long oneDayMs = 86_400_000L;

// For UTC the result must be the same as the old epoch-millis arithmetic
long wm = epochOf("2020-10-09T16:07:59");
long expectedTrigger = epochOf("2020-10-09T23:59:59.999");
assertThat(getNextTriggerWatermarkWithOffset(wm, oneDayMs, 0, utc, false))
.isEqualTo(expectedTrigger);
}

private static long utcMills(String utcDateTime) {
return LocalDateTime.parse(utcDateTime).atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
}

private static long epochOf(String utcDateTime) {
return utcMills(utcDateTime);
}
}