Skip to content

Commit 6247fb0

Browse files
committed
feat: add settings and topic blacklist
1 parent 7beda27 commit 6247fb0

File tree

6 files changed

+68
-0
lines changed

6 files changed

+68
-0
lines changed

src/main/java/com/devshawn/kafka/dsf/StateManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ private DesiredPlan generatePlan() {
7878

7979
private void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
8080
List<TopicListing> topics = kafkaService.getTopics();
81+
List<String> prefixesToIgnore = getPrefixedTopicsToIgnore(desiredState);
8182

8283
desiredState.getTopics().forEach((key, value) -> {
8384
TopicPlan.Builder topicPlan = new TopicPlan.Builder()
@@ -99,6 +100,12 @@ private void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPl
99100
});
100101

101102
topics.forEach(currentTopic -> {
103+
boolean shouldIgnore = prefixesToIgnore.stream().anyMatch(it -> currentTopic.name().startsWith(it));
104+
if (shouldIgnore) {
105+
log.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
106+
return;
107+
}
108+
102109
if (desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
103110
TopicPlan topicPlan = new TopicPlan.Builder()
104111
.setName(currentTopic.name())
@@ -246,6 +253,14 @@ private void applyAcls(DesiredPlan desiredPlan) {
246253
});
247254
}
248255

256+
private List<String> getPrefixedTopicsToIgnore(DesiredState desiredState) {
257+
try {
258+
return desiredState.getSettings().get().getTopics().get().getBlacklist().get().getPrefixed();
259+
} catch (NoSuchElementException ex) {
260+
return Collections.emptyList();
261+
}
262+
}
263+
249264
private void initializeLogger(boolean verbose) {
250265
Logger root = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
251266
Logger kafka = (Logger) LoggerFactory.getLogger("org.apache.kafka");

src/main/java/com/devshawn/kafka/dsf/domain/state/DesiredState.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import org.inferred.freebuilder.FreeBuilder;
55

66
import java.util.Map;
7+
import java.util.Optional;
78

89
@FreeBuilder
910
@JsonDeserialize(builder = DesiredState.Builder.class)
1011
public interface DesiredState {
1112

13+
Optional<Settings> getSettings();
14+
1215
Map<String, TopicDetails> getTopics();
1316

1417
Map<String, AclDetails> getAcls();
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.dsf.domain.state;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = Settings.Builder.class)
10+
public interface Settings {
11+
12+
Optional<SettingsTopics> getTopics();
13+
14+
class Builder extends Settings_Builder {
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.dsf.domain.state;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsTopics.Builder.class)
10+
public interface SettingsTopics {
11+
12+
Optional<SettingsTopicsBlacklist> getBlacklist();
13+
14+
class Builder extends SettingsTopics_Builder {
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.dsf.domain.state;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.List;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsTopicsBlacklist.Builder.class)
10+
public interface SettingsTopicsBlacklist {
11+
12+
List<String> getPrefixed();
13+
14+
class Builder extends SettingsTopicsBlacklist_Builder {
15+
}
16+
}

src/main/java/com/devshawn/kafka/dsf/service/ParserService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
1111
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
1212
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
13+
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

@@ -30,6 +31,7 @@ public ParserService(File file) {
3031
this.objectMapper = new ObjectMapper(new YAMLFactory());
3132
objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
3233
objectMapper.enable(JsonParser.Feature.STRICT_DUPLICATE_DETECTION);
34+
objectMapper.registerModule(new Jdk8Module());
3335
this.file = file;
3436
}
3537

0 commit comments

Comments
 (0)