Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,14 @@ public void open(Collection<TopicPartition> partitions) {
}
}

public void checkWritersForNecessityOfRotation() {
for (TopicPartitionWriter topicPartitionWriter : topicPartitionWriters.values()) {
if (topicPartitionWriter.shouldRotateAndMaybeUpdateTimers()) {
topicPartitionWriter.write();
}
}
}

public void close() {
// Close any writers we have. We may get assigned the same partitions and end up duplicating
// some effort since we'll have to reprocess those messages. It may be possible to hold on to
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
// Although the connector manages offsets via files in HDFS, we still want to have Connect
// commit the consumer offsets for records this task has consumed from its topic partitions and
// committed to HDFS.

// Check writers for should rotate by rotation interval
hdfsWriter.checkWritersForNecessityOfRotation();

Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : hdfsWriter.getCommittedOffsets().entrySet()) {
log.debug(
Expand Down
38 changes: 14 additions & 24 deletions src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public TopicPartitionWriter(
}

// Initialize rotation timers
updateRotationTimers(null);
updateRotationTimers();
}

@SuppressWarnings("fallthrough")
Expand Down Expand Up @@ -290,12 +290,9 @@ public boolean recover() {
return true;
}

private void updateRotationTimers(SinkRecord currentRecord) {
private void updateRotationTimers() {
long now = time.milliseconds();
// Wallclock-based partitioners should be independent of the record argument.
lastRotate = isWallclockBased
? (Long) now
: currentRecord != null ? timestampExtractor.extract(currentRecord) : null;
lastRotate = time.milliseconds();
if (log.isDebugEnabled() && rotateIntervalMs > 0) {
log.debug(
"Update last rotation timer. Next rotation for {} will be in {}ms",
Expand All @@ -322,7 +319,6 @@ private void updateRotationTimers(SinkRecord currentRecord) {
@SuppressWarnings("fallthrough")
public void write() {
long now = time.milliseconds();
SinkRecord currentRecord = null;
if (failureTime > 0 && now - failureTime < timeoutMs) {
return;
}
Expand All @@ -331,7 +327,7 @@ public void write() {
if (!success) {
return;
}
updateRotationTimers(null);
updateRotationTimers();
}
while (!buffer.isEmpty()) {
try {
Expand All @@ -358,7 +354,6 @@ public void write() {
}
}
SinkRecord record = buffer.peek();
currentRecord = record;
Schema valueSchema = record.valueSchema();
if ((recordCounter <= 0 && currentSchema == null && valueSchema != null)
|| compatibility.shouldChangeSchema(record, null, currentSchema)) {
Expand All @@ -373,7 +368,7 @@ public void write() {
break;
}
} else {
if (shouldRotateAndMaybeUpdateTimers(currentRecord, now)) {
if (shouldRotateAndMaybeUpdateTimers(now)) {
log.info(
"Starting commit and rotation for topic partition {} with start offsets {} "
+ "and end offsets {}",
Expand All @@ -391,7 +386,7 @@ public void write() {
}
}
case SHOULD_ROTATE:
updateRotationTimers(currentRecord);
updateRotationTimers();
closeTempFile();
nextState();
case TEMP_FILE_CLOSED:
Expand Down Expand Up @@ -424,7 +419,7 @@ public void write() {
case WRITE_PARTITION_PAUSED:
// committing files after waiting for rotateIntervalMs time but less than flush.size
// records available
if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(currentRecord, now)) {
if (recordCounter == 0 || !shouldRotateAndMaybeUpdateTimers(now)) {
break;
}

Expand All @@ -434,7 +429,7 @@ public void write() {
);
nextState();
case SHOULD_ROTATE:
updateRotationTimers(currentRecord);
updateRotationTimers();
closeTempFile();
nextState();
case TEMP_FILE_CLOSED:
Expand Down Expand Up @@ -559,19 +554,14 @@ private void setState(State state) {
this.state = state;
}

private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) {
Long currentTimestamp = null;
if (isWallclockBased) {
currentTimestamp = now;
} else if (currentRecord != null) {
currentTimestamp = timestampExtractor.extract(currentRecord);
lastRotate = lastRotate == null ? currentTimestamp : lastRotate;
}
public boolean shouldRotateAndMaybeUpdateTimers() {
return shouldRotateAndMaybeUpdateTimers(time.milliseconds());
}

private boolean shouldRotateAndMaybeUpdateTimers(long now) {
boolean periodicRotation = rotateIntervalMs > 0
&& currentTimestamp != null
&& lastRotate != null
&& currentTimestamp - lastRotate >= rotateIntervalMs;
&& now - lastRotate >= rotateIntervalMs;
boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate;
boolean messageSizeRotation = recordCounter >= flushSize;

Expand All @@ -580,7 +570,7 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long
+ "'{}', timestamp: '{}')? {}",
rotateIntervalMs,
lastRotate,
currentTimestamp,
now,
periodicRotation
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.confluent.connect.storage.wal.WAL;

import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG;
import static io.confluent.connect.storage.StorageSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG;
import static org.apache.kafka.common.utils.Time.SYSTEM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -476,7 +477,7 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except
// Do not roll on size, only based on time.
localProps.put(FLUSH_SIZE_CONFIG, "1000");
localProps.put(
HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG,
ROTATE_INTERVAL_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(1))
);
setUp();
Expand Down Expand Up @@ -525,47 +526,37 @@ public void testWriteRecordTimeBasedPartitionFieldTimestampHours() throws Except
Collections.singleton(new TopicPartition(TOPIC, PARTITION))
));

// And one last record to flush the previous ones.
long timestampMuchLater = first.plusHours(6).getMillis();
Struct lastOne = createRecordWithTimestampField(schema, timestampMuchLater);
sinkRecords.addAll(createSinkRecords(
Collections.singletonList(lastOne),
schema,
19,
Collections.singleton(new TopicPartition(TOPIC, PARTITION))
));

for (SinkRecord record : sinkRecords) {
topicPartitionWriter.buffer(record);
}

topicPartitionWriter.recover();
topicPartitionWriter.write();
assertEquals(0, topicPartitionWriter.offset());
time.sleep(TimeUnit.MINUTES.toMillis(2));
boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers();
assertEquals(true, shouldRotateAndMaybeUpdateTimers);
topicPartitionWriter.write();
topicPartitionWriter.close();

String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst);
String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater);

String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst);
Set<Path> expectedFiles = new HashSet<>();
String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
for (int i : new int[]{0, 3, 6}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
}

expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat)));
String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater);
for (int i : new int[]{9, 12, 15}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
}
verify(expectedFiles, 3, records, schema);
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat)));
verify(expectedFiles, 9, records, schema);
}

@Test
public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Exception {
// Do not roll on size, only based on time.
localProps.put(FLUSH_SIZE_CONFIG, "1000");
localProps.put(
HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG,
ROTATE_INTERVAL_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(1))
);
setUp();
Expand Down Expand Up @@ -603,32 +594,31 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampHours() throws Excep

topicPartitionWriter.recover();
topicPartitionWriter.write();
time.sleep(TimeUnit.MINUTES.toMillis(2));
boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers();
assertEquals(true, shouldRotateAndMaybeUpdateTimers);
topicPartitionWriter.write();
topicPartitionWriter.close();

String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst);
String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater);

String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst);
String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst);
Set<Path> expectedFiles = new HashSet<>();
for (int i : new int[]{0, 3, 6}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
}
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat)));

String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater);
// Records 15,16,17 won't be flushed until a record with a higher timestamp arrives.
for (int i : new int[]{9, 12}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
}
verify(expectedFiles, 3, records, schema);
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat)));
verify(expectedFiles, 9, records, schema);
}

@Test
public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Exception {
// Do not roll on size, only based on time.
localProps.put(FLUSH_SIZE_CONFIG, "1000");
localProps.put(
HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG,
ROTATE_INTERVAL_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(1))
);
setUp();
Expand Down Expand Up @@ -658,7 +648,7 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except
long advanceMs = 20000;
long timestampFirst = first.getMillis();
Collection<SinkRecord> sinkRecords = createSinkRecordsWithTimestamp(records.subList(0, 9), schema, 0, timestampFirst, advanceMs);
long timestampLater = first.plusHours(2).getMillis();
long timestampLater = first.plusDays(1).getMillis();
sinkRecords.addAll(createSinkRecordsWithTimestamp(records.subList(9, 18), schema, 9, timestampLater, advanceMs));

for (SinkRecord record : sinkRecords) {
Expand All @@ -667,24 +657,90 @@ public void testWriteRecordTimeBasedPartitionRecordTimestampDays() throws Except

topicPartitionWriter.recover();
topicPartitionWriter.write();
time.sleep(TimeUnit.MINUTES.toMillis(2));
boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers();
assertEquals(true, shouldRotateAndMaybeUpdateTimers);
topicPartitionWriter.write();
topicPartitionWriter.close();

String encodedPartitionFirst = getTimebasedEncodedPartition(timestampFirst);
String encodedPartitionLater = getTimebasedEncodedPartition(timestampLater);

String dirPrefixFirst = partitioner.generatePartitionedPath(TOPIC, encodedPartitionFirst);
String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater);

Set<Path> expectedFiles = new HashSet<>();
String topicsDir = this.topicsDir.get(TOPIC_PARTITION.topic());
for (int i : new int[]{0, 3, 6}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixFirst, TOPIC_PARTITION, 0, 8, extension, zeroPadFormat)));
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, 9, 17, extension, zeroPadFormat)));

int expectedBatchSize = 9;
verify(expectedFiles, expectedBatchSize, records, schema);
}

@Test
public void testCloseTempFileByRotateIntervalWithoutReadingNewRecords() throws Exception {
setUp();
// Define the partitioner
partitioner = new DataWriter.PartitionerWrapper(
new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>()
);
parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1));
parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record");
parsedConfig.put(PartitionerConfig.PATH_FORMAT_CONFIG, "'ds'=YYYYMMdd");
partitioner.configure(parsedConfig);

properties.put(FLUSH_SIZE_CONFIG, "1000");
properties.put(
ROTATE_INTERVAL_MS_CONFIG,
String.valueOf(TimeUnit.MINUTES.toMillis(1))
);
connectorConfig = new HdfsSinkConnectorConfig(properties);

TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION,
storage,
writerProvider,
newWriterProvider,
partitioner,
connectorConfig,
context,
avroData,
time
);

Schema schema = createSchema();
List<Struct> records = new ArrayList<>();
for (int i = 0; i < 10; i++) {
records.add(createRecord(schema, i, 12.2f));
}

String dirPrefixLater = partitioner.generatePartitionedPath(TOPIC, encodedPartitionLater);
// Records 15,16,17 won't be flushed until a record with a higher timestamp arrives.
for (int i : new int[]{9, 12}) {
expectedFiles.add(new Path(FileUtils.committedFileName(url, topicsDir, dirPrefixLater, TOPIC_PARTITION, i, i + 2, extension, zeroPadFormat)));
long advanceMs = 20000;
DateTime first = new DateTime(2023, 5, 7, 9, 0, DateTimeZone.UTC);
long timestamp = first.getMillis();
Collection<SinkRecord> sinkRecords = createSinkRecordsWithTimestamp(
records, schema, 0, timestamp, advanceMs
);
for (SinkRecord record : sinkRecords) {
topicPartitionWriter.buffer(record);
}
verify(expectedFiles, 3, records, schema);

assertEquals(-1, topicPartitionWriter.offset());
topicPartitionWriter.write();
assertEquals(0, topicPartitionWriter.offset());
time.sleep(TimeUnit.MINUTES.toMillis(2));
boolean shouldRotateAndMaybeUpdateTimers = topicPartitionWriter.shouldRotateAndMaybeUpdateTimers();
assertEquals(true, shouldRotateAndMaybeUpdateTimers);
topicPartitionWriter.write();
assertEquals(10, topicPartitionWriter.offset());
topicPartitionWriter.close();

Set<Path> expectedFiles = new HashSet<>();
String topicsDir = this.topicsDir.get(TOPIC);
expectedFiles.add(new Path(url + "/" + topicsDir + "/" + TOPIC + "/ds=20230507" +
"/" + TOPIC + "+" + PARTITION + "+0000000000+0000000009" + extension));
int expectedBatchSize = 10;
verify(expectedFiles, expectedBatchSize, records, schema);
}

@Test
Expand All @@ -693,7 +749,7 @@ public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation
// Do not roll on size, only based on time.
localProps.put(FLUSH_SIZE_CONFIG, "1000");
localProps.put(
HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG,
ROTATE_INTERVAL_MS_CONFIG,
String.valueOf(TimeUnit.HOURS.toMillis(1))
);
localProps.put(
Expand Down