Skip to content

Commit 6f6decc

Browse files
committed
fix(plugin): files should be cleanup when an error occurred while converting data to connect record
1 parent b308e70 commit 6f6decc

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public RecordsIterable<FileRecord<TypedStruct>> next() {
182182
latestPolledRecord = filtered.last();
183183
}
184184

185-
// Return record to the connect SourceTask
185+
// Return record to SourceTask
186186
return filtered;
187187
} catch (final ConnectFilePulseException e) {
188188
exception = e;
@@ -325,7 +325,7 @@ private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfCom
325325
}
326326
}
327327

328-
// Quickly check if we can considered this file completed based on the content-length.
328+
// Quickly check if we can consider this file completed based on the content-length.
329329
boolean isAlreadyCompleted = committedOffset.position() >= metadata.contentLength();
330330
if (!ignoreCommittedOffsets && isAlreadyCompleted) {
331331
LOG.warn(
@@ -358,6 +358,10 @@ private FileInputIterator<FileRecord<TypedStruct>> getOrCloseIteratorIfNoMoreRec
358358
return null;
359359
}
360360

361+
public void closeCurrentIterator(final Exception cause) {
362+
closeIterator(currentIterator, cause);
363+
}
364+
361365
private void closeIterator(final FileInputIterator<FileRecord<TypedStruct>> iterator,
362366
final Exception cause) {
363367
final FileContext context = iterator.context();

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,15 @@ private SourceRecord buildSourceRecord(final FileContext context,
295295
return result;
296296

297297
} catch (final Throwable t) {
298-
throw new ConnectFilePulseException(
299-
"Failed to convert data into connect record: '" + context.metadata().uri() + "'",
298+
var exception = new ConnectFilePulseException(String.format(
299+
"Failed to convert data into Kafka Connect record at offset %s from object-file: %s'",
300+
context.offset(),
301+
context.metadata()),
300302
t
301303
);
304+
// Close internal iterator for the current object-file so that it will be marked as failed
305+
consumer.closeCurrentIterator(exception);
306+
throw exception;
302307
}
303308
}
304309

0 commit comments

Comments
 (0)