From e871c91fe7f083a4385be37ebe281f065c34700b Mon Sep 17 00:00:00 2001 From: "andrey.starikov" <1andreystarikov@gmail.com> Date: Sat, 26 Aug 2023 20:27:54 +0300 Subject: [PATCH] Solved the problem of files getting stuck in the temporary folder due to the lack of new messages in the topic despite the set property rotate.interval.ms. Property flush.size now also works correctly. --- .../io/confluent/connect/hdfs/DataWriter.java | 8 ++ .../confluent/connect/hdfs/HdfsSinkTask.java | 4 + .../connect/hdfs/TopicPartitionWriter.java | 38 ++--- .../hdfs/TopicPartitionWriterTest.java | 136 ++++++++++++------ 4 files changed, 122 insertions(+), 64 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index 26ee8bb06..584e6f10d 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -464,6 +464,14 @@ public void open(Collection partitions) { } } + public void checkWritersForNecessityOfRotation() { + for (TopicPartitionWriter topicPartitionWriter : topicPartitionWriters.values()) { + if (topicPartitionWriter.shouldRotateAndMaybeUpdateTimers()) { + topicPartitionWriter.write(); + } + } + } + public void close() { // Close any writers we have. We may get assigned the same partitions and end up duplicating // some effort since we'll have to reprocess those messages. It may be possible to hold on to diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java index 28f9b91ec..ffb59810c 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java @@ -143,6 +143,10 @@ public Map preCommit( // Although the connector manages offsets via files in HDFS, we still want to have Connect // commit the consumer offsets for records this task has consumed from its topic partitions and // committed to HDFS. + + // Check writers for should rotate by rotation interval + hdfsWriter.checkWritersForNecessityOfRotation(); + Map result = new HashMap<>(); for (Map.Entry entry : hdfsWriter.getCommittedOffsets().entrySet()) { log.debug( diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index dcb821d30..e5211418f 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -246,7 +246,7 @@ public TopicPartitionWriter( } // Initialize rotation timers - updateRotationTimers(null); + updateRotationTimers(); } @SuppressWarnings("fallthrough") @@ -290,12 +290,9 @@ public boolean recover() { return true; } - private void updateRotationTimers(SinkRecord currentRecord) { + private void updateRotationTimers() { long now = time.milliseconds(); - // Wallclock-based partitioners should be independent of the record argument. - lastRotate = isWallclockBased - ? (Long) now - : currentRecord != null ? timestampExtractor.extract(currentRecord) : null; + lastRotate = time.milliseconds(); if (log.isDebugEnabled() && rotateIntervalMs > 0) { log.debug( "Update last rotation timer. Next rotation for {} will be in {}ms", @@ -322,7 +319,6 @@ private void updateRotationTimers(SinkRecord currentRecord) { @SuppressWarnings("fallthrough") public void write() { long now = time.milliseconds(); - SinkRecord currentRecord = null; if (failureTime > 0 && now - failureTime < timeoutMs) { return; } @@ -331,7 +327,7 @@ public void write() { if (!success) { return; } - updateRotationTimers(null); + updateRotationTimers(); } while (!buffer.isEmpty()) { try { @@ -358,7 +354,6 @@ public void write() { } } SinkRecord record = buffer.peek(); - currentRecord = record; Schema valueSchema = record.valueSchema(); if ((recordCounter <= 0 && currentSchema == null && valueSchema != null) || compatibility.shouldChangeSchema(record, null, currentSchema)) { @@ -373,7 +368,7 @@ public void write() { break; } } else { - if (shouldRotateAndMaybeUpdateTimers(currentRecord, now)) { + if (shouldRotateAndMaybeUpdateTimers(now)) { log.info( "Starting commit and rotation for topic partition {} with start offsets {} " + "and end offsets {}", @@ -391,7 +386,7 @@ public void write() { } } case SHOULD_ROTATE: - updateRotationTimers(currentRecord); + updateRotationTimers(); closeTempFile(); nextState(); case TEMP_FILE_CLOSED: @@ -424,7 +419,7 @@ public void write() { case WRITE_PARTITION_PAUSED: // committing files after waiting for rotateIntervalMs time but less than flush.size // records available - if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(currentRecord, now)) { + if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(now)) { break; } @@ -434,7 +429,7 @@ public void write() { ); nextState(); case SHOULD_ROTATE: - updateRotationTimers(currentRecord); + updateRotationTimers(); closeTempFile(); nextState(); case TEMP_FILE_CLOSED: @@ -559,19 +554,14 @@ private void setState(State state) { this.state = state; } - private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) { - Long currentTimestamp = null; - if (isWallclockBased) { - currentTimestamp = now; - } else if (currentRecord != null) { - currentTimestamp = timestampExtractor.extract(currentRecord); - lastRotate = lastRotate == null ? currentTimestamp : lastRotate; - } + public boolean shouldRotateAndMaybeUpdateTimers() { + return shouldRotateAndMaybeUpdateTimers(time.milliseconds()); + } + private boolean shouldRotateAndMaybeUpdateTimers(long now) { boolean periodicRotation = rotateIntervalMs > 0 - && currentTimestamp != null && lastRotate != null - && currentTimestamp - lastRotate >= rotateIntervalMs; + && now - lastRotate >= rotateIntervalMs; boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate; boolean messageSizeRotation = recordCounter >= flushSize; @@ -580,7 +570,7 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long + "'{}', timestamp: '{}')? {}", rotateIntervalMs, lastRotate, - currentTimestamp, + now, periodicRotation ); diff --git a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java index 2732b73b4..3c6cecaaa 100644 --- a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java +++ b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java @@ -61,6 +61,7 @@ import io.confluent.connect.storage.wal.WAL; import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; +import static io.confluent.connect.storage.StorageSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG; import static org.apache.kafka.common.utils.Time.SYSTEM; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -476,7 +477,7 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -525,39 +526,29 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except Collections.singleton(new TopicPartition(TOPIC, PARTITION)) )); - // And one last record to flush the previous ones. - long timestampMuchLater = first.plusHours(6).getMillis(); - Struct lastOne = createRecordWithTimestampField(schema, timestampMuchLater); - sinkRecords.addAll(createSinkRecords( - Collections.singletonList(lastOne), - schema, - 19, - Collections.singleton(new TopicPartition(TOPIC, PARTITION)) - )); - for (SinkRecord record : sinkRecords) { topicPartitionWriter.buffer(record); } topicPartitionWriter.recover(); topicPartitionWriter.write(); + assertEquals(0, topicPartitionWriter.offset()); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); + String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); Set expectedFiles = new HashSet<>(); - String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - for (int i : new int[]{9, 12, 15}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - verify(expectedFiles, 3, records, schema); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + verify(expectedFiles, 9, records, schema); } @Test @@ -565,7 +556,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Excep // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -603,24 +594,23 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Excep topicPartitionWriter.recover(); topicPartitionWriter.write(); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); - String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); + String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); Set expectedFiles = new HashSet<>(); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - // Records 15,16,17 won't be flushed until a record with a higher timestamp arrives. - for (int i : new int[]{9, 12}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); - } - verify(expectedFiles, 3, records, schema); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + verify(expectedFiles, 9, records, schema); } @Test @@ -628,7 +618,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(1)) ); setUp(); @@ -658,7 +648,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except long advanceMs = 20000; long timestampFirst = first.getMillis(); Collection sinkRecords = createSinkRecordsWithTimestamp(records.subList(0, 9), schema, 0, timestampFirst, advanceMs); - long timestampLater = first.plusHours(2).getMillis(); + long timestampLater = first.plusDays(1).getMillis(); sinkRecords.addAll(createSinkRecordsWithTimestamp(records.subList(9, 18), schema, 9, timestampLater, advanceMs)); for (SinkRecord record : sinkRecords) { @@ -667,24 +657,90 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except topicPartitionWriter.recover(); topicPartitionWriter.write(); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); topicPartitionWriter.close(); String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst); String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater); String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst); + String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); + Set expectedFiles = new HashSet<>(); String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic()); - for (int i : new int[]{0, 3, 6}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat))); + expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat))); + + int expectedBatchSize = 9; + verify(expectedFiles, expectedBatchSize, records, schema); + } + + @Test + public void testCloseTempFileByRotateIntervalWithoutReadingNewRecords() throws Exception { + setUp(); + // Define the partitioner + partitioner = new DataWriter.PartitionerWrapper( + new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>() + ); + parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1)); + parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); + parsedConfig.put(PartitionerConfig.PATH_FORMAT_CONFIG, "'ds'=YYYYMMdd"); + partitioner.configure(parsedConfig); + + properties.put(FLUSH_SIZE_CONFIG, "1000"); + properties.put( + ROTATE_INTERVAL_MS_CONFIG, + String.valueOf(TimeUnit.MINUTES.toMillis(1)) + ); + connectorConfig = new HdfsSinkConnectorConfig(properties); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, + storage, + writerProvider, + newWriterProvider, + partitioner, + connectorConfig, + context, + avroData, + time + ); + + Schema schema = createSchema(); + List records = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + records.add(createRecord(schema, i, 12.2f)); } - String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater); - // Records 15,16,17 won't be flushed until a record with a higher timestamp arrives. - for (int i : new int[]{9, 12}) { - expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat))); + long advanceMs = 20000; + DateTime first = new DateTime(2023, 5, 7, 9, 0, DateTimeZone.UTC); + long timestamp = first.getMillis(); + Collection sinkRecords = createSinkRecordsWithTimestamp( + records, schema, 0, timestamp, advanceMs + ); + for (SinkRecord record : sinkRecords) { + topicPartitionWriter.buffer(record); } - verify(expectedFiles, 3, records, schema); + + assertEquals(-1, topicPartitionWriter.offset()); + topicPartitionWriter.write(); + assertEquals(0, topicPartitionWriter.offset()); + time.sleep(TimeUnit.MINUTES.toMillis(2)); + boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers(); + assertEquals(true, shouldRotateAndMaybeUpdateTimers); + topicPartitionWriter.write(); + assertEquals(10, topicPartitionWriter.offset()); + topicPartitionWriter.close(); + + Set expectedFiles = new HashSet<>(); + String topicsDir = this.topicsDir.get(TOPIC); + expectedFiles.add(new Path(url + "/" + topicsDir + "/" + TOPIC + "/ds=20230507" + + "/" + TOPIC + "+" + PARTITION + "+0000000000+0000000009" + extension)); + int expectedBatchSize = 10; + verify(expectedFiles, expectedBatchSize, records, schema); } @Test @@ -693,7 +749,7 @@ public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation // Do not roll on size, only based on time. localProps.put(FLUSH_SIZE_CONFIG, "1000"); localProps.put( - HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG, + ROTATE_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.HOURS.toMillis(1)) ); localProps.put(