Skip to content

Commit d17298f

Browse files
committed
fix(plugin): add generation-id prop to task-config to force reconfiguraton
1 parent 10259c5 commit d17298f

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/SourceTaskConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class SourceTaskConfig extends CommonSourceConfig {
4747
private static final String OMIT_READ_COMMITTED_FILE_CONFIG = "ignore.committed.offsets";
4848
private static final String OMIT_READ_COMMITTED_FILE_DOC = "Should a task ignore committed offsets while scheduling a file (default : false).";
4949

50+
public static final String TASK_GENERATION_ID = "task.generation.id";
51+
private static final String TASK_GENERATION_DOC = "The task configuration generation id.";
52+
5053
private final EnrichedConnectorConfig enrichedConfig;
5154

5255
static ConfigDef getConf() {
@@ -64,6 +67,13 @@ static ConfigDef getConf() {
6467
false,
6568
ConfigDef.Importance.LOW,
6669
OMIT_READ_COMMITTED_FILE_DOC
70+
)
71+
.define(
72+
TASK_GENERATION_ID,
73+
ConfigDef.Type.INT,
74+
0,
75+
ConfigDef.Importance.LOW,
76+
TASK_GENERATION_DOC
6777
);
6878
}
6979

@@ -178,6 +188,10 @@ public FileInputReader reader() {
178188
return getConfiguredInstance(CommonSourceConfig.TASKS_FILE_READER_CLASS_CONFIG, FileInputReader.class);
179189
}
180190

191+
public int getTaskGenerationId() {
192+
return this.getInt(TASK_GENERATION_ID);
193+
}
194+
181195
public List<RecordFilter> filters() {
182196
final List<String> filterAliases = getList(FILTER_CONFIG);
183197
final List<RecordFilter> filters = new ArrayList<>(filterAliases.size());

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
158158
final List<Map<String, String>> taskConfigs = new ArrayList<>(maxTasks);
159159
IntStream.range(0, maxTasks)
160160
.forEachOrdered(i -> taskConfigs
161-
.add(createTaskConfig(i, maxTasks, null))
161+
.add(createTaskConfig(i, maxTasks, 0, null))
162162
);
163163
return taskConfigs;
164164
}
@@ -176,12 +176,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
176176
LOG.info("No object file was found - resetting all tasks with an empty config.");
177177
IntStream.range(0, maxTasks)
178178
.forEachOrdered(i -> taskConfigs
179-
.add(createTaskConfig(i, maxTasks, Collections.emptyList()))
179+
.add(createTaskConfig(i, maxTasks, taskConfigsGen, Collections.emptyList()))
180180
);
181181
} else {
182182
IntStream.range(0, partitioned.size())
183183
.forEachOrdered(i -> taskConfigs
184-
.add(createTaskConfig(i, partitioned.size(), partitioned.get(i)))
184+
.add(createTaskConfig(i, partitioned.size(), taskConfigsGen, partitioned.get(i)))
185185
);
186186
}
187187

@@ -204,8 +204,10 @@ private List<List<String>> partitionAndGet(int maxTasks) {
204204

205205
private Map<String, String> createTaskConfig(final int taskId,
206206
final int taskCount,
207+
final long taskConfigGen,
207208
final List<String> URIs) {
208209
final Map<String, String> taskConfig = new HashMap<>(configProperties);
210+
taskConfig.put(SourceTaskConfig.TASK_GENERATION_ID, String.valueOf(taskConfigGen));
209211
if (connectorConfig.isFileListingTaskDelegationEnabled()) {
210212
taskConfig.put(SourceTaskConfig.FILE_URIS_PROVIDER_CONFIG, DelegateTaskFileURIProvider.class.getName());
211213
taskConfig.put(SourceTaskConfig.TASK_PARTITIONER_CLASS_CONFIG, HashByURITaskPartitioner.class.getName());

0 commit comments

Comments
 (0)