From 4b2019e0e3cc55244ae7583a4b57b3a92fd39820 Mon Sep 17 00:00:00 2001 From: Usiel Riedl Date: Wed, 31 Jan 2024 17:55:11 +0800 Subject: [PATCH] Ensure rotation of files By falling back to wallclock time when there is no record (i.e. when the upstream `DataWriter` calls `write()`, when the buffer is empty) we initiate a rotation of tmp files to committed files if any of the time intervals trigger. --- .../confluent/connect/hdfs/TopicPartitionWriter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 9bb8b9390..4f0dc5a97 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -316,9 +316,9 @@ public boolean recover() { private void updateRotationTimers(SinkRecord currentRecord) { long now = time.milliseconds(); // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased + lastRotate = isWallclockBased || currentRecord == null ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + : timestampExtractor.extract(currentRecord); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -600,10 +600,10 @@ private void setState(State state) { } private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { + Long currentTimestamp; + if (isWallclockBased || currentRecord == null) { currentTimestamp = now; - } else if (currentRecord != null) { + } else { currentTimestamp = timestampExtractor.extract(currentRecord); lastRotate = lastRotate == null ? currentTimestamp : lastRotate; }