-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(partition-ttl): Introduce KeepByEventTimeStrategy to expire partitions by event time #19081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,275 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.table.action.ttl.strategy; | ||
|
|
||
| import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; | ||
| import org.apache.hudi.common.util.PartitionPathEncodeUtils; | ||
| import org.apache.hudi.table.HoodieTable; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| import java.text.ParseException; | ||
| import java.time.ZoneOffset; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.time.format.DateTimeParseException; | ||
| import java.time.temporal.TemporalAccessor; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * Event-time based partition TTL strategy: lifetime is read from the partition path, not from | ||
| * commit metadata. Late-arriving writes into an old partition do not extend its lifetime, and | ||
| * backfilled historic partitions are still considered old. Compare with {@link KeepByTimeStrategy} | ||
| * (last commit time) and {@link KeepByCreationTimeStrategy} (creation commit time). | ||
| * | ||
| * <h3>Supported path shapes</h3> | ||
| * The first-class, tested shapes are day and hour granularity. Each example shows the partition | ||
| * path on the left and the required {@code startIndex} on the right (see | ||
| * <i>Locating the time block</i> below); non-time segments may appear before and/or after the | ||
| * time block. | ||
| * <ul> | ||
| * <li>Day, {@code format=yyyy-MM-dd} | ||
| * <ul> | ||
| * <li>time only: {@code 2026-06-27}, {@code dt=2026-06-27} — startIndex {@code 0}</li> | ||
| * <li>prefix + time: {@code region=us/2026-06-27}, {@code region=us/dt=2026-06-27} — startIndex {@code 1}</li> | ||
| * <li>time + suffix: {@code 2026-06-27/source=app}, {@code dt=2026-06-27/source=app} — startIndex {@code 0}</li> | ||
| * <li>prefix + time + suffix: {@code region=us/dt=2026-06-27/source=app} — startIndex {@code 1}</li> | ||
| * </ul> | ||
| * </li> | ||
| * <li>Day, {@code format=yyyyMMdd} | ||
| * <ul> | ||
| * <li>time only: {@code 20260627}, {@code dt=20260627} — startIndex {@code 0}</li> | ||
| * <li>prefix + time: {@code region=us/20260627}, {@code region=us/dt=20260627} — startIndex {@code 1}</li> | ||
| * <li>time + suffix: {@code 20260627/source=app}, {@code dt=20260627/source=app} — startIndex {@code 0}</li> | ||
| * <li>prefix + time + suffix: {@code region=us/dt=20260627/source=app} — startIndex {@code 1}</li> | ||
| * </ul> | ||
| * </li> | ||
| * <li>Hour, {@code format=yyyy-MM-dd/HH} | ||
| * <ul> | ||
| * <li>time only: {@code 2026-06-27/12}, {@code dt=2026-06-27/hh=12} — startIndex {@code 0}</li> | ||
| * <li>prefix + time: {@code region=us/2026-06-27/12}, {@code region=us/dt=2026-06-27/hh=12} — startIndex {@code 1}</li> | ||
| * <li>time + suffix: {@code 2026-06-27/12/source=app}, {@code dt=2026-06-27/hh=12/source=app} — startIndex {@code 0}</li> | ||
| * <li>prefix + time + suffix: {@code region=us/dt=2026-06-27/hh=12/source=app} — startIndex {@code 1}</li> | ||
| * </ul> | ||
| * </li> | ||
| * <li>Hour, {@code format=yyyyMMdd/HH} | ||
| * <ul> | ||
| * <li>time only: {@code 20260627/12}, {@code dt=20260627/hh=12} — startIndex {@code 0}</li> | ||
| * <li>prefix + time: {@code region=us/20260627/12}, {@code region=us/dt=20260627/hh=12} — startIndex {@code 1}</li> | ||
| * <li>time + suffix: {@code 20260627/12/source=app}, {@code dt=20260627/hh=12/source=app} — startIndex {@code 0}</li> | ||
| * <li>prefix + time + suffix: {@code region=us/dt=20260627/hh=12/source=app} — startIndex {@code 1}</li> | ||
| * </ul> | ||
| * </li> | ||
| * </ul> | ||
| * Hive-style key names are not constrained: {@code dt=}, {@code day=}, {@code event_date=}, | ||
| * {@code hh=}, {@code hour=} all work; only the value after {@code =} is parsed. | ||
| * <p> | ||
| * Any {@link java.time.format.DateTimeFormatter} pattern works as long as the resulting | ||
| * {@link java.time.temporal.TemporalAccessor} can be resolved to either an {@link java.time.Instant} | ||
| * or a {@link java.time.LocalDate} (day-only patterns are anchored at UTC start-of-day). A | ||
| * {@code /} in the pattern means the time value spans that many consecutive path segments. | ||
| * Patterns missing day-of-month, e.g. month-only {@code yyyy-MM}, cannot be resolved and will | ||
| * raise the standard parse-failure error at runtime. | ||
| * | ||
| * <h3>Locating the time block</h3> | ||
| * The time block must occupy a <i>contiguous</i> segment range of the partition path. Only the | ||
| * segments before it count toward {@code startIndex}; segments after are ignored. Interleaved | ||
| * layouts such as {@code dt=20260627/source=app/hh=12} are not supported -- the time block must | ||
| * be in one piece. | ||
| * | ||
| * <h3>Time zone</h3> | ||
| * Both the partition's event time and the cutoff derived from {@code instantTime} are interpreted | ||
| * in UTC. Set {@code hoodie.table.timeline.timezone=UTC} so the timeline writes instants under the | ||
| * same convention; otherwise the cutoff drifts by the JVM's UTC offset -- a boundary effect at | ||
| * day granularity, a full-offset shift at hour granularity. | ||
| * | ||
| * <h3>Configuration</h3> | ||
| * All three knobs come with defaults, so a table whose partition path is purely a date in | ||
| * {@code yyyy-MM-dd} form works out of the box. | ||
| * <ul> | ||
| * <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_FORMAT} — date-time pattern of | ||
| * the time block in the partition path. Default {@code yyyy-MM-dd}. A {@code /} in the | ||
| * pattern means the time block spans that many consecutive segments.</li> | ||
| * <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_PARTITION_START_INDEX} — | ||
| * 0-based index of the first segment that carries the time block. Default {@code 0}. | ||
| * Raise it when non-time segments come before the time block (see examples above).</li> | ||
| * <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION} — | ||
| * whether to treat partitions containing {@code __HIVE_DEFAULT_PARTITION__} (i.e. data | ||
| * whose event-time column was {@code NULL}) as expired. Default {@code false}, i.e. such | ||
| * partitions are skipped with a WARN and the user keeps explicit control over them.</li> | ||
| * </ul> | ||
| */ | ||
| @Slf4j | ||
| public class KeepByEventTimeStrategy extends KeepByTimeStrategy { | ||
|
|
||
| private final String eventTimeFormat; | ||
| private final int startIndex; | ||
| private final boolean deleteHiveDefaultPartition; | ||
| private final boolean hiveStylePartitioning; | ||
|
|
||
| public KeepByEventTimeStrategy(HoodieTable hoodieTable, String instantTime) { | ||
| super(hoodieTable, instantTime); | ||
| // Defaults: format='yyyy-MM-dd', startIndex=0, deleteHiveDefaultPartition=false. The two | ||
| // guards below catch users who set the values explicitly to obviously-broken inputs. | ||
| this.eventTimeFormat = writeConfig.getPartitionTTLEventTimeFormat(); | ||
| if (eventTimeFormat == null || eventTimeFormat.isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| "hoodie.partition.ttl.strategy.event.time.format must not be empty."); | ||
| } | ||
| this.startIndex = writeConfig.getPartitionTTLEventTimePartitionStartIndex(); | ||
| if (startIndex < 0) { | ||
| throw new IllegalArgumentException( | ||
| "hoodie.partition.ttl.strategy.event.time.partition.start.index must be >= 0, got " + startIndex); | ||
| } | ||
| this.deleteHiveDefaultPartition = writeConfig.shouldDeleteHiveDefaultPartitionForEventTimeTTL(); | ||
| // Hive-style partitioning is a table-level property recorded at table creation; trust it | ||
| // rather than guessing per-segment from a stray '=' character in a value. | ||
| this.hiveStylePartitioning = Boolean.parseBoolean( | ||
| hoodieTable.getMetaClient().getTableConfig().getHiveStylePartitioningEnable()); | ||
| } | ||
|
|
||
| @Override | ||
| protected List<String> getExpiredPartitionsForTimeStrategy(List<String> partitionPathsForTTL) { | ||
| long cutoffMillis = resolveCutoffMillis(instantTime, ttlInMilis); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 |
||
| DateTimeFormatter formatter = DateTimeFormatter.ofPattern(eventTimeFormat).withZone(ZoneOffset.UTC); | ||
| int segCount = segmentCount(eventTimeFormat); | ||
| return partitionPathsForTTL.stream().parallel() | ||
| .filter(path -> isPartitionExpiredByEventTime( | ||
| path, formatter, startIndex, segCount, cutoffMillis, deleteHiveDefaultPartition, hiveStylePartitioning)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** | ||
| * Number of '/'-separated path segments the configured format occupies. | ||
| * Example: {@code yyyy-MM-dd/HH} -> 2. | ||
| */ | ||
| static int segmentCount(String format) { | ||
| return format.split("/").length; | ||
| } | ||
|
|
||
| /** | ||
| * Resolve the "now" reference timestamp from {@code instantTime}, in UTC. | ||
| * <p> | ||
| * Anchoring on {@code instantTime} keeps the strategy idempotent across retries of the same | ||
| * replace commit. We parse it in UTC so the cutoff and the partition's event time (also parsed | ||
| * in UTC above) sit on the same axis -- otherwise expiry drifts by the JVM's UTC offset, which | ||
| * is negligible at day granularity but a full-offset shift at hour granularity. | ||
| */ | ||
| static long resolveCutoffMillis(String instantTime, long ttlInMillis) { | ||
| try { | ||
| return HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime, ZoneOffset.UTC).getTime() - ttlInMillis; | ||
| } catch (ParseException e) { | ||
| throw new IllegalStateException("Failed to parse instant time " + instantTime, e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Decide whether a partition path is expired. | ||
| * <p> | ||
| * The strategy treats any partition that cannot be parsed under the configured format / | ||
| * start-index / hive-style as a hard error. Reasoning: this class derives lifetime from the | ||
| * path itself, so a partition we cannot parse has no defined lifetime, and silently skipping | ||
| * it would leave it in the table forever while the rest of TTL appears to succeed. Switch to | ||
| * {@code KEEP_BY_TIME} or {@code KEEP_BY_CREATION_TIME} (which key off commit metadata, not | ||
| * the path) if the table contains partitions that don't conform to a single event-time shape. | ||
| * <p> | ||
| * Package-private so unit tests can exercise it directly without spinning up a HoodieTable. | ||
| */ | ||
| static boolean isPartitionExpiredByEventTime(String partitionPath, | ||
| DateTimeFormatter formatter, | ||
| int startIndex, | ||
| int segCount, | ||
| long cutoffMillis, | ||
| boolean deleteHiveDefaultPartition, | ||
| boolean hiveStylePartitioning) { | ||
| String[] segments = partitionPath.split("/"); | ||
|
wangxianghu marked this conversation as resolved.
|
||
| if (segments.length < startIndex + segCount) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "Partition '%s' has %d segment(s) but the configured event time spans %d segment(s) starting at index %d. " | ||
| + "Check hoodie.partition.ttl.strategy.event.time.format and event.time.partition.start.index, " | ||
| + "or switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if not all partitions of this table follow an event-time shape.", | ||
| partitionPath, segments.length, segCount, startIndex)); | ||
| } | ||
|
|
||
| String[] timeSegs = new String[segCount]; | ||
| for (int i = 0; i < segCount; i++) { | ||
| String seg = segments[startIndex + i]; | ||
| if (hiveStylePartitioning) { | ||
| int eq = seg.indexOf('='); | ||
| if (eq < 0) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "Partition '%s' segment '%s' has no hive-style 'field=value' prefix but " | ||
| + "hoodie.datasource.write.hive_style_partitioning=true on the table. " | ||
| + "Switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such legacy partitions must coexist.", | ||
| partitionPath, seg)); | ||
| } | ||
| timeSegs[i] = seg.substring(eq + 1); | ||
| } else { | ||
| timeSegs[i] = seg; | ||
| } | ||
| if (PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(timeSegs[i])) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 This default-marker check runs per time segment inside the loop, so with |
||
| // Hive default partition is explicitly user-controlled (see config above); not a parse error. | ||
| if (deleteHiveDefaultPartition) { | ||
| log.info("Partition '{}' contains {} and delete switch is on; marking expired", | ||
| partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); | ||
| return true; | ||
| } | ||
| log.warn("Skipping partition '{}': contains {} (set hoodie.partition.ttl.strategy." | ||
| + "event.time.delete.hive.default.partition=true to delete)", | ||
| partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| String timeStr = String.join("/", timeSegs); | ||
| Long eventMillis = parseEventMillis(timeStr, formatter); | ||
| if (eventMillis == null) { | ||
| throw new IllegalArgumentException(String.format( | ||
| "Partition '%s': cannot parse '%s' with pattern '%s'. " | ||
| + "Fix hoodie.partition.ttl.strategy.event.time.format, or switch to " | ||
| + "KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such partitions must remain in the table.", | ||
| partitionPath, timeStr, formatter)); | ||
| } | ||
| return eventMillis < cutoffMillis; | ||
| } | ||
|
|
||
| /** | ||
| * Parse {@code timeStr} into epoch millis. Tries full date-time first; falls back to a | ||
| * date-only parse anchored at UTC start-of-day so day-level patterns (e.g. {@code yyyy-MM-dd}) | ||
| * also work. | ||
| */ | ||
| static Long parseEventMillis(String timeStr, DateTimeFormatter formatter) { | ||
| TemporalAccessor parsed; | ||
| try { | ||
| parsed = formatter.parse(timeStr); | ||
| } catch (DateTimeParseException e) { | ||
| return null; | ||
| } | ||
| try { | ||
| return java.time.Instant.from(parsed).toEpochMilli(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: |
||
| } catch (java.time.DateTimeException ignore) { | ||
| // Day-level pattern with no time-of-day: anchor at UTC start of day. | ||
| try { | ||
| return java.time.LocalDate.from(parsed).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); | ||
| } catch (java.time.DateTimeException e) { | ||
| return null; | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.