diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index 26ee8bb06..584e6f10d 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -464,6 +464,14 @@ public void open(Collection partitions) { } } + public void checkWritersForNecessityOfRotation() { + for (TopicPartitionWriter topicPartitionWriter : topicPartitionWriters.values()) { + if (topicPartitionWriter.shouldRotateAndMaybeUpdateTimers()) { + topicPartitionWriter.write(); + } + } + } + public void close() { // Close any writers we have. We may get assigned the same partitions and end up duplicating // some effort since we'll have to reprocess those messages. It may be possible to hold on to diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java index 28f9b91ec..ffb59810c 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java @@ -143,6 +143,10 @@ public Map preCommit( // Although the connector manages offsets via files in HDFS, we still want to have Connect // commit the consumer offsets for records this task has consumed from its topic partitions and // committed to HDFS. + + // Check writers for should rotate by rotation interval + hdfsWriter.checkWritersForNecessityOfRotation(); + Map result = new HashMap<>(); for (Map.Entry entry : hdfsWriter.getCommittedOffsets().entrySet()) { log.debug( diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index dcb821d30..e5211418f 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -246,7 +246,7 @@ public TopicPartitionWriter( } // Initialize rotation timers - updateRotationTimers(null); + updateRotationTimers(); } @SuppressWarnings("fallthrough") @@ -290,12 +290,9 @@ public boolean recover() { return true; } - private void updateRotationTimers(SinkRecord currentRecord) { + private void updateRotationTimers() { long now = time.milliseconds(); - // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased - ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + lastRotate = time.milliseconds(); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -322,7 +319,6 @@ private void updateRotationTimers(SinkRecord currentRecord) { @SuppressWarnings("fallthrough") public void write() { long now = time.milliseconds(); - SinkRecord currentRecord = null; if (failureTime > 0 && now - failureTime < timeoutMs) { return; } @@ -331,7 +327,7 @@ public void write() { if (!success) { return; } - updateRotationTimers(null); + updateRotationTimers(); } while (!buffer.isEmpty()) { try { @@ -358,7 +354,6 @@ public void write() { } } SinkRecord record = buffer.peek(); - currentRecord = record; Schema valueSchema = record.valueSchema(); if ((recordCounter <= 0 && currentSchema == null && valueSchema != null) || compatibility.shouldChangeSchema(record, null, currentSchema)) { @@ -373,7 +368,7 @@ public void write() { break; } } else { - if (shouldRotateAndMaybeUpdateTimers(currentRecord, now)) { + if (shouldRotateAndMaybeUpdateTimers(now)) { log.info( "Starting commit and rotation for topic partition {} with start offsets {} " + "and end offsets {}", @@ -391,7 +386,7 @@ public void write() { } } case SHOULD_ROTATE: - updateRotationTimers(currentRecord); + updateRotationTimers(); closeTempFile(); nextState(); case TEMP_FILE_CLOSED: @@ -424,7 +419,7 @@ public void write() { case WRITE_PARTITION_PAUSED: // committing files after waiting for rotateIntervalMs time but less than flush.size // records available - if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(currentRecord, now)) { + if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(now)) { break; } @@ -434,7 +429,7 @@ public void write() { ); nextState(); case SHOULD_ROTATE: - updateRotationTimers(currentRecord); + updateRotationTimers(); closeTempFile(); nextState(); case TEMP_FILE_CLOSED: @@ -559,19 +554,14 @@ private void setState(State state) { this.state = state; } - private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { - currentTimestamp = now; - } else if (currentRecord != null) { - currentTimestamp = timestampExtractor.extract(currentRecord); - lastRotate = lastRotate == null ? currentTimestamp : lastRotate; - } + public boolean shouldRotateAndMaybeUpdateTimers() { + return shouldRotateAndMaybeUpdateTimers(time.milliseconds()); + } + private boolean shouldRotateAndMaybeUpdateTimers(long now) { boolean periodicRotation = rotateIntervalMs > 0 - && currentTimestamp != null && lastRotate != null - && currentTimestamp - lastRotate >= rotateIntervalMs; + && now - lastRotate >= rotateIntervalMs; boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate; boolean messageSizeRotation = recordCounter >= flushSize; @@ -580,7 +570,7 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long + "'{}', timestamp: '{}')? {}", rotateIntervalMs, lastRotate, - currentTimestamp, + now, periodicRotation ); diff --git a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java index 2732b73b4..3c6cecaaa 100644 --- a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java +++ b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java @@ -61,6 +61,7 @@ import io.confluent.connect.storage.wal.WAL; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG; import static org.apache.kafka.common.utils.Time.SYSTEM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -476,7 +477,7 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -525,39 +526,29 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except Collections.singleton(new TopicPartition(TOPIC, PARTITION)) )); - // And one last record to flush the previous ones. - long timestampMuchLater = first.plusHours(6).getMillis(); - Struct lastOne = createRecordWithTimestampField(schema, timestampMuchLater); - sinkRecords.addAll(createSinkRecords( - Collections.singletonList(lastOne), - schema, - 19, - Collections.singleton(new TopicPartition(TOPIC, PARTITION)) - )); - for (SinkRecord record : sinkRecords) { topicPartitionWriter.buffer(record); } topicPartitionWriter.recover(); topicPartitionWriter.write(); + assertEquals(0, topicPartitionWriter.offset()); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); + String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); Set expectedFiles = new HashSet<>(); - String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - for (int i : new int[]{9, 12, 15}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - verify(expectedFiles, 3, records, schema); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + verify(expectedFiles, 9, records, schema); } @Test @@ -565,7 +556,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Excep // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -603,24 +594,23 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Excep topicPartitionWriter.recover(); topicPartitionWriter.write(); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); - String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); + String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); Set expectedFiles = new HashSet<>(); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - // Records 15,16,17 won't be flushed until a record with a higher timestamp arrives. - for (int i : new int[]{9, 12}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - verify(expectedFiles, 3, records, schema); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + verify(expectedFiles, 9, records, schema); } @Test @@ -628,7 +618,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -658,7 +648,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except long advanceMs = 20000; long timestampFirst = first.getMillis(); Collection sinkRecords = createSinkRecordsWithTimestamp(records.subList(0, 9), schema, 0, timestampFirst, advanceMs); - long timestampLater = first.plusHours(2).getMillis(); + long timestampLater = first.plusDays(1).getMillis(); sinkRecords.addAll(createSinkRecordsWithTimestamp(records.subList(9, 18), schema, 9, timestampLater, advanceMs)); for (SinkRecord record : sinkRecords) { @@ -667,24 +657,90 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except topicPartitionWriter.recover(); topicPartitionWriter.write(); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); + String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); + Set expectedFiles = new HashSet<>(); String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + + int expectedBatchSize = 9; + verify(expectedFiles, expectedBatchSize, records, schema); + } + + @Test + public void testCloseTempFileByRotateIntervalWithoutReadingNewRecords() throws Exception { + setUp(); + // Define the partitioner + partitioner = new DataWriter.PartitionerWrapper( + new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>() + ); + parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1)); + parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); + parsedConfig.put(PartitionerConfig.PATH_FORMAT_CONFIG, "'ds'=YYYYMMdd"); + partitioner.configure(parsedConfig); + + properties.put(FLUSH_SIZE_CONFIG, "1000"); + properties.put( + ROTATE_INTERVAL_MS_CONFIG, + String.valueOf(TimeUnit.MINUTES.toMillis(1)) + ); + connectorConfig = new HdfsSinkConnectorConfig(properties); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, + storage, + writerProvider, + newWriterProvider, + partitioner, + connectorConfig, + context, + avroData, + time + ); + + Schema schema = createSchema(); + List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(createRecord(schema, i, 12.2f)); } - String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - // Records 15,16,17 won't be flushed until a record with a higher timestamp arrives. - for (int i : new int[]{9, 12}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); + long advanceMs = 20000; + DateTime first = new DateTime(2023, 5, 7, 9, 0, DateTimeZone.UTC); + long timestamp = first.getMillis(); + Collection sinkRecords = createSinkRecordsWithTimestamp( + records, schema, 0, timestamp, advanceMs + ); + for (SinkRecord record : sinkRecords) { + topicPartitionWriter.buffer(record); } - verify(expectedFiles, 3, records, schema); + + assertEquals(-1, topicPartitionWriter.offset()); + topicPartitionWriter.write(); + assertEquals(0, topicPartitionWriter.offset()); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); + assertEquals(10, topicPartitionWriter.offset()); + topicPartitionWriter.close(); + + Set expectedFiles = new HashSet<>(); + String topicsDir = this.topicsDir.get(TOPIC); + expectedFiles.add(new Path(url + "/" + topicsDir + "/" + TOPIC + "/ds=20230507" + + "/" + TOPIC + "+" + PARTITION + "+0000000000+0000000009" + extension)); + int expectedBatchSize = 10; + verify(expectedFiles, expectedBatchSize, records, schema); } @Test @@ -693,7 +749,7 @@ public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.HOURS.toMillis(1)) ); localProps.put(