Skip to content

Commit 4f892a8

Browse files
committed
feat: add new config props to define file ordering
1 parent 9163b74 commit 4f892a8

File tree

7 files changed

+176
-56
lines changed

7 files changed

+176
-56
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.jsoniter.JsonIterator;
2222
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
2323
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
24+
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileOrder;
2425
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
2526
import io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy;
2627
import io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner;
@@ -59,6 +60,9 @@ public class CommonSourceConfig extends AbstractConfig {
5960
public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
6061
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
6162

63+
public static final String TASKS_FILE_PROCESSING_ORDER_BY_CONFIG = "tasks.file.processing.order.by";
64+
private static final String TASKS_FILE_PROCESSING_ORDER_BY_DOC = "The strategy to be used for sorting files for processing. Valid values are: LAST_MODIFIED, URI, CONTENT_LENGTH, CONTENT_LENGTH_DESC.";
65+
6266
public static final String TASKS_HALT_ON_ERROR_CONFIG = "tasks.halt.on.error";
6367
private static final String TASKS_HALT_ON_ERROR_DOC = "Should a task halt when it encounters an error or continue to the next file.";
6468

@@ -204,7 +208,7 @@ public static ConfigDef getConfigDev() {
204208
ConfigDef.Importance.HIGH,
205209
FILTER_DOC,
206210
FILTERS_GROUP,
207-
-1,
211+
groupCounter++,
208212
ConfigDef.Width.NONE,
209213
FILTER_CONFIG
210214
)
@@ -214,10 +218,21 @@ public static ConfigDef getConfigDev() {
214218
DefaultTaskPartitioner.class,
215219
ConfigDef.Importance.HIGH,
216220
FILTER_DOC,
221+
TASK_PARTITIONER_CLASS_DOC,
222+
groupCounter++,
223+
ConfigDef.Width.NONE,
224+
TASK_PARTITIONER_CLASS_CONFIG
225+
)
226+
.define(
227+
TASKS_FILE_PROCESSING_ORDER_BY_CONFIG,
228+
ConfigDef.Type.STRING,
229+
TaskFileOrder.BuiltIn.LAST_MODIFIED.name(),
230+
ConfigDef.Importance.MEDIUM,
231+
TASKS_FILE_PROCESSING_ORDER_BY_DOC,
217232
FILTERS_GROUP,
218-
-1,
233+
groupCounter++,
219234
ConfigDef.Width.NONE,
220-
TASK_PARTITIONER_CLASS_DOC
235+
TASKS_FILE_PROCESSING_ORDER_BY_CONFIG
221236
);
222237
}
223238

@@ -233,6 +248,10 @@ public TaskPartitioner getTaskPartitioner() {
233248
return this.getConfiguredInstance(TASK_PARTITIONER_CLASS_CONFIG, TaskPartitioner.class);
234249
}
235250

251+
public TaskFileOrder getTaskFilerOrder() {
252+
return TaskFileOrder.findBuiltInByName(this.getString(TASKS_FILE_PROCESSING_ORDER_BY_CONFIG));
253+
}
254+
236255
public boolean isTaskHaltOnError() {
237256
return this.getBoolean(TASKS_HALT_ON_ERROR_CONFIG);
238257
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.ArrayList;
4141
import java.util.Collection;
4242
import java.util.Collections;
43-
import java.util.Comparator;
4443
import java.util.Iterator;
4544
import java.util.LinkedList;
4645
import java.util.List;
@@ -52,7 +51,6 @@
5251
import java.util.concurrent.TimeoutException;
5352
import java.util.concurrent.atomic.AtomicBoolean;
5453
import java.util.function.Predicate;
55-
import java.util.stream.Collectors;
5654

5755
/**
5856
* A default {@link FileSystemMonitor} that can be used to trigger file
@@ -68,9 +66,6 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
6866
private static final Duration DEFAULT_READ_END_LOG_TIMEOUT = Duration.ofSeconds(5);
6967
private static final int MAX_SCHEDULE_ATTEMPTS = 3;
7068

71-
private final static Comparator<FileObjectMeta> BY_LAST_MODIFIED =
72-
Comparator.comparingLong(FileObjectMeta::lastModified);
73-
7469
private final FileSystemListing<?> fsListing;
7570

7671
private final StateBackingStore<FileObject> store;
@@ -104,6 +99,8 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
10499

105100
private final Predicate<FileObjectStatus> cleanablePredicate;
106101

102+
private final TaskFileOrder taskFileOrder;
103+
107104
/**
108105
* Creates a new {@link DefaultFileSystemMonitor} instance.
109106
*
@@ -118,16 +115,19 @@ public DefaultFileSystemMonitor(final Long allowTasksReconfigurationAfterTimeout
118115
final GenericFileCleanupPolicy cleanPolicy,
119116
final Predicate<FileObjectStatus> cleanablePredicate,
120117
final SourceOffsetPolicy offsetPolicy,
121-
final StateBackingStore<FileObject> store) {
118+
final StateBackingStore<FileObject> store,
119+
final TaskFileOrder taskFileOrder) {
122120
Objects.requireNonNull(fsListening, "'fsListening' should not be null");
123121
Objects.requireNonNull(cleanPolicy, "'cleanPolicy' should not be null");
124122
Objects.requireNonNull(offsetPolicy, "'offsetPolicy' should not be null");
125123
Objects.requireNonNull(store, "'store' should not null");
126124
Objects.requireNonNull(cleanablePredicate, "'cleanablePredicate' should not null");
125+
Objects.requireNonNull(taskFileOrder, "'taskFileOrder' should not null");
127126

128127
this.fsListing = fsListening;
129128
this.allowTasksReconfigurationAfterTimeoutMs = allowTasksReconfigurationAfterTimeoutMs;
130129
this.cleanablePredicate = cleanablePredicate;
130+
this.taskFileOrder = taskFileOrder;
131131

132132
if (cleanPolicy instanceof FileCleanupPolicy) {
133133
this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy) cleanPolicy);
@@ -420,12 +420,7 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
420420
"or no object file was detected after starting the connector."
421421
);
422422
}
423-
424-
// Sort all object files by the last-modified date before returning
425-
return partitions
426-
.stream()
427-
.sorted(BY_LAST_MODIFIED)
428-
.collect(Collectors.toList());
423+
return taskFileOrder.sort(partitions);
429424
} finally {
430425
scanned.clear();
431426
taskReconfigurationRequested.set(false);

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DelegateTaskFileURIProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class DelegateTaskFileURIProvider implements TaskFileURIProvider {
5454

5555
private StateBackingStoreAccess sharedStore;
5656

57+
private TaskFileOrder taskFileOrder;
58+
5759
private StateSnapshot<FileObject> fileState;
5860

5961
private boolean isFirstCall = true;
@@ -80,6 +82,7 @@ public void configure(final Map<String, ?> configs) {
8082
fileSystemListing = config.getFileSystemListing();
8183
sourceOffsetPolicy = config.getSourceOffsetPolicy();
8284
fileSystemListing.setFilter(new CompositeFileListFilter(config.getFileSystemListingFilter()));
85+
taskFileOrder = config.getTaskFilerOrder();
8386
}
8487

8588
/**
@@ -110,7 +113,9 @@ public List<URI> nextURIs() {
110113
).values();
111114

112115
isFirstCall = false;
113-
return partitioner.partitionForTask(filtered, config.getTaskCount(), config.getTaskId());
116+
117+
List<FileObjectMeta> sorted = taskFileOrder.sort(filtered);
118+
return partitioner.partitionForTask(sorted, config.getTaskCount(), config.getTaskId());
114119
}
115120

116121
private void refreshState() {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2022 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
22+
23+
import java.util.Collection;
24+
import java.util.Comparator;
25+
import java.util.List;
26+
import java.util.function.Function;
27+
import java.util.stream.Collectors;
28+
29+
/**
30+
* Use for ordering files before assigning to task.
31+
*/
32+
public interface TaskFileOrder extends Function<Collection<FileObjectMeta>, List<FileObjectMeta>> {
33+
34+
static TaskFileOrder findBuiltInByName(final String name) {
35+
return BuiltIn.valueOf(name).get();
36+
}
37+
38+
List<FileObjectMeta> sort(final Collection<FileObjectMeta> objects);
39+
40+
/**
41+
* {@inheritDoc}
42+
*/
43+
@Override
44+
default List<FileObjectMeta> apply(final Collection<FileObjectMeta> objects) {
45+
return sort(objects);
46+
}
47+
48+
enum BuiltIn {
49+
50+
/** Sort all object files by the last-modified date. */
51+
LAST_MODIFIED(
52+
new AbstractTaskFileOrder(Comparator.comparingLong(FileObjectMeta::lastModified))
53+
),
54+
/** Sort all object files by URI. */
55+
URI(
56+
new AbstractTaskFileOrder(Comparator.comparing(FileObjectMeta::uri))
57+
),
58+
/** Sort all object files by content-length. */
59+
CONTENT_LENGTH(
60+
new AbstractTaskFileOrder(Comparator.comparingLong(FileObjectMeta::contentLength))
61+
),
62+
/** Sort all object files by content-length. */
63+
CONTENT_LENGTH_DESC(
64+
new AbstractTaskFileOrder(Comparator.comparingLong(FileObjectMeta::contentLength).reversed())
65+
);
66+
67+
final TaskFileOrder order;
68+
69+
BuiltIn(TaskFileOrder order) {
70+
this.order = order;
71+
}
72+
73+
TaskFileOrder get() {
74+
return order;
75+
}
76+
77+
}
78+
79+
class AbstractTaskFileOrder implements TaskFileOrder {
80+
81+
private final Comparator<FileObjectMeta> comparator;
82+
83+
protected AbstractTaskFileOrder(Comparator<FileObjectMeta> comparator) {
84+
this.comparator = comparator;
85+
}
86+
87+
/**
88+
* {@inheritDoc}
89+
*/
90+
@Override
91+
public List<FileObjectMeta> sort(final Collection<FileObjectMeta> objects) {
92+
return objects
93+
.stream()
94+
.sorted(comparator)
95+
.collect(Collectors.toList());
96+
}
97+
}
98+
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ public void start(final Map<String, String> props) {
121121
connectorConfig.getFsCleanupPolicy(),
122122
connectorConfig.getFsCleanupPolicyPredicate(),
123123
connectorConfig.getSourceOffsetPolicy(),
124-
sharedStore.get().getResource()
124+
sharedStore.get().getResource(),
125+
connectorConfig.getTaskFilerOrder()
125126
);
126127

127128
monitor.setFileSystemListingEnabled(!connectorConfig.isFileListingTaskDelegationEnabled());

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ private DefaultFileSystemMonitor newFileSystemMonitor(final MockFileCleaner clea
255255
cleaner,
256256
status -> List.of(FileObjectStatus.FAILED, FileObjectStatus.COMPLETED).contains(status),
257257
OFFSET_MANAGER,
258-
store
258+
store,
259+
TaskFileOrder.BuiltIn.LAST_MODIFIED.get()
259260
);
260261
}
261262

0 commit comments

Comments
 (0)