Skip to content

Commit f964de0

Browse files
committed
feat(dataformat): add new XML config to force content field
1 parent 5f517f9 commit f964de0

File tree

8 files changed

+150
-53
lines changed

8 files changed

+150
-53
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,6 @@
6363
<suppress checks="LineLength" files="KafkaBasedLog.java"/>
6464
<suppress checks="NPathComplexity" files="KafkaBasedLog.java"/>
6565
<!-- Classes used for configuration -->
66-
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files=".*Config.java"/>
66+
<suppress checks="(MethodLength|LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files=".*Config.java"/>
6767
<suppress checks="(LineLength|CyclomaticComplexity|NPathComplexity|JavaNCSS)" files="SchemaMerger.java"/>
6868
</suppressions>

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLCommonConfig.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class XMLCommonConfig extends AbstractConfig {
3636

3737
public static final String XML_FORCE_ARRAY_ON_FIELDS_CONFIG = "xml.force.array.on.fields";
38-
private static final String XML_FORCE_ARRAY_ON_FIELDS_FORCE_DOC = "The comma-separated list of fields for which an array-type must be forced";
38+
private static final String XML_FORCE_ARRAY_ON_FIELDS_FORCE_DOC = "The comma-separated list of fields for which an array-type must be forced.";
3939

4040
public static final String XML_PARSER_VALIDATING_ENABLED_CONFIG = "xml.parser.validating.enabled";
4141
private static final String XML_PARSER_VALIDATING_ENABLED_DOC = " Specifies that the parser will validate documents as they are parsed (default: false).";
@@ -58,16 +58,19 @@ public class XMLCommonConfig extends AbstractConfig {
5858
public static final String XML_ATTRIBUTE_PREFIX_CONFIG = "xml.attribute.prefix";
5959
private static final String XML_ATTRIBUTE_PREFIX_DOC = "If set, the name of attributes will be prepended with the specified prefix when they are added to a record (default: '').";
6060

61-
public static final String XML_TEXT_NODE_VALUE_FIELD_NAME_CONFIG = "xml.text.node.value.field.name";
62-
private static final String XML_TEXT_NODE_VALUE_FIELD_NAME_DEFAULT = "value";
63-
private static final String XML_TEXT_NODE_VALUE_FIELD_NAME_DOC = "Specifies the name to be used for naming the field that will contain the value of a TextNode element having attributes. (default: 'value').";
61+
public static final String XML_CONTENT_FIELD_NAME_CONFIG = "xml.content.field.name";
62+
private static final String XML_CONTENT_FIELD_NAME_CONFIG_DEFAULT = "value";
63+
private static final String XML_CONTENT_FIELD_NAME_CONFIG_DOC = "Specifies the name to be used for naming the field that will contain the value of a TextNode element having attributes. (default: 'value').";
6464

6565
public static final String XML_FIELD_NAME_CHARACTERS_REGEX_PATTERN_CONFIG = "xml.field.name.characters.regex.pattern";
6666
private static final String XML_FIELD_NAME_CHARACTERS_REGEX_PATTERN_DOC = "Specifies the regex pattern to use for matching the characters in XML element name to replace when converting a document to a struct (default: '[.\\-]').";
6767

6868
public static final String XML_FIELD_NAME_CHARACTER_STRING_REPLACEMENT_CONFIG = "xml.field.name.characters.string.replacement";
6969
private static final String XML_FIELD_NAME_CHARACTER_STRING_REPLACEMENT_DOC = "Specifies the replacement string to be used when converting a document to a struct (default: '').";
7070

71+
public static final String XML_FORCE_CONTENT_FIELD_FOR_PATHS_CONFIG = "xml.force.content.field.for.paths";
72+
private static final String XML_FORCE_CONTENT_FIELD_FOR_PATHS_DOC = "The comma-separated list of field for which a content-field must be forced.";
73+
7174
private final String keyPrefix;
7275

7376
/**
@@ -111,7 +114,7 @@ public String getAttributePrefix() {
111114
}
112115

113116
public String getTextNodeValueFieldName() {
114-
return getString(withKeyPrefix(XML_TEXT_NODE_VALUE_FIELD_NAME_CONFIG));
117+
return getString(withKeyPrefix(XML_CONTENT_FIELD_NAME_CONFIG));
115118
}
116119

117120
public Pattern getXmlFieldCharactersRegexPattern() {
@@ -122,6 +125,10 @@ public String getXmlFieldCharactersStringReplacement() {
122125
return getString(withKeyPrefix(XML_FIELD_NAME_CHARACTER_STRING_REPLACEMENT_CONFIG));
123126
}
124127

128+
public List<String> getForceContentFields() {
129+
return getList(withKeyPrefix(XML_CONTENT_FIELD_NAME_CONFIG));
130+
}
131+
125132
public boolean isDataTypeInferenceEnabled() {
126133
return getBoolean(withKeyPrefix(XML_DATA_TYPE_INFERENCE_ENABLED_CONFIG));
127134
}
@@ -229,15 +236,15 @@ public static ConfigDef buildConfigDefWith(final String group,
229236
keyPrefix + XML_ATTRIBUTE_PREFIX_CONFIG
230237
)
231238
.define(
232-
keyPrefix + XML_TEXT_NODE_VALUE_FIELD_NAME_CONFIG,
239+
keyPrefix + XML_CONTENT_FIELD_NAME_CONFIG,
233240
ConfigDef.Type.STRING,
234-
XML_TEXT_NODE_VALUE_FIELD_NAME_DEFAULT,
241+
XML_CONTENT_FIELD_NAME_CONFIG_DEFAULT,
235242
ConfigDef.Importance.LOW,
236-
XML_TEXT_NODE_VALUE_FIELD_NAME_DOC,
243+
XML_CONTENT_FIELD_NAME_CONFIG_DOC,
237244
group,
238245
filterGroupCounter++,
239246
ConfigDef.Width.NONE,
240-
keyPrefix + XML_TEXT_NODE_VALUE_FIELD_NAME_CONFIG
247+
keyPrefix + XML_CONTENT_FIELD_NAME_CONFIG
241248
)
242249
.define(
243250
keyPrefix + XML_FIELD_NAME_CHARACTERS_REGEX_PATTERN_CONFIG,
@@ -260,6 +267,17 @@ public static ConfigDef buildConfigDefWith(final String group,
260267
filterGroupCounter++,
261268
ConfigDef.Width.NONE,
262269
keyPrefix + XML_FIELD_NAME_CHARACTER_STRING_REPLACEMENT_CONFIG
270+
)
271+
.define(
272+
keyPrefix + XML_FORCE_CONTENT_FIELD_FOR_PATHS_CONFIG,
273+
ConfigDef.Type.LIST,
274+
Collections.emptyList(),
275+
ConfigDef.Importance.LOW,
276+
XML_FORCE_CONTENT_FIELD_FOR_PATHS_DOC,
277+
group,
278+
filterGroupCounter++,
279+
ConfigDef.Width.NONE,
280+
keyPrefix + XML_FORCE_CONTENT_FIELD_FOR_PATHS_CONFIG
263281
);
264282

265283
for (ConfigDef.ConfigKey configKey : additional) {

connect-file-pulse-dataformat/src/main/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLNodeToStructConverter.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,30 @@ public final class XMLNodeToStructConverter implements Function<Node, TypedStruc
6060
private static final String FIELD_CHARACTERS_STRING_REPLACEMENT_DEFAULT = "_";
6161

6262
private static final String XML_TEXT_NODE_VALUE_FIELD_NAME_DEFAULT = "value";
63+
public static final String DOCUMENT_NODE_NAME = "#document";
6364

6465
private boolean excludeEmptyElement = false;
6566

6667
private boolean excludeAllAttributes = false;
6768

6869
private boolean isTypeInferenceEnabled = false;
6970

70-
private String textNodeValueFieldName = XML_TEXT_NODE_VALUE_FIELD_NAME_DEFAULT;
71+
private String contentFieldName = XML_TEXT_NODE_VALUE_FIELD_NAME_DEFAULT;
7172

7273
private Set<String> excludeAttributesInNamespaces = Collections.emptySet();
7374

7475
private String attributePrefix = "";
7576

7677
private FieldPaths forceArrayFields = FieldPaths.empty();
7778

79+
private FieldPaths forceContentFields = FieldPaths.empty();
80+
7881
private Pattern fieldCharactersRegexPattern = FIELD_CHARACTERS_REGEX_PATTERN_DEFAULT;
7982

8083
private String fieldCharactersStringReplacement = FIELD_CHARACTERS_STRING_REPLACEMENT_DEFAULT;
8184

82-
public XMLNodeToStructConverter setTextNodeValueFieldName(String textNodeValueFieldName) {
83-
this.textNodeValueFieldName = textNodeValueFieldName;
85+
public XMLNodeToStructConverter setContentFieldName(String textNodeValueFieldName) {
86+
this.contentFieldName = textNodeValueFieldName;
8487
return this;
8588
}
8689

@@ -94,7 +97,6 @@ public XMLNodeToStructConverter setExcludeAllAttributes(boolean excludeAllAttrib
9497
return this;
9598
}
9699

97-
98100
public XMLNodeToStructConverter setFieldCharactersRegexPattern(Pattern fieldCharactersRegexPattern) {
99101
this.fieldCharactersRegexPattern = fieldCharactersRegexPattern;
100102
return this;
@@ -120,6 +122,11 @@ public XMLNodeToStructConverter setForceArrayFields(final FieldPaths forceArrayF
120122
return this;
121123
}
122124

125+
public XMLNodeToStructConverter setForceContentFields(final FieldPaths forceContentFields) {
126+
this.forceContentFields = forceContentFields;
127+
return this;
128+
}
129+
123130
public XMLNodeToStructConverter setTypeInferenceEnabled(final boolean isTypeInferenceEnabled) {
124131
this.isTypeInferenceEnabled = isTypeInferenceEnabled;
125132
return this;
@@ -133,25 +140,34 @@ public XMLNodeToStructConverter setTypeInferenceEnabled(final boolean isTypeInfe
133140
*/
134141
@Override
135142
public TypedStruct apply(final Node node) {
136-
return convertObjectTree(node, forceArrayFields).getStruct();
143+
return convertObjectTree(node, forceArrayFields, forceContentFields).getStruct();
137144
}
138145

139146
private TypedValue convertObjectTree(final Node node,
140-
final FieldPaths forceArrayFields) {
147+
final FieldPaths forceArrayFields,
148+
final FieldPaths forceContentFields) {
141149
Objects.requireNonNull(node, "'node' cannot be null");
142150

143151
final String nodeName = determineNodeName(node);
144-
final FieldPaths currentForceArrayFields = nodeName.equals("#document")
152+
final FieldPaths currentForceArrayFields = nodeName.equals(DOCUMENT_NODE_NAME)
145153
? forceArrayFields :
146154
forceArrayFields.next(sanitizeNodeName(nodeName));
147155

156+
final FieldPaths currentForceContentFields = nodeName.equals(DOCUMENT_NODE_NAME)
157+
? forceContentFields :
158+
forceContentFields.next(sanitizeNodeName(nodeName));
159+
148160
// Create a new Struct container object for holding all node elements, i.e., child nodes and attributes.
149161
TypedStruct container = TypedStruct.create();
150162
getNotExcludedNodeAttributes(node).forEach(container::put);
151163
for (Node child = node.getFirstChild(); child != null; child = child.getNextSibling()) {
152164
// Text nodes always return #text" as the node name, so it's best to use the parent node name instead.
153165
final String childNodeName = isTextNode(child) ? nodeName : determineNodeName(child);
154-
Optional<TypedValue> optional = readObjectNodeValue(child, currentForceArrayFields);
166+
Optional<TypedValue> optional = readObjectNodeValue(
167+
child,
168+
currentForceArrayFields,
169+
currentForceContentFields
170+
);
155171
if (optional.isPresent()) {
156172
final TypedValue nodeValue = optional.get();
157173
if (excludeEmptyElement &&
@@ -168,16 +184,20 @@ private TypedValue convertObjectTree(final Node node,
168184
}
169185

170186
private Optional<TypedValue> readObjectNodeValue(final Node node,
171-
final FieldPaths forceArrayFields) {
187+
final FieldPaths forceArrayFields,
188+
final FieldPaths forceContentFields) {
172189
if (isWhitespaceOrNewLineNodeElement(node) ||
173190
isNodeOfType(node, Node.DOCUMENT_TYPE_NODE)) {
174191
return Optional.empty();
175192
}
176193

194+
final boolean forceContentField = forceContentFields.anyMatches(determineNodeName(node));
195+
177196
if (isTextNode(node)) {
178197
final String text = node.getNodeValue();
179198
final TypedValue data = toTypedValue(text);
180-
return readTextNodeValue(node, data);
199+
200+
return readTextNodeValue(node, data, forceContentField);
181201
}
182202

183203
if (isElementNode(node)) {
@@ -186,9 +206,9 @@ private Optional<TypedValue> readObjectNodeValue(final Node node,
186206
if (childTextContent.isPresent()) {
187207
final String text = childTextContent.get();
188208
final TypedValue data = toTypedValue(text);
189-
return readTextNodeValue(node, data);
209+
return readTextNodeValue(node, data, forceContentField);
190210
}
191-
return Optional.of(convertObjectTree(node, forceArrayFields));
211+
return Optional.of(convertObjectTree(node, forceArrayFields, forceContentFields));
192212
}
193213

194214
throw new ReaderException("Unsupported node type '" + node.getNodeType() + "'");
@@ -198,17 +218,19 @@ private TypedValue toTypedValue(final String text) {
198218
return isTypeInferenceEnabled ? TypedValue.parse(text) : TypedValue.string(text);
199219
}
200220

201-
private Optional<TypedValue> readTextNodeValue(final Node node, final TypedValue data) {
221+
private Optional<TypedValue> readTextNodeValue(final Node node,
222+
final TypedValue data,
223+
boolean forceContentField) {
202224
// Check if TextNode as no attribute
203225
final Map<String, String> attributes = getNotExcludedNodeAttributes(node);
204-
if (attributes.isEmpty()) {
226+
if (attributes.isEmpty() && !forceContentField) {
205227
return Optional.of(data);
206228
}
207229

208230
// Else, create a Struct container
209231
final TypedStruct container = TypedStruct.create();
210232
attributes.forEach(container::put);
211-
container.put(textNodeValueFieldName, data);
233+
container.put(contentFieldName, data);
212234
return Optional.of(TypedValue.struct(container));
213235
}
214236

connect-file-pulse-dataformat/src/test/java/io/streamthoughts/kafka/connect/filepulse/xml/XMLNodeToStructConverterTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.xml;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.data.FieldPaths;
2122
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2223
import org.junit.Assert;
2324
import org.junit.Test;
@@ -26,6 +27,7 @@
2627

2728
import java.io.ByteArrayInputStream;
2829
import java.io.IOException;
30+
import java.util.List;
2931

3032
public class XMLNodeToStructConverterTest {
3133

@@ -78,4 +80,56 @@ public void should_not_prefix_node_attributes() throws IOException, SAXException
7880
Assert.assertTrue(result.exists(path));
7981
Assert.assertEquals("text", result.find(path).getString());
8082
}
83+
84+
@Test
85+
public void should_convert_given_single_text_node_element_without_attr() throws Exception {
86+
// Given
87+
final byte[] bytes = "<root>test</root>".getBytes();
88+
final XMLNodeToStructConverter converter = new XMLNodeToStructConverter()
89+
.setContentFieldName("text");
90+
91+
// When
92+
final Document document = reader.parse(new ByteArrayInputStream(bytes));
93+
TypedStruct struct = converter.apply(document);
94+
95+
// Then
96+
Assert.assertNotNull(struct);
97+
Assert.assertEquals("test", struct.getString("root"));
98+
}
99+
100+
@Test
101+
public void should_convert_given_single_text_node_element_without_attr_and_force_content() throws Exception {
102+
// Given
103+
final byte[] bytes = "<root>test</root>".getBytes();
104+
final XMLNodeToStructConverter converter = new XMLNodeToStructConverter()
105+
.setContentFieldName("text")
106+
.setForceContentFields(FieldPaths.from(List.of("root")));
107+
108+
// When
109+
final Document document = reader.parse(new ByteArrayInputStream(bytes));
110+
TypedStruct struct = converter.apply(document);
111+
112+
// Then
113+
Assert.assertNotNull(struct);
114+
TypedStruct root = struct.getStruct("root");
115+
Assert.assertEquals("test", root.getString("text"));
116+
}
117+
118+
@Test
119+
public void should_convert_given_single_text_node_element_with_attrs() throws Exception {
120+
// Given
121+
final byte[] bytes = "<root attr=\"attr\">test</root>".getBytes();
122+
final XMLNodeToStructConverter converter = new XMLNodeToStructConverter()
123+
.setContentFieldName("text");
124+
125+
// When
126+
final Document document = reader.parse(new ByteArrayInputStream(bytes));
127+
TypedStruct struct = converter.apply(document);
128+
129+
// Then
130+
Assert.assertNotNull(struct);
131+
TypedStruct root = struct.getStruct("root");
132+
Assert.assertEquals("test", root.getString("text"));
133+
Assert.assertEquals("attr", root.getString("attr"));
134+
}
81135
}

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/xml/XMLFileInputIterator.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ public XMLFileInputIterator(final XMLFileInputReaderConfig config,
8787
.setExcludeAllAttributes(config.isNodeAttributesExcluded())
8888
.setExcludeAttributesInNamespaces(config.getExcludeNodeAttributesInNamespaces())
8989
.setForceArrayFields(FieldPaths.from(config.forceArrayFields()))
90+
.setForceContentFields(FieldPaths.from(config.getForceContentFields()))
9091
.setTypeInferenceEnabled(config.isDataTypeInferenceEnabled())
91-
.setTextNodeValueFieldName(config.getTextNodeValueFieldName())
92+
.setContentFieldName(config.getTextNodeValueFieldName())
9293
.setFieldCharactersRegexPattern(config.getXmlFieldCharactersRegexPattern())
9394
.setFieldCharactersStringReplacement(config.getXmlFieldCharactersStringReplacement())
9495
.setAttributePrefix(config.getAttributePrefix());
@@ -108,26 +109,26 @@ public XMLFileInputIterator(final XMLFileInputReaderConfig config,
108109
@Override
109110
public void warning(final SAXParseException e) {
110111
LOG.warn(
111-
"Handled XML parser warning on file {}. Error: {}",
112-
objectMeta.uri(),
113-
e.getLocalizedMessage()
112+
"Handled XML parser warning on file {}. Error: {}",
113+
objectMeta.uri(),
114+
e.getLocalizedMessage()
114115
);
115116
}
116117

117118
@Override
118119
public void error(final SAXParseException e) {
119120
LOG.warn(
120-
"Handled XML parser error on file {}. Error: {}",
121-
objectMeta.uri(),
122-
e.getLocalizedMessage()
121+
"Handled XML parser error on file {}. Error: {}",
122+
objectMeta.uri(),
123+
e.getLocalizedMessage()
123124
);
124125
}
125126

126127
@Override
127128
public void fatalError(final SAXParseException e) {
128129
throw new ReaderException(
129-
"Handled XML parser fatal error on file '" + objectMeta.uri() + "'",
130-
e
130+
"Handled XML parser fatal error on file '" + objectMeta.uri() + "'",
131+
e
131132
);
132133
}
133134
});

0 commit comments

Comments
 (0)