Skip to content

Commit 9163b74

Browse files
committed
fix(dataformat): fix invalig config and NPE in XMLNodeToStructConverter
1 parent f964de0 commit 9163b74

File tree

4 files changed

+10
-9
lines changed

4 files changed

+10
-9
lines changed

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLCommonConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public String getAttributePrefix() {
113113
return getString(withKeyPrefix(XML_ATTRIBUTE_PREFIX_CONFIG));
114114
}
115115

116-
public String getTextNodeValueFieldName() {
116+
public String getContentFieldName() {
117117
return getString(withKeyPrefix(XML_CONTENT_FIELD_NAME_CONFIG));
118118
}
119119

@@ -126,7 +126,7 @@ public String getXmlFieldCharactersStringReplacement() {
126126
}
127127

128128
public List<String> getForceContentFields() {
129-
return getList(withKeyPrefix(XML_CONTENT_FIELD_NAME_CONFIG));
129+
return getList(withKeyPrefix(XML_FORCE_CONTENT_FIELD_FOR_PATHS_CONFIG));
130130
}
131131

132132
public boolean isDataTypeInferenceEnabled() {

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLNodeToStructConverter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ private TypedValue convertObjectTree(final Node node,
165165
final String childNodeName = isTextNode(child) ? nodeName : determineNodeName(child);
166166
Optional<TypedValue> optional = readObjectNodeValue(
167167
child,
168+
childNodeName,
168169
currentForceArrayFields,
169170
currentForceContentFields
170171
);
@@ -184,20 +185,19 @@ private TypedValue convertObjectTree(final Node node,
184185
}
185186

186187
private Optional<TypedValue> readObjectNodeValue(final Node node,
188+
final String nodeName,
187189
final FieldPaths forceArrayFields,
188190
final FieldPaths forceContentFields) {
189191
if (isWhitespaceOrNewLineNodeElement(node) ||
190192
isNodeOfType(node, Node.DOCUMENT_TYPE_NODE)) {
191193
return Optional.empty();
192194
}
193195

194-
final boolean forceContentField = forceContentFields.anyMatches(determineNodeName(node));
195-
196196
if (isTextNode(node)) {
197197
final String text = node.getNodeValue();
198198
final TypedValue data = toTypedValue(text);
199199

200-
return readTextNodeValue(node, data, forceContentField);
200+
return readTextNodeValue(node, data, forceContentFields.anyMatches(nodeName));
201201
}
202202

203203
if (isElementNode(node)) {
@@ -206,7 +206,7 @@ private Optional<TypedValue> readObjectNodeValue(final Node node,
206206
if (childTextContent.isPresent()) {
207207
final String text = childTextContent.get();
208208
final TypedValue data = toTypedValue(text);
209-
return readTextNodeValue(node, data, forceContentField);
209+
return readTextNodeValue(node, data, forceContentFields.anyMatches(nodeName));
210210
}
211211
return Optional.of(convertObjectTree(node, forceArrayFields, forceContentFields));
212212
}
@@ -243,7 +243,8 @@ private Map<String, String> getNotExcludedNodeAttributes(final Node node) {
243243
final Map<String, String> attributes = new HashMap<>();
244244
for (int i = 0; i < nodeMap.getLength(); i++) {
245245
Attr attr = (Attr) nodeMap.item(i);
246-
if (!excludeAttributesInNamespaces.contains(attr.getNamespaceURI())) {
246+
String namespaceURI = attr.getNamespaceURI();
247+
if (namespaceURI == null || !excludeAttributesInNamespaces.contains(namespaceURI)) {
247248
String attrName = determineNodeName(attr);
248249
if (isNotXmlNamespace(attr)) {
249250
attributes.put(attributePrefix + attrName, attr.getNodeValue());

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public XMLFileInputIterator(final XMLFileInputReaderConfig config,
8989
.setForceArrayFields(FieldPaths.from(config.forceArrayFields()))
9090
.setForceContentFields(FieldPaths.from(config.getForceContentFields()))
9191
.setTypeInferenceEnabled(config.isDataTypeInferenceEnabled())
92-
.setContentFieldName(config.getTextNodeValueFieldName())
92+
.setContentFieldName(config.getContentFieldName())
9393
.setFieldCharactersRegexPattern(config.getXmlFieldCharactersRegexPattern())
9494
.setFieldCharactersStringReplacement(config.getXmlFieldCharactersStringReplacement())
9595
.setAttributePrefix(config.getAttributePrefix());

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/XmlToStructFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void configure(final Map<String, ?> configs) {
7777
.setForceArrayFields(FieldPaths.from(filterConfig.forceArrayFields()))
7878
.setForceContentFields(FieldPaths.from(filterConfig.getForceContentFields()))
7979
.setTypeInferenceEnabled(filterConfig.isDataTypeInferenceEnabled())
80-
.setContentFieldName(filterConfig.getTextNodeValueFieldName())
80+
.setContentFieldName(filterConfig.getContentFieldName())
8181
.setFieldCharactersRegexPattern(filterConfig.getXmlFieldCharactersRegexPattern())
8282
.setFieldCharactersStringReplacement(filterConfig.getXmlFieldCharactersStringReplacement())
8383
.setAttributePrefix(filterConfig.getAttributePrefix());

0 commit comments

Comments
 (0)