Skip to content

Commit 8be8c8d

Browse files
committed
fix(plugin): fix KafkaFileObjectStateBackingStoreConfig
1 parent 3141794 commit 8be8c8d

File tree

2 files changed

+141
-20
lines changed

2 files changed

+141
-20
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,31 @@
2323
import java.util.Map;
2424
import java.util.Optional;
2525
import org.apache.kafka.clients.CommonClientConfigs;
26-
import org.apache.kafka.clients.producer.ProducerConfig;
2726
import org.apache.kafka.common.config.AbstractConfig;
2827
import org.apache.kafka.common.config.ConfigDef;
2928

30-
public class KafkaFileObjectStateBackingStoreConfig extends AbstractConfig {
29+
public final class KafkaFileObjectStateBackingStoreConfig extends AbstractConfig {
3130

3231
private static final String GROUP = "KafkaFileObjectStateBackingStore";
3332

34-
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG = "tasks.file.status.storage.topic";
33+
public static final String TASKS_FILE_STATUS_STORAGE_PREFIX = "tasks.file.status.storage.";
34+
35+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "topic";
3536
private static final String TASKS_FILE_STATUS_STORAGE_TOPIC_DOC = "The topic name which is used to report file states.";
3637
private static final String TASKS_FILE_STATUS_STORAGE_TOPIC_DEFAULT = "connect-file-pulse-status";
3738

38-
public static final String TASKS_FILE_STATUS_STORAGE_NAME_CONFIG = "tasks.file.status.storage.name";
39+
public static final String TASKS_FILE_STATUS_STORAGE_NAME_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "name";
3940
private static final String TASKS_FILE_STATUS_STORAGE_NAME_DOC = "The reporter identifier to be used by tasks and connector to report and monitor file progression.";
4041

41-
public static final String TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG = "tasks.file.status.storage.bootstrap.servers";
42+
public static final String TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
4243

43-
public static final String TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG = "tasks.file.status.storage.consumer.enabled";
44+
public static final String TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "consumer.enabled";
4445
public static final String TASKS_FILE_STATUS_STORAGE_CONSUMER_ENABLED_DOC = "Boolean to indicate if the storage should consume the status topic.";
4546

46-
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG = "tasks.file.status.storage.topic.partitions";
47+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "topic.partitions";
4748
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_PARTITIONS_DOC = "The number of partitions to be used for the status storage topic.";
4849

49-
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG = "tasks.file.status.storage.topic.replication.factor";
50+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "topic.replication.factor";
5051
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor to be used for the status storage topic.";
5152

5253
/**
@@ -72,21 +73,21 @@ public String getTaskStorageName() {
7273

7374
public Map<String, Object> getConsumerTaskStorageConfigs() {
7475
final Map<String, Object> configs = new HashMap<>();
75-
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
76+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
7677
configs.putAll(getInternalKafkaConsumerConfigs());
7778
return configs;
7879
}
7980

8081
public Map<String, Object> getProducerTaskStorageConfigs() {
8182
final Map<String, Object> configs = new HashMap<>();
82-
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
83+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
8384
configs.putAll(getInternalKafkaProducerConfigs());
8485
return configs;
8586
}
8687

8788
public Map<String, Object> getAdminClientTaskStorageConfigs() {
8889
final Map<String, Object> configs = new HashMap<>();
89-
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
90+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getInternalBootstrapServers());
9091
configs.putAll(getInternalKafkaAdminClientConfigs());
9192
return configs;
9293
}
@@ -96,21 +97,24 @@ private String getInternalBootstrapServers() {
9697
}
9798

9899
private Map<String, Object> getInternalKafkaAdminClientConfigs() {
99-
Map<String, Object> consumerConfigs = KafkaUtils.getAdminClientConfigs(originals());
100-
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.admin."));
101-
return consumerConfigs;
100+
Map<String, Object> originals = originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX, true);
101+
Map<String, Object> adminClientConfigs = new HashMap<>(originals);
102+
adminClientConfigs.putAll(originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX + "admin."));
103+
return KafkaUtils.getAdminClientConfigs(originals);
102104
}
103105

104106
private Map<String, Object> getInternalKafkaConsumerConfigs() {
105-
Map<String, Object> consumerConfigs = KafkaUtils.getConsumerConfigs(originals());
106-
consumerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.consumer."));
107-
return consumerConfigs;
107+
Map<String, Object> originals = originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX, true);
108+
Map<String, Object> consumerConfigs = new HashMap<>(originals);
109+
consumerConfigs.putAll(originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX + "consumer."));
110+
return KafkaUtils.getConsumerConfigs(consumerConfigs);
108111
}
109112

110113
private Map<String, Object> getInternalKafkaProducerConfigs() {
111-
Map<String, Object> producerConfigs = KafkaUtils.getProducerConfigs(originals());
112-
producerConfigs.putAll(originalsWithPrefix("tasks.file.status.storage.producer."));
113-
return producerConfigs;
114+
Map<String, Object> originals = originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX, true);
115+
Map<String, Object> producerConfigs = new HashMap<>(originals);
116+
producerConfigs.putAll(originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX + "producer."));
117+
return KafkaUtils.getProducerConfigs(producerConfigs);
114118
}
115119

116120
Optional<Integer> getTopicPartitions() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.state;
20+
21+
import java.util.Map;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
27+
class KafkaFileObjectStateBackingStoreConfigTest {
28+
29+
@Test
30+
void should_return_default_topic_configs_given_valid_props() {
31+
// GIVEN
32+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
33+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
34+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???"
35+
));
36+
37+
// WHEN
38+
String topic = config.getTaskStorageTopic();
39+
40+
// THEN
41+
Assertions.assertEquals("connect-file-pulse-status", topic);
42+
}
43+
44+
@Test
45+
void should_return_override_topic_configs_given_valid_props() {
46+
// GIVEN
47+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
48+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
49+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_TOPIC_CONFIG, "test",
50+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???"
51+
));
52+
53+
// WHEN
54+
String topic = config.getTaskStorageTopic();
55+
56+
// THEN
57+
Assertions.assertEquals("test", topic);
58+
}
59+
60+
@Test
61+
void should_return_name_configs_given_valid_props() {
62+
// GIVEN
63+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
64+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "test",
65+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???"
66+
));
67+
68+
// WHEN
69+
String name = config.getTaskStorageName();
70+
71+
// THEN
72+
Assertions.assertEquals("test", name);
73+
}
74+
75+
@Test
76+
void should_return_consumer_configs_given_valid_props() {
77+
// GIVEN
78+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
79+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
80+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???",
81+
"tasks.file.status.storage.random", "???",
82+
"tasks.file.status.storage." + ConsumerConfig.CLIENT_ID_CONFIG, "???",
83+
"tasks.file.status.storage.consumer." + ConsumerConfig.GROUP_ID_CONFIG, "???"
84+
));
85+
86+
// WHEN
87+
Map<String, Object> consumerConfigs = config.getConsumerTaskStorageConfigs();
88+
89+
// THEN
90+
Assertions.assertEquals(3, consumerConfigs.size());
91+
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
92+
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG));
93+
Assertions.assertNotNull(consumerConfigs.get(ConsumerConfig.GROUP_ID_CONFIG));
94+
}
95+
96+
@Test
97+
void should_return_producer_configs_given_valid_props() {
98+
// GIVEN
99+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
100+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
101+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???",
102+
"tasks.file.status.storage.random", "???",
103+
"tasks.file.status.storage." + ProducerConfig.CLIENT_ID_CONFIG, "???",
104+
"tasks.file.status.storage.producer." + ProducerConfig.ACKS_CONFIG, "???"
105+
));
106+
107+
// WHEN
108+
Map<String, Object> producerConfigs = config.getProducerTaskStorageConfigs();
109+
110+
// THEN
111+
Assertions.assertEquals(3, producerConfigs.size());
112+
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
113+
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.CLIENT_ID_CONFIG));
114+
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.ACKS_CONFIG));
115+
}
116+
117+
}

0 commit comments

Comments
 (0)